消息队列
概述
多任务系统中,任务间互相同步等待共享资源,我们一般会使用信号量,如果需要互斥,则使用互斥量。而任务间互相收发消息则可以使用消息队列。消息队列(queue)使用类似信号量的机制进行任务间的同步,并使用环形缓冲池(ring buffer)来进行消息的队列缓冲管理,以达到任务间收发消息的阻塞和通知管理。Queue的实现目的在于任务间互相收发消息。一般如果有信号量机制,用户就可以自己实现一套任务间的阻塞和通知收发功能,其本质在于接收方通过信号量的获取来开始接收消息,发送方通过信号量的释放来通知接收方处理。接收任务在无消息时被阻塞,消息到来时被唤醒处理。Queue就是基于这样一种类信号量机制来进行消息的收发。再加上ring buffer的缓冲机制来缓存任务间的消息队列,就组合成了本章的消息队列(queue)。其既包含消息的缓冲队列,又包含了消息的通知机制。
消息队列模块整体受宏RHINO_CONFIG_BUF_QUEUE
开关控制,
对应的AOS API接口实现位于:aos_rhino.c
中,由RHINO_CONFIG_BUF_QUEUE
宏定义包含实现;
对应的krhino内部实现位于:·k_buf_queue.c
。
包含头文件
#include <aos/aos.h> //总头文件
#include <aos/kernel.h> //直接对应头文件
接口定义
创建一个队列
int aos_queue_new(aos_queue_t *queue, void* buf, unsigned int size, int max_msg)
参数:
- queue:队列描述结构体指针;需要用户定义一个queue结构体;例:‘aos_queue_t g_queue’; 传入&g_queue
- buf:此queue队列的缓冲区起点;例:char buf[1000];传入buf
- size:此queue队列的缓冲区大小
- max_msg:一次存入缓冲区的最大数据单元
返回值:
- 0:执行成功;其他:返回失败
删除一个队列
void aos_queue_free(aos_queue_t *queue)
删除一个队列,并释放阻塞在其中的任务
参数:
- queue:队列描述结构体指针
返回值:
- 无
向queue内发送数据
int aos_queue_send(aos_queue_t *queue, void* msg, unsigned int size)
参数:
- queue:队列描述结构体指针
- msg:发送数据起始内存
- size:发送数据大小
返回值
- 0:执行成功;其他:返回失败
从queue内收取数据
int aos_queue_recv(aos_queue_t *queue, unsigned int ms, void* msg, unsigned int*size)
参数:
- queue:队列描述结构体指针
- ms: 传入0表示不超时,立即返回;AOS_WAIT_FOREVER表示永久等待;其他数值表示超时时间,单位ms
- msg:返回获取到的数据的内存指针
- size:返回获取到的数据大小
返回值:
- 0:执行成功;其他:返回失败
判断一个队列queue是否有效
int aos_queue_is_valid(aos_queue_t *queue)
参数:
- queue: 判断一个队列queue是否有效
返回值
- 0:queue无效;1:queue有效
获取一个队列queue的缓冲区起点
void *aos_queue_buf_ptr(aos_queue_t* queue)
- 参数: *queue:队列描述结构体指针
- 返回值:
- NULL: 获取失败;其他:返回队列queue的缓冲区起点
示例代码
#define QUEUE_BUF_LEN 100
static aos_queue_t test_queue;
static char queue_buf[QUEUE_BUF_LEN];
static void task1_entry(void *arg)
{
char *msg_send = "Hello, Queue!";
aos_msleep(3000); // 任务休眠3000ms
printf("task1 send msg\n");
/*发送消息*/
aos_queue_send(&test_queue, msg_send, strlen(msg_send));
}
static void task2_entry(void *arg)
{
char msg_recv[16] = {0};
unsigned int size_recv = 16;
printf("task2 wait msg\n");
memset(msg_recv, 0, size_recv);
aos_queue_recv(&test_queue, 100000, msg_recv, &size_recv);
printf("task2 get msg: ", size_recv);
for (int i = 0; i < size_recv; i++) {
printf("%c", msg_recv[i]);
}
printf("\n");
/*获取到消息,当前任务继续执行下去*/
printf("queue test successfully!\n");
/*删除消息队列*/
aos_queue_free(&test_queue);
}
void test_queue_start(void)
{
int ret = -1;
aos_msleep(1000); // 任务休眠1000ms
/*当前任务:创建消息队列,消息队列最大为长度为100,单条消息最大为20*/
ret = aos_queue_new(&test_queue, queue_buf, QUEUE_BUF_LEN, 20);
if (ret != 0) {
printf("queue create failed\n");
return;
}
aos_task_new("task1", task1_entry, NULL, 512);
aos_task_new("task2", task2_entry, NULL, 512);
}