Skip to content

Commit 5c55c95

Browse files
committed
Reconnect on zmq errors.
1 parent 2bad989 commit 5c55c95

File tree

1 file changed

+56
-12
lines changed

1 file changed

+56
-12
lines changed

internal/backend/concentratord/concentratord.go

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package concentratord
33
import (
44
"context"
55
"sync"
6+
"time"
67

78
"github.com/go-zeromq/zmq4"
89
"github.com/gofrs/uuid"
@@ -28,12 +29,16 @@ type Backend struct {
2829
subscribeEventChan chan events.Subscribe
2930
disconnectChan chan lorawan.EUI64
3031

32+
eventURL string
33+
commandURL string
34+
35+
gatewayID lorawan.EUI64
36+
3137
crcCheck bool
3238
}
3339

3440
// NewBackend creates a new Backend.
3541
func NewBackend(conf config.Config) (*Backend, error) {
36-
var err error
3742
log.WithFields(log.Fields{
3843
"event_url": conf.Backend.Concentratord.EventURL,
3944
"command_url": conf.Backend.Concentratord.CommandURL,
@@ -48,34 +53,70 @@ func NewBackend(conf config.Config) (*Backend, error) {
4853
gatewayStatsChan: make(chan gw.GatewayStats, 1),
4954
subscribeEventChan: make(chan events.Subscribe, 1),
5055

56+
eventURL: conf.Backend.Concentratord.EventURL,
57+
commandURL: conf.Backend.Concentratord.CommandURL,
58+
5159
crcCheck: conf.Backend.Concentratord.CRCCheck,
5260
}
5361

54-
err = b.eventSock.Dial(conf.Backend.Concentratord.EventURL)
62+
b.dialEventSockLoop()
63+
b.dialCommandSockLoop()
64+
65+
b.subscribeEventChan <- events.Subscribe{Subscribe: true, GatewayID: b.gatewayID}
66+
67+
go b.eventLoop()
68+
69+
return &b, nil
70+
}
71+
72+
func (b *Backend) dialEventSock() error {
73+
err := b.eventSock.Dial(b.eventURL)
5574
if err != nil {
56-
return nil, errors.Wrap(err, "dial event api url error")
75+
return errors.Wrap(err, "dial event api url error")
5776
}
5877

5978
err = b.eventSock.SetOption(zmq4.OptionSubscribe, "")
6079
if err != nil {
61-
return nil, errors.Wrap(err, "set event option error")
80+
return errors.Wrap(err, "set event option error")
6281
}
6382

64-
err = b.commandSock.Dial(conf.Backend.Concentratord.CommandURL)
83+
return nil
84+
}
85+
86+
func (b *Backend) dialCommandSock() error {
87+
err := b.commandSock.Dial(b.commandURL)
6588
if err != nil {
66-
return nil, errors.Wrap(err, "dial command api url error")
89+
return errors.Wrap(err, "dial command api url error")
6790
}
6891

69-
gatewayID, err := b.getGatewayID()
92+
b.gatewayID, err = b.getGatewayID()
7093
if err != nil {
71-
return nil, errors.Wrap(err, "get gateway id error")
94+
return errors.Wrap(err, "get gateway id error")
7295
}
7396

74-
b.subscribeEventChan <- events.Subscribe{Subscribe: true, GatewayID: gatewayID}
97+
return nil
98+
}
7599

76-
go b.eventLoop()
100+
func (b *Backend) dialCommandSockLoop() {
101+
for {
102+
if err := b.dialCommandSock(); err != nil {
103+
log.WithError(err).Error("backend/concentratord: command socket dial error")
104+
time.Sleep(time.Second)
105+
continue
106+
}
107+
break
108+
}
109+
}
77110

78-
return &b, nil
111+
func (b *Backend) dialEventSockLoop() {
112+
for {
113+
if err := b.dialEventSock(); err != nil {
114+
log.WithError(err).Error("backend/concentratord: event socket dial error")
115+
time.Sleep(time.Second)
116+
continue
117+
}
118+
break
119+
}
79120
}
80121

81122
func (b *Backend) getGatewayID() (lorawan.EUI64, error) {
@@ -182,11 +223,13 @@ func (b *Backend) commandRequest(command string, v proto.Message) ([]byte, error
182223

183224
msg := zmq4.NewMsgFrom([]byte(command), bb)
184225
if err = b.commandSock.SendMulti(msg); err != nil {
226+
b.dialCommandSock()
185227
return nil, errors.Wrap(err, "send command request error")
186228
}
187229

188230
reply, err := b.commandSock.Recv()
189231
if err != nil {
232+
b.dialCommandSock()
190233
return nil, errors.Wrap(err, "receive command request reply error")
191234
}
192235

@@ -197,7 +240,8 @@ func (b *Backend) eventLoop() {
197240
for {
198241
msg, err := b.eventSock.Recv()
199242
if err != nil {
200-
log.WithError(err).Fatal("backend/concentratord: receive event message error")
243+
log.WithError(err).Error("backend/concentratord: receive event message error")
244+
b.dialEventSockLoop()
201245
continue
202246
}
203247

0 commit comments

Comments
 (0)