Skip to content

Commit 8ceba39

Browse files
committed
refactor: overhaul transport controller to eliminate deadlock bugs
Signed-off-by: Christian Stewart <[email protected]>
1 parent e74d2de commit 8ceba39

29 files changed

+764
-821
lines changed

go.mod

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ module github.com/aperturerobotics/bifrost
33
go 1.22
44

55
require (
6-
github.com/aperturerobotics/common v0.18.0 // latest
7-
github.com/aperturerobotics/controllerbus v0.47.0 // latest
6+
github.com/aperturerobotics/common v0.18.1 // latest
7+
github.com/aperturerobotics/controllerbus v0.47.2 // latest
88
github.com/aperturerobotics/entitygraph v0.9.1 // latest
99
github.com/aperturerobotics/protobuf-go-lite v0.6.5 // latest
1010
github.com/aperturerobotics/starpc v0.33.6 // latest
11-
github.com/aperturerobotics/util v1.25.3 // latest
11+
github.com/aperturerobotics/util v1.25.7-0.20240802075949-372f05d3a13f // latest
1212
)
1313

1414
// aperture: use compatibility forks
@@ -22,7 +22,7 @@ replace (
2222
github.com/nats-io/nats-server/v2 => github.com/aperturerobotics/bifrost-nats-server/v2 v2.1.8-0.20221228081037-b7c2df0c151f // aperture-2.0
2323
github.com/nats-io/nats.go => github.com/aperturerobotics/bifrost-nats-client v1.10.1-0.20200831103200-24c3d0464e58 // aperture-2.0
2424
github.com/nats-io/nkeys => github.com/nats-io/nkeys v0.3.0 // indirect: used by bifrost-nats-server
25-
github.com/quic-go/quic-go => github.com/aperturerobotics/quic-go v0.45.1-0.20240701204210-82dc570e7aa0 // aperture
25+
github.com/quic-go/quic-go => github.com/aperturerobotics/quic-go v0.45.1-0.20240802054753-f83427ffc2c6 // aperture
2626
github.com/sirupsen/logrus => github.com/aperturerobotics/logrus v1.9.4-0.20240119050608-13332fb58195 // aperture
2727
)
2828

go.sum

+8-8
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ github.com/aperturerobotics/bifrost-nats-client v1.10.1-0.20200831103200-24c3d04
44
github.com/aperturerobotics/bifrost-nats-client v1.10.1-0.20200831103200-24c3d0464e58/go.mod h1:ougcjYEZDYV8pVtaNbA5sgYDukkYHyKtSsW/T3B13j0=
55
github.com/aperturerobotics/bifrost-nats-server/v2 v2.1.8-0.20221228081037-b7c2df0c151f h1:bmScByQNGDPPy9T+zdwu816XaCbFtD5UDyqZMRiHJ80=
66
github.com/aperturerobotics/bifrost-nats-server/v2 v2.1.8-0.20221228081037-b7c2df0c151f/go.mod h1:kIcZtLpq4UIZzOqduYLm1mYU1nuMBtN6XuDCtQ21QT8=
7-
github.com/aperturerobotics/common v0.18.0 h1:AoeDkZcpg83+OjwUXGQD2VlUEDUl03IZBJiHW7pXtHE=
8-
github.com/aperturerobotics/common v0.18.0/go.mod h1:qLWqm2T9MQmxDtE1ddNdWCHnUj1Z66GzNxllkZ3Fj7Y=
9-
github.com/aperturerobotics/controllerbus v0.47.0 h1:rQ9f0nd53GEYoZYqQ+lN9CaKcFot4RBATY/TosT8+j8=
10-
github.com/aperturerobotics/controllerbus v0.47.0/go.mod h1:pT96tRLE9bCk73Osc/DAhYr7rK4wDcypdy8FqAUunk8=
7+
github.com/aperturerobotics/common v0.18.1 h1:zXFzVFOPe/11YZv1EUevvTmoWRVhX+B+S/5oUk9emGQ=
8+
github.com/aperturerobotics/common v0.18.1/go.mod h1:qLWqm2T9MQmxDtE1ddNdWCHnUj1Z66GzNxllkZ3Fj7Y=
9+
github.com/aperturerobotics/controllerbus v0.47.2 h1:BKPoRY5FRhS0AWTe0DgpsyvKy+invnFgC8ZHSuz8W9M=
10+
github.com/aperturerobotics/controllerbus v0.47.2/go.mod h1:bbLYOlppStRVjtRbbXSB72W4aFrPMKx7oUb814A8/dI=
1111
github.com/aperturerobotics/entitygraph v0.9.1 h1:bCl0Q9N1hkQnOtdCYz7pNSYqH+5CY3EUCuD19dQmJeI=
1212
github.com/aperturerobotics/entitygraph v0.9.1/go.mod h1:40VoNXSslWQieSS4V3HyzqOOZnRiGxItj/qZYSHg8Rw=
1313
github.com/aperturerobotics/go-libp2p v0.33.1-0.20240511223728-e0b67c111765 h1:KTSK6vPrh6gaT6CH0G2yAnmlcdjsc9Bk5JCqSeePrUw=
@@ -24,12 +24,12 @@ github.com/aperturerobotics/logrus v1.9.4-0.20240119050608-13332fb58195 h1:uyeD1
2424
github.com/aperturerobotics/logrus v1.9.4-0.20240119050608-13332fb58195/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
2525
github.com/aperturerobotics/protobuf-go-lite v0.6.5 h1:AuPPcZ7ZaJe9ZYYC4gF7/5/Xbn9Mt9uXyV3+ADWy+Ys=
2626
github.com/aperturerobotics/protobuf-go-lite v0.6.5/go.mod h1:YTbfnUj3feSULhs8VgepAHFnF3wUc0CPj4jd2axy21I=
27-
github.com/aperturerobotics/quic-go v0.45.1-0.20240701204210-82dc570e7aa0 h1:KH1Zk+zgI+jKmc/oRUEXEdlW00sMpy/WZbnOChW+nPk=
28-
github.com/aperturerobotics/quic-go v0.45.1-0.20240701204210-82dc570e7aa0/go.mod h1:X095EBMI8M7riYQRvUgegHFkEkgM2QKLvyGHyAcOw/Q=
27+
github.com/aperturerobotics/quic-go v0.45.1-0.20240802054753-f83427ffc2c6 h1:MZNodbX53cJQ6UuSIut4FvJWstxn+8/E1TpF1ceYjy0=
28+
github.com/aperturerobotics/quic-go v0.45.1-0.20240802054753-f83427ffc2c6/go.mod h1:X095EBMI8M7riYQRvUgegHFkEkgM2QKLvyGHyAcOw/Q=
2929
github.com/aperturerobotics/starpc v0.33.6 h1:noc/MnmIMTek9bdEvd88QiD1p9KzEV8CUOBIoKmGgm0=
3030
github.com/aperturerobotics/starpc v0.33.6/go.mod h1:4IYcbulEzqhPT5jKaDeL1BJPFd8WVWZ7Ugu0/348/Is=
31-
github.com/aperturerobotics/util v1.25.3 h1:8YwVRPnRYs/s8dRRhCPWCH0zF19fk8P4zVQtaIec73o=
32-
github.com/aperturerobotics/util v1.25.3/go.mod h1:m/paprtgaTiGfc4X3LkXpeseK9hfQA7QBI3cKsE/h3Y=
31+
github.com/aperturerobotics/util v1.25.7-0.20240802075949-372f05d3a13f h1:YIqFbDLHyly/Nmwo0+EYA/W8YEkopcMHDWhih0BNajw=
32+
github.com/aperturerobotics/util v1.25.7-0.20240802075949-372f05d3a13f/go.mod h1:DdflF5HvWF01S4MNnSV9O5LR0SGajkx0wDIiXTsNtFc=
3333
github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
3434
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
3535
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=

peer/api/api_srpc.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pubsub/api/api_srpc.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pubsub/controller/controller.go

+45-52
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package pubsub_controller
22

33
import (
44
"context"
5-
"sync"
65

76
"github.com/aperturerobotics/bifrost/link"
87
"github.com/aperturerobotics/bifrost/peer"
@@ -47,9 +46,7 @@ type Controller struct {
4746
// peerCtr holds the peer
4847
peerCtr *ccontainer.CContainer[*peer.Peer]
4948

50-
// mtx guards below fields
51-
mtx sync.Mutex
52-
// bcast wakes the controller
49+
// bcast guards below fields
5350
bcast broadcast.Broadcast
5451
// cleanupRefs are the refs to cleanup
5552
cleanupRefs []directive.Reference
@@ -141,46 +138,47 @@ func (c *Controller) Execute(ctx context.Context) error {
141138
}()
142139

143140
for {
144-
c.mtx.Lock()
145-
for _, vl := range c.incLinks {
146-
tpl := pubsub.NewPeerLinkTuple(vl)
147-
if e, ok := c.links[tpl]; ok {
148-
e.ctxCancel()
149-
}
150-
tlCtx, tlCtxCancel := context.WithCancel(ctx)
151-
tl := &trackedLink{
152-
c: c,
153-
ctxCancel: tlCtxCancel,
154-
tpl: tpl,
155-
lnk: vl,
156-
le: c.le.
157-
WithField("link-uuid", vl.GetUUID()).
158-
WithField("link-remote-peer", vl.GetRemotePeer().String()),
159-
}
160-
c.links[tpl] = tl
161-
go func() {
162-
err := tl.trackLink(tlCtx)
163-
tlCtxCancel()
164-
if err != context.Canceled && err != nil {
165-
tl.le.WithError(err).Warn("link tracker returned fatal error")
141+
var waitCh <-chan struct{}
142+
c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
143+
for _, vl := range c.incLinks {
144+
tpl := pubsub.NewPeerLinkTuple(vl)
145+
if e, ok := c.links[tpl]; ok {
146+
e.ctxCancel()
166147
}
167-
c.mtx.Lock()
168-
if ol, ok := c.links[tpl]; ok && ol == tl {
169-
delete(c.links, tpl)
148+
tlCtx, tlCtxCancel := context.WithCancel(ctx)
149+
tl := &trackedLink{
150+
c: c,
151+
ctxCancel: tlCtxCancel,
152+
tpl: tpl,
153+
lnk: vl,
154+
le: c.le.
155+
WithField("link-uuid", vl.GetUUID()).
156+
WithField("link-remote-peer", vl.GetRemotePeer().String()),
170157
}
171-
c.mtx.Unlock()
172-
}()
173-
}
174-
c.incLinks = nil
175-
wake := c.bcast.GetWaitCh()
176-
c.mtx.Unlock()
158+
c.links[tpl] = tl
159+
go func() {
160+
err := tl.trackLink(tlCtx)
161+
tlCtxCancel()
162+
if err != context.Canceled && err != nil {
163+
tl.le.WithError(err).Warn("link tracker returned fatal error")
164+
}
165+
c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
166+
if ol, ok := c.links[tpl]; ok && ol == tl {
167+
delete(c.links, tpl)
168+
}
169+
})
170+
}()
171+
}
172+
c.incLinks = nil
173+
waitCh = getWaitCh()
174+
})
177175

178176
select {
179177
case <-ctx.Done():
180178
return ctx.Err()
181179
case err := <-psErr:
182180
return err
183-
case <-wake:
181+
case <-waitCh:
184182
}
185183
}
186184
}
@@ -211,7 +209,7 @@ func (c *Controller) HandleDirective(ctx context.Context, di directive.Instance)
211209
dir := di.GetDirective()
212210
switch d := dir.(type) {
213211
case link.EstablishLinkWithPeer:
214-
c.handleEstablishLink(ctx, di, d)
212+
c.handleEstablishLink(di)
215213
case link.HandleMountedStream:
216214
return c.handleMountedStream(ctx, di, d)
217215
case pubsub.BuildChannelSubscription:
@@ -224,16 +222,16 @@ func (c *Controller) HandleDirective(ctx context.Context, di directive.Instance)
224222
// Close releases any resources used by the controller.
225223
// Error indicates any issue encountered releasing.
226224
func (c *Controller) Close() error {
227-
c.mtx.Lock()
228-
for _, ref := range c.cleanupRefs {
229-
ref.Release()
230-
}
231-
c.cleanupRefs = nil
232-
for k, l := range c.links {
233-
l.ctxCancel()
234-
delete(c.links, k)
235-
}
236-
c.mtx.Unlock()
225+
c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
226+
for _, ref := range c.cleanupRefs {
227+
ref.Release()
228+
}
229+
c.cleanupRefs = nil
230+
for k, l := range c.links {
231+
l.ctxCancel()
232+
delete(c.links, k)
233+
}
234+
})
237235

238236
_ = c.pubSubCtr.SwapValue(func(val *pubsub.PubSub) *pubsub.PubSub {
239237
if val != nil {
@@ -245,10 +243,5 @@ func (c *Controller) Close() error {
245243
return nil
246244
}
247245

248-
// wake wakes the controller
249-
func (c *Controller) wake() {
250-
c.bcast.Broadcast()
251-
}
252-
253246
// _ is a type assertion
254247
var _ pubsub.Controller = ((*Controller)(nil))

pubsub/controller/establish-link.go

+28-34
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package pubsub_controller
22

33
import (
4-
"context"
5-
64
"github.com/aperturerobotics/bifrost/link"
75
"github.com/aperturerobotics/bifrost/pubsub"
86
"github.com/aperturerobotics/controllerbus/directive"
@@ -24,11 +22,7 @@ func newEstablishLinkHandler(c *Controller) *establishLinkHandler {
2422
}
2523

2624
// handleEstablishLink handles an EstablishLink directive.
27-
func (c *Controller) handleEstablishLink(
28-
ctx context.Context,
29-
di directive.Instance,
30-
d link.EstablishLinkWithPeer,
31-
) {
25+
func (c *Controller) handleEstablishLink(di directive.Instance) {
3226
handler := newEstablishLinkHandler(c)
3327
ref := di.AddReference(handler, true)
3428
if ref == nil {
@@ -48,10 +42,10 @@ func (e *establishLinkHandler) HandleValueAdded(inst directive.Instance, val dir
4842
e.c.le.Debugf("got link with uuid %v", vl.GetUUID())
4943

5044
// Attempt to open the stream.
51-
e.c.mtx.Lock()
52-
e.c.incLinks = append(e.c.incLinks, vl)
53-
e.c.mtx.Unlock()
54-
e.c.wake()
45+
e.c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
46+
e.c.incLinks = append(e.c.incLinks, vl)
47+
broadcast()
48+
})
5549
}
5650

5751
// HandleValueRemoved is called when a value is removed from the directive.
@@ -62,19 +56,19 @@ func (e *establishLinkHandler) HandleValueRemoved(inst directive.Instance, val d
6256
}
6357
e.c.le.Debugf("lost link with uuid %v", vl.GetUUID())
6458
tpl := pubsub.NewPeerLinkTuple(vl)
65-
e.c.mtx.Lock()
66-
for i, l := range e.c.incLinks {
67-
if l == vl {
68-
e.c.incLinks[i] = e.c.incLinks[len(e.c.incLinks)-1]
69-
e.c.incLinks[len(e.c.incLinks)-1] = nil
70-
e.c.incLinks = e.c.incLinks[:len(e.c.incLinks)-1]
71-
break
59+
e.c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
60+
for i, l := range e.c.incLinks {
61+
if l == vl {
62+
e.c.incLinks[i] = e.c.incLinks[len(e.c.incLinks)-1]
63+
e.c.incLinks[len(e.c.incLinks)-1] = nil
64+
e.c.incLinks = e.c.incLinks[:len(e.c.incLinks)-1]
65+
break
66+
}
7267
}
73-
}
74-
if v, ok := e.c.links[tpl]; ok {
75-
v.ctxCancel()
76-
}
77-
e.c.mtx.Unlock()
68+
if v, ok := e.c.links[tpl]; ok {
69+
v.ctxCancel()
70+
}
71+
})
7872
}
7973

8074
// HandleInstanceDisposed is called when a directive instance is disposed.
@@ -86,18 +80,18 @@ func (e *establishLinkHandler) HandleInstanceDisposed(inst directive.Instance) {
8680
}
8781
e.ref = nil
8882

89-
e.c.mtx.Lock()
90-
for i, ref := range e.c.cleanupRefs {
91-
if ref == eref {
92-
a := e.c.cleanupRefs
93-
a[i] = a[len(a)-1]
94-
a[len(a)-1] = nil
95-
a = a[:len(a)-1]
96-
e.c.cleanupRefs = a
97-
break
83+
e.c.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
84+
for i, ref := range e.c.cleanupRefs {
85+
if ref == eref {
86+
a := e.c.cleanupRefs
87+
a[i] = a[len(a)-1]
88+
a[len(a)-1] = nil
89+
a = a[:len(a)-1]
90+
e.c.cleanupRefs = a
91+
break
92+
}
9893
}
99-
}
100-
e.c.mtx.Unlock()
94+
})
10195
}
10296

10397
var _ directive.ReferenceHandler = ((*establishLinkHandler)(nil))

rpc/access/access_srpc.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/access/server.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,12 @@ func (s *AccessRpcServiceServer) LookupRpcService(
5353
}
5454
}
5555

56+
var waitCh <-chan struct{}
57+
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
58+
waitCh = getWaitCh()
59+
})
60+
5661
dir := bifrost_rpc.NewLookupRpcService(req.GetServiceId(), serverID)
57-
waitCh := bcast.GetWaitCh()
5862
vals := make(map[uint32]struct{})
5963
di, ref, err := s.b.AddDirective(dir, bus.NewCallbackHandler(
6064
func(av directive.AttachedValue) {

signaling/rpc/signaling_srpc.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

stream/api/api_srpc.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)