Skip to content

Commit 05c5c66

Browse files
authored
Merge pull request ipfs/go-unixfs#39 from ipfs/features/streaming-ls-5600
feat(Directory): Add EnumLinksAsync method This commit was moved from ipfs/go-unixfs@a3eae7f
2 parents 27aab7c + 4fb5d04 commit 05c5c66

File tree

5 files changed

+179
-41
lines changed

5 files changed

+179
-41
lines changed

unixfs/ipld-merkledag/hamt/hamt.go

+46-19
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"context"
2525
"fmt"
2626
"os"
27-
"sync"
2827

2928
bitfield "github.com/Stebalien/go-bitfield"
3029
cid "github.com/ipfs/go-cid"
@@ -400,21 +399,16 @@ func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func
400399
// EnumLinks collects all links in the Shard.
401400
func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) {
402401
var links []*ipld.Link
403-
var setlk sync.Mutex
404402

405-
getLinks := makeAsyncTrieGetLinks(ds.dserv, func(sv *Shard) error {
406-
lnk := sv.val
407-
lnk.Name = sv.key
408-
setlk.Lock()
409-
links = append(links, lnk)
410-
setlk.Unlock()
411-
return nil
412-
})
413-
414-
cset := cid.NewSet()
403+
linkResults := ds.EnumLinksAsync(ctx)
415404

416-
err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
417-
return links, err
405+
for linkResult := range linkResults {
406+
if linkResult.Err != nil {
407+
return links, linkResult.Err
408+
}
409+
links = append(links, linkResult.Link)
410+
}
411+
return links, nil
418412
}
419413

420414
// ForEachLink walks the Shard and calls the given function.
@@ -427,10 +421,28 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro
427421
})
428422
}
429423

424+
// EnumLinksAsync returns a channel which will receive Links in the directory
425+
// as they are enumerated, where order is not gauranteed
426+
func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
427+
linkResults := make(chan format.LinkResult)
428+
ctx, cancel := context.WithCancel(ctx)
429+
go func() {
430+
defer close(linkResults)
431+
defer cancel()
432+
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
433+
cset := cid.NewSet()
434+
err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
435+
if err != nil {
436+
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
437+
}
438+
}()
439+
return linkResults
440+
}
441+
430442
// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync
431443
// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called
432444
// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation
433-
func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard *Shard) error) dag.GetLinks {
445+
func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks {
434446

435447
return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) {
436448
node, err := dagService.Get(ctx, currentCid)
@@ -458,16 +470,31 @@ func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard *
458470
if err != nil {
459471
return nil, err
460472
}
461-
err = onShardValue(sv)
462-
if err != nil {
463-
return nil, err
464-
}
473+
formattedLink := sv.val
474+
formattedLink.Name = sv.key
475+
emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil})
465476
}
466477
}
467478
return childShards, nil
468479
}
469480
}
470481

482+
func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) {
483+
// make sure that context cancel is processed first
484+
// the reason is due to the concurrency of EnumerateChildrenAsync
485+
// it's possible for EnumLinksAsync to complete and close the linkResults
486+
// channel before this code runs
487+
select {
488+
case <-ctx.Done():
489+
return
490+
default:
491+
}
492+
select {
493+
case linkResults <- r:
494+
case <-ctx.Done():
495+
}
496+
}
497+
471498
func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error {
472499
for idx := range ds.children {
473500
c, err := ds.getChild(ctx, idx)

unixfs/ipld-merkledag/hamt/hamt_test.go

+67-22
Original file line numberDiff line numberDiff line change
@@ -74,28 +74,7 @@ func assertLink(s *Shard, name string, found bool) error {
7474
}
7575
}
7676

77-
func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
78-
ctx, cancel := context.WithCancel(context.Background())
79-
defer cancel()
80-
nd, err := s.Node()
81-
if err != nil {
82-
return err
83-
}
84-
85-
nds, err := NewHamtFromDag(ds, nd)
86-
if err != nil {
87-
return err
88-
}
89-
90-
linksA, err := s.EnumLinks(ctx)
91-
if err != nil {
92-
return err
93-
}
94-
95-
linksB, err := nds.EnumLinks(ctx)
96-
if err != nil {
97-
return err
98-
}
77+
func assertLinksEqual(linksA []*ipld.Link, linksB []*ipld.Link) error {
9978

10079
if len(linksA) != len(linksB) {
10180
return fmt.Errorf("links arrays are different sizes")
@@ -121,6 +100,32 @@ func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
121100
return nil
122101
}
123102

103+
func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
104+
ctx, cancel := context.WithCancel(context.Background())
105+
defer cancel()
106+
nd, err := s.Node()
107+
if err != nil {
108+
return err
109+
}
110+
111+
nds, err := NewHamtFromDag(ds, nd)
112+
if err != nil {
113+
return err
114+
}
115+
116+
linksA, err := s.EnumLinks(ctx)
117+
if err != nil {
118+
return err
119+
}
120+
121+
linksB, err := nds.EnumLinks(ctx)
122+
if err != nil {
123+
return err
124+
}
125+
126+
return assertLinksEqual(linksA, linksB)
127+
}
128+
124129
func TestBasicSet(t *testing.T) {
125130
ds := mdtest.Mock()
126131
for _, w := range []int{128, 256, 512, 1024, 2048, 4096} {
@@ -309,6 +314,46 @@ func TestSetAfterMarshal(t *testing.T) {
309314
}
310315
}
311316

317+
func TestEnumLinksAsync(t *testing.T) {
318+
ds := mdtest.Mock()
319+
_, s, err := makeDir(ds, 300)
320+
if err != nil {
321+
t.Fatal(err)
322+
}
323+
ctx := context.Background()
324+
325+
nd, err := s.Node()
326+
if err != nil {
327+
t.Fatal(err)
328+
}
329+
330+
nds, err := NewHamtFromDag(ds, nd)
331+
if err != nil {
332+
t.Fatal(err)
333+
}
334+
335+
linksA, err := nds.EnumLinks(ctx)
336+
if err != nil {
337+
t.Fatal(err)
338+
}
339+
340+
linkResults := nds.EnumLinksAsync(ctx)
341+
342+
var linksB []*ipld.Link
343+
344+
for linkResult := range linkResults {
345+
if linkResult.Err != nil {
346+
t.Fatal(linkResult.Err)
347+
}
348+
linksB = append(linksB, linkResult.Link)
349+
}
350+
351+
err = assertLinksEqual(linksA, linksB)
352+
if err != nil {
353+
t.Fatal(err)
354+
}
355+
}
356+
312357
func TestDuplicateAddShard(t *testing.T) {
313358
ds := mdtest.Mock()
314359
dir, _ := NewShard(ds, 256)

unixfs/ipld-merkledag/io/directory.go

+31
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77

88
mdag "github.com/ipfs/go-merkledag"
9+
910
format "github.com/ipfs/go-unixfs"
1011
hamt "github.com/ipfs/go-unixfs/hamt"
1112

@@ -38,6 +39,10 @@ type Directory interface {
3839
// ForEachLink applies the given function to Links in the directory.
3940
ForEachLink(context.Context, func(*ipld.Link) error) error
4041

42+
// EnumLinksAsync returns a channel which will receive Links in the directory
43+
// as they are enumerated, where order is not gauranteed
44+
EnumLinksAsync(context.Context) <-chan format.LinkResult
45+
4146
// Links returns the all the links in the directory node.
4247
Links(context.Context) ([]*ipld.Link, error)
4348

@@ -141,6 +146,26 @@ func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.No
141146
return d.node.AddNodeLink(name, node)
142147
}
143148

149+
// EnumLinksAsync returns a channel which will receive Links in the directory
150+
// as they are enumerated, where order is not gauranteed
151+
func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
152+
linkResults := make(chan format.LinkResult)
153+
go func() {
154+
defer close(linkResults)
155+
for _, l := range d.node.Links() {
156+
select {
157+
case linkResults <- format.LinkResult{
158+
Link: l,
159+
Err: nil,
160+
}:
161+
case <-ctx.Done():
162+
return
163+
}
164+
}
165+
}()
166+
return linkResults
167+
}
168+
144169
// ForEachLink implements the `Directory` interface.
145170
func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
146171
for _, l := range d.node.Links() {
@@ -226,6 +251,12 @@ func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) erro
226251
return d.shard.ForEachLink(ctx, f)
227252
}
228253

254+
// EnumLinksAsync returns a channel which will receive Links in the directory
255+
// as they are enumerated, where order is not gauranteed
256+
func (d *HAMTDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
257+
return d.shard.EnumLinksAsync(ctx)
258+
}
259+
229260
// Links implements the `Directory` interface.
230261
func (d *HAMTDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
231262
return d.shard.EnumLinks(ctx)

unixfs/ipld-merkledag/io/directory_test.go

+26
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"fmt"
66
"testing"
77

8+
ipld "github.com/ipfs/go-ipld-format"
89
mdtest "github.com/ipfs/go-merkledag/test"
10+
911
ft "github.com/ipfs/go-unixfs"
1012
)
1113

@@ -155,4 +157,28 @@ func TestDirBuilder(t *testing.T) {
155157
if len(links) != count {
156158
t.Fatal("wrong number of links", len(links), count)
157159
}
160+
161+
linkResults := dir.EnumLinksAsync(ctx)
162+
163+
asyncNames := make(map[string]bool)
164+
var asyncLinks []*ipld.Link
165+
166+
for linkResult := range linkResults {
167+
if linkResult.Err != nil {
168+
t.Fatal(linkResult.Err)
169+
}
170+
asyncNames[linkResult.Link.Name] = true
171+
asyncLinks = append(asyncLinks, linkResult.Link)
172+
}
173+
174+
for i := 0; i < count; i++ {
175+
n := fmt.Sprintf("entry %d", i)
176+
if !asyncNames[n] {
177+
t.Fatal("COULDNT FIND: ", n)
178+
}
179+
}
180+
181+
if len(asyncLinks) != count {
182+
t.Fatal("wrong number of links", len(asyncLinks), count)
183+
}
158184
}

unixfs/ipld-merkledag/unixfs.go

+9
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,18 @@ import (
99
proto "github.com/gogo/protobuf/proto"
1010

1111
dag "github.com/ipfs/go-merkledag"
12+
13+
ipld "github.com/ipfs/go-ipld-format"
1214
pb "github.com/ipfs/go-unixfs/pb"
1315
)
1416

17+
// A LinkResult for any parallel enumeration of links
18+
// TODO: Should this live in go-ipld-format?
19+
type LinkResult struct {
20+
Link *ipld.Link
21+
Err error
22+
}
23+
1524
// Shorthands for protobuffer types
1625
const (
1726
TRaw = pb.Data_Raw

0 commit comments

Comments
 (0)