Skip to content

Commit

Permalink
Service MQTT message queue asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
chriadam committed Feb 23, 2023
1 parent 887e9d9 commit d8250e9
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
5 changes: 5 additions & 0 deletions inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include <QHostAddress>
#include <QTimer>
#include <QSet>
#include <QQueue>
#include <QString>
#include <QByteArray>

#ifdef MQTT_WEBSOCKETS_ENABLED
#include <QUrl>
Expand Down Expand Up @@ -93,6 +96,7 @@ private Q_SLOTS:
void setConnectionState(ConnectionState connectionState);
void setError(QMqttClient::ClientError error);
void parseMessage(const QString &path, const QByteArray &message);
void serviceQueue();

QTimer mKeepAliveTimer;
QMqttClient *mMqttConnection;
Expand All @@ -101,6 +105,7 @@ private Q_SLOTS:
#endif // MQTT_WEBSOCKETS_ENABLED
QString mClientId;
QString mPortalId;
QQueue<QPair<QString, QByteArray> > mMessageQueue;
ConnectionState mConnectionState;
const int mReconnectAttemptIntervals[6] = { 250, 1000, 2000, 5000, 10000, 30000 };
quint16 mAutoReconnectAttemptCounter;
Expand Down
23 changes: 18 additions & 5 deletions src/qt/ve_qitems_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,31 @@ void VeQItemMqttProducer::onMessageReceived(const QByteArray &message, const QMq
// ignore keepalive topic.
} else {
// we have a topic message which we need to expose via VeQItem.
// invoke the parse method via a QueuedConnection to ensure
// service the queue via a QueuedConnection to ensure
// that we don't block the UI.
const QString path = topicName.mid(notificationPrefix.size() + 1);
QMetaObject::invokeMethod(this, [this, path, message] {
parseMessage(path, message);
}, Qt::QueuedConnection);
mMessageQueue.enqueue(QPair<QString, QByteArray>(path, message));
if (mMessageQueue.size() == 1) {
QMetaObject::invokeMethod(this, [this] {
serviceQueue();
}, Qt::QueuedConnection);
}
}
}
}
}

void VeQItemMqttProducer::serviceQueue()
{
const QPair<QString, QByteArray> message = mMessageQueue.dequeue();
parseMessage(message.first, message.second);
if (mMessageQueue.size()) {
QMetaObject::invokeMethod(this, [this] {
serviceQueue();
}, Qt::QueuedConnection);
}
}

void VeQItemMqttProducer::parseMessage(const QString &path, const QByteArray &message)
{
VeQItemMqtt *item = qobject_cast<VeQItemMqtt*>(mProducerRoot->itemGetOrCreate(path, true, true));
Expand All @@ -258,7 +272,6 @@ void VeQItemMqttProducer::doKeepAlive()
if (mMqttConnection
&& mMqttConnection->state() == QMqttClient::Connected
&& !mPortalId.isEmpty()) {
// TODO: just specific topics? Or all topics as done here?
setConnectionState(Ready);
mMqttConnection->publish(QMqttTopicName(QStringLiteral("R/%1/keepalive").arg(mPortalId)), QByteArray());
}
Expand Down

0 comments on commit d8250e9

Please sign in to comment.