Skip to content

Commit 3a5a22b

Browse files
authoredNov 17, 2022
Merge pull request #114 from berty/d4ryl00/fix/lifecycle
fix: close of stores
2 parents b503985 + 4f24a13 commit 3a5a22b

File tree

16 files changed

+531
-457
lines changed

16 files changed

+531
-457
lines changed
 

‎.github/workflows/go.yml

+17-12
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,24 @@ jobs:
1616
golang:
1717
- 1.18.x
1818
steps:
19-
- uses: actions/checkout@v3
20-
- name: Set up Go
21-
uses: actions/setup-go@v2
22-
with:
23-
go-version: ${{ matrix.golang }}
24-
- name: golangci-lint
25-
uses: golangci/golangci-lint-action@v3.3.0
26-
with:
27-
go-version: ${{ matrix.golang }}
28-
version: v1.50.1
29-
args: --timeout=10m
30-
# only-new-issues: true
19+
- name: Checkout
20+
uses: actions/checkout@v3
21+
22+
- name: Setup asdf
23+
uses: asdf-vm/actions/setup@v1
24+
25+
- name: Setup golang
26+
run: |
27+
asdf plugin add golang
28+
asdf install golang
29+
30+
- name: Setup golangci-lint
31+
run: |
32+
asdf plugin add golangci-lint
33+
asdf install golangci-lint
3134
35+
- name: Run golangci-lint
36+
run: make lint
3237

3338
go-tests-on-linux:
3439
runs-on: ubuntu-latest

‎.tool-versions

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
golang 1.18
2+
golangci-lint 1.50.1

‎baseorbitdb/events.go

-18
This file was deleted.

‎baseorbitdb/events_handler.go

-72
This file was deleted.

‎baseorbitdb/orbitdb.go

+43-320
Large diffs are not rendered by default.

‎iface/interface.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,11 @@ type NewStoreOptions struct {
346346
Tracer trace.Tracer
347347
IO ipfslog.IO
348348
StoreSpecificOpts interface{}
349+
PubSub PubSubInterface
350+
MessageMarshaler MessageMarshaler
351+
PeerID peer.ID
352+
DirectChannelFactory DirectChannelFactory
353+
NewHeadsEmitter event.Emitter
349354
}
350355

351356
type DirectChannelOptions struct {
@@ -371,7 +376,7 @@ type DirectChannelEmitter interface {
371376
type DirectChannelFactory func(ctx context.Context, emitter DirectChannelEmitter, opts *DirectChannelOptions) (DirectChannel, error)
372377

373378
// StoreConstructor Defines the expected constructor for a custom store
374-
type StoreConstructor func(context.Context, coreapi.CoreAPI, *identityprovider.Identity, address.Address, *NewStoreOptions) (Store, error)
379+
type StoreConstructor func(coreapi.CoreAPI, *identityprovider.Identity, address.Address, *NewStoreOptions) (Store, error)
375380

376381
// IndexConstructor Defines the expected constructor for a custom index
377382
type IndexConstructor func(publicKey []byte) StoreIndex

‎messagemarshaler/doc.go

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package messagemarshaler is a package for marshal messages
2+
package messagemarshaler // import "berty.tech/go-orbit-db/messagemarshaler"

‎messagemarshaler/json_marshaler.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package messagemarshaler
2+
3+
import (
4+
"encoding/json"
5+
6+
"berty.tech/go-orbit-db/iface"
7+
)
8+
9+
type JSONMarshaler struct{}
10+
11+
func (JSONMarshaler) Marshal(m *iface.MessageExchangeHeads) ([]byte, error) {
12+
return json.Marshal(m)
13+
}
14+
15+
func (JSONMarshaler) Unmarshal(data []byte, m *iface.MessageExchangeHeads) error {
16+
return json.Unmarshal(data, m)
17+
}

‎pubsub/oneonone/channel.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ const (
2323
)
2424

2525
type channel struct {
26-
id string
27-
sub coreapi.PubSubSubscription
26+
ctx context.Context
27+
cancel context.CancelFunc
28+
id string
29+
sub coreapi.PubSubSubscription
2830
}
2931

3032
type channels struct {
@@ -52,12 +54,16 @@ func (c *channels) Connect(ctx context.Context, target peer.ID) error {
5254
return fmt.Errorf("unable to subscribe to pubsub: %w", err)
5355
}
5456

57+
ctx, cancel := context.WithCancel(ctx)
58+
5559
c.subs[target] = &channel{
56-
sub: sub,
57-
id: id,
60+
ctx: ctx,
61+
cancel: cancel,
62+
sub: sub,
63+
id: id,
5864
}
5965
go func() {
60-
c.monitorTopic(sub, target)
66+
c.monitorTopic(ctx, sub, target)
6167

6268
// if monitor topic is done, remove target from cache
6369
c.muSubs.Lock()
@@ -127,9 +133,9 @@ func (c *channels) getChannelID(p peer.ID) string {
127133
return fmt.Sprintf("/%s/%s", PROTOCOL, strings.Join(channelIDPeers, "/"))
128134
}
129135

130-
func (c *channels) monitorTopic(sub coreapi.PubSubSubscription, p peer.ID) {
136+
func (c *channels) monitorTopic(ctx context.Context, sub coreapi.PubSubSubscription, p peer.ID) {
131137
for {
132-
msg, err := sub.Next(c.ctx)
138+
msg, err := sub.Next(ctx)
133139
switch err {
134140
case nil:
135141
case context.Canceled, context.DeadlineExceeded:

‎pubsub/pubsubraw/pubsub.go

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func (p *psTopic) WatchPeers(ctx context.Context) (<-chan events.Event, error) {
3636

3737
ch := make(chan events.Event, 32)
3838
go func() {
39+
defer ph.Cancel()
3940
defer close(ch)
4041
for {
4142
evt, err := ph.NextPeerEvent(ctx)
@@ -70,6 +71,7 @@ func (p *psTopic) WatchMessages(ctx context.Context) (<-chan *iface.EventPubSubM
7071
ch := make(chan *iface.EventPubSubMessage, 32)
7172
go func() {
7273
defer close(ch)
74+
defer sub.Cancel()
7375
for {
7476
msg, err := sub.Next(ctx)
7577
if err != nil {

‎stores/basestore/base_store.go

+404-17
Large diffs are not rendered by default.

‎stores/documentstore/document.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func DefaultStoreOptsForMap(keyField string) *iface.CreateDocumentDBOptions {
242242
}
243243

244244
// NewOrbitDBDocumentStore Instantiates a new DocumentStore
245-
func NewOrbitDBDocumentStore(ctx context.Context, ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) (iface.Store, error) {
245+
func NewOrbitDBDocumentStore(ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) (iface.Store, error) {
246246
if options.StoreSpecificOpts == nil {
247247
options.StoreSpecificOpts = DefaultStoreOptsForMap("_id")
248248
}
@@ -271,7 +271,7 @@ func NewOrbitDBDocumentStore(ctx context.Context, ipfs coreapi.CoreAPI, identity
271271
store := &orbitDBDocumentStore{docOpts: docOpts}
272272
options.Index = func(_ []byte) iface.StoreIndex { return newDocumentIndex(docOpts) }
273273

274-
err := store.InitBaseStore(ctx, ipfs, identity, addr, options)
274+
err := store.InitBaseStore(ipfs, identity, addr, options)
275275
if err != nil {
276276
return nil, fmt.Errorf("unable to initialize document store: %w", err)
277277
}

‎stores/eventlogstore/log.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,12 @@ func (o *orbitDBEventLogStore) Type() string {
201201
}
202202

203203
// NewOrbitDBEventLogStore Instantiates a new EventLogStore
204-
func NewOrbitDBEventLogStore(ctx context.Context, ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) (i iface.Store, e error) {
204+
func NewOrbitDBEventLogStore(ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) (i iface.Store, e error) {
205+
205206
store := &orbitDBEventLogStore{}
206207
options.Index = NewEventIndex
207208

208-
err := store.InitBaseStore(ctx, ipfs, identity, addr, options)
209+
err := store.InitBaseStore(ipfs, identity, addr, options)
209210
if err != nil {
210211
return nil, fmt.Errorf("unable to initialize base store: %w", err)
211212
}

‎stores/events.go

+15
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package stores
33
import (
44
ipfslog "berty.tech/go-ipfs-log"
55
"berty.tech/go-orbit-db/address"
6+
"berty.tech/go-orbit-db/iface"
67
"berty.tech/go-orbit-db/stores/replicator"
78
cid "github.com/ipfs/go-cid"
89
peer "github.com/libp2p/go-libp2p/core/peer"
@@ -15,6 +16,7 @@ var Events = []interface{}{
1516
new(EventLoad),
1617
new(EventReplicated),
1718
new(EventReplicate),
19+
new(EventExchangeHeads),
1820
}
1921

2022
type EventReplicate struct {
@@ -137,3 +139,16 @@ func NewEventNewPeer(p peer.ID) EventNewPeer {
137139
Peer: p,
138140
}
139141
}
142+
143+
// EventExchangeHeadsset An event as stateful, sent when new exchange head is done, so newly subscriber can replay last event in case they missed it
144+
type EventExchangeHeads struct {
145+
Peer peer.ID
146+
Message *iface.MessageExchangeHeads
147+
}
148+
149+
func NewEventExchangeHeads(p peer.ID, msg *iface.MessageExchangeHeads) EventExchangeHeads {
150+
return EventExchangeHeads{
151+
Peer: p,
152+
Message: msg,
153+
}
154+
}

‎stores/kvstore/keyvalue.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ func (o *orbitDBKeyValue) Type() string {
8585
}
8686

8787
// NewOrbitDBKeyValue Instantiates a new KeyValueStore
88-
func NewOrbitDBKeyValue(ctx context.Context, ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) (i iface.Store, e error) {
88+
func NewOrbitDBKeyValue(ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) (i iface.Store, e error) {
8989
store := &orbitDBKeyValue{}
9090

9191
options.Index = NewKVIndex
9292

93-
err := store.InitBaseStore(ctx, ipfs, identity, addr, options)
93+
err := store.InitBaseStore(ipfs, identity, addr, options)
9494
if err != nil {
9595
return nil, fmt.Errorf("unable to initialize base store: %w", err)
9696
}

‎tests/replicate_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -384,11 +384,11 @@ func testDirectChannelNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (or
384384

385385
orbitdb1, err := orbitdb.NewOrbitDB(ctx, ipfs1, &orbitdb.NewOrbitDBOptions{
386386
Directory: &dbPath1,
387+
Logger: logger,
387388
DirectChannelFactory: directchannel.InitDirectChannelFactory(zap.NewNop(), node1.PeerHost),
388389
// @NOTE(gfanton): use raw pubsub here, we need a higher buffer
389390
// for subscribe to make the test works on CI
390391
PubSub: pubsubraw.NewPubSub(node1.PubSub, node1.Identity, nil, nil),
391-
Logger: logger,
392392
})
393393
require.NoError(t, err)
394394

@@ -422,11 +422,11 @@ func testDefaultNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.
422422
logger.Named("orbitdb.tests").Debug(fmt.Sprintf("node%d is %s", i, node1.Identity.String()))
423423

424424
orbitdb1, err := orbitdb.NewOrbitDB(ctx, ipfs1, &orbitdb.NewOrbitDBOptions{
425-
// @NOTE(gfanton): use raw pubsub here, we need a higher buffer
426-
// for subscribe to make the test works on CI
427-
PubSub: pubsubraw.NewPubSub(node1.PubSub, node1.Identity, logger, nil),
428425
Directory: &dbPath1,
429426
Logger: logger,
427+
// @NOTE(gfanton): use raw pubsub here, we need a higher buffer
428+
// for subscribe to make the test works on CI
429+
PubSub: pubsubraw.NewPubSub(node1.PubSub, node1.Identity, nil, nil),
430430
})
431431
require.NoError(t, err)
432432

0 commit comments

Comments
 (0)
Please sign in to comment.