Skip to content

Commit

Permalink
Transition to Ready state after all initial messages are received
Browse files Browse the repository at this point in the history
Previously, we transitioned to Ready state as soon as we received
the first message (since that meant that communication with the
broker had been established).
Now, we transition to Readying state once we receive the first
message, and only transition to Ready state once we have received
all of the initial messages triggered by the first KeepAlive.

This ensures that data is in a coherent state when Ready.

Also, construct timers as children of the producer, to ensure that
timer events are serviced on the producer's thread.
  • Loading branch information
chriadam committed Apr 28, 2023
1 parent 20290f2 commit 395fd9b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
5 changes: 4 additions & 1 deletion inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class VeQItemMqttProducer : public VeQItemProducer
Idle,
Connecting,
Connected,
Readying,
Ready,
Disconnected,
Reconnecting,
Expand Down Expand Up @@ -103,7 +104,9 @@ private Q_SLOTS:
void setError(QMqttClient::ClientError error);
void parseMessage(const QString &path, const QByteArray &message);

QTimer mKeepAliveTimer;
QTimer *mKeepAliveTimer;
QTimer *mReadyStateTimer;
QTimer *mReadyStateFallbackTimer;
QMqttClient *mMqttConnection;
#ifdef MQTT_WEBSOCKETS_ENABLED
WebSocketDevice *mWebSocket = nullptr;
Expand Down
49 changes: 41 additions & 8 deletions src/qt/ve_qitems_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ VeQItemMqtt::VeQItemMqtt(VeQItemMqttProducer *producer)
this, [this] {
const VeQItemMqttProducer::ConnectionState state = mqttProducer()->connectionState();
switch (state) {
case VeQItemMqttProducer::Ready:
case VeQItemMqttProducer::Readying:
// No need to requestValue(), since we subscribe to everything
// and send keepalive to read all values when Ready.
// One possible issue: the VRM broker will provide retained
Expand Down Expand Up @@ -54,6 +54,9 @@ VeQItemMqttProducer *VeQItemMqtt::mqttProducer() const
VeQItemMqttProducer::VeQItemMqttProducer(
VeQItem *root, const QString &id, const QString &clientIdPrefix, QObject *parent)
: VeQItemProducer(root, id, parent),
mKeepAliveTimer(new QTimer(this)),
mReadyStateTimer(new QTimer(this)),
mReadyStateFallbackTimer(new QTimer(this)),
mMqttConnection(nullptr),
mPort(0),
mConnectionState(Idle),
Expand All @@ -76,10 +79,30 @@ VeQItemMqttProducer::VeQItemMqttProducer(
const quint64 uniqueId = QRandomGenerator::global()->generate64();
mClientId.append(QStringLiteral("%1").arg(uniqueId, 16, 16, QLatin1Char('0')));

mKeepAliveTimer.setInterval(1000 * 30);
connect(&mKeepAliveTimer, &QTimer::timeout,
mKeepAliveTimer->setInterval(1000 * 30);
connect(mKeepAliveTimer, &QTimer::timeout,
this, &VeQItemMqttProducer::doKeepAlive);
mKeepAliveTimer.start();
mKeepAliveTimer->start();

mReadyStateTimer->setSingleShot(true);
mReadyStateTimer->setInterval(500);
connect(mReadyStateTimer, &QTimer::timeout,
this, [this] {
mReadyStateTimer->stop();
if (connectionState() == Readying) {
setConnectionState(Ready);
}
});

mReadyStateFallbackTimer->setSingleShot(true);
mReadyStateFallbackTimer->setInterval(5000);
connect(mReadyStateFallbackTimer, &QTimer::timeout,
this, [this] {
mReadyStateTimer->stop();
if (connectionState() == Readying) {
setConnectionState(Ready);
}
});
}

VeQItem *VeQItemMqttProducer::createItem()
Expand Down Expand Up @@ -217,7 +240,7 @@ void VeQItemMqttProducer::onConnected()
doKeepAlive();
}

mKeepAliveTimer.start();
mKeepAliveTimer->start();
}

void VeQItemMqttProducer::onDisconnected()
Expand All @@ -228,7 +251,8 @@ void VeQItemMqttProducer::onDisconnected()
&& mMqttConnection->error() != QMqttClient::NoError) {
setError(mMqttConnection->error());
}
mKeepAliveTimer.stop();
mKeepAliveTimer->stop();
mReadyStateTimer->stop();
mReceivedMessage = false;

if (mAutoReconnectAttemptCounter < mAutoReconnectMaxAttempts) {
Expand Down Expand Up @@ -329,10 +353,19 @@ void VeQItemMqttProducer::onMessageReceived(const QByteArray &message, const QMq
}
}

// Once we have received a message, transition to Ready state.
// Once we have received a message, perform KeepAlive.
if (!mReceivedMessage) {
mReceivedMessage = true;
doKeepAlive();
} else if (connectionState() == VeQItemMqttProducer::Readying) {
// We will receive a flurry of messages upon initial connection.
// Once they subside we should transition to Ready state.
if (!mReadyStateTimer->isActive()) {
// transition to Ready state after 5 seconds
// even if we are still receiving initial messages.
mReadyStateFallbackTimer->start();
}
mReadyStateTimer->start(); // restart the timer.
}
}
}
Expand Down Expand Up @@ -362,7 +395,7 @@ void VeQItemMqttProducer::doKeepAlive()
&& mMqttConnection->state() == QMqttClient::Connected
&& !mPortalId.isEmpty()) {
if (mReceivedMessage) {
setConnectionState(Ready);
setConnectionState(Readying);
}
mMqttConnection->publish(QMqttTopicName(QStringLiteral("R/%1/keepalive").arg(mPortalId)), QByteArray());
}
Expand Down

0 comments on commit 395fd9b

Please sign in to comment.