Skip to content

Commit

Permalink
change(freertos/smp): Update stream_buffer.c locking
Browse files Browse the repository at this point in the history
Updated stream_buffer.c to use granular locking

- Added xTaskSpinlock and xISRSpinlock
- Replaced critical section macros with data group locking macros
such as taskENTER/EXIT_CRITICAL() with sbENTER/EXIT_CRITICAL().
- Added prvLockStreamBufferForTasks() and prvUnlockStreamBufferForTasks() to suspend the stream
buffer when executing non-deterministic code.

Co-authored-by: Sudeep Mohanty <[email protected]>
  • Loading branch information
Dazza0 and sudeep-mohanty committed Jan 14, 2025
1 parent d881cfa commit a714137
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 19 deletions.
3 changes: 3 additions & 0 deletions include/FreeRTOS.h
Original file line number Diff line number Diff line change
Expand Up @@ -3488,6 +3488,9 @@ typedef struct xSTATIC_STREAM_BUFFER
void * pvDummy5[ 2 ];
#endif
UBaseType_t uxDummy6;
#if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) )
portSPINLOCK_TYPE xDummySpinlock[ 2 ];
#endif /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */
} StaticStreamBuffer_t;

/* Message buffers are built on stream buffers. */
Expand Down
150 changes: 131 additions & 19 deletions stream_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,54 @@
#error INCLUDE_xTaskGetCurrentTaskHandle must be set to 1 to build stream_buffer.c
#endif


/*
* Macro to mark the start of a critical code region.
*/
#if ( portUSING_GRANULAR_LOCKS == 1 )
#define sbENTER_CRITICAL( pxStreamBuffer ) portLOCK_DATA_GROUP( ( portSPINLOCK_TYPE * ) &( pxStreamBuffer->xTaskSpinlock ), &( pxStreamBuffer->xISRSpinlock ) )
#define sbENTER_CRITICAL_FROM_ISR( pxStreamBuffer ) portLOCK_DATA_GROUP_FROM_ISR( ( portSPINLOCK_TYPE * ) &( pxStreamBuffer->xISRSpinlock ) )
#else /* #if ( portUSING_GRANULAR_LOCKS == 1 ) */
#define sbENTER_CRITICAL( pxEventBits ) do { ( void ) pxStreamBuffer; taskENTER_CRITICAL(); } while( 0 )
#define sbENTER_CRITICAL_FROM_ISR( pxEventBits ) do { ( void ) pxStreamBuffer; taskENTER_CRITICAL_FROM_ISR(); } while( 0 )
#endif /* #if ( portUSING_GRANULAR_LOCKS == 1 ) */

/*
* Macro to mark the end of a critical code region.
*/
#if ( portUSING_GRANULAR_LOCKS == 1 )
#define sbEXIT_CRITICAL( pxEventBits ) portUNLOCK_DATA_GROUP( ( portSPINLOCK_TYPE * ) &( pxStreamBuffer->xTaskSpinlock ), &( pxStreamBuffer->xISRSpinlock ) )
#define sbEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus, pxEventBits ) portUNLOCK_DATA_GROUP_FROM_ISR( uxSavedInterruptStatus, ( portSPINLOCK_TYPE * ) &( pxStreamBuffer->xISRSpinlock ) )
#else /* #if ( portUSING_GRANULAR_LOCKS == 1 ) */
#define egEXIT_CRITICAL( pxEventBits ) do { ( void ) pxEventBits; taskEXIT_CRITICAL(); } while( 0 )
#define egEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus, pxStreamBuffer ) do { ( void ) pxStreamBuffer; taskEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus ); } while( 0 )
#endif /* #if ( portUSING_GRANULAR_LOCKS == 1 ) */

/*
* Macro used to lock and unlock a stream buffer. When a task locks a stream
* buffer, the task will have thread safe non-deterministic access to the stream
* buffer.
* - Concurrent access from other tasks will be blocked by the xTaskSpinlock
* - Concurrent access from ISRs will be pended
*
* When the task unlocks the stream buffer, all pended access attempts are handled.
*/
#if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) )
#define sbLOCK( pxStreamBuffer ) prvLockStreamBufferForTasks( pxStreamBuffer )
#define sbUNLOCK( pxStreamBuffer ) prvUnlockStreamBufferForTasks( pxStreamBuffer )
#else /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */
#define sbLOCK( pxStreamBuffer ) vTaskSuspendAll()
#define sbUNLOCK( pxStreamBuffer ) ( void ) xTaskResumeAll()
#endif /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */

/* If the user has not provided application specific Rx notification macros,
* or #defined the notification macros away, then provide default implementations
* that uses task notifications. */
#ifndef sbRECEIVE_COMPLETED
#define sbRECEIVE_COMPLETED( pxStreamBuffer ) \
do \
{ \
vTaskSuspendAll(); \
sbLOCK( pxStreamBuffer ); \
{ \
if( ( pxStreamBuffer )->xTaskWaitingToSend != NULL ) \
{ \
Expand All @@ -76,7 +116,7 @@
( pxStreamBuffer )->xTaskWaitingToSend = NULL; \
} \
} \
( void ) xTaskResumeAll(); \
( void ) sbUNLOCK( pxStreamBuffer ); \
} while( 0 )
#endif /* sbRECEIVE_COMPLETED */

Expand Down Expand Up @@ -105,7 +145,7 @@
do { \
UBaseType_t uxSavedInterruptStatus; \
\
uxSavedInterruptStatus = taskENTER_CRITICAL_FROM_ISR(); \
uxSavedInterruptStatus = sbENTER_CRITICAL_FROM_ISR( pxStreamBuffer ); \
{ \
if( ( pxStreamBuffer )->xTaskWaitingToSend != NULL ) \
{ \
Expand All @@ -117,7 +157,7 @@
( pxStreamBuffer )->xTaskWaitingToSend = NULL; \
} \
} \
taskEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus ); \
sbEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus, pxStreamBuffer ); \
} while( 0 )
#endif /* sbRECEIVE_COMPLETED_FROM_ISR */

Expand Down Expand Up @@ -145,7 +185,7 @@
*/
#ifndef sbSEND_COMPLETED
#define sbSEND_COMPLETED( pxStreamBuffer ) \
vTaskSuspendAll(); \
sbLOCK( pxStreamBuffer ); \
{ \
if( ( pxStreamBuffer )->xTaskWaitingToReceive != NULL ) \
{ \
Expand All @@ -156,7 +196,7 @@
( pxStreamBuffer )->xTaskWaitingToReceive = NULL; \
} \
} \
( void ) xTaskResumeAll()
( void ) sbUNLOCK( pxStreamBuffer )
#endif /* sbSEND_COMPLETED */

/* If user has provided a per-instance send completed callback, then
Expand Down Expand Up @@ -184,7 +224,7 @@
do { \
UBaseType_t uxSavedInterruptStatus; \
\
uxSavedInterruptStatus = taskENTER_CRITICAL_FROM_ISR(); \
uxSavedInterruptStatus = sbENTER_CRITICAL_FROM_ISR( pxStreamBuffer ); \
{ \
if( ( pxStreamBuffer )->xTaskWaitingToReceive != NULL ) \
{ \
Expand All @@ -196,7 +236,7 @@
( pxStreamBuffer )->xTaskWaitingToReceive = NULL; \
} \
} \
taskEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus ); \
sbEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus, pxStreamBuffer ); \
} while( 0 )
#endif /* sbSEND_COMPLETE_FROM_ISR */

Expand Down Expand Up @@ -249,8 +289,30 @@ typedef struct StreamBufferDef_t
StreamBufferCallbackFunction_t pxReceiveCompletedCallback; /* Optional callback called on receive complete. sbRECEIVE_COMPLETED is called if this is NULL. */
#endif
UBaseType_t uxNotificationIndex; /* The index we are using for notification, by default tskDEFAULT_INDEX_TO_NOTIFY. */
#if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) )
portSPINLOCK_TYPE xTaskSpinlock;
portSPINLOCK_TYPE xISRSpinlock;
#endif /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */
} StreamBuffer_t;

/*
* Locks a stream buffer for tasks. Prevents other tasks from accessing the stream buffer
* but allows ISRs to pend access to the stream buffer. Caller cannot be preempted
* by other tasks after locking the stream buffer, thus allowing the caller to
* execute non-deterministic operations.
*/
#if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) )
static void prvLockStreamBufferForTasks( StreamBuffer_t * const pxStreamBuffer ) PRIVILEGED_FUNCTION;
#endif /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */

/*
* Unlocks a stream buffer for tasks. Handles all pended access from ISRs, then reenables preemption
* for the caller.
*/
#if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) )
static void prvUnlockStreamBufferForTasks( StreamBuffer_t * const pxStreamBuffer ) PRIVILEGED_FUNCTION;
#endif /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */

/*
* The number of bytes available to be read from the buffer.
*/
Expand Down Expand Up @@ -327,6 +389,42 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
StreamBufferCallbackFunction_t pxReceiveCompletedCallback ) PRIVILEGED_FUNCTION;

/*-----------------------------------------------------------*/

#if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) )
static void prvLockStreamBufferForTasks( StreamBuffer_t * const pxStreamBuffer )
{
/* Disable preempt so that current task cannot be preempted by another task */
vTaskPreemptionDisable( NULL );

/* Lock the stream buffer data group so that we can suspend the stream buffer atomically */
sbENTER_CRITICAL( pxStreamBuffer );

/* Keep holding xTaskSpinlock after unlocking the data group to prevent tasks
* on other cores from accessing the stream buffer while it is suspended. */
portGET_SPINLOCK( &( pxStreamBuffer->xTaskSpinlock ) );

sbEXIT_CRITICAL( pxStreamBuffer );
}
#endif /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */
/*-----------------------------------------------------------*/

#if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) )
static void prvUnlockStreamBufferForTasks( StreamBuffer_t * const pxStreamBuffer )
{
/* Lock the stream buffer data group so that we can handle any pended accesses atomically */
sbENTER_CRITICAL( pxStreamBuffer );

/* Release the previously held task spinlock */
portRELEASE_SPINLOCK( &( pxStreamBuffer->xTaskSpinlock ) );

sbEXIT_CRITICAL( pxStreamBuffer );

/* Re-enable preemption so that current task cannot be preempted by other tasks */
vTaskPreemptionEnable( NULL );
}
#endif /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */
/*-----------------------------------------------------------*/

#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
StreamBufferHandle_t xStreamBufferGenericCreate( size_t xBufferSizeBytes,
size_t xTriggerLevelBytes,
Expand Down Expand Up @@ -405,6 +503,13 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
pxSendCompletedCallback,
pxReceiveCompletedCallback );

#if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) )
{
portINIT_STREAM_BUFFER_TASK_SPINLOCK( &( ( ( StreamBuffer_t * ) pvAllocatedMemory )->xTaskSpinlock ) );
portINIT_STREAM_BUFFER_ISR_SPINLOCK( &( ( ( StreamBuffer_t * ) pvAllocatedMemory )->xISRSpinlock ) );
}
#endif /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */

traceSTREAM_BUFFER_CREATE( ( ( StreamBuffer_t * ) pvAllocatedMemory ), xStreamBufferType );
}
else
Expand Down Expand Up @@ -499,6 +604,13 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
* again. */
pxStreamBuffer->ucFlags |= sbFLAGS_IS_STATICALLY_ALLOCATED;

#if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) )
{
portINIT_STREAM_BUFFER_TASK_SPINLOCK( &( pxStreamBuffer->xTaskSpinlock ) );
portINIT_STREAM_BUFFER_ISR_SPINLOCK( &( pxStreamBuffer->xISRSpinlock ) );
}
#endif /* #if ( ( portUSING_GRANULAR_LOCKS == 1 ) && ( configNUMBER_OF_CORES > 1 ) ) */

traceSTREAM_BUFFER_CREATE( pxStreamBuffer, xStreamBufferType );

/* MISRA Ref 11.3.1 [Misaligned access] */
Expand Down Expand Up @@ -614,7 +726,7 @@ BaseType_t xStreamBufferReset( StreamBufferHandle_t xStreamBuffer )
#endif

/* Can only reset a message buffer if there are no tasks blocked on it. */
taskENTER_CRITICAL();
sbENTER_CRITICAL( pxStreamBuffer );
{
if( ( pxStreamBuffer->xTaskWaitingToReceive == NULL ) && ( pxStreamBuffer->xTaskWaitingToSend == NULL ) )
{
Expand Down Expand Up @@ -644,7 +756,7 @@ BaseType_t xStreamBufferReset( StreamBufferHandle_t xStreamBuffer )
xReturn = pdPASS;
}
}
taskEXIT_CRITICAL();
sbEXIT_CRITICAL( pxStreamBuffer );

traceRETURN_xStreamBufferReset( xReturn );

Expand Down Expand Up @@ -872,7 +984,7 @@ size_t xStreamBufferSend( StreamBufferHandle_t xStreamBuffer,
{
/* Wait until the required number of bytes are free in the message
* buffer. */
taskENTER_CRITICAL();
sbENTER_CRITICAL( pxStreamBuffer );
{
xSpace = xStreamBufferSpacesAvailable( pxStreamBuffer );

Expand All @@ -887,11 +999,11 @@ size_t xStreamBufferSend( StreamBufferHandle_t xStreamBuffer,
}
else
{
taskEXIT_CRITICAL();
sbEXIT_CRITICAL( pxStreamBuffer );
break;
}
}
taskEXIT_CRITICAL();
sbEXIT_CRITICAL( pxStreamBuffer );

traceBLOCKING_ON_STREAM_BUFFER_SEND( xStreamBuffer );
( void ) xTaskNotifyWaitIndexed( pxStreamBuffer->uxNotificationIndex, ( uint32_t ) 0, ( uint32_t ) 0, NULL, xTicksToWait );
Expand Down Expand Up @@ -1087,7 +1199,7 @@ size_t xStreamBufferReceive( StreamBufferHandle_t xStreamBuffer,
{
/* Checking if there is data and clearing the notification state must be
* performed atomically. */
taskENTER_CRITICAL();
sbENTER_CRITICAL( pxStreamBuffer );
{
xBytesAvailable = prvBytesInBuffer( pxStreamBuffer );

Expand All @@ -1112,7 +1224,7 @@ size_t xStreamBufferReceive( StreamBufferHandle_t xStreamBuffer,
mtCOVERAGE_TEST_MARKER();
}
}
taskEXIT_CRITICAL();
sbEXIT_CRITICAL( pxStreamBuffer );

if( xBytesAvailable <= xBytesToStoreMessageLength )
{
Expand Down Expand Up @@ -1409,7 +1521,7 @@ BaseType_t xStreamBufferSendCompletedFromISR( StreamBufferHandle_t xStreamBuffer
/* MISRA Ref 4.7.1 [Return value shall be checked] */
/* More details at: https://github.com/FreeRTOS/FreeRTOS-Kernel/blob/main/MISRA.md#dir-47 */
/* coverity[misra_c_2012_directive_4_7_violation] */
uxSavedInterruptStatus = taskENTER_CRITICAL_FROM_ISR();
uxSavedInterruptStatus = sbENTER_CRITICAL_FROM_ISR( pxStreamBuffer );
{
if( ( pxStreamBuffer )->xTaskWaitingToReceive != NULL )
{
Expand All @@ -1426,7 +1538,7 @@ BaseType_t xStreamBufferSendCompletedFromISR( StreamBufferHandle_t xStreamBuffer
xReturn = pdFALSE;
}
}
taskEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus );
sbEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus, pxStreamBuffer );

traceRETURN_xStreamBufferSendCompletedFromISR( xReturn );

Expand All @@ -1448,7 +1560,7 @@ BaseType_t xStreamBufferReceiveCompletedFromISR( StreamBufferHandle_t xStreamBuf
/* MISRA Ref 4.7.1 [Return value shall be checked] */
/* More details at: https://github.com/FreeRTOS/FreeRTOS-Kernel/blob/main/MISRA.md#dir-47 */
/* coverity[misra_c_2012_directive_4_7_violation] */
uxSavedInterruptStatus = taskENTER_CRITICAL_FROM_ISR();
uxSavedInterruptStatus = sbENTER_CRITICAL_FROM_ISR( pxStreamBuffer );
{
if( ( pxStreamBuffer )->xTaskWaitingToSend != NULL )
{
Expand All @@ -1465,7 +1577,7 @@ BaseType_t xStreamBufferReceiveCompletedFromISR( StreamBufferHandle_t xStreamBuf
xReturn = pdFALSE;
}
}
taskEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus );
sbEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus, pxStreamBuffer );

traceRETURN_xStreamBufferReceiveCompletedFromISR( xReturn );

Expand Down

0 comments on commit a714137

Please sign in to comment.