diff --git a/examples/go.mod b/examples/go.mod index af45d927b..746218352 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -42,7 +42,7 @@ require ( github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect - github.com/gammazero/chanqueue v1.0.0 // indirect + github.com/gammazero/chanqueue v1.1.0 // indirect github.com/gammazero/deque v1.0.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/examples/go.sum b/examples/go.sum index 83b8ea5af..fa3febd28 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -79,8 +79,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= -github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o= -github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= +github.com/gammazero/chanqueue v1.1.0 h1:yiwtloc1azhgGLFo2gMloJtQvkYD936Ai7tBfa+rYJw= +github.com/gammazero/chanqueue v1.1.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= diff --git a/go.mod b/go.mod index b1f02c7cf..e372ca364 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/filecoin-project/go-clock v0.1.0 github.com/gabriel-vasile/mimetype v1.4.6 - github.com/gammazero/chanqueue v1.0.0 + github.com/gammazero/chanqueue v1.1.0 github.com/gammazero/deque v1.0.0 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 diff --git a/go.sum b/go.sum index 742dc02b9..a90d5e64a 100644 --- a/go.sum +++ b/go.sum @@ -79,8 +79,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= -github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o= -github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= +github.com/gammazero/chanqueue v1.1.0 h1:yiwtloc1azhgGLFo2gMloJtQvkYD936Ai7tBfa+rYJw= +github.com/gammazero/chanqueue v1.1.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= diff --git a/provider/provider.go b/provider/provider.go index 4197f3dae..485a3ff4c 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -1,8 +1,15 @@ +// Package provider provides interfaces and tooling for (Re)providers. +// +// This includes methods to provide streams of CIDs (i.e. from pinned +// merkledags, from blockstores, from single dags etc.). These methods can be +// used for other purposes, but are usually fed to the Reprovider to announce +// CIDs. package provider import ( "context" + "github.com/gammazero/chanqueue" blocks "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/fetcher" fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" @@ -46,7 +53,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { } } -// NewPinnedProvider returns provider supplying pinned keys +// NewPinnedProvider returns a KeyChanFunc supplying pinned keys. The Provider +// will block when writing to the channel and there are no readers. func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.Factory) KeyChanFunc { return func(ctx context.Context) (<-chan cid.Cid, error) { set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) @@ -70,6 +78,23 @@ func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.F } } +// NewBufferedProvider returns a KeyChanFunc supplying keys from a given +// KeyChanFunction, but buffering keys in memory if we can read them faster +// they are consumed. This allows the underlying KeyChanFunc to finish +// listing pins as soon as possible releasing any resources, locks, at the +// expense of memory usage. +func NewBufferedProvider(pinsF KeyChanFunc) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + pins, err := pinsF(ctx) + if err != nil { + return nil, err + } + + queue := chanqueue.New(chanqueue.WithInputRdOnly[cid.Cid](pins)) + return queue.Out(), nil + } +} + func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { set := cidutil.NewStreamingSet() recursivePins := cidutil.NewSet() diff --git a/provider/provider_test.go b/provider/provider_test.go new file mode 100644 index 000000000..f585ead0c --- /dev/null +++ b/provider/provider_test.go @@ -0,0 +1,91 @@ +package provider + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/exchange/offline" + bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" + "github.com/ipfs/boxo/ipld/merkledag" + mdutils "github.com/ipfs/boxo/ipld/merkledag/test" + ipinner "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/boxo/pinning/pinner/dspinner" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/require" +) + +// TestBufferedPinProvider checks that we can modify a pinset while reading +// from the provider, as all elements of the pinset have been placed in +// memory. +func TestBufferedPinProvider(t *testing.T) { + ctx := context.Background() + + // Setup + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + bs := blockstore.NewBlockstore(ds) + bserv := blockservice.New(bs, offline.Exchange(bs)) + fetcher := bsfetcher.NewFetcherConfig(bserv) + dserv := merkledag.NewDAGService(bserv) + pinner, err := dspinner.New(ctx, ds, dserv) + require.NoError(t, err) + daggen := mdutils.NewDAGGenerator() + root, _, err := daggen.MakeDagNode(dserv.Add, 1, 64) + require.NoError(t, err) + root2, _, err := daggen.MakeDagNode(dserv.Add, 1, 64) + require.NoError(t, err) + + // test with 0 pins to ensure things work. + zeroProv := NewPinnedProvider(false, pinner, fetcher) + zeroKeyChanF := NewBufferedProvider(zeroProv) + zeroPins, err := zeroKeyChanF(ctx) + require.NoError(t, err) + for range zeroPins { + t.Error("There should not be any pins") + } + + // Pin the first DAG. + err = pinner.PinWithMode(ctx, root, ipinner.Recursive, "test") + require.NoError(t, err) + + // Then open the keyChanF to read the pins. This should trigger the + // pin query, but we don't read from it, so in normal condiditions + // it would block. + pinProv := NewPinnedProvider(false, pinner, fetcher) + keyChanF := NewBufferedProvider(pinProv) + root1pins, err := keyChanF(ctx) + require.NoError(t, err) + + // Give time to buffer all the results as this is happening in the + // background. + time.Sleep(200 * time.Millisecond) + + // If the previous query was blocking the pinset under a read-lock, + // we would not be able to write a second pin: + err = pinner.PinWithMode(ctx, root2, ipinner.Recursive, "test") + require.NoError(t, err) + + // Now we trigger a second query. + pinProv2 := NewPinnedProvider(false, pinner, fetcher) + keyChanF2 := NewBufferedProvider(pinProv2) + root2pins, err := keyChanF2(ctx) + require.NoError(t, err) + + // And finally proceed to read pins. The second keyChan should contain + // both root and root2 pins, while the first keyChan contains only the + // elements from the first pin because they were all cached before the + // second pin happened. + root1count := 0 + root2count := 0 + for range root2pins { + root2count++ + } + for range root1pins { + root1count++ + } + require.Equal(t, 64, root1count, "first pin should have provided 2048 cids") + require.Equal(t, 64+64, root2count, "second pin should have provided 4096 cids") +}