Skip to content

Commit

Permalink
Transition to Ready state after all initial messages are received
Browse files Browse the repository at this point in the history
Previously, we transitioned to Ready state as soon as we received
the first message (since that meant that communication with the
broker had been established).

Now we transition to Initializing state once we receive the first
message, and only transition to Ready state once we have received
all of the initial messages triggered by the first KeepAlive.

This ensures that data is in a coherent state when Ready.

Also, construct timers as children of the producer, to ensure that
timer events are serviced on the producer's thread.
  • Loading branch information
chriadam committed May 9, 2023
1 parent 20290f2 commit 77167ce
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 9 deletions.
5 changes: 4 additions & 1 deletion inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class VeQItemMqttProducer : public VeQItemProducer
Idle,
Connecting,
Connected,
Initializing,
Ready,
Disconnected,
Reconnecting,
Expand Down Expand Up @@ -103,7 +104,9 @@ private Q_SLOTS:
void setError(QMqttClient::ClientError error);
void parseMessage(const QString &path, const QByteArray &message);

QTimer mKeepAliveTimer;
QTimer *mKeepAliveTimer;
QTimer *mReadyStateTimer;
QTimer *mReadyStateFallbackTimer;
QMqttClient *mMqttConnection;
#ifdef MQTT_WEBSOCKETS_ENABLED
WebSocketDevice *mWebSocket = nullptr;
Expand Down
50 changes: 42 additions & 8 deletions src/qt/ve_qitems_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ VeQItemMqtt::VeQItemMqtt(VeQItemMqttProducer *producer)
this, [this] {
const VeQItemMqttProducer::ConnectionState state = mqttProducer()->connectionState();
switch (state) {
case VeQItemMqttProducer::Ready:
case VeQItemMqttProducer::Initializing:
// No need to requestValue(), since we subscribe to everything
// and send keepalive to read all values when Ready.
// One possible issue: the VRM broker will provide retained
Expand Down Expand Up @@ -54,6 +54,9 @@ VeQItemMqttProducer *VeQItemMqtt::mqttProducer() const
VeQItemMqttProducer::VeQItemMqttProducer(
VeQItem *root, const QString &id, const QString &clientIdPrefix, QObject *parent)
: VeQItemProducer(root, id, parent),
mKeepAliveTimer(new QTimer(this)),
mReadyStateTimer(new QTimer(this)),
mReadyStateFallbackTimer(new QTimer(this)),
mMqttConnection(nullptr),
mPort(0),
mConnectionState(Idle),
Expand All @@ -76,10 +79,30 @@ VeQItemMqttProducer::VeQItemMqttProducer(
const quint64 uniqueId = QRandomGenerator::global()->generate64();
mClientId.append(QStringLiteral("%1").arg(uniqueId, 16, 16, QLatin1Char('0')));

mKeepAliveTimer.setInterval(1000 * 30);
connect(&mKeepAliveTimer, &QTimer::timeout,
mKeepAliveTimer->setInterval(1000 * 30);
connect(mKeepAliveTimer, &QTimer::timeout,
this, &VeQItemMqttProducer::doKeepAlive);
mKeepAliveTimer.start();
mKeepAliveTimer->start();

mReadyStateTimer->setSingleShot(true);
mReadyStateTimer->setInterval(500);
connect(mReadyStateTimer, &QTimer::timeout,
this, [this] {
mReadyStateTimer->stop();
if (connectionState() == Initializing) {
setConnectionState(Ready);
}
});

mReadyStateFallbackTimer->setSingleShot(true);
mReadyStateFallbackTimer->setInterval(5000);
connect(mReadyStateFallbackTimer, &QTimer::timeout,
this, [this] {
mReadyStateTimer->stop();
if (connectionState() == Initializing) {
setConnectionState(Ready);
}
});
}

VeQItem *VeQItemMqttProducer::createItem()
Expand Down Expand Up @@ -217,7 +240,7 @@ void VeQItemMqttProducer::onConnected()
doKeepAlive();
}

mKeepAliveTimer.start();
mKeepAliveTimer->start();
}

void VeQItemMqttProducer::onDisconnected()
Expand All @@ -228,7 +251,8 @@ void VeQItemMqttProducer::onDisconnected()
&& mMqttConnection->error() != QMqttClient::NoError) {
setError(mMqttConnection->error());
}
mKeepAliveTimer.stop();
mKeepAliveTimer->stop();
mReadyStateTimer->stop();
mReceivedMessage = false;

if (mAutoReconnectAttemptCounter < mAutoReconnectMaxAttempts) {
Expand Down Expand Up @@ -329,10 +353,19 @@ void VeQItemMqttProducer::onMessageReceived(const QByteArray &message, const QMq
}
}

// Once we have received a message, transition to Ready state.
// Once we have received a message, perform KeepAlive.
if (!mReceivedMessage) {
mReceivedMessage = true;
doKeepAlive();
} else if (connectionState() == VeQItemMqttProducer::Initializing) {
// We will receive a flurry of messages upon initial connection.
// Once they subside we should transition to Ready state.
if (!mReadyStateTimer->isActive()) {
// transition to Ready state after 5 seconds
// even if we are still receiving initial messages.
mReadyStateFallbackTimer->start();
}
mReadyStateTimer->start(); // restart the timer.
}
}
}
Expand Down Expand Up @@ -362,7 +395,8 @@ void VeQItemMqttProducer::doKeepAlive()
&& mMqttConnection->state() == QMqttClient::Connected
&& !mPortalId.isEmpty()) {
if (mReceivedMessage) {
setConnectionState(Ready);
// transition to Initializing state while we wait for the flurry of initial messages to end.
setConnectionState(Initializing);
}
mMqttConnection->publish(QMqttTopicName(QStringLiteral("R/%1/keepalive").arg(mPortalId)), QByteArray());
}
Expand Down

0 comments on commit 77167ce

Please sign in to comment.