Skip to content

Commit

Permalink
Handle WebSocket disconnection more gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
chriadam committed Mar 9, 2023
1 parent c1b0c14 commit e66bbce
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 28 deletions.
7 changes: 6 additions & 1 deletion inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
#include <QQueue>
#include <QString>
#include <QByteArray>
#include <QUrl>

#ifdef MQTT_WEBSOCKETS_ENABLED
#include <QUrl>
#include <QWebSocket>
#include <QWebSocketProtocol>
#include <QIODevice>
Expand Down Expand Up @@ -110,12 +110,16 @@ private Q_SLOTS:
#endif // MQTT_WEBSOCKETS_ENABLED
QString mClientId;
QString mPortalId;
QUrl mUrl;
QString mHostName;
int mPort;
QQueue<QPair<QString, QByteArray> > mMessageQueue;
ConnectionState mConnectionState;
const int mReconnectAttemptIntervals[6] = { 250, 1000, 2000, 5000, 10000, 30000 };
quint16 mAutoReconnectAttemptCounter;
const quint16 mAutoReconnectMaxAttempts;
QMqttClient::ClientError mError;
QMqttClient::ProtocolVersion mProtocolVersion;
bool mReceivedMessage;
};

Expand All @@ -135,6 +139,7 @@ class WebSocketDevice : public QIODevice
void setProtocol(const QByteArray &protocol);
QByteArray protocol() const;

bool isValid() const;
bool open(QIODeviceBase::OpenMode mode) override;
void close() override;
qint64 readData(char *data, qint64 maxSize) override;
Expand Down
92 changes: 65 additions & 27 deletions src/qt/ve_qitems_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ VeQItemMqttProducer::VeQItemMqttProducer(
VeQItem *root, const QString &id, const QString &clientIdPrefix, QObject *parent)
: VeQItemProducer(root, id, parent),
mMqttConnection(nullptr),
mPort(0),
mConnectionState(Idle),
mAutoReconnectAttemptCounter(0),
mAutoReconnectMaxAttempts(sizeof(mReconnectAttemptIntervals)/sizeof(mReconnectAttemptIntervals[0])),
mError(QMqttClient::NoError),
mProtocolVersion(QMqttClient::MQTT_3_1_1),
mReceivedMessage(false)
{
// Create a sanitized clientId. MQTT v3.1 spec states that the clientId must be
Expand Down Expand Up @@ -72,6 +74,11 @@ bool VeQItemMqttProducer::open(
const QUrl &url,
QMqttClient::ProtocolVersion protocolVersion)
{
mUrl = url;
mProtocolVersion = protocolVersion;
mHostName = QString();
mPort = 0;

if (mMqttConnection) {
mMqttConnection->deleteLater();
}
Expand All @@ -89,6 +96,10 @@ bool VeQItemMqttProducer::open(
connect(mMqttConnection, &QMqttClient::messageReceived,
this, &VeQItemMqttProducer::onMessageReceived);

if (mWebSocket) {
mWebSocket->deleteLater();
}

mWebSocket = new WebSocketDevice(mMqttConnection);
mWebSocket->setUrl(url);
mWebSocket->setProtocol(
Expand Down Expand Up @@ -116,14 +127,18 @@ bool VeQItemMqttProducer::open(

bool VeQItemMqttProducer::open(const QHostAddress &host, int port)
{
mHostName = host.toString();
mPort = port;
mUrl = QUrl();

if (mMqttConnection) {
mMqttConnection->deleteLater();
}

mMqttConnection = new QMqttClient(this);
mMqttConnection->setClientId(mClientId);
mMqttConnection->setHostname(host.toString());
mMqttConnection->setPort(port);
mMqttConnection->setHostname(mHostName);
mMqttConnection->setPort(mPort);

connect(mMqttConnection, &QMqttClient::connected,
this, &VeQItemMqttProducer::onConnected);
Expand Down Expand Up @@ -189,8 +204,26 @@ void VeQItemMqttProducer::onDisconnected()
// Attempt to reconnect. We use a staggered exponential backoff interval.
setConnectionState(Reconnecting);
const int interval = mReconnectAttemptIntervals[mAutoReconnectAttemptCounter++];
#ifdef MQTT_WEBSOCKETS_ENABLED
if (!mWebSocket || !mWebSocket->isValid()) {
QTimer::singleShot(interval + QRandomGenerator::global()->bounded(interval/2),
this, [this] {
quint16 count = mAutoReconnectAttemptCounter;
if (mHostName.isEmpty()) {
open(mUrl, mProtocolVersion);
} else {
open(QHostAddress(mHostName), mPort);
}
mAutoReconnectAttemptCounter= count;
});
} else {
QTimer::singleShot(interval + QRandomGenerator::global()->bounded(interval/2),
this, &VeQItemMqttProducer::aboutToConnect);
}
#else
QTimer::singleShot(interval + QRandomGenerator::global()->bounded(interval/2),
this, &VeQItemMqttProducer::aboutToConnect);
#endif
} else {
setConnectionState(Failed);
}
Expand Down Expand Up @@ -351,56 +384,61 @@ bool VeQItemMqttProducer::publishValue(const QString &uid, const QVariant &value

#ifdef MQTT_WEBSOCKETS_ENABLED
WebSocketDevice::WebSocketDevice(QObject *parent)
: QIODevice(parent)
: QIODevice(parent)
{
connect(&mWebSocket, &QWebSocket::connected,
this, &WebSocketDevice::connected);
connect(&mWebSocket, &QWebSocket::disconnected,
this, &WebSocketDevice::disconnected);
connect(&mWebSocket, &QWebSocket::binaryMessageReceived,
this, &WebSocketDevice::onBinaryMessageReceived);
connect(&mWebSocket, &QWebSocket::connected,
this, &WebSocketDevice::connected);
connect(&mWebSocket, &QWebSocket::disconnected,
this, &WebSocketDevice::disconnected);
connect(&mWebSocket, &QWebSocket::binaryMessageReceived,
this, &WebSocketDevice::onBinaryMessageReceived);
}

void WebSocketDevice::setUrl(const QUrl &url)
{
if (mUrl != url) {
mUrl = url;
emit urlChanged();
}
if (mUrl != url) {
mUrl = url;
emit urlChanged();
}
}

QUrl WebSocketDevice::url() const
{
return mUrl;
return mUrl;
}

void WebSocketDevice::setProtocol(const QByteArray &protocol)
{
if (mProtocol != protocol) {
mProtocol = protocol;
emit protocolChanged();
}
if (mProtocol != protocol) {
mProtocol = protocol;
emit protocolChanged();
}
}

QByteArray WebSocketDevice::protocol() const
{
return mProtocol;
return mProtocol;
}

bool WebSocketDevice::isValid() const
{
return mWebSocket.isValid();
}

bool WebSocketDevice::open(QIODeviceBase::OpenMode mode)
{
// FIXME: Qt 6.4 supports websocket subprotocols, but until then...
QNetworkRequest r;
r.setUrl(mUrl);
r.setRawHeader("Sec-WebSocket-Protocol", mProtocol.constData());
mWebSocket.open(r);
return QIODevice::open(mode);
// FIXME: Qt 6.4 supports websocket subprotocols, but until then...
QNetworkRequest r;
r.setUrl(mUrl);
r.setRawHeader("Sec-WebSocket-Protocol", mProtocol.constData());
mWebSocket.open(r);
return QIODevice::open(mode);
}

void WebSocketDevice::close()
{
mWebSocket.close();
QIODevice::close();
mWebSocket.close();
QIODevice::close();
}

qint64 WebSocketDevice::readData(char *data, qint64 maxSize)
Expand Down

0 comments on commit e66bbce

Please sign in to comment.