From 363be6f25dcdd8e55d9c2911548dac448758933a Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 29 Jun 2022 17:03:29 +0200 Subject: [PATCH 1/7] Tmp --- .../domain/blobs/replication/downloader.go | 5 ++ service/domain/blobs/replication/manager.go | 65 +++++++++++++++++-- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/service/domain/blobs/replication/downloader.go b/service/domain/blobs/replication/downloader.go index 29c1a977..2dd88386 100644 --- a/service/domain/blobs/replication/downloader.go +++ b/service/domain/blobs/replication/downloader.go @@ -14,8 +14,13 @@ import ( "github.com/planetary-social/go-ssb/service/domain/transport/rpc" ) +var ErrBlobNotFound = errors.New("blob not found") + type BlobStorage interface { Store(id refs.Blob, r io.Reader) error + + // Size returns the size of the blob. If the blob is not found it returns ErrBlobNotFound. + Size(id refs.Blob) (blobs.Size, error) } type BlobsGetDownloader struct { diff --git a/service/domain/blobs/replication/manager.go b/service/domain/blobs/replication/manager.go index 1998f426..9ce9d49d 100644 --- a/service/domain/blobs/replication/manager.go +++ b/service/domain/blobs/replication/manager.go @@ -2,6 +2,7 @@ package replication import ( "context" + "sync" "time" "github.com/boreq/errors" @@ -22,16 +23,18 @@ type Downloader interface { } type Manager struct { - storage WantListStorage - downloader Downloader - logger logging.Logger + storage WantListStorage + downloader Downloader + connectionStreams *connectionStreams + logger logging.Logger } func NewManager(storage WantListStorage, downloader Downloader, logger logging.Logger) *Manager { return &Manager{ - storage: storage, - downloader: downloader, - logger: logger.New("replication_manager"), + storage: storage, + downloader: downloader, + connectionStreams: newConnectionStreams(), + logger: logger.New("replication_manager"), } } @@ -43,6 +46,7 @@ func (r *Manager) HandleIncomingCreateWantsRequest(ctx context.Context) (<-chan r.logger.WithField("connectionId", connectionId).Debug("incoming create wants") ch := make(chan messages.BlobWithSizeOrWantDistance) + r.connectionStreams.AddIncoming(connectionId, newIncomingStream(ctx, ch)) go r.sendWantListPeriodically(ctx, ch) return ch, nil } @@ -117,3 +121,52 @@ func (r *Manager) sendWantListPeriodically(ctx context.Context, ch chan<- messag } } } + +type connectionStreams struct { + m map[rpc.ConnectionId]*streams + lock sync.Mutex +} + +func newConnectionStreams() *connectionStreams { + return &connectionStreams{ + m: make(map[rpc.ConnectionId]*streams), + } +} + +func (cs *connectionStreams) AddIncoming(id rpc.ConnectionId, stream incomingStream) { + cs.lock.Lock() + defer cs.lock.Unlock() + + streams, ok := cs.m[id] + if !ok { + streams = newStreams() + cs.m[id] = streams + } + + streams.AddIncoming(stream) +} + +type streams struct { + lock sync.Mutex + incoming []incomingStream +} + +func newStreams() *streams { + return &streams{} +} + +func (s *streams) AddIncoming(stream incomingStream) { + s.lock.Lock() + defer s.lock.Unlock() + + s.incoming = append(s.incoming, stream) +} + +type incomingStream struct { + ctx context.Context + ch <-chan messages.BlobWithSizeOrWantDistance +} + +func newIncomingStream(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance) incomingStream { + return incomingStream{ctx: ctx, ch: ch} +} From 7a98547bf1c2cbcb261fb393c19913d519a58a87 Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 29 Jun 2022 18:04:10 +0200 Subject: [PATCH 2/7] Concept --- cmd/ssb-test/main.go | 44 +++-- di/wire_gen.go | 2 +- service/adapters/blobs/storage.go | 4 + service/adapters/blobs/storage_test.go | 16 ++ service/domain/blobs/replication/common.go | 29 +++ .../domain/blobs/replication/downloader.go | 9 - service/domain/blobs/replication/manager.go | 170 ++++-------------- service/domain/blobs/replication/wants.go | 160 +++++++++++++++++ service/domain/messages/blobs_createWants.go | 14 ++ 9 files changed, 283 insertions(+), 165 deletions(-) create mode 100644 service/domain/blobs/replication/common.go create mode 100644 service/domain/blobs/replication/wants.go diff --git a/cmd/ssb-test/main.go b/cmd/ssb-test/main.go index caea42a3..df20249c 100644 --- a/cmd/ssb-test/main.go +++ b/cmd/ssb-test/main.go @@ -19,8 +19,6 @@ import ( "github.com/planetary-social/go-ssb/service/domain/feeds/formats" "github.com/planetary-social/go-ssb/service/domain/identity" "github.com/planetary-social/go-ssb/service/domain/invites" - "github.com/planetary-social/go-ssb/service/domain/network" - "github.com/planetary-social/go-ssb/service/domain/refs" "github.com/planetary-social/go-ssb/service/domain/transport/boxstream" "github.com/sirupsen/logrus" ) @@ -33,27 +31,27 @@ func main() { } var ( - myPatchwork = refs.MustNewIdentity("@qFtLJ6P5Eh9vKxnj7Rsh8SkE6B6Z36DVLP7ZOKNeQ/Y=.ed25519") - myPatchworkConnect = commands.Connect{ - Remote: myPatchwork.Identity(), - Address: network.NewAddress("127.0.0.1:8008"), - } - - //localGoSSB = refs.MustNewIdentity("@ln1Bdt8lEy4/F/szWlFVAIAIdCBKmzH2MNEVad8BWus=.ed25519") - //localGoSSBConnect = commands.Connect{ - // Remote: localGoSSB.Identity(), - // Address: network.NewAddress("127.0.0.1:8008"), - //} - - mainnetPub = invites.MustNewInviteFromString("one.planetary.pub:8008:@CIlwTOK+m6v1hT2zUVOCJvvZq7KE/65ErN6yA2yrURY=.ed25519~KVvak/aZeQJQUrn1imLIvwU+EVTkCzGW8TJWTmK8lOk=") - - //soapdog = refs.MustNewIdentity("@qv10rF4IsmxRZb7g5ekJ33EakYBpdrmV/vtP1ij5BS4=.ed25519") - - //pub = refs.MustNewIdentity("@CIlwTOK+m6v1hT2zUVOCJvvZq7KE/65ErN6yA2yrURY=.ed25519") - //hubConnect = commands2.Connect{ - // Remote: pub.Identity(), - // Address: network2.NewAddress("one.planetary.pub:8008"), - //} +//myPatchwork = refs.MustNewIdentity("@qFtLJ6P5Eh9vKxnj7Rsh8SkE6B6Z36DVLP7ZOKNeQ/Y=.ed25519") +//myPatchworkConnect = commands.Connect{ +// Remote: myPatchwork.Identity(), +// Address: network.NewAddress("127.0.0.1:8008"), +//} + +//localGoSSB = refs.MustNewIdentity("@ln1Bdt8lEy4/F/szWlFVAIAIdCBKmzH2MNEVad8BWus=.ed25519") +//localGoSSBConnect = commands.Connect{ +// Remote: localGoSSB.Identity(), +// Address: network.NewAddress("127.0.0.1:8008"), +//} + +//mainnetPub = invites.MustNewInviteFromString("one.planetary.pub:8008:@CIlwTOK+m6v1hT2zUVOCJvvZq7KE/65ErN6yA2yrURY=.ed25519~KVvak/aZeQJQUrn1imLIvwU+EVTkCzGW8TJWTmK8lOk=") + +//soapdog = refs.MustNewIdentity("@qv10rF4IsmxRZb7g5ekJ33EakYBpdrmV/vtP1ij5BS4=.ed25519") + +//pub = refs.MustNewIdentity("@CIlwTOK+m6v1hT2zUVOCJvvZq7KE/65ErN6yA2yrURY=.ed25519") +//hubConnect = commands2.Connect{ +// Remote: pub.Identity(), +// Address: network2.NewAddress("one.planetary.pub:8008"), +//} ) var ( diff --git a/di/wire_gen.go b/di/wire_gen.go index c7019b38..70f8989f 100644 --- a/di/wire_gen.go +++ b/di/wire_gen.go @@ -248,7 +248,7 @@ func BuildService(contextContext context.Context, private identity.Private, conf return Service{}, err } blobsGetDownloader := replication2.NewBlobsGetDownloader(filesystemStorage, logger) - replicationManager := replication2.NewManager(readWantListRepository, blobsGetDownloader, logger) + replicationManager := replication2.NewManager(readWantListRepository, filesystemStorage, blobsGetDownloader, logger) replicator := replication2.NewReplicator(replicationManager) peerManager := domain.NewPeerManager(contextContext, peerManagerConfig, gossipReplicator, replicator, dialer, logger) connectHandler := commands.NewConnectHandler(peerManager, logger) diff --git a/service/adapters/blobs/storage.go b/service/adapters/blobs/storage.go index cbe740c1..52a6e0b6 100644 --- a/service/adapters/blobs/storage.go +++ b/service/adapters/blobs/storage.go @@ -13,6 +13,7 @@ import ( "github.com/boreq/errors" "github.com/planetary-social/go-ssb/logging" "github.com/planetary-social/go-ssb/service/domain/blobs" + "github.com/planetary-social/go-ssb/service/domain/blobs/replication" "github.com/planetary-social/go-ssb/service/domain/refs" ) @@ -93,6 +94,9 @@ func (f FilesystemStorage) Size(id refs.Blob) (blobs.Size, error) { name := f.pathStorage(id) fi, err := os.Stat(name) if err != nil { + if os.IsNotExist(err) { + return blobs.Size{}, replication.ErrBlobNotFound + } return blobs.Size{}, errors.Wrap(err, "stat failed") } return blobs.NewSize(fi.Size()) diff --git a/service/adapters/blobs/storage_test.go b/service/adapters/blobs/storage_test.go index 16fe9894..2811a827 100644 --- a/service/adapters/blobs/storage_test.go +++ b/service/adapters/blobs/storage_test.go @@ -9,6 +9,7 @@ import ( "github.com/planetary-social/go-ssb/logging" "github.com/planetary-social/go-ssb/service/adapters/blobs" blobsdomain "github.com/planetary-social/go-ssb/service/domain/blobs" + blobReplication "github.com/planetary-social/go-ssb/service/domain/blobs/replication" "github.com/planetary-social/go-ssb/service/domain/refs" "github.com/stretchr/testify/require" ) @@ -25,6 +26,10 @@ func TestStorage(t *testing.T) { err = storage.Store(id, r) require.NoError(t, err) + size, err := storage.Size(id) + require.NoError(t, err) + require.Equal(t, len(data), size.InBytes()) + rc, err := storage.Get(id) require.NoError(t, err) defer rc.Close() @@ -35,6 +40,17 @@ func TestStorage(t *testing.T) { require.Equal(t, data, readData) } +func TestSizeReturnsBlobNotFound(t *testing.T) { + directory := fixtures.Directory(t) + logger := logging.NewDevNullLogger() + + storage, err := blobs.NewFilesystemStorage(directory, logger) + require.NoError(t, err) + + _, err = storage.Size(fixtures.SomeRefBlob()) + require.ErrorIs(t, err, blobReplication.ErrBlobNotFound) +} + func newFakeBlob(t *testing.T) (refs.Blob, io.Reader, []byte) { buf := &bytes.Buffer{} diff --git a/service/domain/blobs/replication/common.go b/service/domain/blobs/replication/common.go new file mode 100644 index 00000000..3c6b5ba2 --- /dev/null +++ b/service/domain/blobs/replication/common.go @@ -0,0 +1,29 @@ +package replication + +import ( + "context" + "io" + + "github.com/boreq/errors" + "github.com/planetary-social/go-ssb/service/domain/blobs" + "github.com/planetary-social/go-ssb/service/domain/refs" + "github.com/planetary-social/go-ssb/service/domain/transport" +) + +type WantListStorage interface { + GetWantList() (blobs.WantList, error) +} + +type Downloader interface { + OnHasReceived(ctx context.Context, peer transport.Peer, blob refs.Blob, size blobs.Size) +} + +var ErrBlobNotFound = errors.New("blob not found") + +type BlobStorage interface { + Store(id refs.Blob, r io.Reader) error + + // Size returns the size of the blob. If the blob is not found it returns + // ErrBlobNotFound. + Size(id refs.Blob) (blobs.Size, error) +} diff --git a/service/domain/blobs/replication/downloader.go b/service/domain/blobs/replication/downloader.go index 2dd88386..296dbfaa 100644 --- a/service/domain/blobs/replication/downloader.go +++ b/service/domain/blobs/replication/downloader.go @@ -14,15 +14,6 @@ import ( "github.com/planetary-social/go-ssb/service/domain/transport/rpc" ) -var ErrBlobNotFound = errors.New("blob not found") - -type BlobStorage interface { - Store(id refs.Blob, r io.Reader) error - - // Size returns the size of the blob. If the blob is not found it returns ErrBlobNotFound. - Size(id refs.Blob) (blobs.Size, error) -} - type BlobsGetDownloader struct { storage BlobStorage logger logging.Logger diff --git a/service/domain/blobs/replication/manager.go b/service/domain/blobs/replication/manager.go index 9ce9d49d..43441cf8 100644 --- a/service/domain/blobs/replication/manager.go +++ b/service/domain/blobs/replication/manager.go @@ -3,170 +3,76 @@ package replication import ( "context" "sync" - "time" "github.com/boreq/errors" "github.com/planetary-social/go-ssb/logging" - "github.com/planetary-social/go-ssb/service/domain/blobs" "github.com/planetary-social/go-ssb/service/domain/messages" - "github.com/planetary-social/go-ssb/service/domain/refs" "github.com/planetary-social/go-ssb/service/domain/transport" "github.com/planetary-social/go-ssb/service/domain/transport/rpc" ) -type WantListStorage interface { - GetWantList() (blobs.WantList, error) -} - -type Downloader interface { - OnHasReceived(ctx context.Context, peer transport.Peer, blob refs.Blob, size blobs.Size) -} - type Manager struct { - storage WantListStorage - downloader Downloader - connectionStreams *connectionStreams - logger logging.Logger + wantListStorage WantListStorage + blobStorage BlobStorage + downloader Downloader + logger logging.Logger + + processes map[rpc.ConnectionId]*WantsProcess + lock sync.Mutex // guards processes } -func NewManager(storage WantListStorage, downloader Downloader, logger logging.Logger) *Manager { +func NewManager( + wantListStorage WantListStorage, + blobStorage BlobStorage, + downloader Downloader, + logger logging.Logger, +) *Manager { return &Manager{ - storage: storage, - downloader: downloader, - connectionStreams: newConnectionStreams(), - logger: logger.New("replication_manager"), + wantListStorage: wantListStorage, + blobStorage: blobStorage, + downloader: downloader, + processes: make(map[rpc.ConnectionId]*WantsProcess), + logger: logger.New("replication_manager"), } } -func (r *Manager) HandleIncomingCreateWantsRequest(ctx context.Context) (<-chan messages.BlobWithSizeOrWantDistance, error) { +func (m *Manager) HandleIncomingCreateWantsRequest(ctx context.Context) (<-chan messages.BlobWithSizeOrWantDistance, error) { connectionId, ok := rpc.GetConnectionIdFromContext(ctx) if !ok { return nil, errors.New("connection id not found in context") } - r.logger.WithField("connectionId", connectionId).Debug("incoming create wants") + + m.lock.Lock() + defer m.lock.Unlock() ch := make(chan messages.BlobWithSizeOrWantDistance) - r.connectionStreams.AddIncoming(connectionId, newIncomingStream(ctx, ch)) - go r.sendWantListPeriodically(ctx, ch) + m.getOrCreateProcess(connectionId).AddIncoming(NewIncomingStream(ctx, ch)) return ch, nil } -func (r *Manager) HandleOutgoingCreateWantsRequest(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) error { +func (m *Manager) HandleOutgoingCreateWantsRequest(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) error { connectionId, ok := rpc.GetConnectionIdFromContext(ctx) if !ok { return errors.New("connection id not found in context") } - r.logger.WithField("connectionId", connectionId).Debug("outgoing create wants") - - go r.handleOutgoing(ctx, connectionId, ch, peer) - return nil -} - -func (r *Manager) handleOutgoing(ctx context.Context, id rpc.ConnectionId, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) { - for blobWithSizeOrWantDistance := range ch { - logger := r.logger.WithField("connection_id", id).WithField("blob", blobWithSizeOrWantDistance.Id().String()) - - if size, ok := blobWithSizeOrWantDistance.SizeOrWantDistance().Size(); ok { - logger.WithField("size", size.InBytes()).Debug("got size") - go r.downloader.OnHasReceived(ctx, peer, blobWithSizeOrWantDistance.Id(), size) - continue - } - - if distance, ok := blobWithSizeOrWantDistance.SizeOrWantDistance().WantDistance(); ok { - // peer wants a blob - // todo tell it that we have it if we have it - logger.WithField("distance", distance.Int()).Debug("got distance") - continue - } - - panic("logic error") - } - - // todo channel closed -} - -func (r *Manager) sendWantListPeriodically(ctx context.Context, ch chan<- messages.BlobWithSizeOrWantDistance) { - defer close(ch) - defer r.logger.Debug("terminating sending want list") - - for { - wl, err := r.storage.GetWantList() - if err != nil { - r.logger.WithError(err).Error("could not get the want list") - continue - } - - for _, v := range wl.List() { - v, err := messages.NewBlobWithWantDistance(v.Id, v.Distance) - if err != nil { - r.logger.WithError(err).Error("could not create a blob with want distance") - continue - } - - r.logger.WithField("blob", v.Id()).Debug("sending wants") - - select { - case ch <- v: - continue - case <-ctx.Done(): - return - } - } - - select { - case <-ctx.Done(): - return - case <-time.After(10 * time.Second): // todo change - continue - } - } -} -type connectionStreams struct { - m map[rpc.ConnectionId]*streams - lock sync.Mutex -} + m.lock.Lock() + defer m.lock.Unlock() -func newConnectionStreams() *connectionStreams { - return &connectionStreams{ - m: make(map[rpc.ConnectionId]*streams), - } + m.getOrCreateProcess(connectionId).AddOutgoing(ctx, ch, peer) + return nil } -func (cs *connectionStreams) AddIncoming(id rpc.ConnectionId, stream incomingStream) { - cs.lock.Lock() - defer cs.lock.Unlock() - - streams, ok := cs.m[id] +func (m *Manager) getOrCreateProcess(id rpc.ConnectionId) *WantsProcess { + v, ok := m.processes[id] if !ok { - streams = newStreams() - cs.m[id] = streams + v = NewWantsProcess( + m.wantListStorage, + m.blobStorage, + m.downloader, + m.logger.WithField("connection_id", id), + ) + m.processes[id] = v } - - streams.AddIncoming(stream) -} - -type streams struct { - lock sync.Mutex - incoming []incomingStream -} - -func newStreams() *streams { - return &streams{} -} - -func (s *streams) AddIncoming(stream incomingStream) { - s.lock.Lock() - defer s.lock.Unlock() - - s.incoming = append(s.incoming, stream) -} - -type incomingStream struct { - ctx context.Context - ch <-chan messages.BlobWithSizeOrWantDistance -} - -func newIncomingStream(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance) incomingStream { - return incomingStream{ctx: ctx, ch: ch} + return v } diff --git a/service/domain/blobs/replication/wants.go b/service/domain/blobs/replication/wants.go new file mode 100644 index 00000000..b1de3644 --- /dev/null +++ b/service/domain/blobs/replication/wants.go @@ -0,0 +1,160 @@ +package replication + +import ( + "context" + "sync" + "time" + + "github.com/boreq/errors" + "github.com/planetary-social/go-ssb/logging" + "github.com/planetary-social/go-ssb/service/domain/messages" + "github.com/planetary-social/go-ssb/service/domain/refs" + "github.com/planetary-social/go-ssb/service/domain/transport" +) + +type WantsProcess struct { + lock sync.Mutex + incoming []IncomingStream + + wantListStorage WantListStorage + blobStorage BlobStorage + downloader Downloader + logger logging.Logger +} + +func NewWantsProcess( + wantListStorage WantListStorage, + blobStorage BlobStorage, + downloader Downloader, + logger logging.Logger, +) *WantsProcess { + return &WantsProcess{ + wantListStorage: wantListStorage, + blobStorage: blobStorage, + downloader: downloader, + logger: logger.New("wants_process"), + } +} + +func (p *WantsProcess) AddIncoming(stream IncomingStream) { + p.logger.Debug("adding incoming") + + p.lock.Lock() + defer p.lock.Unlock() + + p.incoming = append(p.incoming, stream) + go p.incomingLoop(stream) +} + +func (p *WantsProcess) AddOutgoing(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) { + p.logger.Debug("adding outgoing") + go p.outgoingLoop(ctx, ch, peer) +} + +func (p *WantsProcess) incomingLoop(stream IncomingStream) { + defer close(stream.ch) + // todo cleanup? + + for { + wl, err := p.wantListStorage.GetWantList() + if err != nil { + p.logger.WithError(err).Error("could not get the want list") + continue + } + + for _, v := range wl.List() { + v, err := messages.NewBlobWithWantDistance(v.Id, v.Distance) + if err != nil { + p.logger.WithError(err).Error("could not create a blob with want distance") + continue + } + + p.logger.WithField("blob", v.Id()).Debug("sending wants") + + select { + case stream.ch <- v: + continue + case <-stream.ctx.Done(): + return + } + } + + select { + case <-stream.ctx.Done(): + return + case <-time.After(10 * time.Second): // todo change + continue + } + } +} + +func (p *WantsProcess) outgoingLoop(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) { + for hasOrWant := range ch { + logger := p.logger.WithField("blob", hasOrWant.Id().String()) + + if size, ok := hasOrWant.SizeOrWantDistance().Size(); ok { + logger.WithField("size", size.InBytes()).Debug("received has") + go p.downloader.OnHasReceived(ctx, peer, hasOrWant.Id(), size) + continue + } + + if distance, ok := hasOrWant.SizeOrWantDistance().WantDistance(); ok { + logger.WithField("distance", distance.Int()).Debug("received want") + + if err := p.onReceiveWant(hasOrWant.Id()); err != nil { + logger.WithError(err).Error("error processing a want") + } + + continue + } + + panic("logic error") + } + + // todo channel closed, cleanup? +} + +func (p *WantsProcess) onReceiveWant(id refs.Blob) error { + size, err := p.blobStorage.Size(id) + if err != nil { + if errors.Is(err, ErrBlobNotFound) { + p.logger.WithField("blob", id).Debug("we don't have this blob") + return nil + } + return errors.Wrap(err, "could not get blob size") + } + + has, err := messages.NewBlobWithSize(id, size) + if err != nil { + return errors.Wrap(err, "could not create a has") + } + + p.sendToAll(has) + return nil +} + +func (p *WantsProcess) sendToAll(wantOrHas messages.BlobWithSizeOrWantDistance) { + p.lock.Lock() + defer p.lock.Unlock() + + p.logger. + WithField("v", wantOrHas). + WithField("num_incoming", len(p.incoming)). + Debug("sending want or has") + + for _, incoming := range p.incoming { + incoming.ch <- wantOrHas // todo what if this is slow + } +} + +type IncomingStream struct { + ctx context.Context + ch chan<- messages.BlobWithSizeOrWantDistance +} + +func NewIncomingStream(ctx context.Context, ch chan<- messages.BlobWithSizeOrWantDistance) IncomingStream { + return IncomingStream{ + ctx: ctx, + ch: ch, + } +} diff --git a/service/domain/messages/blobs_createWants.go b/service/domain/messages/blobs_createWants.go index 239ff668..ee280cce 100644 --- a/service/domain/messages/blobs_createWants.go +++ b/service/domain/messages/blobs_createWants.go @@ -2,6 +2,8 @@ package messages import ( "encoding/json" + "fmt" + "strings" "github.com/boreq/errors" "github.com/planetary-social/go-ssb/service/domain/blobs" @@ -145,3 +147,15 @@ func (b BlobWithSizeOrWantDistance) Id() refs.Blob { func (b BlobWithSizeOrWantDistance) SizeOrWantDistance() blobs.SizeOrWantDistance { return b.sizeOrWantDistance } + +func (b BlobWithSizeOrWantDistance) String() string { + var s []string + s = append(s, fmt.Sprintf("id=%s", b.id.String())) + if distance, ok := b.sizeOrWantDistance.WantDistance(); ok { + s = append(s, fmt.Sprintf("distance=%d", distance.Int())) + } + if size, ok := b.sizeOrWantDistance.Size(); ok { + s = append(s, fmt.Sprintf("size=%d", size.InBytes())) + } + return fmt.Sprintf("<%s>", strings.Join(s, " ")) +} From 039913b4eee304a7020316f394b005ec7ad62977 Mon Sep 17 00:00:00 2001 From: boreq Date: Thu, 30 Jun 2022 15:46:28 +0200 Subject: [PATCH 3/7] Tests --- service/domain/blobs/replication/manager.go | 6 +- .../domain/blobs/replication/manager_test.go | 3 + service/domain/blobs/replication/wants.go | 26 ++-- .../domain/blobs/replication/wants_test.go | 111 ++++++++++++++++++ service/domain/messages/blobs_createWants.go | 16 +++ 5 files changed, 150 insertions(+), 12 deletions(-) create mode 100644 service/domain/blobs/replication/wants_test.go diff --git a/service/domain/blobs/replication/manager.go b/service/domain/blobs/replication/manager.go index 43441cf8..b1f301f3 100644 --- a/service/domain/blobs/replication/manager.go +++ b/service/domain/blobs/replication/manager.go @@ -13,7 +13,7 @@ import ( type Manager struct { wantListStorage WantListStorage - blobStorage BlobStorage + blobStorage BlobSizeRepository downloader Downloader logger logging.Logger @@ -23,7 +23,7 @@ type Manager struct { func NewManager( wantListStorage WantListStorage, - blobStorage BlobStorage, + blobStorage BlobSizeRepository, downloader Downloader, logger logging.Logger, ) *Manager { @@ -46,7 +46,7 @@ func (m *Manager) HandleIncomingCreateWantsRequest(ctx context.Context) (<-chan defer m.lock.Unlock() ch := make(chan messages.BlobWithSizeOrWantDistance) - m.getOrCreateProcess(connectionId).AddIncoming(NewIncomingStream(ctx, ch)) + m.getOrCreateProcess(connectionId).AddIncoming(ctx, ch) return ch, nil } diff --git a/service/domain/blobs/replication/manager_test.go b/service/domain/blobs/replication/manager_test.go index 1a78ace0..b49b8695 100644 --- a/service/domain/blobs/replication/manager_test.go +++ b/service/domain/blobs/replication/manager_test.go @@ -8,6 +8,7 @@ import ( "github.com/planetary-social/go-ssb/fixtures" "github.com/planetary-social/go-ssb/logging" + "github.com/planetary-social/go-ssb/service/adapters/mocks" "github.com/planetary-social/go-ssb/service/domain/blobs" "github.com/planetary-social/go-ssb/service/domain/blobs/replication" "github.com/planetary-social/go-ssb/service/domain/messages" @@ -90,11 +91,13 @@ type testManager struct { func newTestManager(t *testing.T) testManager { wantListStorage := newWantListStorageMock() + blobStorage := mocks.NewBlobStorageMock() downloader := newDownloaderMock() logger := logging.NewDevNullLogger() manager := replication.NewManager( wantListStorage, + blobStorage, downloader, logger, ) diff --git a/service/domain/blobs/replication/wants.go b/service/domain/blobs/replication/wants.go index b1de3644..a3cfa823 100644 --- a/service/domain/blobs/replication/wants.go +++ b/service/domain/blobs/replication/wants.go @@ -7,24 +7,31 @@ import ( "github.com/boreq/errors" "github.com/planetary-social/go-ssb/logging" + "github.com/planetary-social/go-ssb/service/domain/blobs" "github.com/planetary-social/go-ssb/service/domain/messages" "github.com/planetary-social/go-ssb/service/domain/refs" "github.com/planetary-social/go-ssb/service/domain/transport" ) +type BlobSizeRepository interface { + // Size returns the size of the blob. If the blob is not found it returns + // ErrBlobNotFound. + Size(id refs.Blob) (blobs.Size, error) +} + type WantsProcess struct { lock sync.Mutex - incoming []IncomingStream + incoming []incomingStream wantListStorage WantListStorage - blobStorage BlobStorage + blobStorage BlobSizeRepository downloader Downloader logger logging.Logger } func NewWantsProcess( wantListStorage WantListStorage, - blobStorage BlobStorage, + blobStorage BlobSizeRepository, downloader Downloader, logger logging.Logger, ) *WantsProcess { @@ -36,12 +43,13 @@ func NewWantsProcess( } } -func (p *WantsProcess) AddIncoming(stream IncomingStream) { +func (p *WantsProcess) AddIncoming(ctx context.Context, ch chan<- messages.BlobWithSizeOrWantDistance) { p.logger.Debug("adding incoming") p.lock.Lock() defer p.lock.Unlock() + stream := newIncomingStream(ctx, ch) p.incoming = append(p.incoming, stream) go p.incomingLoop(stream) } @@ -51,7 +59,7 @@ func (p *WantsProcess) AddOutgoing(ctx context.Context, ch <-chan messages.BlobW go p.outgoingLoop(ctx, ch, peer) } -func (p *WantsProcess) incomingLoop(stream IncomingStream) { +func (p *WantsProcess) incomingLoop(stream incomingStream) { defer close(stream.ch) // todo cleanup? @@ -94,6 +102,7 @@ func (p *WantsProcess) outgoingLoop(ctx context.Context, ch <-chan messages.Blob if size, ok := hasOrWant.SizeOrWantDistance().Size(); ok { logger.WithField("size", size.InBytes()).Debug("received has") + go p.downloader.OnHasReceived(ctx, peer, hasOrWant.Id(), size) continue } @@ -104,7 +113,6 @@ func (p *WantsProcess) outgoingLoop(ctx context.Context, ch <-chan messages.Blob if err := p.onReceiveWant(hasOrWant.Id()); err != nil { logger.WithError(err).Error("error processing a want") } - continue } @@ -147,13 +155,13 @@ func (p *WantsProcess) sendToAll(wantOrHas messages.BlobWithSizeOrWantDistance) } } -type IncomingStream struct { +type incomingStream struct { ctx context.Context ch chan<- messages.BlobWithSizeOrWantDistance } -func NewIncomingStream(ctx context.Context, ch chan<- messages.BlobWithSizeOrWantDistance) IncomingStream { - return IncomingStream{ +func newIncomingStream(ctx context.Context, ch chan<- messages.BlobWithSizeOrWantDistance) incomingStream { + return incomingStream{ ctx: ctx, ch: ch, } diff --git a/service/domain/blobs/replication/wants_test.go b/service/domain/blobs/replication/wants_test.go new file mode 100644 index 00000000..9cdf48b3 --- /dev/null +++ b/service/domain/blobs/replication/wants_test.go @@ -0,0 +1,111 @@ +package replication_test + +import ( + "context" + "testing" + "time" + + "github.com/planetary-social/go-ssb/fixtures" + "github.com/planetary-social/go-ssb/logging" + "github.com/planetary-social/go-ssb/service/adapters/mocks" + "github.com/planetary-social/go-ssb/service/domain/blobs" + "github.com/planetary-social/go-ssb/service/domain/blobs/replication" + "github.com/planetary-social/go-ssb/service/domain/messages" + "github.com/planetary-social/go-ssb/service/domain/transport" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRepliesToWantsWithHas(t *testing.T) { + p := newTestWantsProcess() + + ctx := fixtures.TestContext(t) + outgoingCh := make(chan messages.BlobWithSizeOrWantDistance) + peer := transport.NewPeer(fixtures.SomePublicIdentity(), nil) + p.WantsProcess.AddOutgoing(ctx, outgoingCh, peer) + + incomingCh := make(chan messages.BlobWithSizeOrWantDistance) + p.WantsProcess.AddIncoming(ctx, incomingCh) + + blobId := fixtures.SomeRefBlob() + p.BlobStorage.MockBlob(blobId, fixtures.SomeBytes()) + + go func() { + select { + case outgoingCh <- messages.MustNewBlobWithWantDistance(blobId, blobs.NewWantDistanceLocal()): + return + case <-ctx.Done(): + t.Fatal("context done") + } + }() + + select { + case sent := <-incomingCh: + require.Equal(t, blobId, sent.Id()) + case <-time.After(1 * time.Second): + t.Fatal("timeout") + } +} + +func TestPassesHasToDownloader(t *testing.T) { + p := newTestWantsProcess() + + ctx, cancel := context.WithTimeout(fixtures.TestContext(t), 5*time.Second) + defer cancel() + + outgoingCh := make(chan messages.BlobWithSizeOrWantDistance) + peer := transport.NewPeer(fixtures.SomePublicIdentity(), nil) + p.WantsProcess.AddOutgoing(ctx, outgoingCh, peer) + + blobId := fixtures.SomeRefBlob() + blobSize := fixtures.SomeSize() + + select { + case outgoingCh <- messages.MustNewBlobWithSize(blobId, blobSize): + case <-ctx.Done(): + t.Fatal("context done") + } + + require.Eventually(t, + func() bool { + return assert.ObjectsAreEqual( + []onHasReceivedCall{ + { + Peer: peer, + Blob: blobId, + Size: blobSize, + }, + }, + p.Downloader.OnHasReceivedCalls(), + ) + }, + 1*time.Second, + 10*time.Millisecond, + ) +} + +type TestWantsProcess struct { + WantsProcess *replication.WantsProcess + BlobStorage *mocks.BlobStorageMock + Downloader *downloaderMock +} + +func newTestWantsProcess() TestWantsProcess { + wantListStorage := newWantListStorageMock() + downloader := newDownloaderMock() + blobStorage := mocks.NewBlobStorageMock() + logger := logging.NewDevNullLogger() + + process := replication.NewWantsProcess( + wantListStorage, + blobStorage, + downloader, + logger, + ) + + return TestWantsProcess{ + WantsProcess: process, + BlobStorage: blobStorage, + Downloader: downloader, + } +} diff --git a/service/domain/messages/blobs_createWants.go b/service/domain/messages/blobs_createWants.go index ee280cce..98bf968f 100644 --- a/service/domain/messages/blobs_createWants.go +++ b/service/domain/messages/blobs_createWants.go @@ -131,6 +131,14 @@ func NewBlobWithWantDistance(id refs.Blob, wantDistance blobs.WantDistance) (Blo return NewBlobWithSizeOrWantDistance(id, v) } +func MustNewBlobWithWantDistance(id refs.Blob, wantDistance blobs.WantDistance) BlobWithSizeOrWantDistance { + v, err := NewBlobWithWantDistance(id, wantDistance) + if err != nil { + panic(err) + } + return v +} + func NewBlobWithSize(id refs.Blob, size blobs.Size) (BlobWithSizeOrWantDistance, error) { v, err := blobs.NewSizeOrWantDistanceContainingSize(size) if err != nil { @@ -140,6 +148,14 @@ func NewBlobWithSize(id refs.Blob, size blobs.Size) (BlobWithSizeOrWantDistance, return NewBlobWithSizeOrWantDistance(id, v) } +func MustNewBlobWithSize(id refs.Blob, size blobs.Size) BlobWithSizeOrWantDistance { + v, err := NewBlobWithSize(id, size) + if err != nil { + panic(err) + } + return v +} + func (b BlobWithSizeOrWantDistance) Id() refs.Blob { return b.id } From a3cf15ccd54366ef78a122221f8072129998cbbd Mon Sep 17 00:00:00 2001 From: boreq Date: Thu, 30 Jun 2022 15:48:35 +0200 Subject: [PATCH 4/7] Pipeline --- di/inject_adapters.go | 1 + service/adapters/blobs/storage_test.go | 2 +- service/domain/blobs/replication/wants_test.go | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/di/inject_adapters.go b/di/inject_adapters.go index 573925ff..9cf46cd5 100644 --- a/di/inject_adapters.go +++ b/di/inject_adapters.go @@ -74,6 +74,7 @@ var blobsAdaptersSet = wire.NewSet( newFilesystemStorage, wire.Bind(new(blobReplication.BlobStorage), new(*blobs.FilesystemStorage)), wire.Bind(new(queries.BlobStorage), new(*blobs.FilesystemStorage)), + wire.Bind(new(blobReplication.BlobSizeRepository), new(*blobs.FilesystemStorage)), ) func newFilesystemStorage(logger logging.Logger, config Config) (*blobs.FilesystemStorage, error) { diff --git a/service/adapters/blobs/storage_test.go b/service/adapters/blobs/storage_test.go index 2811a827..814f8489 100644 --- a/service/adapters/blobs/storage_test.go +++ b/service/adapters/blobs/storage_test.go @@ -28,7 +28,7 @@ func TestStorage(t *testing.T) { size, err := storage.Size(id) require.NoError(t, err) - require.Equal(t, len(data), size.InBytes()) + require.EqualValues(t, len(data), size.InBytes()) rc, err := storage.Get(id) require.NoError(t, err) diff --git a/service/domain/blobs/replication/wants_test.go b/service/domain/blobs/replication/wants_test.go index 9cdf48b3..092b95ad 100644 --- a/service/domain/blobs/replication/wants_test.go +++ b/service/domain/blobs/replication/wants_test.go @@ -35,7 +35,7 @@ func TestRepliesToWantsWithHas(t *testing.T) { case outgoingCh <- messages.MustNewBlobWithWantDistance(blobId, blobs.NewWantDistanceLocal()): return case <-ctx.Done(): - t.Fatal("context done") + panic("context done") } }() From 163860dbaa8680b1cde8ec7eb53d59158e7fa363 Mon Sep 17 00:00:00 2001 From: boreq Date: Thu, 30 Jun 2022 16:02:49 +0200 Subject: [PATCH 5/7] Respond to wants if incoming channel connects later --- service/domain/blobs/replication/wants.go | 58 ++++++++++++++++--- .../domain/blobs/replication/wants_test.go | 28 +++++++++ 2 files changed, 77 insertions(+), 9 deletions(-) diff --git a/service/domain/blobs/replication/wants.go b/service/domain/blobs/replication/wants.go index a3cfa823..cbec1c84 100644 --- a/service/domain/blobs/replication/wants.go +++ b/service/domain/blobs/replication/wants.go @@ -20,8 +20,11 @@ type BlobSizeRepository interface { } type WantsProcess struct { - lock sync.Mutex - incoming []incomingStream + incomingLock sync.Mutex + incoming []incomingStream + + remoteWantsLock sync.Mutex + remoteWants map[string]struct{} wantListStorage WantListStorage blobStorage BlobSizeRepository @@ -36,6 +39,8 @@ func NewWantsProcess( logger logging.Logger, ) *WantsProcess { return &WantsProcess{ + remoteWants: make(map[string]struct{}), + wantListStorage: wantListStorage, blobStorage: blobStorage, downloader: downloader, @@ -46,8 +51,8 @@ func NewWantsProcess( func (p *WantsProcess) AddIncoming(ctx context.Context, ch chan<- messages.BlobWithSizeOrWantDistance) { p.logger.Debug("adding incoming") - p.lock.Lock() - defer p.lock.Unlock() + p.incomingLock.Lock() + defer p.incomingLock.Unlock() stream := newIncomingStream(ctx, ch) p.incoming = append(p.incoming, stream) @@ -63,6 +68,10 @@ func (p *WantsProcess) incomingLoop(stream incomingStream) { defer close(stream.ch) // todo cleanup? + if err := p.respondToPreviousWants(); err != nil { + p.logger.WithError(err).Error("failed to respond to previous wants") + } + for { wl, err := p.wantListStorage.GetWantList() if err != nil { @@ -123,6 +132,33 @@ func (p *WantsProcess) outgoingLoop(ctx context.Context, ch <-chan messages.Blob } func (p *WantsProcess) onReceiveWant(id refs.Blob) error { + p.addRemoteWants(id) + if err := p.respondToWant(id); err != nil { + return errors.Wrap(err, "could not respond to want") + } + return nil +} + +func (p *WantsProcess) addRemoteWants(id refs.Blob) { + p.remoteWantsLock.Lock() + defer p.remoteWantsLock.Unlock() + p.remoteWants[id.String()] = struct{}{} +} + +func (p *WantsProcess) respondToPreviousWants() error { + p.remoteWantsLock.Lock() + defer p.remoteWantsLock.Unlock() + + for refString := range p.remoteWants { + if err := p.respondToWant(refs.MustNewBlob(refString)); err != nil { + return errors.Wrap(err, "failed to respond to a want") + } + } + + return nil +} + +func (p *WantsProcess) respondToWant(id refs.Blob) error { size, err := p.blobStorage.Size(id) if err != nil { if errors.Is(err, ErrBlobNotFound) { @@ -137,13 +173,13 @@ func (p *WantsProcess) onReceiveWant(id refs.Blob) error { return errors.Wrap(err, "could not create a has") } - p.sendToAll(has) + p.sendToAllIncomingStreams(has) return nil } -func (p *WantsProcess) sendToAll(wantOrHas messages.BlobWithSizeOrWantDistance) { - p.lock.Lock() - defer p.lock.Unlock() +func (p *WantsProcess) sendToAllIncomingStreams(wantOrHas messages.BlobWithSizeOrWantDistance) { + p.incomingLock.Lock() + defer p.incomingLock.Unlock() p.logger. WithField("v", wantOrHas). @@ -151,7 +187,11 @@ func (p *WantsProcess) sendToAll(wantOrHas messages.BlobWithSizeOrWantDistance) Debug("sending want or has") for _, incoming := range p.incoming { - incoming.ch <- wantOrHas // todo what if this is slow + select { + case incoming.ch <- wantOrHas: // todo what if this is slow + case <-incoming.ctx.Done(): + continue + } } } diff --git a/service/domain/blobs/replication/wants_test.go b/service/domain/blobs/replication/wants_test.go index 092b95ad..2063539f 100644 --- a/service/domain/blobs/replication/wants_test.go +++ b/service/domain/blobs/replication/wants_test.go @@ -47,6 +47,34 @@ func TestRepliesToWantsWithHas(t *testing.T) { } } +func TestRepliesToWantsWithHasIfIncomingChannelConnectedLater(t *testing.T) { + p := newTestWantsProcess() + + ctx := fixtures.TestContext(t) + outgoingCh := make(chan messages.BlobWithSizeOrWantDistance) + peer := transport.NewPeer(fixtures.SomePublicIdentity(), nil) + p.WantsProcess.AddOutgoing(ctx, outgoingCh, peer) + + blobId := fixtures.SomeRefBlob() + p.BlobStorage.MockBlob(blobId, fixtures.SomeBytes()) + + select { + case outgoingCh <- messages.MustNewBlobWithWantDistance(blobId, blobs.NewWantDistanceLocal()): + case <-ctx.Done(): + panic("context done") + } + + incomingCh := make(chan messages.BlobWithSizeOrWantDistance) + p.WantsProcess.AddIncoming(ctx, incomingCh) + + select { + case sent := <-incomingCh: + require.Equal(t, blobId, sent.Id()) + case <-time.After(1 * time.Second): + t.Fatal("timeout") + } +} + func TestPassesHasToDownloader(t *testing.T) { p := newTestWantsProcess() From 593a870b947864d68e7ed7b47e570243f04f7ffe Mon Sep 17 00:00:00 2001 From: boreq Date: Thu, 30 Jun 2022 16:32:29 +0200 Subject: [PATCH 6/7] Cleanup incoming processes --- service/domain/blobs/replication/manager.go | 1 + service/domain/blobs/replication/wants.go | 39 +++++++++++++++------ 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/service/domain/blobs/replication/manager.go b/service/domain/blobs/replication/manager.go index b1f301f3..c5355121 100644 --- a/service/domain/blobs/replication/manager.go +++ b/service/domain/blobs/replication/manager.go @@ -17,6 +17,7 @@ type Manager struct { downloader Downloader logger logging.Logger + // todo cleanup processes processes map[rpc.ConnectionId]*WantsProcess lock sync.Mutex // guards processes } diff --git a/service/domain/blobs/replication/wants.go b/service/domain/blobs/replication/wants.go index cbec1c84..35e02276 100644 --- a/service/domain/blobs/replication/wants.go +++ b/service/domain/blobs/replication/wants.go @@ -56,7 +56,16 @@ func (p *WantsProcess) AddIncoming(ctx context.Context, ch chan<- messages.BlobW stream := newIncomingStream(ctx, ch) p.incoming = append(p.incoming, stream) - go p.incomingLoop(stream) + + go func() { + defer close(ch) + defer func() { + if err := p.cleanupIncomingStream(stream); err != nil { + p.logger.WithError(err).Error("could not clean up a stream") + } + }() + p.incomingLoop(stream.ctx, stream.ch) + }() } func (p *WantsProcess) AddOutgoing(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) { @@ -64,10 +73,7 @@ func (p *WantsProcess) AddOutgoing(ctx context.Context, ch <-chan messages.BlobW go p.outgoingLoop(ctx, ch, peer) } -func (p *WantsProcess) incomingLoop(stream incomingStream) { - defer close(stream.ch) - // todo cleanup? - +func (p *WantsProcess) incomingLoop(ctx context.Context, ch chan<- messages.BlobWithSizeOrWantDistance) { if err := p.respondToPreviousWants(); err != nil { p.logger.WithError(err).Error("failed to respond to previous wants") } @@ -89,20 +95,33 @@ func (p *WantsProcess) incomingLoop(stream incomingStream) { p.logger.WithField("blob", v.Id()).Debug("sending wants") select { - case stream.ch <- v: + case ch <- v: continue - case <-stream.ctx.Done(): + case <-ctx.Done(): return } } select { - case <-stream.ctx.Done(): - return case <-time.After(10 * time.Second): // todo change continue + case <-ctx.Done(): + return + } + } +} + +func (p *WantsProcess) cleanupIncomingStream(stream incomingStream) error { + p.incomingLock.Lock() + defer p.incomingLock.Unlock() + + for i := range p.incoming { + if p.incoming[i] == stream { + p.incoming = append(p.incoming[:i], p.incoming[i+1:]...) + return nil } } + return errors.New("incoming stream not found") } func (p *WantsProcess) outgoingLoop(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) { @@ -127,8 +146,6 @@ func (p *WantsProcess) outgoingLoop(ctx context.Context, ch <-chan messages.Blob panic("logic error") } - - // todo channel closed, cleanup? } func (p *WantsProcess) onReceiveWant(id refs.Blob) error { From be29d6a620a82d4884145f3817d9a3cb5589c45f Mon Sep 17 00:00:00 2001 From: boreq Date: Thu, 30 Jun 2022 16:42:06 +0200 Subject: [PATCH 7/7] Cleanup interfaces --- service/domain/blobs/replication/common.go | 2 ++ service/domain/blobs/replication/wants.go | 7 ------- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/service/domain/blobs/replication/common.go b/service/domain/blobs/replication/common.go index 3c6b5ba2..a20113f0 100644 --- a/service/domain/blobs/replication/common.go +++ b/service/domain/blobs/replication/common.go @@ -22,7 +22,9 @@ var ErrBlobNotFound = errors.New("blob not found") type BlobStorage interface { Store(id refs.Blob, r io.Reader) error +} +type BlobSizeRepository interface { // Size returns the size of the blob. If the blob is not found it returns // ErrBlobNotFound. Size(id refs.Blob) (blobs.Size, error) diff --git a/service/domain/blobs/replication/wants.go b/service/domain/blobs/replication/wants.go index 35e02276..04e411dc 100644 --- a/service/domain/blobs/replication/wants.go +++ b/service/domain/blobs/replication/wants.go @@ -7,18 +7,11 @@ import ( "github.com/boreq/errors" "github.com/planetary-social/go-ssb/logging" - "github.com/planetary-social/go-ssb/service/domain/blobs" "github.com/planetary-social/go-ssb/service/domain/messages" "github.com/planetary-social/go-ssb/service/domain/refs" "github.com/planetary-social/go-ssb/service/domain/transport" ) -type BlobSizeRepository interface { - // Size returns the size of the blob. If the blob is not found it returns - // ErrBlobNotFound. - Size(id refs.Blob) (blobs.Size, error) -} - type WantsProcess struct { incomingLock sync.Mutex incoming []incomingStream