@@ -132,23 +132,58 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
132
132
// associated callback (or the defaultHandler, if one exists and no other route matched). If
133
133
// anything is sent down the stop channel the function will end.
134
134
func (r * router ) matchAndDispatch (messages <- chan * packets.PublishPacket , order bool , client * client ) <- chan * PacketAndToken {
135
- ackChan := make (chan * PacketAndToken )
136
- go func () {
135
+ var wg sync.WaitGroup
136
+ ackOutChan := make (chan * PacketAndToken ) // Channel returned to caller; closed when messages channel closed
137
+ var ackInChan chan * PacketAndToken // ACKs generated by ackFunc get put onto this channel
138
+
139
+ stopAckCopy := make (chan struct {}) // Closure requests stop of go routine copying ackInChan to ackOutChan
140
+ ackCopyStopped := make (chan struct {}) // Closure indicates that it is safe to close ackOutChan
141
+ goRoutinesDone := make (chan struct {}) // closed on wg.Done()
142
+ if order {
143
+ ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
144
+ } else {
145
+ // When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
146
+ ackInChan = make (chan * PacketAndToken )
147
+ go func () { // go routine to copy from ackInChan to ackOutChan until stopped
148
+ for {
149
+ select {
150
+ case a := <- ackInChan :
151
+ ackOutChan <- a
152
+ case <- stopAckCopy :
153
+ close (ackCopyStopped ) // Signal main go routine that it is safe to close ackOutChan
154
+ for {
155
+ select {
156
+ case <- ackInChan : // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
157
+ DEBUG .Println (ROU , "matchAndDispatch received acknowledgment after processing stopped (ACK dropped)." )
158
+ case <- goRoutinesDone :
159
+ close (ackInChan ) // Nothing further should be sent (a panic is probably better than silent failure)
160
+ DEBUG .Println (ROU , "matchAndDispatch order=false copy goroutine exiting." )
161
+ return
162
+ }
163
+ }
164
+ }
165
+ }
166
+ }()
167
+ }
168
+
169
+ go func () { // Main go routine handling inbound messages
137
170
for message := range messages {
138
171
// DEBUG.Println(ROU, "matchAndDispatch received message")
139
172
sent := false
140
173
r .RLock ()
141
- m := messageFromPublish (message , ackFunc (ackChan , client .persist , message ))
174
+ m := messageFromPublish (message , ackFunc (ackInChan , client .persist , message ))
142
175
var handlers []MessageHandler
143
176
for e := r .routes .Front (); e != nil ; e = e .Next () {
144
177
if e .Value .(* route ).match (message .TopicName ) {
145
178
if order {
146
179
handlers = append (handlers , e .Value .(* route ).callback )
147
180
} else {
148
181
hd := e .Value .(* route ).callback
182
+ wg .Add (1 )
149
183
go func () {
150
184
hd (client , m )
151
185
m .Ack ()
186
+ wg .Done ()
152
187
}()
153
188
}
154
189
sent = true
@@ -159,9 +194,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
159
194
if order {
160
195
handlers = append (handlers , r .defaultHandler )
161
196
} else {
197
+ wg .Add (1 )
162
198
go func () {
163
199
r .defaultHandler (client , m )
164
200
m .Ack ()
201
+ wg .Done ()
165
202
}()
166
203
}
167
204
} else {
@@ -175,8 +212,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
175
212
}
176
213
// DEBUG.Println(ROU, "matchAndDispatch handled message")
177
214
}
178
- close (ackChan )
215
+ if order {
216
+ close (ackOutChan )
217
+ } else { // Ensure that nothing further will be written to ackOutChan before closing it
218
+ close (stopAckCopy )
219
+ <- ackCopyStopped
220
+ close (ackOutChan )
221
+ go func () {
222
+ wg .Wait () // Note: If this remains running then the user has handlers that are not returning
223
+ close (goRoutinesDone )
224
+ }()
225
+ }
179
226
DEBUG .Println (ROU , "matchAndDispatch exiting" )
180
227
}()
181
- return ackChan
228
+ return ackOutChan
182
229
}
0 commit comments