消息队列实现原理

消息队列实现原理

消息队列创建QueueCreate.c

//队列实现,实际是xQueueGenericCreate函数,为什么呢

//队列的创建类型
#define queueQUEUE_TYPE_BASE				( ( uint8_t ) 0U )
#define queueQUEUE_TYPE_SET					( ( uint8_t ) 0U )
#define queueQUEUE_TYPE_MUTEX 				( ( uint8_t ) 1U )
#define queueQUEUE_TYPE_COUNTING_SEMAPHORE	( ( uint8_t ) 2U )
#define queueQUEUE_TYPE_BINARY_SEMAPHORE	( ( uint8_t ) 3U )
#define queueQUEUE_TYPE_RECURSIVE_MUTEX		( ( uint8_t ) 4U )


#if( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
	#define xQueueCreate( uxQueueLength, uxItemSize ) 
	xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )
#endif

	/*
		参数:
			uxQueueLength
			uxItemSize
			ucQueueType
		返回值
			QueueHandle_t  队列的句柄 其实是队列控制块地址
			
	*/
	QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, const uint8_t ucQueueType )
	{
	Queue_t *pxNewQueue;
	size_t xQueueSizeInBytes;
	uint8_t *pucQueueStorage;


		if( uxItemSize == ( UBaseType_t ) 0 )//队列内存空间为空
		{
			/* 队列字节大小 赋值为0 */
			xQueueSizeInBytes = ( size_t ) 0;
		}
		else
		{
			/* 队列字节大小 赋值为  长度*每个队列项大小*/
			xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize ); /*lint !e961 MISRA exception as the casts are only redundant for some ports. */
		}
		// 申请内存空间  消息队列控制块大小+消息队列空间大小
		pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes );

		if( pxNewQueue != NULL )
		{
			/* 找到消息队列操作空间的首地址 */
			pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t );
			//初始化一个新的消息队列
			prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
		}

		return pxNewQueue;
	}

/*

	参数:
		pucQueueStorage :队列操作空间的首地址
		pxNewQueue : 队列的句柄
	返回值:
	
*/
static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, uint8_t *pucQueueStorage, const uint8_t ucQueueType, Queue_t *pxNewQueue )
{
	/* Remove compiler warnings about unused parameters should
	configUSE_TRACE_FACILITY not be set to 1. */
	( void ) ucQueueType;

	if( uxItemSize == ( UBaseType_t ) 0 )
	{
		/* 把队列控制块首地址赋值到队列头指针上  ????? 这是互斥信号使用,后面再分析 */
		pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
	}
	else
	{
		/* 把队列空间首地址赋值给队列头指针上 */
		pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
	}

	/* 长度 单元大小 */
	pxNewQueue->uxLength = uxQueueLength;
	pxNewQueue->uxItemSize = uxItemSize;
	//队列重置函数
	( void ) xQueueGenericReset( pxNewQueue, pdTRUE );
}

/*
	参数:
		xQueue 队列句柄
		xNewQueue 操作队列的状态是什么,新建pdTRUE还是已经创建好了
	返回值:
*/
BaseType_t xQueueGenericReset( QueueHandle_t xQueue, BaseType_t xNewQueue )
{
Queue_t * const pxQueue = ( Queue_t * ) xQueue;

	//进入临界段,这是时候操作队列控制块,不允许打断
	taskENTER_CRITICAL();
	{
	/*
		1、头地址赋值
		2、等待处理的消息个数为0
		3、写入指针赋值为队列头指针
		4、读出指针写入最后一个可用消息
		5、赋值队列锁为解锁状态
	*/
		pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
		pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;
		pxQueue->pcWriteTo = pxQueue->pcHead;
		pxQueue->u.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - ( UBaseType_t ) 1U ) * pxQueue->uxItemSize );
		pxQueue->cRxLock = queueUNLOCKED;
		pxQueue->cTxLock = queueUNLOCKED;

		//判断是否为新建队列
		if( xNewQueue == pdFALSE )
		{
			/* 判断发送等待列表里面是否有任务 */
			if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
			{
				//移除事件列表中的任务
				if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
				{
					//进行上下文切换
					queueYIELD_IF_USING_PREEMPTION();
				}
				else
				{
					mtCOVERAGE_TEST_MARKER();
				}
			}
			else
			{
				mtCOVERAGE_TEST_MARKER();
			}
		}
		else//新建队列,直接初始化 发送和接受 列表项
		{
			/* Ensure the event queues start in the correct state. */
			vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
			vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
		}
	}
	taskEXIT_CRITICAL();

	/* A value is returned for calling semantic consistency with previous
	versions. */
	return pdPASS;
}

消息队列删除QueueDelete.c

void vQueueDelete( QueueHandle_t xQueue )
{
Queue_t * const pxQueue = ( Queue_t * ) xQueue;

	configASSERT( pxQueue );
	traceQUEUE_DELETE( pxQueue );

	#if ( configQUEUE_REGISTRY_SIZE > 0 )
	{
		vQueueUnregisterQueue( pxQueue );
	}
	#endif

	#if( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 0 ) )
	{
		/*其实就是释放消息队列的内存空间*/
		vPortFree( pxQueue );
	}
	#elif( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 1 ) )
	{
		/* The queue could have been allocated statically or dynamically, so
		check before attempting to free the memory. */
		if( pxQueue->ucStaticallyAllocated == ( uint8_t ) pdFALSE )
		{
			/*其实就是释放消息队列的内存空间*/
			vPortFree( pxQueue );
		}
		else
		{
			mtCOVERAGE_TEST_MARKER();
		}
	}
	#else
	{
		/* The queue must have been statically allocated, so is not going to be
		deleted.  Avoid compiler warnings about the unused parameter. */
		( void ) pxQueue;
	}
	#endif /* configSUPPORT_DYNAMIC_ALLOCATION */
}

消息队列接收queuereceive.c

/*
	最终调用发送消息接口是xQueueGenericReceive???为什么
	队列出队,有两种模式
		一种是:出队后,删除已经读取到队列项或者消息空间
		另一种是:出队后,不删除,然后恢复出队记录地址,让其他任务或者中断,继续读取使用
	类型:
		pdFALSE		删除
		pdTRUE		不删除

*/
#define xQueueReceive( xQueue, pvBuffer, xTicksToWait ) 
xQueueGenericReceive( ( xQueue ), ( pvBuffer ), ( xTicksToWait ), pdFALSE )

BaseType_t xQueueGenericReceive( QueueHandle_t xQueue, void * const pvBuffer, TickType_t xTicksToWait, const BaseType_t xJustPeeking )

				//重点分析这一个,其他流程和发送流程差不多
				//判断是否删除已经接收到的消息空间
 				if( xJustPeeking == pdFALSE )
 				{
 					traceQUEUE_RECEIVE( pxQueue );
						
					//更新消息等待的记录值,让它减一
 					pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1;
 				}
 				else
 				{
					//不删除 就重新赋值未读取消息之前的地址,到出队指针
 					pxQueue->u.pcReadFrom = pcOriginalReadPosition;
 
 				}

消息队列发送queuesend.c

/*
	最终调用发送消息接口是xQueueGenericSend???为什么
	由于信号量都是有消息队列实现的,这个时候,操作系统,定义了消息队列的类型
	类型:
	#define	queueSEND_TO_BACK		( ( BaseType_t ) 0 ) 1、从队尾加入
	#define	queueSEND_TO_FRONT		( ( BaseType_t ) 1 ) 2、从队头加入
	#define queueOVERWRITE			( ( BaseType_t ) 2 ) 3、覆盖入队
*/
#define xQueueSend( xQueue, pvItemToQueue, xTicksToWait ) 
xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_BACK )
/*
	参数:
		xQueue
		pvItemToQueue
		xTicksToWait
		queueSEND_TO_BACK
	返回值:BaseType_t 

*/
BaseType_t xQueueGenericSend( QueueHandle_t xQueue, const void * const pvItemToQueue, TickType_t xTicksToWait, const BaseType_t xCopyPosition )
{
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;


	/* 
		使用for死循环的目的,为了快速的处理消息拷贝,消息处理功能
	*/
	for( ;; )
	{	
		//进入了临界段
		taskENTER_CRITICAL();
		{
			/* 
				1、判断消息队列是否满了
				2、是否允许覆盖入队
				这两个条件任何一个成立,都执行入队操作
			*/
			if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
			{
				//拷贝数据到队列操作空间内
				xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );

				#if ( configUSE_QUEUE_SETS == 1 )
				
				#else /* configUSE_QUEUE_SETS */
				{
					/* 判断等待接收的列表是否为空. */
					if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
					{
						//溢出列表,改变等待任务状态为就绪态
						if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
						{
							/* 进行上下文切换*/
							queueYIELD_IF_USING_PREEMPTION();
						}
						else
						{
							mtCOVERAGE_TEST_MARKER();
						}
					}
					else if( xYieldRequired != pdFALSE )
					{
						/* 再次进行上下文切换 */
						queueYIELD_IF_USING_PREEMPTION();
					}
					else
					{
						mtCOVERAGE_TEST_MARKER();
					}
				}
				#endif /* configUSE_QUEUE_SETS */
				//退出临界段,返回成功
				taskEXIT_CRITICAL();
				return pdPASS;
			}
			else
			{
				//不允许入队,先判断是否需要阻塞
				if( xTicksToWait == ( TickType_t ) 0 )
				{
					/* 不许阻塞,退出临界段,之后返回队满 */
					taskEXIT_CRITICAL();
					traceQUEUE_SEND_FAILED( pxQueue );
					return errQUEUE_FULL;
				}
				//超时结构体是否操作过
				else if( xEntryTimeSet == pdFALSE )
				{
					/* 超时结构体初始化*/
					vTaskSetTimeOutState( &xTimeOut );
					xEntryTimeSet = pdTRUE;
				}
				else
				{
					/* Entry time was already set. */
					mtCOVERAGE_TEST_MARKER();
				}
			}
		}
		退出临界段
		taskEXIT_CRITICAL();
		
		//后面的代码都是允许阻塞处理

		/* 
			1、挂起了调度器 ----不让其他任务打断
			2、队列上锁------不让中断打断 (因为之前已经退出临界段了)

		*/

		vTaskSuspendAll();
		prvLockQueue( pxQueue );

		/* 判断阻塞时间是否超时了 */
		if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
		{
			//判断队列是否满
			if( prvIsQueueFull( pxQueue ) != pdFALSE )
			{
				//队满,把当前任务,添加到等待发送的事件列表中,内部还把任务添加到延时列表中去
				traceBLOCKING_ON_QUEUE_SEND( pxQueue );
				vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );

				/* 解锁*/
				prvUnlockQueue( pxQueue );

				/* 恢复调度器 */
				if( xTaskResumeAll() == pdFALSE )
				{
					//进行上下文切换
					portYIELD_WITHIN_API();
				}
			}
			else
			{
				//队列未满 解锁,恢复调度器,重新进行入队操作
				/* Try again. */
				prvUnlockQueue( pxQueue );
				( void ) xTaskResumeAll();
			}
		}//
		//已经超时了,解锁,开始调度器 返回队满
		else
		{
			/* The timeout has expired. */
			prvUnlockQueue( pxQueue );
			( void ) xTaskResumeAll();

			traceQUEUE_SEND_FAILED( pxQueue );
			return errQUEUE_FULL;
		}
	}
}

在中断中发送queuesendformIsr.c

/*
	最终调用发送消息接口是xQueueGenericSendFromISR???为什么
	由于信号量都是有消息队列实现的,这个时候,操作系统,定义了消息队列的类型
	类型:
	#define	queueSEND_TO_BACK		( ( BaseType_t ) 0 ) 1、从队尾加入
	#define	queueSEND_TO_FRONT		( ( BaseType_t ) 1 ) 2、从队头加入
	#define queueOVERWRITE			( ( BaseType_t ) 2 ) 3、覆盖入队
*/
#define xQueueSendFromISR( xQueue, pvItemToQueue, pxHigherPriorityTaskWoken ) 
xQueueGenericSendFromISR( ( xQueue ), ( pvItemToQueue ), ( pxHigherPriorityTaskWoken ), queueSEND_TO_BACK )
/*
	参数:
		xQueue
		pvItemToQueue
		NULL
		queueSEND_TO_BACK
	返回值:BaseType_t 

*/
BaseType_t xQueueGenericSendFromISR( QueueHandle_t xQueue, const void * const pvItemToQueue, BaseType_t * const pxHigherPriorityTaskWoken, const BaseType_t xCopyPosition )
{
BaseType_t xReturn;
UBaseType_t uxSavedInterruptStatus;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;


	/*带返回值的关闭中断,需要保存上次关闭中断的状态值,恢复时候,写入 */
	uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
	{
		//队满?覆盖入队?
		if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
		{
			//获取了队列发送锁的状态值
			const int8_t cTxLock = pxQueue->cTxLock;
			//拷贝数据到队列操作空间内
			( void ) prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );

			/* 判断队列是否上锁 */
			/*
				#define queueUNLOCKED					( ( int8_t ) -1 ) 解锁状态
				#define queueLOCKED_UNMODIFIED			( ( int8_t ) 0 )  上锁状态初值
			*/
			if( cTxLock == queueUNLOCKED )
			{
				#if ( configUSE_QUEUE_SETS == 1 )
				
				#else /* configUSE_QUEUE_SETS */
				{
					//恢复等待消息任务,中断内没有进行上下文切换
					if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
					{
						if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
						{
							/* The task waiting has a higher priority so record that a
							context	switch is required. */
							if( pxHigherPriorityTaskWoken != NULL )
							{
								*pxHigherPriorityTaskWoken = pdTRUE;
							}
							else
							{
								mtCOVERAGE_TEST_MARKER();
							}
						}
						else
						{
							mtCOVERAGE_TEST_MARKER();
						}
					}
					else
					{
						mtCOVERAGE_TEST_MARKER();
					}
				}
				#endif /* configUSE_QUEUE_SETS */
			}
			else //队列已经上锁
			{
				/* 发送锁加一 */
				pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 );
			}
			//返回成功
			xReturn = pdPASS;
		}
		else
		{	
			//返回队满
			traceQUEUE_SEND_FROM_ISR_FAILED( pxQueue );
			xReturn = errQUEUE_FULL;
		}
	}
	//开启中断,保存上次状态值
	portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );

	return xReturn;
}
//队列上锁
把发送和接受锁都赋值为上锁的初始值
#define prvLockQueue( pxQueue )								\
	taskENTER_CRITICAL();									\
	{														\
		if( ( pxQueue )->cRxLock == queueUNLOCKED )			\
		{													\
			( pxQueue )->cRxLock = queueLOCKED_UNMODIFIED;	\
		}													\
		if( ( pxQueue )->cTxLock == queueUNLOCKED )			\
		{													\
			( pxQueue )->cTxLock = queueLOCKED_UNMODIFIED;	\
		}													\
	}														\
	taskEXIT_CRITICAL()
	
/*
	队列解锁
	参数:消息队列句柄
*/
static void prvUnlockQueue( Queue_t * const pxQueue )
{
	/* THIS FUNCTION MUST BE CALLED WITH THE SCHEDULER SUSPENDED. */

	/* 进入临界段 */
	taskENTER_CRITICAL();
	{
		//获取发送锁的状态值
		int8_t cTxLock = pxQueue->cTxLock;

		/* 遍历解锁  直到解锁为止 */
		while( cTxLock > queueLOCKED_UNMODIFIED )
		{
			/* Data was posted while the queue was locked.  Are any tasks
			blocked waiting for data to become available? */
			#if ( configUSE_QUEUE_SETS == 1 )
		
			#else /* configUSE_QUEUE_SETS */
			{
				/* 解除等待消息任务,进行上下文切换 */
				if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
				{
					if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
					{
						/* The task waiting has a higher priority so record that
						a context switch is required. */
						vTaskMissedYield();
					}
					else
					{
						mtCOVERAGE_TEST_MARKER();
					}
				}
				else
				{
					break;
				}
			}
			#endif /* configUSE_QUEUE_SETS */
			//队列发送锁减一
			--cTxLock;
		}
		//最后,解除发送锁
		pxQueue->cTxLock = queueUNLOCKED;
	}
	//退出临界段
	taskEXIT_CRITICAL();

	/* Do the same for the Rx lock. */
	taskENTER_CRITICAL();
	{
		int8_t cRxLock = pxQueue->cRxLock;

		while( cRxLock > queueLOCKED_UNMODIFIED )
		{
			if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
			{
				if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
				{
					vTaskMissedYield();
				}
				else
				{
					mtCOVERAGE_TEST_MARKER();
				}

				--cRxLock;
			}
			else
			{
				break;
			}
		}

		pxQueue->cRxLock = queueUNLOCKED;
	}
	taskEXIT_CRITICAL();
}

消息队列在中断中接收