Skip to content

Commit 9995b72

Browse files
committed
Resolve rare deadlock that could occur when network error occurs while multiple other operations are in progress.
Ref #509
1 parent 1c925c4 commit 9995b72

File tree

2 files changed

+117
-1
lines changed

2 files changed

+117
-1
lines changed

client.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
607607
commsIncomingPub = nil
608608
continue
609609
}
610-
incomingPubChan <- pub
610+
// Care is needed here because an error elsewhere could trigger a deadlock
611+
sendPubLoop:
612+
for {
613+
select {
614+
case incomingPubChan <- pub:
615+
break sendPubLoop
616+
case err, ok := <-commsErrors:
617+
if !ok { // commsErrors has been closed so we can ignore it
618+
commsErrors = nil
619+
continue
620+
}
621+
ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
622+
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
623+
continue
624+
}
625+
}
611626
case err, ok := <-commsErrors:
612627
if !ok {
613628
commsErrors = nil

fvt_client_test.go

+101
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ package mqtt
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"fmt"
21+
"runtime"
22+
"sync"
2023
"testing"
2124
"time"
2225

@@ -1444,3 +1447,101 @@ func Test_ResumeSubsWithReconnect(t *testing.T) {
14441447

14451448
c.Disconnect(250)
14461449
}
1450+
1451+
// Issue 209 - occasional deadlock when connections are lost unexpectedly
1452+
// This was quite a nasty deadlock which occurred in very rare circumstances; I could not come up with a reliable way of
1453+
// replicating this but the below would cause it to happen fairly consistently (when the test was run a decent number
1454+
// of times). Following the fix it ran 10,000 times without issue.
1455+
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
1456+
func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
1457+
topic := "/test/DisconnectWhileProcessingIncomingPublish"
1458+
1459+
pops := NewClientOptions()
1460+
pops.AddBroker(FVTTCP)
1461+
// pops.SetOrderMatters(false) // Not really needed but consistent...
1462+
pops.SetClientID("dwpip-pub")
1463+
p := NewClient(pops)
1464+
1465+
sops := NewClientOptions()
1466+
sops.AddBroker(FVTTCP)
1467+
sops.SetAutoReconnect(false) // We dont want the connection to be re-established
1468+
sops.SetWriteTimeout(500 * time.Millisecond) // We will be sending a lot of publish messages and want go routines to clear...
1469+
// sops.SetOrderMatters(false)
1470+
sops.SetClientID("dwpip-sub")
1471+
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occured)
1472+
sDisconnected := make(chan struct{})
1473+
sops.SetConnectionLostHandler(func(Client, error) { close(sDisconnected) })
1474+
1475+
msgReceived := make(chan struct{})
1476+
var oneMsgReceived sync.Once
1477+
var f MessageHandler = func(client Client, msg Message) {
1478+
// No need to do anything when message received (just want ACK sent ASAP)
1479+
oneMsgReceived.Do(func() { close(msgReceived) })
1480+
}
1481+
1482+
s := NewClient(sops).(*client) // s = subscriber
1483+
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
1484+
t.Fatalf("Error on subscriber Client.Connect(): %v", sToken.Error())
1485+
}
1486+
1487+
if sToken := s.Subscribe(topic, 1, f); sToken.Wait() && sToken.Error() != nil {
1488+
t.Fatalf("Error on subscriber Client.Subscribe(): %v", sToken.Error())
1489+
}
1490+
1491+
// Use a go routine to swamp the broker with messages
1492+
if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil { // p = publisher
1493+
t.Fatalf("Error on publisher Client.Connect(): %v", pToken.Error())
1494+
}
1495+
// We will hammer both the publisher and subscriber with messages
1496+
ctx, cancel := context.WithCancel(context.Background())
1497+
pubDone := make(chan struct{})
1498+
go func() {
1499+
defer close(pubDone)
1500+
i := 0
1501+
for {
1502+
p.Publish(topic, 1, false, fmt.Sprintf("test message: %d", i))
1503+
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing its a problem for another time)
1504+
go func() { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }()
1505+
i++
1506+
1507+
if ctx.Err() != nil {
1508+
return
1509+
}
1510+
}
1511+
}()
1512+
1513+
// Wait until we have received a message (ensuring that the stream of messages has started)
1514+
select {
1515+
case <-msgReceived: // All good
1516+
case <-time.After(time.Second):
1517+
t.Errorf("no messages received")
1518+
}
1519+
1520+
// We need the connection to drop; unfortunately using any internal method (`s.conn.Close()` etc) will hide the
1521+
// behaviour because any calls to Read/Write will return immediately. So we just ask the broker to disconnect..
1522+
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
1523+
err := dm.Write(s.conn)
1524+
if err != nil {
1525+
t.Fatalf("error dending disconnect packet: %s", err)
1526+
}
1527+
1528+
// Lets give the library up to a second to shutdown (indicated by the status changing)
1529+
select {
1530+
case <-sDisconnected: // All good
1531+
case <-time.After(time.Second):
1532+
cancel() // no point leaving publisher running
1533+
time.Sleep(time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
1534+
buf := make([]byte, 1<<20)
1535+
stacklen := runtime.Stack(buf, true)
1536+
t.Fatalf("connection was not lost as expected - probable deadlock. Stacktrace follows: %s", buf[:stacklen])
1537+
}
1538+
1539+
cancel() // no point leaving publisher running
1540+
1541+
select {
1542+
case <-pubDone:
1543+
case <-time.After(time.Second):
1544+
t.Errorf("pubdone not closed within a second")
1545+
}
1546+
p.Disconnect(250) // Close publisher
1547+
}

0 commit comments

Comments
 (0)