diff --git a/extern/boostd-data/yugabyte/service.go b/extern/boostd-data/yugabyte/service.go index 33f787064..df0208150 100644 --- a/extern/boostd-data/yugabyte/service.go +++ b/extern/boostd-data/yugabyte/service.go @@ -537,28 +537,59 @@ func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, re insertPieceOffsetsQry := `INSERT INTO PayloadToPieces (PayloadMultihash, PieceCid) VALUES (?, ?)` pieceCidBytes := pieceCid.Bytes() - var batch *gocql.Batch - for allIdx, rec := range recs { - if batch == nil { - batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) - batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) + // split the slice into go-routine batches of 2M records + threadBatch := 2000000 + + log.Warnw("about to add multihashes to pieces", "piececid", pieceCid, "recs", len(recs), "threadBatch", threadBatch, "threads", len(recs)/threadBatch+1) + + var eg errgroup.Group + eg.SetLimit(128) + for i := 0; i < len(recs); i += threadBatch { + i := i + log.Warnw("fire thread", "i", i) + j := i + threadBatch + if j >= len(recs) { + j = len(recs) } - batch.Entries = append(batch.Entries, gocql.BatchEntry{ - Stmt: insertPieceOffsetsQry, - Args: []interface{}{trimMultihash(rec.Cid.Hash()), pieceCidBytes}, - Idempotent: true, - }) + // Process batch recs[i:j] - if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize { - err := s.session.ExecuteBatch(batch) - if err != nil { - return fmt.Errorf("inserting into PayloadToPieces: %w", err) + eg.Go(func() error { + var batch *gocql.Batch + recsb := recs[i:j] + for allIdx, rec := range recsb { + if batch == nil { + batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) + } + + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: insertPieceOffsetsQry, + Args: []interface{}{trimMultihash(rec.Cid.Hash()), pieceCidBytes}, + Idempotent: true, + }) + + if allIdx == len(recsb)-1 || len(batch.Entries) == s.settings.InsertBatchSize { + err := s.session.ExecuteBatch(batch) + if err != nil { + return fmt.Errorf("inserting into PayloadToPieces: %w", err) + } + batch = nil + + // emit progress only from batch 0 + if i == 0 { + numberOfGoroutines := len(recs)/threadBatch + 1 // TODO: confirm this is ok + progress(float64(numberOfGoroutines) * float64(allIdx+1) / float64(len(recs))) + } + } } - batch = nil + return nil + }) + } - progress(float64(allIdx+1) / float64(len(recs))) - } + err := eg.Wait() + if err != nil { + return err } return nil } @@ -570,28 +601,59 @@ func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []mode insertPieceOffsetsQry := `INSERT INTO PieceBlockOffsetSize (PieceCid, PayloadMultihash, BlockOffset, BlockSize) VALUES (?, ?, ?, ?)` pieceCidBytes := pieceCid.Bytes() - var batch *gocql.Batch - for allIdx, rec := range recs { - if batch == nil { - batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) - batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) + // split the slice into go-routine batches of 2M records + threadBatch := 2000000 + + log.Warnw("about to add piece infos", "piececid", pieceCid, "recs", len(recs), "threadBatch", threadBatch, "threads", len(recs)/threadBatch+1) + + var eg errgroup.Group + eg.SetLimit(128) + for i := 0; i < len(recs); i += threadBatch { + i := i + log.Warnw("fire thread", "i", i) + j := i + threadBatch + if j >= len(recs) { + j = len(recs) } - batch.Entries = append(batch.Entries, gocql.BatchEntry{ - Stmt: insertPieceOffsetsQry, - Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size}, - Idempotent: true, - }) + // Process batch recs[i:j] - if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize { - err := s.session.ExecuteBatch(batch) - if err != nil { - return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err) + eg.Go(func() error { + var batch *gocql.Batch + recsb := recs[i:j] + for allIdx, rec := range recsb { + if batch == nil { + batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) + } + + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: insertPieceOffsetsQry, + Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size}, + Idempotent: true, + }) + + if allIdx == len(recsb)-1 || len(batch.Entries) == s.settings.InsertBatchSize { + err := s.session.ExecuteBatch(batch) + if err != nil { + return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err) + } + batch = nil + + // emit progress only from batch 0 + if i == 0 { + numberOfGoroutines := len(recs)/threadBatch + 1 // TODO: confirm this is ok + progress(float64(numberOfGoroutines) * float64(allIdx+1) / float64(len(recs))) + } + } } - batch = nil + return nil + }) + } - progress(float64(allIdx+1) / float64(len(recs))) - } + err := eg.Wait() + if err != nil { + return err } return nil @@ -841,3 +903,36 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error { failureMetrics = false return nil } + +func (s *Store) execParallel(ctx context.Context, recs []model.Record, parallelism int, f func(record model.Record) error) error { + queue := make(chan model.Record, len(recs)) + for _, rec := range recs { + queue <- rec + } + close(queue) + + var eg errgroup.Group + for i := 0; i < parallelism; i++ { + eg.Go(func() error { + for ctx.Err() == nil { + select { + case <-ctx.Done(): + return ctx.Err() + case rec, ok := <-queue: + if !ok { + // Finished adding all the queued items, exit the thread + return nil + } + + err := f(rec) + if err != nil { + return err + } + } + } + + return ctx.Err() + }) + } + return eg.Wait() +}