diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 3841f70f5f5..2f1795cd8e0 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -691,7 +691,7 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error { return nil } -func openChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) { +func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) { log := sutil.NewLogger(logger) opts := badgerDB. @@ -722,17 +722,21 @@ func (exeNode *ExecutionNode) LoadExecutionState( error, ) { - chunkDataPackDB, err := openChunkDataPackDB(exeNode.exeConf.chunkDataPackDir, node.Logger) + chunkDataPackDB, err := storagepebble.OpenDefaultPebbleDB(exeNode.exeConf.chunkDataPackDir) if err != nil { - return nil, err + return nil, fmt.Errorf("could not open chunk data pack database: %w", err) } + exeNode.builder.ShutdownFunc(func() error { if err := chunkDataPackDB.Close(); err != nil { return fmt.Errorf("error closing chunk data pack database: %w", err) } return nil }) - chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) + // chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, + // chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) + chunkDataPacks := storagepebble.NewChunkDataPacks(node.Metrics.Cache, + chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) // Needed for gRPC server, make sure to assign to main scoped vars exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB) diff --git a/model/flow/chunk.go b/model/flow/chunk.go index e8aeafea8bf..83eabde4b1e 100644 --- a/model/flow/chunk.go +++ b/model/flow/chunk.go @@ -1,9 +1,11 @@ package flow import ( + "fmt" "log" "github.com/ipfs/go-cid" + "github.com/vmihailenco/msgpack/v4" ) var EmptyEventCollectionID Identifier @@ -203,3 +205,65 @@ type BlockExecutionDataRoot struct { // associated with this block. ChunkExecutionDataIDs []cid.Cid } + +// MarshalMsgpack implements the msgpack.Marshaler interface +func (b BlockExecutionDataRoot) MarshalMsgpack() ([]byte, error) { + return msgpack.Marshal(struct { + BlockID Identifier + ChunkExecutionDataIDs []string + }{ + BlockID: b.BlockID, + ChunkExecutionDataIDs: cidsToStrings(b.ChunkExecutionDataIDs), + }) +} + +// UnmarshalMsgpack implements the msgpack.Unmarshaler interface +func (b *BlockExecutionDataRoot) UnmarshalMsgpack(data []byte) error { + var temp struct { + BlockID Identifier + ChunkExecutionDataIDs []string + } + + if err := msgpack.Unmarshal(data, &temp); err != nil { + return err + } + + b.BlockID = temp.BlockID + cids, err := stringsToCids(temp.ChunkExecutionDataIDs) + + if err != nil { + return fmt.Errorf("failed to decode chunk execution data ids: %w", err) + } + + b.ChunkExecutionDataIDs = cids + + return nil +} + +// Helper function to convert a slice of cid.Cid to a slice of strings +func cidsToStrings(cids []cid.Cid) []string { + if cids == nil { + return nil + } + strs := make([]string, len(cids)) + for i, c := range cids { + strs[i] = c.String() + } + return strs +} + +// Helper function to convert a slice of strings to a slice of cid.Cid +func stringsToCids(strs []string) ([]cid.Cid, error) { + if strs == nil { + return nil, nil + } + cids := make([]cid.Cid, len(strs)) + for i, s := range strs { + c, err := cid.Decode(s) + if err != nil { + return nil, fmt.Errorf("failed to decode cid %v: %w", s, err) + } + cids[i] = c + } + return cids, nil +} diff --git a/storage/badger/chunkDataPacks.go b/storage/badger/chunkDataPacks.go index 63865a574ce..05f42cf7856 100644 --- a/storage/badger/chunkDataPacks.go +++ b/storage/badger/chunkDataPacks.go @@ -9,7 +9,6 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" - badgermodel "github.com/onflow/flow-go/storage/badger/model" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/transaction" ) @@ -17,25 +16,25 @@ import ( type ChunkDataPacks struct { db *badger.DB collections storage.Collections - byChunkIDCache *Cache[flow.Identifier, *badgermodel.StoredChunkDataPack] + byChunkIDCache *Cache[flow.Identifier, *storage.StoredChunkDataPack] } func NewChunkDataPacks(collector module.CacheMetrics, db *badger.DB, collections storage.Collections, byChunkIDCacheSize uint) *ChunkDataPacks { - store := func(key flow.Identifier, val *badgermodel.StoredChunkDataPack) func(*transaction.Tx) error { + store := func(key flow.Identifier, val *storage.StoredChunkDataPack) func(*transaction.Tx) error { return transaction.WithTx(operation.SkipDuplicates(operation.InsertChunkDataPack(val))) } - retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) { - return func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) { - var c badgermodel.StoredChunkDataPack + retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { + return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { + var c storage.StoredChunkDataPack err := operation.RetrieveChunkDataPack(key, &c)(tx) return &c, err } } cache := newCache(collector, metrics.ResourceChunkDataPack, - withLimit[flow.Identifier, *badgermodel.StoredChunkDataPack](byChunkIDCacheSize), + withLimit[flow.Identifier, *storage.StoredChunkDataPack](byChunkIDCacheSize), withStore(store), withRetrieve(retrieve), ) @@ -71,7 +70,7 @@ func (ch *ChunkDataPacks) Remove(chunkIDs []flow.Identifier) error { // No errors are expected during normal operation, but it may return generic error // if entity is not serializable or Badger unexpectedly fails to process request func (ch *ChunkDataPacks) BatchStore(c *flow.ChunkDataPack, batch storage.BatchStorage) error { - sc := toStoredChunkDataPack(c) + sc := storage.ToStoredChunkDataPack(c) writeBatch := batch.GetWriter() batch.OnSucceed(func() { ch.byChunkIDCache.Insert(sc.ChunkID, sc) @@ -133,7 +132,7 @@ func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPac return chdp, nil } -func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*badgermodel.StoredChunkDataPack, error) { +func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*storage.StoredChunkDataPack, error) { tx := ch.db.NewTransaction(false) defer tx.Discard() @@ -145,8 +144,8 @@ func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*badgermodel.Store return schdp, nil } -func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn) (*badgermodel.StoredChunkDataPack, error) { - return func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) { +func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn) (*storage.StoredChunkDataPack, error) { + return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) { val, err := ch.byChunkIDCache.Get(chunkID)(tx) if err != nil { return nil, err @@ -154,22 +153,3 @@ func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn return val, nil } } - -func toStoredChunkDataPack(c *flow.ChunkDataPack) *badgermodel.StoredChunkDataPack { - sc := &badgermodel.StoredChunkDataPack{ - ChunkID: c.ChunkID, - StartState: c.StartState, - Proof: c.Proof, - SystemChunk: false, - ExecutionDataRoot: c.ExecutionDataRoot, - } - - if c.Collection != nil { - // non system chunk - sc.CollectionID = c.Collection.ID() - } else { - sc.SystemChunk = true - } - - return sc -} diff --git a/storage/badger/model/storedChunkDataPack.go b/storage/badger/model/storedChunkDataPack.go deleted file mode 100644 index 31349604070..00000000000 --- a/storage/badger/model/storedChunkDataPack.go +++ /dev/null @@ -1,17 +0,0 @@ -package badgermodel - -import ( - "github.com/onflow/flow-go/model/flow" -) - -// StoredChunkDataPack is an in-storage representation of chunk data pack. -// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining -// the collection on a secondary storage. -type StoredChunkDataPack struct { - ChunkID flow.Identifier - StartState flow.StateCommitment - Proof flow.StorageProof - CollectionID flow.Identifier - SystemChunk bool - ExecutionDataRoot flow.BlockExecutionDataRoot -} diff --git a/storage/badger/operation/chunkDataPacks.go b/storage/badger/operation/chunkDataPacks.go index 687712985d4..e0f2deb2ce2 100644 --- a/storage/badger/operation/chunkDataPacks.go +++ b/storage/badger/operation/chunkDataPacks.go @@ -4,16 +4,16 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/onflow/flow-go/model/flow" - badgermodel "github.com/onflow/flow-go/storage/badger/model" + "github.com/onflow/flow-go/storage" ) // InsertChunkDataPack inserts a chunk data pack keyed by chunk ID. -func InsertChunkDataPack(c *badgermodel.StoredChunkDataPack) func(*badger.Txn) error { +func InsertChunkDataPack(c *storage.StoredChunkDataPack) func(*badger.Txn) error { return insert(makePrefix(codeChunkDataPack, c.ChunkID), c) } // BatchInsertChunkDataPack inserts a chunk data pack keyed by chunk ID into a batch -func BatchInsertChunkDataPack(c *badgermodel.StoredChunkDataPack) func(batch *badger.WriteBatch) error { +func BatchInsertChunkDataPack(c *storage.StoredChunkDataPack) func(batch *badger.WriteBatch) error { return batchWrite(makePrefix(codeChunkDataPack, c.ChunkID), c) } @@ -25,7 +25,7 @@ func BatchRemoveChunkDataPack(chunkID flow.Identifier) func(batch *badger.WriteB } // RetrieveChunkDataPack retrieves a chunk data pack by chunk ID. -func RetrieveChunkDataPack(chunkID flow.Identifier, c *badgermodel.StoredChunkDataPack) func(*badger.Txn) error { +func RetrieveChunkDataPack(chunkID flow.Identifier, c *storage.StoredChunkDataPack) func(*badger.Txn) error { return retrieve(makePrefix(codeChunkDataPack, chunkID), c) } diff --git a/storage/badger/operation/chunkDataPacks_test.go b/storage/badger/operation/chunkDataPacks_test.go index 0dc79ef7266..f3a90af8d00 100644 --- a/storage/badger/operation/chunkDataPacks_test.go +++ b/storage/badger/operation/chunkDataPacks_test.go @@ -7,14 +7,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - storagemodel "github.com/onflow/flow-go/storage/badger/model" + "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/utils/unittest" ) func TestChunkDataPack(t *testing.T) { unittest.RunWithBadgerDB(t, func(db *badger.DB) { collectionID := unittest.IdentifierFixture() - expected := &storagemodel.StoredChunkDataPack{ + expected := &storage.StoredChunkDataPack{ ChunkID: unittest.IdentifierFixture(), StartState: unittest.StateCommitmentFixture(), Proof: []byte{'p'}, @@ -22,7 +22,7 @@ func TestChunkDataPack(t *testing.T) { } t.Run("Retrieve non-existent", func(t *testing.T) { - var actual storagemodel.StoredChunkDataPack + var actual storage.StoredChunkDataPack err := db.View(RetrieveChunkDataPack(expected.ChunkID, &actual)) assert.Error(t, err) }) @@ -31,7 +31,7 @@ func TestChunkDataPack(t *testing.T) { err := db.Update(InsertChunkDataPack(expected)) require.NoError(t, err) - var actual storagemodel.StoredChunkDataPack + var actual storage.StoredChunkDataPack err = db.View(RetrieveChunkDataPack(expected.ChunkID, &actual)) assert.NoError(t, err) @@ -42,7 +42,7 @@ func TestChunkDataPack(t *testing.T) { err := db.Update(RemoveChunkDataPack(expected.ChunkID)) require.NoError(t, err) - var actual storagemodel.StoredChunkDataPack + var actual storage.StoredChunkDataPack err = db.View(RetrieveChunkDataPack(expected.ChunkID, &actual)) assert.Error(t, err) }) diff --git a/storage/chunkDataPacks.go b/storage/chunkDataPacks.go index d9f49735d90..60d4fe2375d 100644 --- a/storage/chunkDataPacks.go +++ b/storage/chunkDataPacks.go @@ -15,9 +15,6 @@ type ChunkDataPacks interface { // No errors are expected during normal operation, but it may return generic error Remove(cs []flow.Identifier) error - // BatchStore inserts the chunk header, keyed by chunk ID into a given batch - BatchStore(c *flow.ChunkDataPack, batch BatchStorage) error - // ByChunkID returns the chunk data for the given a chunk ID. ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) @@ -26,3 +23,34 @@ type ChunkDataPacks interface { // If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned. BatchRemove(chunkID flow.Identifier, batch BatchStorage) error } + +// StoredChunkDataPack is an in-storage representation of chunk data pack. +// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining +// the collection on a secondary storage. +type StoredChunkDataPack struct { + ChunkID flow.Identifier + StartState flow.StateCommitment + Proof flow.StorageProof + CollectionID flow.Identifier + SystemChunk bool + ExecutionDataRoot flow.BlockExecutionDataRoot +} + +func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack { + sc := &StoredChunkDataPack{ + ChunkID: c.ChunkID, + StartState: c.StartState, + Proof: c.Proof, + SystemChunk: false, + ExecutionDataRoot: c.ExecutionDataRoot, + } + + if c.Collection != nil { + // non system chunk + sc.CollectionID = c.Collection.ID() + } else { + sc.SystemChunk = true + } + + return sc +} diff --git a/storage/mock/chunk_data_packs.go b/storage/mock/chunk_data_packs.go index 3fbacab10d8..2db2f6dbe63 100644 --- a/storage/mock/chunk_data_packs.go +++ b/storage/mock/chunk_data_packs.go @@ -32,24 +32,6 @@ func (_m *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.Bat return r0 } -// BatchStore provides a mock function with given fields: c, batch -func (_m *ChunkDataPacks) BatchStore(c *flow.ChunkDataPack, batch storage.BatchStorage) error { - ret := _m.Called(c, batch) - - if len(ret) == 0 { - panic("no return value specified for BatchStore") - } - - var r0 error - if rf, ok := ret.Get(0).(func(*flow.ChunkDataPack, storage.BatchStorage) error); ok { - r0 = rf(c, batch) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // ByChunkID provides a mock function with given fields: chunkID func (_m *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { ret := _m.Called(chunkID) diff --git a/storage/pebble/batch.go b/storage/pebble/batch.go new file mode 100644 index 00000000000..9a45e55bc02 --- /dev/null +++ b/storage/pebble/batch.go @@ -0,0 +1,58 @@ +package pebble + +import ( + "sync" + + "github.com/cockroachdb/pebble" +) + +type Batch struct { + writer *pebble.Batch + + lock sync.RWMutex + callbacks []func() +} + +func NewBatch(db *pebble.DB) *Batch { + batch := db.NewBatch() + return &Batch{ + writer: batch, + callbacks: make([]func(), 0), + } +} + +func (b *Batch) GetWriter() *pebble.Batch { + return b.writer +} + +// OnSucceed adds a callback to execute after the batch has +// been successfully flushed. +// useful for implementing the cache where we will only cache +// after the batch has been successfully flushed +func (b *Batch) OnSucceed(callback func()) { + b.lock.Lock() + defer b.lock.Unlock() + b.callbacks = append(b.callbacks, callback) +} + +// Flush will call the badger Batch's Flush method, in +// addition, it will call the callbacks added by +// OnSucceed +// any error are exceptions +func (b *Batch) Flush() error { + err := b.writer.Commit(nil) + if err != nil { + return err + } + + b.lock.RLock() + defer b.lock.RUnlock() + for _, callback := range b.callbacks { + callback() + } + return nil +} + +func (b *Batch) Close() error { + return b.writer.Close() +} diff --git a/storage/pebble/chunk_data_packs.go b/storage/pebble/chunk_data_packs.go new file mode 100644 index 00000000000..c0b5b47eeab --- /dev/null +++ b/storage/pebble/chunk_data_packs.go @@ -0,0 +1,144 @@ +package pebble + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/rs/zerolog/log" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/pebble/operation" +) + +type ChunkDataPacks struct { + db *pebble.DB + collections storage.Collections + byChunkIDCache *Cache[flow.Identifier, *storage.StoredChunkDataPack] +} + +var _ storage.ChunkDataPacks = (*ChunkDataPacks)(nil) + +func NewChunkDataPacks(collector module.CacheMetrics, db *pebble.DB, collections storage.Collections, byChunkIDCacheSize uint) *ChunkDataPacks { + + retrieve := func(key flow.Identifier) func(pebble.Reader) (*storage.StoredChunkDataPack, error) { + return func(r pebble.Reader) (*storage.StoredChunkDataPack, error) { + var c storage.StoredChunkDataPack + err := operation.RetrieveChunkDataPack(key, &c)(r) + return &c, err + } + } + + cache := newCache(collector, metrics.ResourceChunkDataPack, + withLimit[flow.Identifier, *storage.StoredChunkDataPack](byChunkIDCacheSize), + withRetrieve(retrieve), + ) + + return &ChunkDataPacks{ + db: db, + collections: collections, + byChunkIDCache: cache, + } +} + +// Store stores the given chunk data pack lists, it stores them atomically. +// Any error are exceptions +func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) error { + batch := NewBatch(ch.db) + defer func() { + err := batch.Close() + if err != nil { + log.Error().Err(err).Msgf("failed to close batch when storing chunk data pack") + } + }() + + for _, c := range cs { + err := ch.batchStore(c, batch) + if err != nil { + return fmt.Errorf("cannot store chunk data pack: %w", err) + } + } + + err := batch.Flush() + if err != nil { + return fmt.Errorf("cannot commit batch: %w", err) + } + + return nil +} + +// Remove removes chunk data packs by IDs, it removes them atomically. +// Any errors are exceptions +func (ch *ChunkDataPacks) Remove(cs []flow.Identifier) error { + batch := ch.db.NewBatch() + + for _, c := range cs { + err := ch.batchRemove(c, batch) + if err != nil { + return fmt.Errorf("cannot remove chunk data pack: %w", err) + } + } + + err := batch.Commit(pebble.Sync) + if err != nil { + return fmt.Errorf("cannot commit batch: %w", err) + } + + for _, c := range cs { + ch.byChunkIDCache.Remove(c) + } + + return nil +} + +// ByChunkID finds the chunk data pack by chunk ID. +// it returns storage.ErrNotFound if not found +// other errors are exceptions +func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error) { + var sc storage.StoredChunkDataPack + err := operation.RetrieveChunkDataPack(chunkID, &sc)(ch.db) + if err != nil { + return nil, fmt.Errorf("could not retrieve stored chunk data pack: %w", err) + } + + chdp := &flow.ChunkDataPack{ + ChunkID: sc.ChunkID, + StartState: sc.StartState, + Proof: sc.Proof, + Collection: nil, // to be filled in later + ExecutionDataRoot: sc.ExecutionDataRoot, + } + if !sc.SystemChunk { + collection, err := ch.collections.ByID(sc.CollectionID) + if err != nil { + return nil, fmt.Errorf("could not retrive collection (id: %x) for stored chunk data pack: %w", sc.CollectionID, err) + } + + chdp.Collection = collection + } + return chdp, nil +} + +// BatchRemove is not used in pebble implementation +func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, batch storage.BatchStorage) error { + return fmt.Errorf("not implemented") +} + +func (ch *ChunkDataPacks) batchRemove(chunkID flow.Identifier, batch pebble.Writer) error { + return operation.RemoveChunkDataPack(chunkID)(batch) +} + +func (ch *ChunkDataPacks) batchStore(c *flow.ChunkDataPack, batch *Batch) error { + sc := storage.ToStoredChunkDataPack(c) + writer := batch.GetWriter() + batch.OnSucceed(func() { + ch.byChunkIDCache.Insert(sc.ChunkID, sc) + }) + err := operation.InsertChunkDataPack(sc)(writer) + if err != nil { + return fmt.Errorf("failed to store chunk data pack: %w", err) + } + return nil +} diff --git a/storage/pebble/chunk_data_packs_test.go b/storage/pebble/chunk_data_packs_test.go new file mode 100644 index 00000000000..f170b22114c --- /dev/null +++ b/storage/pebble/chunk_data_packs_test.go @@ -0,0 +1,116 @@ +package pebble + +import ( + "errors" + "path/filepath" + "testing" + + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v2" + "github.com/stretchr/testify/require" + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/storage" + badgerstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestMsgPacks(t *testing.T) { + chunkDataPacks := unittest.ChunkDataPacksFixture(10) + for _, chunkDataPack := range chunkDataPacks { + sc := storage.ToStoredChunkDataPack(chunkDataPack) + value, err := msgpack.Marshal(sc) + require.NoError(t, err) + + var actual storage.StoredChunkDataPack + err = msgpack.Unmarshal(value, &actual) + require.NoError(t, err) + + require.Equal(t, *sc, actual) + } +} + +// TestChunkDataPacks_Store evaluates correct storage and retrieval of chunk data packs in the storage. +// It also evaluates that re-inserting is idempotent. +func TestChunkDataPacks_Store(t *testing.T) { + WithChunkDataPacks(t, 100, func(t *testing.T, chunkDataPacks []*flow.ChunkDataPack, chunkDataPackStore storage.ChunkDataPacks, _ *pebble.DB) { + // can store + require.NoError(t, chunkDataPackStore.Store(chunkDataPacks)) + + // can read back + for _, c := range chunkDataPacks { + c2, err := chunkDataPackStore.ByChunkID(c.ChunkID) + require.NoError(t, err) + require.Equal(t, c, c2) + } + + // can store again + require.NoError(t, chunkDataPackStore.Store(chunkDataPacks)) + + cids := make([]flow.Identifier, 0, len(chunkDataPacks)) + for i, c := range chunkDataPacks { + // remove everything except the first one + if i > 0 { + cids = append(cids, c.ChunkID) + } + } + // can remove + require.NoError(t, chunkDataPackStore.Remove(cids)) + for i, c := range chunkDataPacks { + if i == 0 { + // the first one is not removed + _, err := chunkDataPackStore.ByChunkID(c.ChunkID) + require.NoError(t, err) + continue + } + // the rest are removed + _, err := chunkDataPackStore.ByChunkID(c.ChunkID) + require.True(t, errors.Is(err, storage.ErrNotFound)) + } + + // can remove again + require.NoError(t, chunkDataPackStore.Remove(cids)) + }) +} + +// WithChunkDataPacks is a test helper that generates specified number of chunk data packs, store them using the storeFunc, and +// then evaluates whether they are successfully retrieved from storage. +func WithChunkDataPacks(t *testing.T, chunks int, storeFunc func(*testing.T, []*flow.ChunkDataPack, storage.ChunkDataPacks, *pebble.DB)) { + RunWithBadgerDBAndPebbleDB(t, func(badgerDB *badger.DB, db *pebble.DB) { + transactions := badgerstorage.NewTransactions(&metrics.NoopCollector{}, badgerDB) + collections := badgerstorage.NewCollections(badgerDB, transactions) + // keep the cache size at 1 to make sure that entries are written and read from storage itself. + store := NewChunkDataPacks(&metrics.NoopCollector{}, db, collections, 1) + + chunkDataPacks := unittest.ChunkDataPacksFixture(chunks) + for _, chunkDataPack := range chunkDataPacks { + // stores collection in Collections storage (which ChunkDataPacks store uses internally) + err := collections.Store(chunkDataPack.Collection) + require.NoError(t, err) + } + + // stores chunk data packs in the memory using provided store function. + storeFunc(t, chunkDataPacks, store, db) + }) +} + +func RunWithBadgerDBAndPebbleDB(t *testing.T, fn func(*badger.DB, *pebble.DB)) { + unittest.RunWithTempDir(t, func(dir string) { + badgerDB := unittest.BadgerDB(t, filepath.Join(dir, "badger")) + defer func() { + require.NoError(t, badgerDB.Close()) + }() + + cache := pebble.NewCache(1 << 20) + defer cache.Unref() + // currently pebble is only used for registers + opts := DefaultPebbleOptions(cache, pebble.DefaultComparer) + pebbledb, err := pebble.Open(filepath.Join(dir, "pebble"), opts) + require.NoError(t, err) + defer pebbledb.Close() + + fn(badgerDB, pebbledb) + }) +} diff --git a/storage/pebble/open.go b/storage/pebble/open.go index a0d7ea6c0d5..80f328ce87a 100644 --- a/storage/pebble/open.go +++ b/storage/pebble/open.go @@ -12,6 +12,8 @@ import ( "github.com/onflow/flow-go/storage/pebble/registers" ) +const DefaultPebbleCacheSize = 1 << 20 + // NewBootstrappedRegistersWithPath initializes a new Registers instance with a pebble db // if the database is not initialized, it close the database and return storage.ErrNotBootstrapped func NewBootstrappedRegistersWithPath(dir string) (*Registers, *pebble.DB, error) { @@ -34,8 +36,12 @@ func NewBootstrappedRegistersWithPath(dir string) (*Registers, *pebble.DB, error } // OpenRegisterPebbleDB opens the database +// The difference between OpenDefaultPebbleDB is that it uses +// a customized comparer (NewMVCCComparer) which is needed to +// implement finding register values at any given height using +// pebble's SeekPrefixGE function func OpenRegisterPebbleDB(dir string) (*pebble.DB, error) { - cache := pebble.NewCache(1 << 20) + cache := pebble.NewCache(DefaultPebbleCacheSize) defer cache.Unref() // currently pebble is only used for registers opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) @@ -47,6 +53,20 @@ func OpenRegisterPebbleDB(dir string) (*pebble.DB, error) { return db, nil } +// OpenDefaultPebbleDB opens a pebble database using default options, +// such as cache size and comparer +func OpenDefaultPebbleDB(dir string) (*pebble.DB, error) { + cache := pebble.NewCache(DefaultPebbleCacheSize) + defer cache.Unref() + opts := DefaultPebbleOptions(cache, pebble.DefaultComparer) + db, err := pebble.Open(dir, opts) + if err != nil { + return nil, fmt.Errorf("failed to open db: %w", err) + } + + return db, nil +} + // ReadHeightsFromBootstrappedDB reads the first and latest height from a bootstrapped register db // If the register db is not bootstrapped, it returns storage.ErrNotBootstrapped // If the register db is corrupted, it returns an error diff --git a/storage/pebble/operation/chunk_data_pack.go b/storage/pebble/operation/chunk_data_pack.go new file mode 100644 index 00000000000..f5cec13cdbe --- /dev/null +++ b/storage/pebble/operation/chunk_data_pack.go @@ -0,0 +1,35 @@ +package operation + +import ( + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" +) + +// InsertChunkDataPack inserts a chunk data pack keyed by chunk ID. +// any error are exceptions +func InsertChunkDataPack(sc *storage.StoredChunkDataPack) func(w pebble.Writer) error { + key := makeKey(codeChunkDataPack, sc.ChunkID) + return insert(key, sc) +} + +// RetrieveChunkDataPack retrieves a chunk data pack by chunk ID. +// it returns storage.ErrNotFound if the chunk data pack is not found +func RetrieveChunkDataPack(chunkID flow.Identifier, sc *storage.StoredChunkDataPack) func(r pebble.Reader) error { + key := makeKey(codeChunkDataPack, chunkID) + return retrieve(key, sc) +} + +// RemoveChunkDataPack removes the chunk data pack with the given chunk ID. +// any error are exceptions +func RemoveChunkDataPack(chunkID flow.Identifier) func(w pebble.Writer) error { + key := makeKey(codeChunkDataPack, chunkID) + return func(w pebble.Writer) error { + return w.Delete(key, nil) + } +} + +func makeKey(prefix byte, identifier flow.Identifier) []byte { + return append([]byte{prefix}, identifier[:]...) +} diff --git a/storage/pebble/operation/codes.go b/storage/pebble/operation/codes.go new file mode 100644 index 00000000000..1d9057646c3 --- /dev/null +++ b/storage/pebble/operation/codes.go @@ -0,0 +1,5 @@ +package operation + +const ( + codeChunkDataPack = 100 +) diff --git a/storage/pebble/operation/common.go b/storage/pebble/operation/common.go new file mode 100644 index 00000000000..ad9e96c2c8b --- /dev/null +++ b/storage/pebble/operation/common.go @@ -0,0 +1,50 @@ +package operation + +import ( + "errors" + + "github.com/cockroachdb/pebble" + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +func insert(key []byte, val interface{}) func(pebble.Writer) error { + return func(w pebble.Writer) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + + err = w.Set(key, value, nil) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + +func retrieve(key []byte, sc interface{}) func(r pebble.Reader) error { + return func(r pebble.Reader) error { + val, closer, err := r.Get(key) + if err != nil { + return convertNotFoundError(err) + } + defer closer.Close() + + err = msgpack.Unmarshal(val, &sc) + if err != nil { + return irrecoverable.NewExceptionf("failed to decode value: %w", err) + } + return nil + } +} + +func convertNotFoundError(err error) error { + if errors.Is(err, pebble.ErrNotFound) { + return storage.ErrNotFound + } + return err +} diff --git a/storage/pebble/value_cache.go b/storage/pebble/value_cache.go new file mode 100644 index 00000000000..38f1f394910 --- /dev/null +++ b/storage/pebble/value_cache.go @@ -0,0 +1,152 @@ +package pebble + +import ( + "errors" + "fmt" + + "github.com/cockroachdb/pebble" + lru "github.com/hashicorp/golang-lru/v2" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +func withLimit[K comparable, V any](limit uint) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.limit = limit + } +} + +type storeFunc[K comparable, V any] func(key K, val V) func(pebble.Writer) error + +// func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) { +// return func(c *Cache[K, V]) { +// c.store = store +// } +// } +func noStore[K comparable, V any](_ K, _ V) func(pebble.Writer) error { + return func(pebble.Writer) error { + return fmt.Errorf("no store function for cache put available") + } +} + +// func noopStore[K comparable, V any](_ K, _ V) func(pebble.Reader) error { +// return func(pebble.Reader) error { +// return nil +// } +// } +type retrieveFunc[K comparable, V any] func(key K) func(pebble.Reader) (V, error) + +func withRetrieve[K comparable, V any](retrieve retrieveFunc[K, V]) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.retrieve = retrieve + } +} + +func noRetrieve[K comparable, V any](_ K) func(pebble.Reader) (V, error) { + return func(pebble.Reader) (V, error) { + var nullV V + return nullV, fmt.Errorf("no retrieve function for cache get available") + } +} + +// Cache is a read-through cache for underlying storage layer. +// Note: when a resource is not found in the cache nor the underlying storage, then +// it will not be cached. In other words, finding the missing item again will +// query the underlying storage again. +type Cache[K comparable, V any] struct { + metrics module.CacheMetrics + limit uint + store storeFunc[K, V] + retrieve retrieveFunc[K, V] + resource string + cache *lru.Cache[K, V] +} + +func newCache[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*Cache[K, V])) *Cache[K, V] { + c := Cache[K, V]{ + metrics: collector, + limit: 1000, + store: noStore[K, V], + retrieve: noRetrieve[K, V], + resource: resourceName, + } + for _, option := range options { + option(&c) + } + c.cache, _ = lru.New[K, V](int(c.limit)) + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + return &c +} + +// IsCached returns true if the key exists in the cache. +// It DOES NOT check whether the key exists in the underlying data store. +func (c *Cache[K, V]) IsCached(key K) bool { + return c.cache.Contains(key) +} + +// Get will try to retrieve the resource from cache first, and then from the +// injected. During normal operations, the following error returns are expected: +// - `storage.ErrNotFound` if key is unknown. +func (c *Cache[K, V]) Get(key K) func(pebble.Reader) (V, error) { + return func(r pebble.Reader) (V, error) { + + // check if we have it in the cache + resource, cached := c.cache.Get(key) + if cached { + c.metrics.CacheHit(c.resource) + return resource, nil + } + + // get it from the database + resource, err := c.retrieve(key)(r) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + c.metrics.CacheNotFound(c.resource) + } + var nullV V + return nullV, fmt.Errorf("could not retrieve resource: %w", err) + } + + c.metrics.CacheMiss(c.resource) + + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } + + return resource, nil + } +} + +func (c *Cache[K, V]) Remove(key K) { + c.cache.Remove(key) +} + +// Insert will add a resource directly to the cache with the given ID +// assuming the resource has been added to storage already. +func (c *Cache[K, V]) Insert(key K, resource V) { + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } +} + +// PutTx will return tx which adds a resource to the cache with the given ID. +func (c *Cache[K, V]) PutTx(key K, resource V) func(pebble.Writer) error { + storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution) + + return func(w pebble.Writer) error { + // the storeOps must be sync operation + err := storeOps(w) // execute operations to store resource + if err != nil { + return fmt.Errorf("could not store resource: %w", err) + } + + c.Insert(key, resource) + + return nil + } +}