Skip to content

Commit 1edf2b6

Browse files
committed
process recs in parallel
1 parent 274df41 commit 1edf2b6

File tree

1 file changed

+129
-34
lines changed

1 file changed

+129
-34
lines changed

extern/boostd-data/yugabyte/service.go

Lines changed: 129 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -537,28 +537,59 @@ func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, re
537537
insertPieceOffsetsQry := `INSERT INTO PayloadToPieces (PayloadMultihash, PieceCid) VALUES (?, ?)`
538538
pieceCidBytes := pieceCid.Bytes()
539539

540-
var batch *gocql.Batch
541-
for allIdx, rec := range recs {
542-
if batch == nil {
543-
batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
544-
batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize)
540+
// split the slice into go-routine batches of 2M records
541+
threadBatch := 2000000
542+
543+
log.Warnw("about to add multihashes to pieces", "piececid", pieceCid, "recs", len(recs), "threadBatch", threadBatch, "threads", len(recs)/threadBatch+1)
544+
545+
var eg errgroup.Group
546+
eg.SetLimit(128)
547+
for i := 0; i < len(recs); i += threadBatch {
548+
i := i
549+
log.Warnw("fire thread", "i", i)
550+
j := i + threadBatch
551+
if j >= len(recs) {
552+
j = len(recs)
545553
}
546554

547-
batch.Entries = append(batch.Entries, gocql.BatchEntry{
548-
Stmt: insertPieceOffsetsQry,
549-
Args: []interface{}{trimMultihash(rec.Cid.Hash()), pieceCidBytes},
550-
Idempotent: true,
551-
})
555+
// Process batch recs[i:j]
552556

553-
if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize {
554-
err := s.session.ExecuteBatch(batch)
555-
if err != nil {
556-
return fmt.Errorf("inserting into PayloadToPieces: %w", err)
557+
eg.Go(func() error {
558+
var batch *gocql.Batch
559+
recsb := recs[i:j]
560+
for allIdx, rec := range recsb {
561+
if batch == nil {
562+
batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
563+
batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize)
564+
}
565+
566+
batch.Entries = append(batch.Entries, gocql.BatchEntry{
567+
Stmt: insertPieceOffsetsQry,
568+
Args: []interface{}{trimMultihash(rec.Cid.Hash()), pieceCidBytes},
569+
Idempotent: true,
570+
})
571+
572+
if allIdx == len(recsb)-1 || len(batch.Entries) == s.settings.InsertBatchSize {
573+
err := s.session.ExecuteBatch(batch)
574+
if err != nil {
575+
return fmt.Errorf("inserting into PayloadToPieces: %w", err)
576+
}
577+
batch = nil
578+
579+
// emit progress only from batch 0
580+
if i == 0 {
581+
numberOfGoroutines := len(recs)/threadBatch + 1 // TODO: confirm this is ok
582+
progress(float64(numberOfGoroutines) * float64(allIdx+1) / float64(len(recs)))
583+
}
584+
}
557585
}
558-
batch = nil
586+
return nil
587+
})
588+
}
559589

560-
progress(float64(allIdx+1) / float64(len(recs)))
561-
}
590+
err := eg.Wait()
591+
if err != nil {
592+
return err
562593
}
563594
return nil
564595
}
@@ -570,28 +601,59 @@ func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []mode
570601
insertPieceOffsetsQry := `INSERT INTO PieceBlockOffsetSize (PieceCid, PayloadMultihash, BlockOffset, BlockSize) VALUES (?, ?, ?, ?)`
571602
pieceCidBytes := pieceCid.Bytes()
572603

573-
var batch *gocql.Batch
574-
for allIdx, rec := range recs {
575-
if batch == nil {
576-
batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
577-
batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize)
604+
// split the slice into go-routine batches of 2M records
605+
threadBatch := 2000000
606+
607+
log.Warnw("about to add piece infos", "piececid", pieceCid, "recs", len(recs), "threadBatch", threadBatch, "threads", len(recs)/threadBatch+1)
608+
609+
var eg errgroup.Group
610+
eg.SetLimit(128)
611+
for i := 0; i < len(recs); i += threadBatch {
612+
i := i
613+
log.Warnw("fire thread", "i", i)
614+
j := i + threadBatch
615+
if j >= len(recs) {
616+
j = len(recs)
578617
}
579618

580-
batch.Entries = append(batch.Entries, gocql.BatchEntry{
581-
Stmt: insertPieceOffsetsQry,
582-
Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size},
583-
Idempotent: true,
584-
})
619+
// Process batch recs[i:j]
585620

586-
if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize {
587-
err := s.session.ExecuteBatch(batch)
588-
if err != nil {
589-
return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err)
621+
eg.Go(func() error {
622+
var batch *gocql.Batch
623+
recsb := recs[i:j]
624+
for allIdx, rec := range recsb {
625+
if batch == nil {
626+
batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
627+
batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize)
628+
}
629+
630+
batch.Entries = append(batch.Entries, gocql.BatchEntry{
631+
Stmt: insertPieceOffsetsQry,
632+
Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size},
633+
Idempotent: true,
634+
})
635+
636+
if allIdx == len(recsb)-1 || len(batch.Entries) == s.settings.InsertBatchSize {
637+
err := s.session.ExecuteBatch(batch)
638+
if err != nil {
639+
return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err)
640+
}
641+
batch = nil
642+
643+
// emit progress only from batch 0
644+
if i == 0 {
645+
numberOfGoroutines := len(recs)/threadBatch + 1 // TODO: confirm this is ok
646+
progress(float64(numberOfGoroutines) * float64(allIdx+1) / float64(len(recs)))
647+
}
648+
}
590649
}
591-
batch = nil
650+
return nil
651+
})
652+
}
592653

593-
progress(float64(allIdx+1) / float64(len(recs)))
594-
}
654+
err := eg.Wait()
655+
if err != nil {
656+
return err
595657
}
596658

597659
return nil
@@ -841,3 +903,36 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {
841903
failureMetrics = false
842904
return nil
843905
}
906+
907+
func (s *Store) execParallel(ctx context.Context, recs []model.Record, parallelism int, f func(record model.Record) error) error {
908+
queue := make(chan model.Record, len(recs))
909+
for _, rec := range recs {
910+
queue <- rec
911+
}
912+
close(queue)
913+
914+
var eg errgroup.Group
915+
for i := 0; i < parallelism; i++ {
916+
eg.Go(func() error {
917+
for ctx.Err() == nil {
918+
select {
919+
case <-ctx.Done():
920+
return ctx.Err()
921+
case rec, ok := <-queue:
922+
if !ok {
923+
// Finished adding all the queued items, exit the thread
924+
return nil
925+
}
926+
927+
err := f(rec)
928+
if err != nil {
929+
return err
930+
}
931+
}
932+
}
933+
934+
return ctx.Err()
935+
})
936+
}
937+
return eg.Wait()
938+
}

0 commit comments

Comments
 (0)