8
8
"fmt"
9
9
"io"
10
10
"os"
11
+ "runtime"
11
12
"strconv"
12
13
"sync"
13
14
"sync/atomic"
@@ -40,6 +41,7 @@ import (
40
41
mh "github.com/multiformats/go-multihash"
41
42
"go.opentelemetry.io/otel/attribute"
42
43
"golang.org/x/sync/errgroup"
44
+ "golang.org/x/xerrors"
43
45
)
44
46
45
47
var log = logging .Logger ("piecedirectory" )
@@ -403,52 +405,98 @@ func (cr *countingReader) Read(p []byte) (n int, err error) {
403
405
404
406
func parsePieceWithDataSegmentIndex (pieceCid cid.Cid , unpaddedSize int64 , r types.SectionReader ) ([]model.Record , error ) {
405
407
406
- ps := abi .UnpaddedPieceSize (unpaddedSize ).Padded ()
407
- dsis := datasegment .DataSegmentIndexStartOffset (ps )
408
-
409
- var readsCnt int32
410
- cr := & countingReader {
411
- Reader : r ,
412
- cnt : & readsCnt ,
413
- }
414
-
415
408
useBufferedReader := os .Getenv ("PODSI_USE_BUFFERED_READER" ) == "true"
416
409
bufferSize , err := strconv .Atoi (os .Getenv ("PODSI_BUFFER_SIZE" ))
417
410
if err != nil {
418
411
bufferSize = int (4e6 )
419
412
}
420
- log .Infow ("podsi: " , "userBufferedReader" , useBufferedReader , "bufferSize" , bufferSize )
413
+ concurrency , err := strconv .Atoi (os .Getenv ("PODSI_VALIDATION_CONCURRENCY" ))
414
+ if err != nil {
415
+ concurrency = runtime .NumCPU ()
416
+ }
417
+ if concurrency < 1 {
418
+ concurrency = 1
419
+ }
420
+ if concurrency > 16 {
421
+ concurrency = 16
422
+ }
423
+
424
+ log .Infow ("podsi: " , "userBufferedReader" , useBufferedReader , "bufferSize" , bufferSize , "validationConcurrency" , concurrency )
425
+ start := time .Now ()
421
426
427
+ ps := abi .UnpaddedPieceSize (unpaddedSize ).Padded ()
428
+ dsis := datasegment .DataSegmentIndexStartOffset (ps )
422
429
if _ , err := r .Seek (int64 (dsis ), io .SeekStart ); err != nil {
423
430
return nil , fmt .Errorf ("could not seek to data segment index: %w" , err )
424
431
}
425
432
426
433
var rr io.Reader
434
+ var readsCnt int32
435
+ cr := & countingReader {
436
+ Reader : r ,
437
+ cnt : & readsCnt ,
438
+ }
427
439
if useBufferedReader {
428
440
rr = bufio .NewReaderSize (cr , bufferSize )
429
441
} else {
430
442
rr = cr
431
443
}
432
444
433
- start := time .Now ()
434
- dataSegments , err := datasegment .ParseDataSegmentIndex (rr )
445
+ indexData , err := datasegment .ParseDataSegmentIndex (rr )
435
446
if err != nil {
436
447
return nil , fmt .Errorf ("could not parse data segment index: %w" , err )
437
448
}
438
- segments , err := dataSegments .ValidEntries ()
439
- if err != nil {
449
+
450
+ log .Infow ("podsi: parsed data segment index" , "segments" , len (indexData .Entries ), "reads" , readsCnt , "time" , time .Since (start ).String ())
451
+ start = time .Now ()
452
+
453
+ if len (indexData .Entries ) < concurrency {
454
+ concurrency = len (indexData .Entries )
455
+ }
456
+
457
+ chunkSize := len (indexData .Entries ) / concurrency
458
+ results := make ([][]datasegment.SegmentDesc , concurrency )
459
+
460
+ var eg errgroup.Group
461
+ for i := 0 ; i < concurrency ; i ++ {
462
+ i := i
463
+ eg .Go (func () error {
464
+ start := i * chunkSize
465
+ end := start + chunkSize
466
+ if i == concurrency - 1 {
467
+ end = len (indexData .Entries )
468
+ }
469
+
470
+ res , err := validateEntries (indexData .Entries [start :end ])
471
+ if err != nil {
472
+ return err
473
+ }
474
+
475
+ results [i ] = res
476
+
477
+ return nil
478
+ })
479
+ }
480
+
481
+ if err := eg .Wait (); err != nil {
440
482
return nil , fmt .Errorf ("could not calculate valid entries: %w" , err )
441
483
}
442
- if len (segments ) == 0 {
443
- return nil , fmt .Errorf ("no data segments found" )
484
+
485
+ validSegments := make ([]datasegment.SegmentDesc , 0 , len (indexData .Entries ))
486
+ for _ , res := range results {
487
+ validSegments = append (validSegments , res ... )
444
488
}
445
- log .Infow ("podsi: parsed and validated data segment index" , "reads" , readsCnt , "time" , time .Since (start ).String ())
446
489
447
- recs := make ([]model.Record , 0 )
490
+ if len (validSegments ) == 0 {
491
+ return nil , fmt .Errorf ("no data segments found" )
492
+ }
448
493
449
- readsCnt = 0
494
+ log . Infow ( "podsi: validated data segment index" , "validSegments" , len ( validSegments ), "time" , time . Since ( start ). String ())
450
495
start = time .Now ()
451
- for _ , s := range segments {
496
+ readsCnt = 0
497
+
498
+ recs := make ([]model.Record , 0 )
499
+ for _ , s := range validSegments {
452
500
segOffset := s .UnpaddedOffest ()
453
501
segSize := s .UnpaddedLength ()
454
502
@@ -475,11 +523,27 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type
475
523
recs = append (recs , subRecs ... )
476
524
}
477
525
478
- log .Infow ("podsi: parsed records from data segments" , "reads" , readsCnt , "time" , time .Since (start ).String ())
526
+ log .Infow ("podsi: parsed records from data segments" , "recs" , len ( recs ), " reads" , readsCnt , "time" , time .Since (start ).String ())
479
527
480
528
return recs , nil
481
529
}
482
530
531
+ func validateEntries (entries []datasegment.SegmentDesc ) ([]datasegment.SegmentDesc , error ) {
532
+ res := make ([]datasegment.SegmentDesc , 0 , len (entries ))
533
+ for i , e := range entries {
534
+
535
+ if err := e .Validate (); err != nil {
536
+ if errors .Is (err , datasegment .ErrValidation ) {
537
+ continue
538
+ } else {
539
+ return nil , xerrors .Errorf ("got unknown error for entry %d: %w" , i , err )
540
+ }
541
+ }
542
+ res = append (res , e )
543
+ }
544
+ return res , nil
545
+ }
546
+
483
547
// BuildIndexForPiece builds indexes for a given piece CID. The piece must contain a valid deal
484
548
// corresponding to an unsealed sector for this method to work. It will try to build index
485
549
// using all available deals and will exit as soon as it succeeds for one of the deals
0 commit comments