Skip to content

Commit f534551

Browse files
committed
fix issue with multiple topics using SendMessage
1 parent d8207fe commit f534551

File tree

5 files changed

+37
-22
lines changed

5 files changed

+37
-22
lines changed

pkg/mqtt/client.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ func NewClient(o Options) (*Client, error) {
4141
opts.SetPassword(o.Password)
4242
}
4343

44-
opts.SetPingTimeout(10 * time.Second)
45-
opts.SetKeepAlive(10 * time.Second)
44+
opts.SetPingTimeout(60 * time.Second)
45+
opts.SetKeepAlive(60 * time.Second)
4646
opts.SetAutoReconnect(true)
4747
opts.SetMaxReconnectInterval(10 * time.Second)
4848
opts.SetConnectionLostHandler(func(c paho.Client, err error) {
49-
log.DefaultLogger.Error(fmt.Sprintf("MQTT Connection Lost: %s\n" + err.Error()))
49+
log.DefaultLogger.Error(fmt.Sprintf("MQTT Connection Lost: %s", err.Error()))
5050
})
5151
opts.SetReconnectingHandler(func(c paho.Client, options *paho.ClientOptions) {
5252
log.DefaultLogger.Debug("MQTT Reconnecting")
@@ -61,7 +61,7 @@ func NewClient(o Options) (*Client, error) {
6161

6262
return &Client{
6363
client: &client,
64-
stream: make(chan StreamMessage),
64+
stream: make(chan StreamMessage, 1000),
6565
}, nil
6666
}
6767

@@ -83,12 +83,12 @@ func (c *Client) Messages(path string) ([]Message, bool) {
8383
return topic.messages, true
8484
}
8585

86-
func (c *Client) Stream() *chan StreamMessage {
87-
return &c.stream
86+
func (c *Client) Stream() chan StreamMessage {
87+
return c.stream
8888
}
8989

9090
func (c *Client) HandleMessage(client paho.Client, msg paho.Message) {
91-
log.DefaultLogger.Debug(fmt.Sprintf("Received MQTT Message %+v", msg))
91+
log.DefaultLogger.Debug(fmt.Sprintf("Received MQTT Message for topic %s", msg.Topic()))
9292
topic, ok := c.topics.Load(msg.Topic())
9393

9494
if ok {
@@ -129,6 +129,13 @@ func (c *Client) Subscribe(t string) {
129129
}
130130
}
131131

132+
func (c *Client) Unsubscribe(t string) {
133+
log.DefaultLogger.Debug(fmt.Sprintf("Unsubscribing from MQTT topic: %s", t))
134+
client := *c.client
135+
client.Unsubscribe(t)
136+
c.topics.Delete(t)
137+
}
138+
132139
func (c *Client) Dispose() {
133140
client := *c.client
134141
log.DefaultLogger.Info("MQTT Disconnecting")

pkg/mqtt/topic.go

+4
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,7 @@ func (tm *TopicMap) Load(path string) (*Topic, bool) {
3232
func (tm *TopicMap) Store(topic *Topic) {
3333
tm.Map.Store(topic.path, topic)
3434
}
35+
36+
func (tm *TopicMap) Delete(path string) {
37+
tm.Map.Delete(path)
38+
}

pkg/plugin/datasource.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,18 @@ import (
77
"time"
88

99
"github.com/grafana/grafana-plugin-sdk-go/backend"
10+
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
1011
"github.com/grafana/grafana-plugin-sdk-go/data"
1112
"github.com/toddtreece/mqtt-datasource/pkg/mqtt"
1213
)
1314

1415
type MQTTClient interface {
15-
Stream() *chan mqtt.StreamMessage
16+
Stream() chan mqtt.StreamMessage
1617
IsConnected() bool
1718
IsSubscribed(topic string) bool
1819
Messages(topic string) ([]mqtt.Message, bool)
1920
Subscribe(topic string)
21+
Unsubscribe(topic string)
2022
}
2123

2224
type MQTTDatasource struct {
@@ -97,16 +99,19 @@ func (m *MQTTDatasource) Query(query backend.DataQuery) backend.DataResponse {
9799
}
98100

99101
frame := ToFrame(messages)
100-
frame.SetMeta(&data.FrameMeta{
101-
Channel: m.channelPrefix + qm.Topic,
102-
})
102+
103+
if qm.Topic != "" {
104+
frame.SetMeta(&data.FrameMeta{
105+
Channel: m.channelPrefix + qm.Topic,
106+
})
107+
}
103108

104109
response.Frames = append(response.Frames, frame)
105110
return response
106111
}
107112

108113
func (m *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
109-
if msg.Topic != req.Path {
114+
if !m.Client.IsSubscribed(req.Path) {
110115
return nil
111116
}
112117

@@ -125,6 +130,7 @@ func (m *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunStr
125130
Data: bytes,
126131
}
127132

133+
log.DefaultLogger.Debug(fmt.Sprintf("Sending message to client for topic %s", msg.Topic))
128134
return sender.Send(packet)
129135
}
130136

pkg/plugin/handler.go

+5-9
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@ package plugin
33
import (
44
"context"
55
"errors"
6-
"fmt"
76

87
"github.com/grafana/grafana-plugin-sdk-go/backend"
98
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
109
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
11-
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
1210
"github.com/grafana/grafana-plugin-sdk-go/data"
1311
"github.com/toddtreece/mqtt-datasource/pkg/mqtt"
1412
)
@@ -59,7 +57,7 @@ func (h *Handler) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
5957
}, nil
6058
}
6159

62-
if ds.Client.IsConnected() {
60+
if !ds.Client.IsConnected() {
6361
return &backend.CheckHealthResult{
6462
Status: backend.HealthStatusError,
6563
Message: "MQTT Disconnected",
@@ -97,17 +95,15 @@ func (h *Handler) RunStream(ctx context.Context, req *backend.RunStreamRequest,
9795
return err
9896
}
9997

98+
defer ds.Client.Unsubscribe(req.Path)
99+
100100
for {
101101
select {
102102
case <-ctx.Done():
103103
backend.Logger.Info("stop streaming (context canceled)")
104104
return nil
105-
case message := <-*ds.Client.Stream():
106-
err := ds.SendMessage(message, req, sender)
107-
if err != nil {
108-
log.DefaultLogger.Error(fmt.Sprintf("unable to send message: %s", err.Error()))
109-
}
110-
105+
case message := <-ds.Client.Stream():
106+
go ds.SendMessage(message, req, sender)
111107
}
112108
}
113109
}

test_broker.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ const aedes = require('aedes')();
22
const server = require('net').createServer(aedes.handle);
33

44
const PORT = 1883;
5-
65
const publishers = {};
76

87
const toMillis = {
@@ -41,4 +40,7 @@ const createPublisher = ({ topic, qos }) => {
4140
server.listen(PORT, () => {
4241
console.log('server started and listening on port ', PORT);
4342
aedes.on('subscribe', subscriptions => subscriptions.forEach(createPublisher));
43+
aedes.on('connectionError', console.error);
44+
aedes.on('clientDisconnect', client => console.log(`disconnect: ${client.id}`));
45+
aedes.on('client', client => console.log(`connect: ${client.id}`));
4446
});

0 commit comments

Comments
 (0)