Skip to content

Commit

Permalink
Fix websocket disconnection handling
Browse files Browse the repository at this point in the history
Qt MQTT already emits the appropriate disconnected signal and
closes the websocket when it detects that the websocket has
disconnected, so the previous manual handler caused the signals
to be emitted twice.

However, in non-websocket case, we also need to handle the case
where the initial connection fails (and manually trigger our
onDisconnected() handler, to perform reconnect etc).
We detect this by handling error change and state change from
the MQTT client.

Finally, fix the reconnect count (as it was previously reset in
a queued connection).
  • Loading branch information
chriadam committed Apr 20, 2023
1 parent 6d4a655 commit 0661ff7
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
1 change: 1 addition & 0 deletions inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private Q_SLOTS:
void onConnected();
void onDisconnected();
void onErrorChanged(QMqttClient::ClientError error);
void onStateChanged(QMqttClient::ClientState state);
void onMessageReceived(const QByteArray &message, const QMqttTopicName &topic);
void doKeepAlive();

Expand Down
67 changes: 53 additions & 14 deletions src/qt/ve_qitems_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ void VeQItemMqttProducer::open(
const QUrl &url,
QMqttClient::ProtocolVersion protocolVersion)
{
mAutoReconnectAttemptCounter = 0;
setError(QMqttClient::NoError);

// Invoke via queued connection to ensure that the children
// are created in the appropriate thread.
QMetaObject::invokeMethod(this, [this, url, protocolVersion] {
Expand All @@ -103,14 +106,16 @@ void VeQItemMqttProducer::open(

mMqttConnection = new QMqttClient(this);
mMqttConnection->setClientId(mClientId);
mAutoReconnectAttemptCounter = 0;

connect(mMqttConnection, &QMqttClient::connected,
this, &VeQItemMqttProducer::onConnected);
connect(mMqttConnection, &QMqttClient::disconnected,
this, &VeQItemMqttProducer::onDisconnected);
connect(mMqttConnection, &QMqttClient::errorChanged,
this, &VeQItemMqttProducer::onErrorChanged);
connect(mMqttConnection, &QMqttClient::stateChanged,
this, &VeQItemMqttProducer::onStateChanged,
Qt::QueuedConnection); // Queued to avoid double onDisconnected().
connect(mMqttConnection, &QMqttClient::messageReceived,
this, &VeQItemMqttProducer::onMessageReceived);

Expand All @@ -129,14 +134,6 @@ void VeQItemMqttProducer::open(
mMqttConnection->setTransport(mWebSocket, QMqttClient::IODevice);
QMetaObject::invokeMethod(this, [this] { aboutToConnect(); }, Qt::QueuedConnection);
});
connect(mWebSocket, &WebSocketDevice::disconnected,
this, [this] {
// TODO: does QMqttClient handle this already?
// Or do I need to manually close()?
qWarning() << "WebSocket disconnected!";
mWebSocket->close();
mMqttConnection->disconnected();
});

setConnectionState(Connecting);
mWebSocket->open(QIODeviceBase::ReadWrite);
Expand All @@ -146,6 +143,9 @@ void VeQItemMqttProducer::open(

void VeQItemMqttProducer::open(const QHostAddress &host, int port)
{
mAutoReconnectAttemptCounter = 0;
setError(QMqttClient::NoError);

// Invoke via queued connection to ensure that the children
// are created in the appropriate thread.
QMetaObject::invokeMethod(this, [this, host, port] {
Expand All @@ -168,10 +168,12 @@ void VeQItemMqttProducer::open(const QHostAddress &host, int port)
this, &VeQItemMqttProducer::onDisconnected);
connect(mMqttConnection, &QMqttClient::errorChanged,
this, &VeQItemMqttProducer::onErrorChanged);
connect(mMqttConnection, &QMqttClient::stateChanged,
this, &VeQItemMqttProducer::onStateChanged,
Qt::QueuedConnection); // Queued to avoid double onDisconnected().
connect(mMqttConnection, &QMqttClient::messageReceived,
this, &VeQItemMqttProducer::onMessageReceived);

mAutoReconnectAttemptCounter = 0;
QMetaObject::invokeMethod(this, [this] { aboutToConnect(); }, Qt::QueuedConnection);
setConnectionState(Connecting);
}, Qt::QueuedConnection);
Expand Down Expand Up @@ -218,41 +220,78 @@ void VeQItemMqttProducer::onConnected()
void VeQItemMqttProducer::onDisconnected()
{
setConnectionState(Disconnected);
if (error() == QMqttClient::NoError
&& mMqttConnection
&& mMqttConnection->error() != QMqttClient::NoError) {
setError(mMqttConnection->error());
}
mKeepAliveTimer.stop();
mReceivedMessage = false;

if (mAutoReconnectAttemptCounter < mAutoReconnectMaxAttempts) {
// Attempt to reconnect. We use a staggered exponential backoff interval.
setConnectionState(Reconnecting);
const int interval = mReconnectAttemptIntervals[mAutoReconnectAttemptCounter++];
#ifdef MQTT_WEBSOCKETS_ENABLED
if (!mWebSocket || !mWebSocket->isValid()) {
QTimer::singleShot(interval + QRandomGenerator::global()->bounded(interval/2),
this, [this] {
setConnectionState(Reconnecting);
quint16 count = mAutoReconnectAttemptCounter;
if (mHostName.isEmpty()) {
open(mUrl, mProtocolVersion);
} else {
open(QHostAddress(mHostName), mPort);
}
mAutoReconnectAttemptCounter= count;
mAutoReconnectAttemptCounter = count;
});
} else {
QTimer::singleShot(interval + QRandomGenerator::global()->bounded(interval/2),
this, &VeQItemMqttProducer::aboutToConnect);
this, [this] {
setConnectionState(Reconnecting);
emit aboutToConnect();
});
}
#else
QTimer::singleShot(interval + QRandomGenerator::global()->bounded(interval/2),
this, &VeQItemMqttProducer::aboutToConnect);
this, [this] {
setConnectionState(Reconnecting);
emit aboutToConnect();
});
#endif
} else {
// Failed to connect. Wait one minute and then start the connection process again.
setConnectionState(Failed);
mAutoReconnectAttemptCounter = 0;
QTimer::singleShot(60000, this, &VeQItemMqttProducer::onDisconnected);
}
}

void VeQItemMqttProducer::onErrorChanged(QMqttClient::ClientError error)
{
setError(error);

if (mMqttConnection && mMqttConnection->state() == QMqttClient::Disconnected
&& (mConnectionState == VeQItemMqttProducer::Connecting
|| mConnectionState == VeQItemMqttProducer::Reconnecting)) {
// If the initial connection failed, QMqttClient might fail
// to emit the state change correctly. Force it,
// but use a queued connection to avoid the possibility
// that the state change signal gets emitted after the
// error change signal.
QMetaObject::invokeMethod(this, [this] {
onStateChanged(QMqttClient::Disconnected);
}, Qt::QueuedConnection);
}
}

void VeQItemMqttProducer::onStateChanged(QMqttClient::ClientState state)
{
if (mMqttConnection && mMqttConnection->state() == QMqttClient::Disconnected
&& (mConnectionState == VeQItemMqttProducer::Connecting
|| mConnectionState == VeQItemMqttProducer::Reconnecting)) {
// if the connection attempt failed, trigger our normal onDisconnected handler.
onDisconnected();
}
}

void VeQItemMqttProducer::onMessageReceived(const QByteArray &message, const QMqttTopicName &topic)
Expand Down

0 comments on commit 0661ff7

Please sign in to comment.