Skip to content

Commit

Permalink
No longer service MQTT message queue asynchronously
Browse files Browse the repository at this point in the history
Qt For WebAssembly doesn't process the events in a timely manner
(unless the event queue is kicked by a timer event), so abandon
the asynchronous queue.  This may be fixed in a future version of
Qt, at which point in time it may be beneficial to revert this
commit, if stutters/UI stalls are noticed when receiving large
batches of MQTT notifications.
  • Loading branch information
chriadam committed Mar 21, 2023
1 parent b3b429a commit 671678f
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 23 deletions.
3 changes: 0 additions & 3 deletions inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <QHostAddress>
#include <QTimer>
#include <QSet>
#include <QQueue>
#include <QString>
#include <QByteArray>
#include <QUrl>
Expand Down Expand Up @@ -101,7 +100,6 @@ private Q_SLOTS:
void setConnectionState(ConnectionState connectionState);
void setError(QMqttClient::ClientError error);
void parseMessage(const QString &path, const QByteArray &message);
void serviceQueue();

QTimer mKeepAliveTimer;
QMqttClient *mMqttConnection;
Expand All @@ -113,7 +111,6 @@ private Q_SLOTS:
QUrl mUrl;
QString mHostName;
int mPort;
QQueue<QPair<QString, QByteArray> > mMessageQueue;
ConnectionState mConnectionState;
const int mReconnectAttemptIntervals[6] = { 250, 1000, 2000, 5000, 10000, 30000 };
quint16 mAutoReconnectAttemptCounter;
Expand Down
21 changes: 1 addition & 20 deletions src/qt/ve_qitems_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ void VeQItemMqttProducer::onDisconnected()
{
setConnectionState(Disconnected);
mKeepAliveTimer.stop();
mMessageQueue.clear();
mReceivedMessage = false;

if (mAutoReconnectAttemptCounter < mAutoReconnectMaxAttempts) {
Expand Down Expand Up @@ -268,15 +267,8 @@ void VeQItemMqttProducer::onMessageReceived(const QByteArray &message, const QMq
// ignore keepalive topic.
} else {
// we have a topic message which we need to expose via VeQItem.
// service the queue via a QueuedConnection to ensure
// that we don't block the UI.
const QString path = topicName.mid(notificationPrefix.size() + 1);
mMessageQueue.enqueue(QPair<QString, QByteArray>(path, message));
if (mMessageQueue.size() == 1) {
QMetaObject::invokeMethod(this, [this] {
serviceQueue();
}, Qt::QueuedConnection);
}
parseMessage(path, message);
}
}

Expand All @@ -288,17 +280,6 @@ void VeQItemMqttProducer::onMessageReceived(const QByteArray &message, const QMq
}
}

void VeQItemMqttProducer::serviceQueue()
{
if (mMessageQueue.size()) {
const QPair<QString, QByteArray> message = mMessageQueue.dequeue();
parseMessage(message.first, message.second);
QMetaObject::invokeMethod(this, [this] {
serviceQueue();
}, Qt::QueuedConnection);
}
}

void VeQItemMqttProducer::parseMessage(const QString &path, const QByteArray &message)
{
VeQItemMqtt *item = qobject_cast<VeQItemMqtt*>(mProducerRoot->itemGetOrCreate(path, true, true));
Expand Down

0 comments on commit 671678f

Please sign in to comment.