Skip to content

Commit

Permalink
Merge branch 'AndriiDiachuk/6819-make-response-for-data-providers-con…
Browse files Browse the repository at this point in the history
…sistent' of github.com:The-K-R-O-K/flow-go into AndriiDiachuk/6819-make-response-for-data-providers-consistent
  • Loading branch information
AndriiDiachuk committed Jan 22, 2025
2 parents cc5412a + 1cb77a3 commit 0de77c4
Show file tree
Hide file tree
Showing 21 changed files with 654 additions and 959 deletions.
8 changes: 4 additions & 4 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ import (
storageerr "github.com/onflow/flow-go/storage"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/procedure"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
storagepebble "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
sutil "github.com/onflow/flow-go/storage/util"
)

Expand Down Expand Up @@ -734,10 +736,8 @@ func (exeNode *ExecutionNode) LoadExecutionState(
}
return nil
})
// chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache,
// chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
chunkDataPacks := storagepebble.NewChunkDataPacks(node.Metrics.Cache,
chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
chunkDataPacks := store.NewChunkDataPacks(node.Metrics.Cache,
pebbleimpl.ToDB(chunkDataPackDB), node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)

// Needed for gRPC server, make sure to assign to main scoped vars
exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ import (
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
storagepebble "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
)

var (
flagHeight uint64
flagDataDir string
flagHeight uint64
flagDataDir string
flagChunkDataPackDir string
)

var Cmd = &cobra.Command{
Expand All @@ -36,11 +40,16 @@ func init() {
Cmd.Flags().StringVar(&flagDataDir, "datadir", "",
"directory that stores the protocol state")
_ = Cmd.MarkFlagRequired("datadir")

Cmd.Flags().StringVar(&flagChunkDataPackDir, "chunk_data_pack_dir", "/var/flow/data/chunk_data_pack",
"directory that stores the protocol state")
_ = Cmd.MarkFlagRequired("chunk_data_pack_dir")
}

func run(*cobra.Command, []string) {
log.Info().
Str("datadir", flagDataDir).
Str("chunk_data_pack_dir", flagChunkDataPackDir).
Uint64("height", flagHeight).
Msg("flags")

Expand All @@ -60,18 +69,28 @@ func run(*cobra.Command, []string) {
metrics := &metrics.NoopCollector{}
transactionResults := badger.NewTransactionResults(metrics, db, badger.DefaultCacheSize)
commits := badger.NewCommits(metrics, db)
chunkDataPacks := badger.NewChunkDataPacks(metrics, db, badger.NewCollections(db, badger.NewTransactions(metrics, db)), badger.DefaultCacheSize)
collections := badger.NewCollections(db, badger.NewTransactions(metrics, db))
results := badger.NewExecutionResults(metrics, db)
receipts := badger.NewExecutionReceipts(metrics, db, results, badger.DefaultCacheSize)
myReceipts := badger.NewMyExecutionReceipts(metrics, db, receipts)
headers := badger.NewHeaders(metrics, db)
events := badger.NewEvents(metrics, db)
serviceEvents := badger.NewServiceEvents(metrics, db)

// require the chunk data pack data must exist before returning the storage module
chunkDataPacksPebbleDB, err := storagepebble.MustOpenDefaultPebbleDB(flagChunkDataPackDir)
if err != nil {
log.Fatal().Err(err).Msgf("could not open chunk data pack DB at %v", flagChunkDataPackDir)
}
chunkDataPacksDB := pebbleimpl.ToDB(chunkDataPacksPebbleDB)
chunkDataPacks := store.NewChunkDataPacks(metrics, chunkDataPacksDB, collections, 1000)

writeBatch := badger.NewBatch(db)
chunkBatch := chunkDataPacksDB.NewBatch()

err = removeExecutionResultsFromHeight(
writeBatch,
chunkBatch,
state,
headers,
transactionResults,
Expand All @@ -86,6 +105,13 @@ func run(*cobra.Command, []string) {
if err != nil {
log.Fatal().Err(err).Msgf("could not remove result from height %v", flagHeight)
}

// remove chunk data packs first, because otherwise the index to find chunk data pack will be removed.
err = chunkBatch.Commit()
if err != nil {
log.Fatal().Err(err).Msgf("could not commit chunk batch at %v", flagHeight)
}

err = writeBatch.Flush()
if err != nil {
log.Fatal().Err(err).Msgf("could not flush write batch at %v", flagHeight)
Expand All @@ -109,11 +135,12 @@ func run(*cobra.Command, []string) {
// need to include the Remove methods
func removeExecutionResultsFromHeight(
writeBatch *badger.Batch,
chunkBatch storage.Batch,
protoState protocol.State,
headers *badger.Headers,
transactionResults *badger.TransactionResults,
commits *badger.Commits,
chunkDataPacks *badger.ChunkDataPacks,
chunkDataPacks storage.ChunkDataPacks,
results *badger.ExecutionResults,
myReceipts *badger.MyExecutionReceipts,
events *badger.Events,
Expand Down Expand Up @@ -148,7 +175,7 @@ func removeExecutionResultsFromHeight(

blockID := head.ID()

err = removeForBlockID(writeBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID)
err = removeForBlockID(writeBatch, chunkBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID)
if err != nil {
return fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err)
}
Expand All @@ -167,7 +194,7 @@ func removeExecutionResultsFromHeight(
total = len(pendings)

for _, pending := range pendings {
err = removeForBlockID(writeBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending)
err = removeForBlockID(writeBatch, chunkBatch, headers, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending)

if err != nil {
return fmt.Errorf("could not remove result for pending block %v: %w", pending, err)
Expand All @@ -188,11 +215,12 @@ func removeExecutionResultsFromHeight(
// It bubbles up any error encountered
func removeForBlockID(
writeBatch *badger.Batch,
chunkBatch storage.Batch,
headers *badger.Headers,
commits *badger.Commits,
transactionResults *badger.TransactionResults,
results *badger.ExecutionResults,
chunks *badger.ChunkDataPacks,
chunks storage.ChunkDataPacks,
myReceipts *badger.MyExecutionReceipts,
events *badger.Events,
serviceEvents *badger.ServiceEvents,
Expand All @@ -211,12 +239,7 @@ func removeForBlockID(
for _, chunk := range result.Chunks {
chunkID := chunk.ID()
// remove chunk data pack
err := chunks.BatchRemove(chunkID, writeBatch)
if errors.Is(err, storage.ErrNotFound) {
log.Warn().Msgf("chunk %v not found for block %v", chunkID, blockID)
continue
}

err := chunks.BatchRemove(chunkID, chunkBatch)
if err != nil {
return fmt.Errorf("could not remove chunk id %v for block id %v: %w", chunkID, blockID, err)
}
Expand Down
Loading

0 comments on commit 0de77c4

Please sign in to comment.