Skip to content

Commit b99ebe6

Browse files
authored
Merge pull request #104 from gfanton/fix/new-peer-event
2 parents 3130869 + 1077e23 commit b99ebe6

File tree

1 file changed

+13
-10
lines changed

1 file changed

+13
-10
lines changed

baseorbitdb/orbitdb.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ type orbitDB struct {
117117

118118
// emitters
119119
emitters struct {
120-
newPeer event.Emitter
121120
newHeads event.Emitter
122121
}
123122

@@ -395,11 +394,6 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity,
395394
messageMarshaler: options.MessageMarshaler,
396395
}
397396

398-
odb.emitters.newPeer, err = eventBus.Emitter(new(stores.EventNewPeer))
399-
if err != nil {
400-
return nil, errors.Wrap(err, "unable to create global emitter")
401-
}
402-
403397
// set new heads as stateful, so newly subscriber can replay last event in case they missed it
404398
odb.emitters.newHeads, err = eventBus.Emitter(new(EventExchangeHeads), eventbus.Stateful)
405399
if err != nil {
@@ -850,10 +844,23 @@ func (o *orbitDB) pubSubChanListener(ctx context.Context, store Store, topic ifa
850844
return err
851845
}
852846

847+
newPeerEmitter, err := store.EventBus().Emitter(new(stores.EventNewPeer))
848+
if err != nil {
849+
return fmt.Errorf("unable to init emitter: %w", err)
850+
}
851+
853852
go func() {
853+
defer newPeerEmitter.Close()
854+
854855
for e := range chPeers {
855856
switch evt := e.(type) {
856857
case *iface.EventPubSubJoin:
858+
// notify store that we have a new peers
859+
if err := newPeerEmitter.Emit(stores.NewEventNewPeer(evt.Peer)); err != nil {
860+
o.logger.Error("unable to emit event new peer", zap.Error(err))
861+
}
862+
863+
// handle new peers
857864
go o.onNewPeerJoined(ctx, evt.Peer, store)
858865
o.logger.Debug(fmt.Sprintf("peer %s joined from %s self is %s", evt.Peer.String(), addr, o.PeerID()))
859866

@@ -912,10 +919,6 @@ func (o *orbitDB) onNewPeerJoined(ctx context.Context, p peer.ID, store Store) {
912919
}
913920
return
914921
}
915-
916-
if err := o.emitters.newPeer.Emit(stores.NewEventNewPeer(p)); err != nil {
917-
o.logger.Error("unable emit NewPeer event", zap.Error(err))
918-
}
919922
}
920923

921924
func (o *orbitDB) exchangeHeads(ctx context.Context, p peer.ID, store Store) error {

0 commit comments

Comments
 (0)