Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement VeQItemMqtt and VeQItemMqttProducer #2

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
afc55df
Implement VeQItemMqtt and VeQItemMqttProducer
chriadam Jan 5, 2023
2fe8ffd
Improve MQTT producer
chriadam Feb 16, 2023
2ef20a5
Service MQTT message queue asynchronously
chriadam Feb 23, 2023
356da73
Support VRM portal connections
chriadam Mar 7, 2023
63eb644
Handle WebSocket disconnection more gracefully
chriadam Mar 9, 2023
431c3d3
No longer service MQTT message queue asynchronously
chriadam Mar 21, 2023
83023d8
Invalidate MQTT item values on disconnect / null
chriadam Mar 24, 2023
7fcae3d
Fix websocket disconnection handling
chriadam Apr 20, 2023
0b3c094
Don't trigger requestValue() from every MQTT item on reconnect
chriadam Apr 28, 2023
38ed766
Transition to Ready state after all initial messages are received
chriadam Apr 28, 2023
38df65d
Only transition from Connected to Initializing
chriadam May 18, 2023
21aaebf
Make WebSocketDevice a sequential QIODevice, report bytesAvailable
chriadam May 23, 2023
930e8ed
Ignore MQTT messages with the retain flag set
chriadam May 30, 2023
1af7c4a
Improve keepalive handling to support FlashMQ broker
chriadam Jun 21, 2023
e8a3872
Only ignore messages with 'retain' flag if not on VRM broker
chriadam Jun 21, 2023
ddd3441
VeQItemMqtt: allow values to be polled via getValue(true)
blammit Aug 2, 2023
2e093c9
Close websocket device upon disconnection
chriadam Aug 4, 2023
cd724a3
Don't ignore retained messages
chriadam Aug 4, 2023
d4994b8
Fix some warnings and whitespace errors
chriadam Aug 16, 2023
72dc03a
Support heartbeat status detection
chriadam Aug 18, 2023
f95312a
Use Q_DECL_DEPRECATED instead of gcc deprecated attributes
chriadam Oct 30, 2023
e44d60f
MQTT producer: mark produced values as synchronized (seen)
chriadam Oct 30, 2023
775bf8e
MQTT producer: produce min/max/default values if available
chriadam Oct 30, 2023
8b8c773
MQTT: reduce initializing->ready fallback timer timeout to 4s
chriadam Nov 13, 2023
95318ac
VeQuickItem: add 'isValid' property
blammit Nov 20, 2023
f5a3dd7
Detect and work-around stuck-reconnection issue in QtMqtt
chriadam Nov 3, 2023
e731a43
MQTT: transition to Ready state after receiving "finished" message
chriadam Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions examples/mqtt_producer/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include <QGuiApplication>
#include <QScopedPointer>
#include <QQmlEngine>
#include <QQmlComponent>
#include <QQuickWindow>
#include <QtDebug>

#ifdef Q_OS_WASM
#include <emscripten/val.h>
#include <emscripten.h>
#include <QWebSocketProtocol>
#include <QUrl>
#include <QUrlQuery>
#else
#include <QHostAddress>
#endif

#include <veutil/qt/ve_qitems_mqtt.hpp>
#include <veutil/qt/ve_quick_item.hpp>

int main(int argc, char *argv[])
{
qmlRegisterType<VeQuickItem>("Victron.VeUtil", 1, 0, "VeQuickItem");

QGuiApplication app(argc, argv);
QQmlEngine engine;
QQmlComponent component(&engine, QUrl(QStringLiteral("qrc:/main.qml")));

bool running = false;
VeQItemMqttProducer producer(VeQItems::getRoot(), QStringLiteral("mqtt"), QStringLiteral("example"));
QObject::connect(&producer, &VeQItemMqttProducer::aboutToConnect,
[&producer] {
producer.setCredentials(QString(), QString());
producer.continueConnect();
});
QObject::connect(&producer, &VeQItemMqttProducer::connectionStateChanged,
[&running, &producer] {
if (producer.connectionState() == VeQItemMqttProducer::Ready) {
running = true;
}
});

QTimer::singleShot(10000,
[&app, &running] {
if (!running) {
qWarning() << "MQTT producer never became ready, unable to connect?";
app.quit();
}
});

QQuickWindow *window = qobject_cast<QQuickWindow*>(component.create());
if (!window) {
qWarning() << "Unable to create main.qml component! " << component.errors();
return 1;
}

window->showFullScreen();
#ifdef Q_OS_WASM
emscripten::val webLocation = emscripten::val::global("location");
const QUrl webLocationUrl = QUrl(QString::fromStdString(webLocation["href"].as<std::string>()));
const QUrlQuery query(webLocationUrl);
const QString ip(query.queryItemValue("mqtt"));
if (ip.isEmpty()) {
qWarning() << "Please specify the broker's IP address via mqtt query parameter, e.g. ?mqtt=192.168.5.96";
} else {
const QString mqttUrl(QStringLiteral("ws://%1:9001/").arg(ip)); // e.g.: "ws://192.168.5.96:9001/"
producer.open(QUrl(mqttUrl), QMqttClient::MQTT_3_1);
}
#else
producer.open(QHostAddress::LocalHost, 1883);
#endif
return app.exec();
}
26 changes: 26 additions & 0 deletions examples/mqtt_producer/main.qml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import QtQuick
import QtQuick.Window
import Victron.VeUtil

Window {
id: root

title: "Testing MQTT VeQItem Support"
color: "lightsteelblue"

width: 600
height: 600

Text {
anchors.centerIn: parent
text: "Voltage = " + batteryVoltage.value
font.family: "Helvetica"
font.pointSize: 24
color: "black"
}

VeQuickItem {
id: batteryVoltage
uid: "mqtt/system/0/Dc/Battery/Voltage"
}
}
16 changes: 16 additions & 0 deletions examples/mqtt_producer/mqtt_producer.pro
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
TARGET = mqtt_producer
TEMPLATE = app
QT += core network gui widgets qml quick mqtt

SOURCES += main.cpp
RESOURCES += qml.qrc

# uncomment the following for websockets support
DEFINES += MQTT_WEBSOCKETS_ENABLED
QT += websockets

CONFIG += ve-qitems-mqtt
VEUTIL = ../..
INCLUDEPATH += $${VEUTIL}/inc
include($${VEUTIL}/src/qt/veqitem.pri)
include($${VEUTIL}/common.pri)
6 changes: 6 additions & 0 deletions examples/mqtt_producer/qml.qrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<RCC>
<qresource prefix="/">
<file>main.qml</file>
</qresource>
</RCC>

4 changes: 2 additions & 2 deletions inc/veutil/qt/ve_qitem.hpp
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class VE_QITEM_EXPORT VeQItem : public QObject
* will free the whole tree. Its only usefull for items which are not part of
* a tree, and that is never the case. So lets deprecate that constructor.
*/
explicit VeQItem(VeQItemProducer *producer, QObject *parent) __attribute((deprecated));
Q_DECL_DEPRECATED explicit VeQItem(VeQItemProducer *producer, QObject *parent);
explicit VeQItem(VeQItemProducer *producer);
~VeQItem();

Expand Down Expand Up @@ -417,7 +417,7 @@ class VE_QITEM_EXPORT VeQItemLocal : public VeQItem
Q_OBJECT

public:
explicit VeQItemLocal(VeQItemProducer *producer, QObject *parent) __attribute((deprecated));
Q_DECL_DEPRECATED explicit VeQItemLocal(VeQItemProducer *producer, QObject *parent);
explicit VeQItemLocal(VeQItemProducer *producer);

int setValue(QVariant const &value) override
Expand Down
193 changes: 193 additions & 0 deletions inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
#pragma once

#include <veutil/qt/ve_qitem.hpp>

#include <QMqttClient>
#include <QMqttSubscription>

#include <QPointer>
#include <QHostAddress>
#include <QTimer>
#include <QSet>
#include <QString>
#include <QByteArray>
#include <QUrl>

#ifdef MQTT_WEBSOCKETS_ENABLED
#include <QWebSocket>
#include <QWebSocketProtocol>
#include <QIODevice>
class WebSocketDevice;
#endif // MQTT_WEBSOCKETS_ENABLED

class VeQItemMqttProducer;

// Implementation of a VeQItem over MQTT
class VeQItemMqtt : public VeQItem
{
Q_OBJECT

public:
VeQItemMqtt(VeQItemMqttProducer *producer);

int setValue(QVariant const &value) override;

QVariant getValue() override;
QVariant getValue(bool force) override;

protected:
void setParent(QObject *parent) override;

private:
VeQItemMqttProducer *mqttProducer() const;
};

class VeQItemMqttProducer : public VeQItemProducer
{
Q_OBJECT
Q_PROPERTY(HeartbeatState heartbeatState READ heartbeatState NOTIFY heartbeatStateChanged)
Q_PROPERTY(ConnectionState connectionState READ connectionState NOTIFY connectionStateChanged)
Q_PROPERTY(QMqttClient::ClientError error READ error NOTIFY errorChanged)
Q_PROPERTY(QString portalId READ portalId WRITE setPortalId NOTIFY portalIdChanged)

public:

enum ConnectionState {
Idle,
Connecting,
Connected,
Initializing,
Ready,
Disconnected,
Reconnecting,
Failed
};
Q_ENUM(ConnectionState)

// The heartbeat state tracks whether the CerboGX is actively alive
// and writing heartbeat timestamp updates to the MQTT broker.
// We might be actively connected to the VRM broker, but the actual
// device itself might have gone offline and so the data is "not live".
enum HeartbeatState {
HeartbeatActive,
HeartbeatMissing,
HeartbeatInactive,
};
Q_ENUM(HeartbeatState)

VeQItemMqttProducer(VeQItem *root, const QString &id, const QString &clientIdPrefix, QObject *parent = nullptr);

VeQItem *createItem() override;

void open(const QHostAddress &host, int port);

#ifdef MQTT_WEBSOCKETS_ENABLED
void open(
const QUrl &url,
QMqttClient::ProtocolVersion protocolVersion);
#endif // MQTT_WEBSOCKETS_ENABLED

QMqttClient *mqttConnection() const;
void setCredentials(const QString &username, const QString &password);
bool publishValue(const QString &uid, const QVariant &value);
bool requestValue(const QString &uid);
HeartbeatState heartbeatState() const;
ConnectionState connectionState() const;
QMqttClient::ClientError error() const;

QString portalId() const;
void setPortalId(const QString &portalId);

Q_SIGNALS:
void heartbeatStateChanged();
void connectionStateChanged();
void errorChanged();
void portalIdChanged();
void messageReceived(const QString &path, const QVariant &value);
void nullMessageReceived(const QString &path);
void aboutToConnect(); // client should handle this by calling setCredentials() and calling continueConnect

public Q_SLOTS:
void continueConnect();

private Q_SLOTS:
void onConnected();
void onDisconnected();
void onErrorChanged(QMqttClient::ClientError error);
void onStateChanged(QMqttClient::ClientState state);
void onMessageReceived(const QByteArray &message, const QMqttTopicName &topic);
void onSubscriptionMessageReceived(const QMqttMessage &message);
void doKeepAlive(bool suppressRepublish = false);

private:
void setHeartbeatState(HeartbeatState heartbeatState);
void setConnectionState(ConnectionState connectionState);
void setError(QMqttClient::ClientError error);
void parseMessage(const QString &path, const QByteArray &message);

QTimer *mKeepAliveTimer;
QTimer *mHeartBeatTimer;
QTimer *mReadyStateFallbackTimer;
QMqttClient *mMqttConnection;
QPointer<QMqttSubscription> mMqttSubscription;
#ifdef MQTT_WEBSOCKETS_ENABLED
WebSocketDevice *mWebSocket = nullptr;
#endif // MQTT_WEBSOCKETS_ENABLED
QString mClientId;
QString mPortalId;
QUrl mUrl;
QString mHostName;
int mPort;
HeartbeatState mHeartbeatState;
ConnectionState mConnectionState;
const int mReconnectAttemptIntervals[6] = { 250, 1000, 2000, 5000, 10000, 30000 };
quint16 mAutoReconnectAttemptCounter;
const quint16 mAutoReconnectMaxAttempts;
QMqttClient::ClientError mError;
QMqttClient::ProtocolVersion mProtocolVersion;
int mMissedHeartbeats;
bool mReceivedMessage;
bool mIsVrmBroker;
};

#ifdef MQTT_WEBSOCKETS_ENABLED
class WebSocketDevice : public QIODevice
{
Q_OBJECT
Q_PROPERTY(QUrl url READ url WRITE setUrl NOTIFY urlChanged)
Q_PROPERTY(QByteArray protocol READ protocol WRITE setProtocol NOTIFY protocolChanged)

public:
WebSocketDevice(QObject *parent = nullptr);

void setUrl(const QUrl &url);
QUrl url() const;

void setProtocol(const QByteArray &protocol);
QByteArray protocol() const;

bool isValid() const;
bool open(QIODeviceBase::OpenMode mode) override;
void close() override;
qint64 readData(char *data, qint64 maxSize) override;
qint64 writeData(const char *data, qint64 maxSize) override;
qint64 bytesAvailable() const override;
bool isSequential() const override;

Q_SIGNALS:
void urlChanged();
void protocolChanged();
void connected();
void disconnected();

public Q_SLOTS:
void onBinaryMessageReceived(const QByteArray &message);

private:
QUrl mUrl;
QByteArray mProtocol;
QByteArray mData;
QWebSocket mWebSocket;
};
#endif // MQTT_WEBSOCKETS_ENABLED

10 changes: 10 additions & 0 deletions inc/veutil/qt/ve_quick_item.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class VE_QITEM_EXPORT VeQuickItem : public QObject {
Q_PROPERTY(double defaultSourceMin READ getDefaultSourceMin NOTIFY defaultMinChanged)
Q_PROPERTY(double defaultSourceMax READ getDefaultSourceMax NOTIFY defaultMaxChanged)
Q_PROPERTY(bool isSetting READ getIsSetting NOTIFY isSettingChanged WRITE setIsSetting)
Q_PROPERTY(bool isValid READ getIsValid NOTIFY isValidChanged)
Q_PROPERTY(bool invalidate READ getInvalidate NOTIFY invalidateChanged WRITE setInvalidate)

public:
Expand All @@ -47,6 +48,7 @@ class VE_QITEM_EXPORT VeQuickItem : public QObject {
mMaxInDisplayUnits(0),
mMinInDisplayUnits(0),
mIsSetting(isSetting),
mIsValid(0),
mInvalidate(1)
{
setUid("");
Expand Down Expand Up @@ -147,6 +149,8 @@ class VE_QITEM_EXPORT VeQuickItem : public QObject {
bool getIsSetting() { return mIsSetting; }
void setIsSetting(bool value);

bool getIsValid() const { return mIsValid; }

signals:
void defaultChanged();
void minChanged();
Expand All @@ -159,6 +163,7 @@ class VE_QITEM_EXPORT VeQuickItem : public QObject {
void uidChanged();
void valueChanged();
void stateChanged();
void isValidChanged();

void unitChanged();
void decimalsChanged();
Expand Down Expand Up @@ -187,7 +192,11 @@ protected slots:
}

void onValueChanged() {
const bool prevIsValid = mIsValid;
mIsValid = getValue().isValid();
emit valueChanged();
if (prevIsValid != mIsValid)
emit isValidChanged();
if (mTextMode == TextMode::Format)
emit textChanged();
}
Expand All @@ -206,6 +215,7 @@ protected slots:
uint32_t mMaxInDisplayUnits:1;
uint32_t mMinInDisplayUnits:1;
uint32_t mIsSetting:1;
uint32_t mIsValid:1;
uint32_t mIsAllocated:1;
uint32_t mInvalidate:1;

Expand Down
Loading