Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provider: add a buffered KeyChanFunc. #870

Merged
merged 4 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
github.com/gammazero/chanqueue v1.0.0 // indirect
github.com/gammazero/chanqueue v1.1.0 // indirect
github.com/gammazero/deque v1.0.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc=
github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc=
github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
github.com/gammazero/chanqueue v1.1.0 h1:yiwtloc1azhgGLFo2gMloJtQvkYD936Ai7tBfa+rYJw=
github.com/gammazero/chanqueue v1.1.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/filecoin-project/go-clock v0.1.0
github.com/gabriel-vasile/mimetype v1.4.6
github.com/gammazero/chanqueue v1.0.0
github.com/gammazero/chanqueue v1.1.0
github.com/gammazero/deque v1.0.0
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc=
github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc=
github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
github.com/gammazero/chanqueue v1.1.0 h1:yiwtloc1azhgGLFo2gMloJtQvkYD936Ai7tBfa+rYJw=
github.com/gammazero/chanqueue v1.1.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down
27 changes: 26 additions & 1 deletion provider/provider.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
// Package provider provides interfaces and tooling for (Re)providers.
//
// This includes methods to provide streams of CIDs (i.e. from pinned
// merkledags, from blockstores, from single dags etc.). These methods can be
// used for other purposes, but are usually fed to the Reprovider to announce
// CIDs.
package provider

import (
"context"

"github.com/gammazero/chanqueue"
blocks "github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/fetcher"
fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers"
Expand Down Expand Up @@ -46,7 +53,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
}
}

// NewPinnedProvider returns provider supplying pinned keys
// NewPinnedProvider returns a KeyChanFunc supplying pinned keys. The Provider
// will block when writing to the channel and there are no readers.
func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.Factory) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots)
Expand All @@ -70,6 +78,23 @@ func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.F
}
}

// NewBufferedProvider returns a KeyChanFunc supplying keys from a given
// KeyChanFunction, but buffering keys in memory if we can read them faster
// they are consumed. This allows the underlying KeyChanFunc to finish
// listing pins as soon as possible releasing any resources, locks, at the
// expense of memory usage.
func NewBufferedProvider(pinsF KeyChanFunc) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
pins, err := pinsF(ctx)
if err != nil {
return nil, err
}

queue := chanqueue.New(chanqueue.WithInputRdOnly[cid.Cid](pins))
return queue.Out(), nil
}
}

func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) {
set := cidutil.NewStreamingSet()
recursivePins := cidutil.NewSet()
Expand Down
91 changes: 91 additions & 0 deletions provider/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package provider

import (
"context"
"testing"
"time"

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange/offline"
bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
"github.com/ipfs/boxo/ipld/merkledag"
mdutils "github.com/ipfs/boxo/ipld/merkledag/test"
ipinner "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/boxo/pinning/pinner/dspinner"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
)

// TestBufferedPinProvider checks that we can modify a pinset while reading
// from the provider, as all elements of the pinset have been placed in
// memory.
func TestBufferedPinProvider(t *testing.T) {
ctx := context.Background()

// Setup
ds := dssync.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewBlockstore(ds)
bserv := blockservice.New(bs, offline.Exchange(bs))
fetcher := bsfetcher.NewFetcherConfig(bserv)
dserv := merkledag.NewDAGService(bserv)
pinner, err := dspinner.New(ctx, ds, dserv)
require.NoError(t, err)
daggen := mdutils.NewDAGGenerator()
root, _, err := daggen.MakeDagNode(dserv.Add, 1, 64)
require.NoError(t, err)
root2, _, err := daggen.MakeDagNode(dserv.Add, 1, 64)
require.NoError(t, err)

// test with 0 pins to ensure things work.
zeroProv := NewPinnedProvider(false, pinner, fetcher)
zeroKeyChanF := NewBufferedProvider(zeroProv)
zeroPins, err := zeroKeyChanF(ctx)
require.NoError(t, err)
for range zeroPins {
t.Error("There should not be any pins")
}

// Pin the first DAG.
err = pinner.PinWithMode(ctx, root, ipinner.Recursive, "test")
require.NoError(t, err)

// Then open the keyChanF to read the pins. This should trigger the
// pin query, but we don't read from it, so in normal condiditions
// it would block.
pinProv := NewPinnedProvider(false, pinner, fetcher)
keyChanF := NewBufferedProvider(pinProv)
root1pins, err := keyChanF(ctx)
require.NoError(t, err)

// Give time to buffer all the results as this is happening in the
// background.
time.Sleep(200 * time.Millisecond)

// If the previous query was blocking the pinset under a read-lock,
// we would not be able to write a second pin:
err = pinner.PinWithMode(ctx, root2, ipinner.Recursive, "test")
require.NoError(t, err)

// Now we trigger a second query.
pinProv2 := NewPinnedProvider(false, pinner, fetcher)
keyChanF2 := NewBufferedProvider(pinProv2)
root2pins, err := keyChanF2(ctx)
require.NoError(t, err)

// And finally proceed to read pins. The second keyChan should contain
// both root and root2 pins, while the first keyChan contains only the
// elements from the first pin because they were all cached before the
// second pin happened.
root1count := 0
root2count := 0
for range root2pins {
root2count++
}
for range root1pins {
root1count++
}
require.Equal(t, 64, root1count, "first pin should have provided 2048 cids")
require.Equal(t, 64+64, root2count, "second pin should have provided 4096 cids")
}
Loading