Skip to content

Commit 93ea580

Browse files
authored
Merge pull request #870 from ipfs/feat/buffered-provider-KeyChanFunc
provider: add a buffered KeyChanFunc.
2 parents c4b514e + e380c9f commit 93ea580

File tree

6 files changed

+123
-7
lines changed

6 files changed

+123
-7
lines changed

examples/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ require (
4242
github.com/flynn/noise v1.1.0 // indirect
4343
github.com/francoispqt/gojay v1.2.13 // indirect
4444
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
45-
github.com/gammazero/chanqueue v1.0.0 // indirect
45+
github.com/gammazero/chanqueue v1.1.0 // indirect
4646
github.com/gammazero/deque v1.0.0 // indirect
4747
github.com/go-logr/logr v1.4.2 // indirect
4848
github.com/go-logr/stdr v1.2.2 // indirect

examples/go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z
7979
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
8080
github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc=
8181
github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc=
82-
github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
83-
github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
82+
github.com/gammazero/chanqueue v1.1.0 h1:yiwtloc1azhgGLFo2gMloJtQvkYD936Ai7tBfa+rYJw=
83+
github.com/gammazero/chanqueue v1.1.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
8484
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
8585
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
8686
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/dustin/go-humanize v1.0.1
1111
github.com/filecoin-project/go-clock v0.1.0
1212
github.com/gabriel-vasile/mimetype v1.4.6
13-
github.com/gammazero/chanqueue v1.0.0
13+
github.com/gammazero/chanqueue v1.1.0
1414
github.com/gammazero/deque v1.0.0
1515
github.com/google/uuid v1.6.0
1616
github.com/gorilla/mux v1.8.1

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z
7979
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
8080
github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc=
8181
github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc=
82-
github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
83-
github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
82+
github.com/gammazero/chanqueue v1.1.0 h1:yiwtloc1azhgGLFo2gMloJtQvkYD936Ai7tBfa+rYJw=
83+
github.com/gammazero/chanqueue v1.1.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
8484
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
8585
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
8686
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=

provider/provider.go

+26-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1+
// Package provider provides interfaces and tooling for (Re)providers.
2+
//
3+
// This includes methods to provide streams of CIDs (i.e. from pinned
4+
// merkledags, from blockstores, from single dags etc.). These methods can be
5+
// used for other purposes, but are usually fed to the Reprovider to announce
6+
// CIDs.
17
package provider
28

39
import (
410
"context"
511

12+
"github.com/gammazero/chanqueue"
613
blocks "github.com/ipfs/boxo/blockstore"
714
"github.com/ipfs/boxo/fetcher"
815
fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers"
@@ -46,7 +53,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
4653
}
4754
}
4855

49-
// NewPinnedProvider returns provider supplying pinned keys
56+
// NewPinnedProvider returns a KeyChanFunc supplying pinned keys. The Provider
57+
// will block when writing to the channel and there are no readers.
5058
func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.Factory) KeyChanFunc {
5159
return func(ctx context.Context) (<-chan cid.Cid, error) {
5260
set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots)
@@ -70,6 +78,23 @@ func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.F
7078
}
7179
}
7280

81+
// NewBufferedProvider returns a KeyChanFunc supplying keys from a given
82+
// KeyChanFunction, but buffering keys in memory if we can read them faster
83+
// they are consumed. This allows the underlying KeyChanFunc to finish
84+
// listing pins as soon as possible releasing any resources, locks, at the
85+
// expense of memory usage.
86+
func NewBufferedProvider(pinsF KeyChanFunc) KeyChanFunc {
87+
return func(ctx context.Context) (<-chan cid.Cid, error) {
88+
pins, err := pinsF(ctx)
89+
if err != nil {
90+
return nil, err
91+
}
92+
93+
queue := chanqueue.New(chanqueue.WithInputRdOnly[cid.Cid](pins))
94+
return queue.Out(), nil
95+
}
96+
}
97+
7398
func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) {
7499
set := cidutil.NewStreamingSet()
75100
recursivePins := cidutil.NewSet()

provider/provider_test.go

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package provider
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/ipfs/boxo/blockservice"
9+
"github.com/ipfs/boxo/blockstore"
10+
"github.com/ipfs/boxo/exchange/offline"
11+
bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
12+
"github.com/ipfs/boxo/ipld/merkledag"
13+
mdutils "github.com/ipfs/boxo/ipld/merkledag/test"
14+
ipinner "github.com/ipfs/boxo/pinning/pinner"
15+
"github.com/ipfs/boxo/pinning/pinner/dspinner"
16+
"github.com/ipfs/go-datastore"
17+
dssync "github.com/ipfs/go-datastore/sync"
18+
"github.com/stretchr/testify/require"
19+
)
20+
21+
// TestBufferedPinProvider checks that we can modify a pinset while reading
22+
// from the provider, as all elements of the pinset have been placed in
23+
// memory.
24+
func TestBufferedPinProvider(t *testing.T) {
25+
ctx := context.Background()
26+
27+
// Setup
28+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
29+
bs := blockstore.NewBlockstore(ds)
30+
bserv := blockservice.New(bs, offline.Exchange(bs))
31+
fetcher := bsfetcher.NewFetcherConfig(bserv)
32+
dserv := merkledag.NewDAGService(bserv)
33+
pinner, err := dspinner.New(ctx, ds, dserv)
34+
require.NoError(t, err)
35+
daggen := mdutils.NewDAGGenerator()
36+
root, _, err := daggen.MakeDagNode(dserv.Add, 1, 64)
37+
require.NoError(t, err)
38+
root2, _, err := daggen.MakeDagNode(dserv.Add, 1, 64)
39+
require.NoError(t, err)
40+
41+
// test with 0 pins to ensure things work.
42+
zeroProv := NewPinnedProvider(false, pinner, fetcher)
43+
zeroKeyChanF := NewBufferedProvider(zeroProv)
44+
zeroPins, err := zeroKeyChanF(ctx)
45+
require.NoError(t, err)
46+
for range zeroPins {
47+
t.Error("There should not be any pins")
48+
}
49+
50+
// Pin the first DAG.
51+
err = pinner.PinWithMode(ctx, root, ipinner.Recursive, "test")
52+
require.NoError(t, err)
53+
54+
// Then open the keyChanF to read the pins. This should trigger the
55+
// pin query, but we don't read from it, so in normal condiditions
56+
// it would block.
57+
pinProv := NewPinnedProvider(false, pinner, fetcher)
58+
keyChanF := NewBufferedProvider(pinProv)
59+
root1pins, err := keyChanF(ctx)
60+
require.NoError(t, err)
61+
62+
// Give time to buffer all the results as this is happening in the
63+
// background.
64+
time.Sleep(200 * time.Millisecond)
65+
66+
// If the previous query was blocking the pinset under a read-lock,
67+
// we would not be able to write a second pin:
68+
err = pinner.PinWithMode(ctx, root2, ipinner.Recursive, "test")
69+
require.NoError(t, err)
70+
71+
// Now we trigger a second query.
72+
pinProv2 := NewPinnedProvider(false, pinner, fetcher)
73+
keyChanF2 := NewBufferedProvider(pinProv2)
74+
root2pins, err := keyChanF2(ctx)
75+
require.NoError(t, err)
76+
77+
// And finally proceed to read pins. The second keyChan should contain
78+
// both root and root2 pins, while the first keyChan contains only the
79+
// elements from the first pin because they were all cached before the
80+
// second pin happened.
81+
root1count := 0
82+
root2count := 0
83+
for range root2pins {
84+
root2count++
85+
}
86+
for range root1pins {
87+
root1count++
88+
}
89+
require.Equal(t, 64, root1count, "first pin should have provided 2048 cids")
90+
require.Equal(t, 64+64, root2count, "second pin should have provided 4096 cids")
91+
}

0 commit comments

Comments
 (0)