Skip to content

Commit 799c708

Browse files
committed
switch to async reading
1 parent 2f2bac2 commit 799c708

File tree

3 files changed

+31
-7
lines changed

3 files changed

+31
-7
lines changed

Diff for: go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -377,3 +377,5 @@ require (
377377
gonum.org/v1/gonum v0.13.0 // indirect
378378
gopkg.in/inf.v0 v0.9.1 // indirect
379379
)
380+
381+
replace github.com/filecoin-project/go-data-segment v0.0.1 => github.com/ischasny/go-data-segment v0.0.0-20231107120541-53b3ec9a7c69

Diff for: itests/dummydeal_podsi_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestDummyPodsiDealOnline(t *testing.T) {
3838
os.Setenv("PODSI_USE_BUFFERED_READER", "true")
3939
os.Setenv("PODSI_BUFFER_SIZE", "10000")
4040

41-
randomFileSize := int(4e6)
41+
randomFileSize := int(1e6)
4242

4343
ctx := context.Background()
4444
log := framework.Log

Diff for: piecedirectory/piecedirectory.go

+28-6
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
mh "github.com/multiformats/go-multihash"
4141
"go.opentelemetry.io/otel/attribute"
4242
"golang.org/x/sync/errgroup"
43+
"golang.org/x/xerrors"
4344
)
4445

4546
var log = logging.Logger("piecedirectory")
@@ -431,13 +432,34 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type
431432
}
432433

433434
start := time.Now()
434-
dataSegments, err := datasegment.ParseDataSegmentIndex(rr)
435-
if err != nil {
436-
return nil, fmt.Errorf("could not parse data segment index: %w", err)
435+
436+
results := make(chan *datasegment.SegmentDesc)
437+
var parseIndexErr error
438+
ctx, cancel := context.WithCancel(context.Background())
439+
go func() {
440+
parseIndexErr = datasegment.ParseDataSegmentIndexAsync(ctx, rr, results)
441+
close(results)
442+
}()
443+
444+
var segments []datasegment.SegmentDesc
445+
cnt := -1
446+
for res := range results {
447+
cnt++
448+
if err := res.Validate(); err != nil {
449+
if errors.Is(err, datasegment.ErrValidation) {
450+
continue
451+
} else {
452+
cancel()
453+
return nil, xerrors.Errorf("could not calculate valid entries: got unknown error for entry %d: %w", cnt, err)
454+
}
455+
}
456+
segments = append(segments, *res)
437457
}
438-
segments, err := dataSegments.ValidEntries()
439-
if err != nil {
440-
return nil, fmt.Errorf("could not calculate valid entries: %w", err)
458+
459+
cancel()
460+
461+
if parseIndexErr != nil {
462+
return nil, fmt.Errorf("could not parse data segment index: %w", parseIndexErr)
441463
}
442464
if len(segments) == 0 {
443465
return nil, fmt.Errorf("no data segments found")

0 commit comments

Comments
 (0)