Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle QoS2 messages when the library loses state - store state in NVM #271

Open
ademyankov opened this issue Dec 21, 2023 · 5 comments
Open
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@ademyankov
Copy link

It is a very narow corner case but stil it breaks the client and renders it unusable.

  1. coreMQTT client is running on an embedded device, it subscribed to a topic with QoS2.
  2. The issue happens when a message is received and then the device reboots (for unrelated to coreMQTT reasons).
  3. When it boots again and re-connects to the broker it immidiately receives MQTT_PACKET_TYPE_PUBREL but since the device was rebooted the library has no knowleadge of the packet it received before, outgoingPublishRecords is empty and MQTT_ProcessLoop returns MQTTBadResponse.

Q: How would you suggest to handle a situation like this? There are no public functions in the library to get the failed packet id and to get access to outgoingPublishRecords structure that one could simply inject a record with id and MQTT_PACKET_TYPE_PUBCOMP type to let go that record. In the corner case like this I guess it's ok to lose the record since not much you can do if device reboots.

@n9wxu
Copy link
Member

n9wxu commented Dec 22, 2023

I have restated the failure case to try and be crystal clear on what is happening.

sender   receiver
publish ->
         <- pubrec
** REBOOT **
pubrel  ->
           ??????
pubrel ->
pubrel ->

@AniruddhaKanhere
Copy link
Member

AniruddhaKanhere commented Jan 9, 2024

Hello @ademyankov,

Thank you for taking the time to report this bug to us.

Yes, you are absolutely correct. This bug will manifest itself when the device loses power mid QoS2 transaction.

To fix this, we believe that the state must be stored in persistent memory after a PUBREL is received. Whenever the device boots up, a callback would allow the application to restore the state of the MQTT 'system'. This way, when the broker sends a PUBREL back to us after reboot, the library can correctly handle it.

However, currently the library does not support it. Also, we would like if the storing and restoring the state is done by the application using callbacks from the library as the library is hardware agnostic.

We think that this can be achieved using setter and getter functions which would be called by the library whenever it receives an incoming PUBLISH/PUBACK/PUBREL/PUBREC/PUBCOMP and wants to store the state. This way, the application is free to choose how to implement the persistent storage so that it does not wear out same block of memory and provides fast response etc.

We also think the since this is very specific to the QoS2 publishes, we should separate the processing and handling of these packets such that anyone not using QoS2 should not have to worry about implementing the setter and getter functions.

Does the above sound like a solution to you?

For now we do not have any current plans to implement this as we have some exiting things to we are working on which we want to release - see here. Thus, I am marking this as a feature request and we'd be grateful to receive contributions from the community!

Thanks,
Aniruddha

@AniruddhaKanhere AniruddhaKanhere added enhancement New feature or request help wanted Extra attention is needed labels Jan 9, 2024
@AniruddhaKanhere AniruddhaKanhere moved this to 🆕 Input Queue in Roadmap Jan 9, 2024
@AniruddhaKanhere AniruddhaKanhere changed the title An issue with handling QoS2 messages when the library loses state Handle QoS2 messages when the library loses state - store state in NVM Jan 9, 2024
@AniruddhaKanhere
Copy link
Member

@ademyankov I also modified the title of this issue just to reflect that this is now a feature request. If you would like, I am happy to revert it back to the way it was.

@ademyankov
Copy link
Author

@AniruddhaKanhere yes, thank you, that sounds like a good solution.

vishwamartur added a commit to vishwamartur/coreMQTT that referenced this issue Oct 31, 2024
Related to FreeRTOS#271

Add support for storing and restoring MQTT state in persistent memory to handle QoS2 messages after a device reboot.

* Add function prototypes for `MQTT_SetOutgoingPublishRecord`, `MQTT_GetOutgoingPublishRecord`, and `MQTT_GetFailedPacketId` in `source/include/core_mqtt_state.h`.
* Implement `MQTT_SetOutgoingPublishRecord`, `MQTT_GetOutgoingPublishRecord`, and `MQTT_GetFailedPacketId` functions in `source/core_mqtt_state.c`.
* Update `README.md` to include instructions on using the new setter and getter functions.
* Add unit tests for `MQTT_SetOutgoingPublishRecord`, `MQTT_GetOutgoingPublishRecord`, and `MQTT_GetFailedPacketId` functions in `test/unit-test/core_mqtt_state_utest.c`.
@DakshitBabbar
Copy link
Member

Hey @ademyankov, here is a possible solution.

If we provide the user with an API like the following:

/** 
 * Copy the publish records from local storage to user provided buffer in a thread safe manner. The user 
 * provided buffer should be at least as large as the maximum count of the publish records stored locally.
 * 
 * @param[in]  pContext Initialised MQTT Context
 * @param[out] pBufferForOutgoingPublishRecords Buffer to store outgoing records
 * @param[in] bufferForOutgoingPublishCount Size of the buffer to store outgoing records
 * @param[out] pBufferForIncomingPublishRecords Buffer to store incomming records
 * @param[in] bufferForIncomingPublishCount Size of buffer to store incomming records
 */

MQTTStatus_t MQTT_GetPublishRecords(const MQTTContext_t * pContext,
                                    MQTTPubAckInfo_t * pBufferForOutgoingPublishRecords,
                                    size_t outgoingPublishCount,
                                    MQTTPubAckInfo_t * pBufferForIncomingPublishRecords,
                                    size_t incomingPublishCount )

To mitigate planned reboot, the user could simply call the MQTT_GetPublishRecords API, store the retrieved records in the NVM, stop receiving more packets (to avoid any state change locally) and shutdown. On reboot, load the state from the NVM and pass it to the MQTT_InitStatefulQoS() before reconnecting again.

In case of unplanned reboots, mitigating them completely will not be possible for the library because no matter what we do there will always be a race condition between the library state and the state stored in the NVM (as rightly pointed out by @AniruddhaKanhere). But the following can be done by the user to mitigate unplanned reboots to some extent:

They can call this API to get the system state (all the publish records) and store it in an NVM periodically. They can increase the frequency of storing the state as the likelihood of an unplanned reboot increases. On a reboot, they can then load the state from the NVM and pass it to the MQTT_InitStatefulQoS() before reconnecting again. Pseudo-code for the same can be as follows:

/* Max number of publish records */
#define PUBLISH_RECORD_LEN  20

/* User configurable frequency for periodically storing the publish records in NVM */
#define STORING_PERIOD_MS  10000

/* User provided memories for storing publish records */
MQTTPubAckInfo_t pOutgoingPublishRecords[ PUBLISH_RECORD_LEN ];
MQTTPubAckInfo_t pIncomingPublishRecords[ PUBLISH_RECORD_LEN ];

/* User provided buffer to get the publish records for storing in NVM */
MQTTPubAckInfo_t pBufferOutgoingPublishRecords[ PUBLISH_RECORD_LEN ];
MQTTPubAckInfo_t pBufferIncomingPublishRecords[ PUBLISH_RECORD_LEN ];

/* Global MQT Context */
MQTTContext_t xMQTTContext = { 0 };

/* Application Task Function */
void MQTTApplication(...)
{   
    MQTT_Init( xMQTTContext, ... );
    
    // if system state is to be restored
        LoadMQTTPublishRecordsFromNVM(); /* defined below */
    
    MQTT_InitStatefulQoS( pxMQTTContext,
                          pOutgoingPublishRecords,
                          PUBLISH_RECORD_LEN,
                          pIncomingPublishRecords,
                          PUBLISH_RECORD_LEN );
    
    xResult = MQTT_Connect( pxMQTTContext, ... );
    
    ...
}

User defined Load and Store Functions:

/* Load Publish Records Function */
void LoadMQTTPublishRecordsFromNVM()
{
    // load publish records from NVM into pOutgoingPublishRecords and pIncomingPublishRecord
}

/* Store Publish Records Function */
/* A separate thread could run this function that periodically stores the publish records */
void StoreMQTTPublishRecordsInNVM( MQTTContext_t xMQTTContext )
{
    // store pOutgoingPublishRecords and pIncomingPublishRecords in NVM periodically
    while(1)
    {
        MQTT_GetPublishRecords(xMQTTContext, 
                             pBufferOutgoingPublishRecords,
                             PUBLISH_RECORD_LEN,
                             pBufferIncomingPublishRecords,
                             PUBLISH_RECORD_LEN);
        
        // copy publish records from pBufferOutgoingPublishRecords and pBufferIncomingPublishRecords to NVM

        delay(STORING_PERIOD_MS)
    }
}

Let us know if you have any concerns regarding this.

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
Status: 🆕 Input Queue
Development

No branches or pull requests

4 participants