Skip to content

Commit 326aee1

Browse files
authored
Merge pull request #119 from D4ryl00/fix/events
fix: directChannel not working
2 parents 3a5a22b + 3301a16 commit 326aee1

File tree

7 files changed

+204
-137
lines changed

7 files changed

+204
-137
lines changed

baseorbitdb/events.go

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package baseorbitdb
2+
3+
import (
4+
"berty.tech/go-orbit-db/iface"
5+
"github.com/libp2p/go-libp2p/core/peer"
6+
)
7+
8+
type EventExchangeHeads struct {
9+
Peer peer.ID
10+
Message *iface.MessageExchangeHeads
11+
}
12+
13+
func NewEventExchangeHeads(p peer.ID, msg *iface.MessageExchangeHeads) EventExchangeHeads {
14+
return EventExchangeHeads{
15+
Peer: p,
16+
Message: msg,
17+
}
18+
}

baseorbitdb/events_handler.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package baseorbitdb
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
ipfslog "berty.tech/go-ipfs-log"
8+
"berty.tech/go-orbit-db/iface"
9+
)
10+
11+
func (o *orbitDB) handleEventExchangeHeads(ctx context.Context, e *iface.MessageExchangeHeads, store iface.Store) error {
12+
untypedHeads := make([]ipfslog.Entry, len(e.Heads))
13+
for i, h := range e.Heads {
14+
untypedHeads[i] = h
15+
}
16+
17+
o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(untypedHeads), e.Address))
18+
19+
if len(untypedHeads) > 0 {
20+
if err := store.Sync(ctx, untypedHeads); err != nil {
21+
return fmt.Errorf("unable to sync heads: %w", err)
22+
}
23+
}
24+
25+
return nil
26+
}

baseorbitdb/orbitdb.go

+138-23
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import (
1818
"berty.tech/go-orbit-db/cache/cacheleveldown"
1919
"berty.tech/go-orbit-db/iface"
2020
_ "berty.tech/go-orbit-db/internal/buildconstraints" // fail for bad go version
21-
"berty.tech/go-orbit-db/stores"
21+
"berty.tech/go-orbit-db/messagemarshaler"
22+
"berty.tech/go-orbit-db/pubsub"
23+
"berty.tech/go-orbit-db/pubsub/oneonone"
2224
"berty.tech/go-orbit-db/utils"
2325
cid "github.com/ipfs/go-cid"
2426
datastore "github.com/ipfs/go-datastore"
@@ -108,7 +110,7 @@ type orbitDB struct {
108110
cache cache.Interface
109111
logger *zap.Logger
110112
tracer trace.Tracer
111-
directChannelFactory iface.DirectChannelFactory
113+
directChannel iface.DirectChannel
112114
messageMarshaler iface.MessageMarshaler
113115

114116
// emitters
@@ -178,17 +180,36 @@ func (o *orbitDB) setStore(address string, store iface.Store) {
178180
o.stores[address] = store
179181
}
180182

181-
func (o *orbitDB) closeAllStores() {
183+
func (o *orbitDB) deleteStore(address string) {
182184
o.muStores.Lock()
183185
defer o.muStores.Unlock()
184186

187+
delete(o.stores, address)
188+
}
189+
190+
func (o *orbitDB) getStore(address string) (iface.Store, bool) {
191+
o.muStores.RLock()
192+
defer o.muStores.RUnlock()
193+
194+
store, ok := o.stores[address]
195+
196+
return store, ok
197+
}
198+
199+
func (o *orbitDB) closeAllStores() {
200+
stores := []Store{}
201+
202+
o.muStores.Lock()
185203
for _, store := range o.stores {
204+
stores = append(stores, store)
205+
}
206+
o.muStores.Unlock()
207+
208+
for _, store := range stores {
186209
if err := store.Close(); err != nil {
187210
o.logger.Error("unable to close store", zap.Error(err))
188211
}
189212
}
190-
191-
o.stores = map[string]Store{}
192213
}
193214

194215
func (o *orbitDB) closeCache() {
@@ -200,6 +221,12 @@ func (o *orbitDB) closeCache() {
200221
}
201222
}
202223

224+
func (o *orbitDB) closeDirectConnections() {
225+
if err := o.directChannel.Close(); err != nil {
226+
o.logger.Error("unable to close connection", zap.Error(err))
227+
}
228+
}
229+
203230
func (o *orbitDB) closeKeyStore() {
204231
o.muKeyStore.Lock()
205232
defer o.muKeyStore.Unlock()
@@ -314,6 +341,16 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity,
314341
options.EventBus = eventbus.NewBus()
315342
}
316343

344+
if options.DirectChannelFactory == nil {
345+
options.DirectChannelFactory = oneonone.NewChannelFactory(is)
346+
}
347+
directConnections, err := makeDirectChannel(ctx, options.EventBus, options.DirectChannelFactory, &iface.DirectChannelOptions{
348+
Logger: options.Logger,
349+
})
350+
if err != nil {
351+
return nil, fmt.Errorf("unable to create a direct connection with peer: %w", err)
352+
}
353+
317354
k, err := is.Key().Self(ctx)
318355
if err != nil {
319356
return nil, err
@@ -324,6 +361,10 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity,
324361
options.PeerID = id
325362
}
326363

364+
if options.MessageMarshaler == nil {
365+
options.MessageMarshaler = &messagemarshaler.JSONMarshaler{}
366+
}
367+
327368
if options.Cache == nil {
328369
options.Cache = cacheleveldown.New(&cache.Options{Logger: options.Logger})
329370
}
@@ -344,7 +385,7 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity,
344385
directory: *options.Directory,
345386
eventBus: options.EventBus,
346387
stores: map[string]Store{},
347-
directChannelFactory: options.DirectChannelFactory,
388+
directChannel: directConnections,
348389
closeKeystore: options.CloseKeystore,
349390
storeTypes: map[string]iface.StoreConstructor{},
350391
accessControllerTypes: map[string]iface.AccessControllerConstructor{},
@@ -354,10 +395,15 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity,
354395
}
355396

356397
// set new heads as stateful, so newly subscriber can replay last event in case they missed it
357-
odb.emitters.newHeads, err = options.EventBus.Emitter(new(stores.EventExchangeHeads), eventbus.Stateful)
398+
odb.emitters.newHeads, err = options.EventBus.Emitter(new(EventExchangeHeads), eventbus.Stateful)
358399
if err != nil {
359400
return nil, fmt.Errorf("unable to create global emitter: %w", err)
360401
}
402+
403+
if err := odb.monitorDirectChannel(ctx, options.EventBus); err != nil {
404+
return nil, fmt.Errorf("unable to monitor direct channel: %w", err)
405+
}
406+
361407
return odb, nil
362408
}
363409

@@ -430,9 +476,14 @@ func NewOrbitDB(ctx context.Context, ipfs coreapi.CoreAPI, options *NewOrbitDBOp
430476

431477
func (o *orbitDB) Close() error {
432478
o.closeAllStores()
479+
o.closeDirectConnections()
433480
o.closeCache()
434481
o.closeKeyStore()
435482

483+
if err := o.emitters.newHeads.Close(); err != nil {
484+
o.logger.Warn("unable to close emitter", zap.Error(err))
485+
}
486+
436487
o.cancel()
437488
return nil
438489
}
@@ -718,23 +769,35 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd
718769
}
719770
}
720771

772+
if options.Logger == nil {
773+
options.Logger = o.logger
774+
}
775+
776+
if options.CloseFunc == nil {
777+
options.CloseFunc = func() {}
778+
}
779+
closeFunc := func() {
780+
options.CloseFunc()
781+
o.deleteStore(parsedDBAddress.String())
782+
}
783+
721784
store, err := storeFunc(o.IPFS(), identity, parsedDBAddress, &iface.NewStoreOptions{
722-
EventBus: options.EventBus,
723-
AccessController: accessController,
724-
Cache: options.Cache,
725-
Replicate: options.Replicate,
726-
Directory: *options.Directory,
727-
SortFn: options.SortFn,
728-
CacheDestroy: func() error { return o.cache.Destroy(o.directory, parsedDBAddress) },
729-
Logger: o.logger,
730-
Tracer: o.tracer,
731-
IO: options.IO,
732-
StoreSpecificOpts: options.StoreSpecificOpts,
733-
PubSub: o.pubsub,
734-
MessageMarshaler: o.messageMarshaler,
735-
PeerID: o.id,
736-
DirectChannelFactory: o.directChannelFactory,
737-
NewHeadsEmitter: o.emitters.newHeads,
785+
EventBus: options.EventBus,
786+
AccessController: accessController,
787+
Cache: options.Cache,
788+
Replicate: options.Replicate,
789+
Directory: *options.Directory,
790+
SortFn: options.SortFn,
791+
CacheDestroy: func() error { return o.cache.Destroy(o.directory, parsedDBAddress) },
792+
Logger: options.Logger,
793+
Tracer: o.tracer,
794+
IO: options.IO,
795+
StoreSpecificOpts: options.StoreSpecificOpts,
796+
PubSub: o.pubsub,
797+
MessageMarshaler: o.messageMarshaler,
798+
PeerID: o.id,
799+
DirectChannel: o.directChannel,
800+
CloseFunc: closeFunc,
738801
})
739802
if err != nil {
740803
return nil, fmt.Errorf("unable to instantiate store: %w", err)
@@ -749,4 +812,56 @@ func (o *orbitDB) EventBus() event.Bus {
749812
return o.eventBus
750813
}
751814

815+
func (o *orbitDB) monitorDirectChannel(ctx context.Context, bus event.Bus) error {
816+
sub, err := bus.Subscribe(new(iface.EventPubSubPayload), eventbus.BufSize(128))
817+
if err != nil {
818+
return fmt.Errorf("unable to init pubsub subscriber: %w", err)
819+
}
820+
821+
go func() {
822+
for {
823+
var e interface{}
824+
select {
825+
case <-ctx.Done():
826+
return
827+
case e = <-sub.Out():
828+
}
829+
830+
evt := e.(iface.EventPubSubPayload)
831+
832+
msg := iface.MessageExchangeHeads{}
833+
if err := o.messageMarshaler.Unmarshal(evt.Payload, &msg); err != nil {
834+
o.logger.Error("unable to unmarshal message payload", zap.Error(err))
835+
continue
836+
}
837+
838+
store, ok := o.getStore(msg.Address)
839+
if !ok {
840+
o.logger.Error("unable to get store from address", zap.Error(err))
841+
continue
842+
}
843+
844+
if err := o.handleEventExchangeHeads(ctx, &msg, store); err != nil {
845+
o.logger.Error("unable to handle pubsub payload", zap.Error(err))
846+
continue
847+
}
848+
849+
if err := o.emitters.newHeads.Emit(NewEventExchangeHeads(evt.Peer, &msg)); err != nil {
850+
o.logger.Warn("unable to emit new heads", zap.Error(err))
851+
}
852+
}
853+
}()
854+
855+
return nil
856+
}
857+
858+
func makeDirectChannel(ctx context.Context, bus event.Bus, df iface.DirectChannelFactory, opts *iface.DirectChannelOptions) (iface.DirectChannel, error) {
859+
emitter, err := pubsub.NewPayloadEmitter(bus)
860+
if err != nil {
861+
return nil, fmt.Errorf("unable to init pubsub emitter: %w", err)
862+
}
863+
864+
return df(ctx, emitter, opts)
865+
}
866+
752867
var _ BaseOrbitDB = &orbitDB{}

iface/interface.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type CreateDBOptions struct {
5353
IO ipfslog.IO
5454
Timeout time.Duration
5555
MessageMarshaler MessageMarshaler
56+
Logger *zap.Logger
57+
CloseFunc func()
5658
StoreSpecificOpts interface{}
5759
}
5860

@@ -345,12 +347,12 @@ type NewStoreOptions struct {
345347
Logger *zap.Logger
346348
Tracer trace.Tracer
347349
IO ipfslog.IO
348-
StoreSpecificOpts interface{}
349350
PubSub PubSubInterface
350351
MessageMarshaler MessageMarshaler
351352
PeerID peer.ID
352-
DirectChannelFactory DirectChannelFactory
353-
NewHeadsEmitter event.Emitter
353+
DirectChannel DirectChannel
354+
CloseFunc func()
355+
StoreSpecificOpts interface{}
354356
}
355357

356358
type DirectChannelOptions struct {

pubsub/directchannel/channel.go

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func (d *directChannel) Connect(ctx context.Context, pid peer.ID) (err error) {
8282
// @NOTE(gfanton): we dont need this on direct channel
8383
// Close Closes the connection
8484
func (d *directChannel) Close() error {
85+
d.host.RemoveStreamHandler(PROTOCOL)
8586
return d.emitter.Close()
8687
}
8788

0 commit comments

Comments
 (0)