Skip to content

Commit 6a72a39

Browse files
authored
Merge pull request #228 from thirdweb-dev/06-16-parallelize_staging_cleanup
Make staging cleanup async and conditional
2 parents a5ed360 + f1dc1f5 commit 6a72a39

File tree

2 files changed

+24
-7
lines changed

2 files changed

+24
-7
lines changed

internal/orchestrator/committer.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,16 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
259259
}
260260
}()
261261

262-
stagingDeleteStart := time.Now()
263-
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
264-
return fmt.Errorf("error deleting data from staging storage: %v", err)
262+
if c.workMode == WorkModeBackfill {
263+
go func() {
264+
stagingDeleteStart := time.Now()
265+
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
266+
log.Error().Err(err).Msg("Failed to delete staging data")
267+
}
268+
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
269+
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
270+
}()
265271
}
266-
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
267-
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
268272

269273
// Find highest block number from committed blocks
270274
highestBlock := blockData[0].Block

internal/orchestrator/committer_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,12 +329,25 @@ func TestCommit(t *testing.T) {
329329
{Block: common.Block{Number: big.NewInt(102)}},
330330
}
331331

332+
// Create a channel to signal when DeleteStagingData is called
333+
deleteDone := make(chan struct{})
334+
332335
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
333-
mockStagingStorage.EXPECT().DeleteStagingData(blockData).Return(nil)
336+
mockStagingStorage.EXPECT().DeleteStagingData(blockData).RunAndReturn(func(data []common.BlockData) error {
337+
close(deleteDone)
338+
return nil
339+
})
334340

335341
err := committer.commit(context.Background(), blockData)
336-
337342
assert.NoError(t, err)
343+
344+
// Wait for DeleteStagingData to be called with a timeout
345+
select {
346+
case <-deleteDone:
347+
// Success - DeleteStagingData was called
348+
case <-time.After(2 * time.Second):
349+
t.Fatal("DeleteStagingData was not called within timeout period")
350+
}
338351
}
339352

340353
func TestHandleGap(t *testing.T) {

0 commit comments

Comments
 (0)