Skip to content

Commit a63faef

Browse files
authored
piece-directory: make sure we process recs correctly (#1828)
1 parent 3342a5f commit a63faef

File tree

5 files changed

+51
-9
lines changed

5 files changed

+51
-9
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ docsgen-openrpc-boost: docsgen-openrpc-bin
217217

218218
## DOCKER IMAGES
219219
docker_user?=filecoin
220-
lotus_version?=v1.25.0-rc1
220+
lotus_version?=v1.25.0
221221
ffi_from_source?=0
222222
build_lotus?=0
223223
build_boost?=1

extern/boostd-data/yugabyte/service.go

+35-4
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,12 @@ func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, re
550550

551551
threadBatch := len(recs) / s.settings.InsertConcurrency // split the slice into go-routine batches
552552

553+
if threadBatch == 0 {
554+
threadBatch = len(recs)
555+
}
556+
557+
log.Debugw("addMultihashesToPieces call", "threadBatch", threadBatch, "len(recs)", len(recs))
558+
553559
var eg errgroup.Group
554560
for i := 0; i < len(recs); i += threadBatch {
555561
i := i
@@ -576,9 +582,18 @@ func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, re
576582
})
577583

578584
if allIdx == len(recsb)-1 || len(batch.Entries) == s.settings.InsertBatchSize {
579-
err := s.session.ExecuteBatch(batch)
585+
err := func() error {
586+
defer func(start time.Time) {
587+
log.Debugw("addMultihashesToPieces executeBatch", "took", time.Since(start), "entries", len(batch.Entries))
588+
}(time.Now())
589+
err := s.session.ExecuteBatch(batch)
590+
if err != nil {
591+
return fmt.Errorf("inserting into PayloadToPieces: %w", err)
592+
}
593+
return nil
594+
}()
580595
if err != nil {
581-
return fmt.Errorf("inserting into PayloadToPieces: %w", err)
596+
return err
582597
}
583598
batch = nil
584599

@@ -609,6 +624,12 @@ func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []mode
609624

610625
threadBatch := len(recs) / s.settings.InsertConcurrency // split the slice into go-routine batches
611626

627+
if threadBatch == 0 {
628+
threadBatch = len(recs)
629+
}
630+
631+
log.Debugw("addPieceInfos call", "threadBatch", threadBatch, "len(recs)", len(recs))
632+
612633
var eg errgroup.Group
613634
for i := 0; i < len(recs); i += threadBatch {
614635
i := i
@@ -635,9 +656,19 @@ func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []mode
635656
})
636657

637658
if allIdx == len(recsb)-1 || len(batch.Entries) == s.settings.InsertBatchSize {
638-
err := s.session.ExecuteBatch(batch)
659+
err := func() error {
660+
defer func(start time.Time) {
661+
log.Debugw("addPieceInfos executeBatch", "took", time.Since(start), "entries", len(batch.Entries))
662+
}(time.Now())
663+
664+
err := s.session.ExecuteBatch(batch)
665+
if err != nil {
666+
return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err)
667+
}
668+
return nil
669+
}()
639670
if err != nil {
640-
return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err)
671+
return err
641672
}
642673
batch = nil
643674

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ require (
320320
require (
321321
github.com/filecoin-project/boost-gfm v1.26.7
322322
github.com/filecoin-project/boost-graphsync v0.13.9
323-
github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231101173716-1622d0ce2581
323+
github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231124125934-3233c510357f
324324
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7
325325
github.com/filecoin-project/go-fil-markets v1.28.3
326326
github.com/filecoin-project/lotus v1.25.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,8 @@ github.com/filecoin-project/boost-gfm v1.26.7 h1:ENJEqx1OzY072QnUP37YrGVmUiCewRw
313313
github.com/filecoin-project/boost-gfm v1.26.7/go.mod h1:OhG2y7WeDx3KU9DPjgWllS+3/ospPjm8/XDrvN6uOfk=
314314
github.com/filecoin-project/boost-graphsync v0.13.9 h1:RQepfTlffLGUmp3Ff7VosYrWUKPLiz++GGV2D/gIfuw=
315315
github.com/filecoin-project/boost-graphsync v0.13.9/go.mod h1:bc2M5ZLZJtXHl8kjnqtn4L1MsdEqpJErDaIeY0bJ9wk=
316-
github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231101173716-1622d0ce2581 h1:a2rhYL8QBTxmP1/E2VhfsvUB6CoCuS4/jEvSpOl68tU=
317-
github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231101173716-1622d0ce2581/go.mod h1:/d9yXj2CqhPPtM+m8GiH7rQHSGRqXYFg3V82Qsk34NA=
316+
github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231124125934-3233c510357f h1:8dd0yAadyeOL5Qd42XhEwD60UKvIFkY2MLhef/IaeOk=
317+
github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231124125934-3233c510357f/go.mod h1:MyzvfYWAH0OAyf95TLUWYq3cO3vm/TVzDS57GKQi47o=
318318
github.com/filecoin-project/dagstore v0.7.0 h1:IS0R+69za8dguYWeqz/MI+nb7ONpk03tAkxPCBXEKm0=
319319
github.com/filecoin-project/dagstore v0.7.0/go.mod h1:YKn4qXih+/2xQWpfJsaKGOi4POw5vH5grDmfPCCnx8g=
320320
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f/go.mod h1:+If3s2VxyjZn+KGGZIoRXBDSFQ9xL404JBJGf4WhEj0=

piecedirectory/piecedirectory.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ func NewPieceDirectory(store *bdclient.Store, pr types.PieceReader, addIndexThro
8888
opt(pd.settings)
8989
}
9090

91+
if pd.settings.addIndexConcurrency == 0 {
92+
pd.settings.addIndexConcurrency = config.DefaultAddIndexConcurrency
93+
}
94+
95+
log.Infow("new piece directory", "add-index-concurrency", pd.settings.addIndexConcurrency, "add-idx-throttle-size", pd.addIdxThrottleSize)
96+
9197
expireCallback := func(key string, reason ttlcache.EvictionReason, value interface{}) {
9298
log.Debugw("expire callback", "piececid", key, "reason", reason)
9399

@@ -317,6 +323,11 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid
317323
return fmt.Errorf("generating index for piece %s: %w", pieceCid, err)
318324
}
319325

326+
if len(recs) == 0 {
327+
log.Warnw("add index: generated index with 0 recs", "pieceCid", pieceCid)
328+
return nil
329+
}
330+
320331
// Transferring a large number of records over the wire can take a significant amount of time.
321332
// Split the transfer into multiple concurrent parts to speed it up. Note that the index can't be generated by boost-data itself
322333
// as it doesn't possess the actual file.
@@ -339,7 +350,7 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid
339350
}
340351
// Add mh => piece index to store: "which piece contains the multihash?"
341352
// Add mh => offset index to store: "what is the offset of the multihash within the piece?"
342-
log.Debugw("add index: store index in local index directory", "pieceCid", pieceCid, "chunk", i, "chunksTotal", concurrency)
353+
log.Debugw("add index: store index in local index directory", "pieceCid", pieceCid, "chunk", i, "chunksTotal", concurrency, "len(recs)", len(recs[start:end]), "start", start, "end", end)
343354
if err := ps.store.AddIndex(ctx, pieceCid, recs[start:end], true); err != nil {
344355
return fmt.Errorf("adding CAR index for piece %s: %w", pieceCid, err)
345356
}

0 commit comments

Comments
 (0)