Skip to content

Commit

Permalink
Merge pull request #6042 from onflow/leo/pebble-chunk-data-packs
Browse files Browse the repository at this point in the history
Pebble based chunk data packs storage
  • Loading branch information
zhangchiqing authored Jun 12, 2024
2 parents 7a6d8a7 + 22daffd commit ab907b9
Show file tree
Hide file tree
Showing 16 changed files with 703 additions and 82 deletions.
12 changes: 8 additions & 4 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error {
return nil
}

func openChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) {
func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) {
log := sutil.NewLogger(logger)

opts := badgerDB.
Expand Down Expand Up @@ -722,17 +722,21 @@ func (exeNode *ExecutionNode) LoadExecutionState(
error,
) {

chunkDataPackDB, err := openChunkDataPackDB(exeNode.exeConf.chunkDataPackDir, node.Logger)
chunkDataPackDB, err := storagepebble.OpenDefaultPebbleDB(exeNode.exeConf.chunkDataPackDir)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not open chunk data pack database: %w", err)
}

exeNode.builder.ShutdownFunc(func() error {
if err := chunkDataPackDB.Close(); err != nil {
return fmt.Errorf("error closing chunk data pack database: %w", err)
}
return nil
})
chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache, chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
// chunkDataPacks := storage.NewChunkDataPacks(node.Metrics.Cache,
// chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)
chunkDataPacks := storagepebble.NewChunkDataPacks(node.Metrics.Cache,
chunkDataPackDB, node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)

// Needed for gRPC server, make sure to assign to main scoped vars
exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB)
Expand Down
64 changes: 64 additions & 0 deletions model/flow/chunk.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package flow

import (
"fmt"
"log"

"github.com/ipfs/go-cid"
"github.com/vmihailenco/msgpack/v4"
)

var EmptyEventCollectionID Identifier
Expand Down Expand Up @@ -203,3 +205,65 @@ type BlockExecutionDataRoot struct {
// associated with this block.
ChunkExecutionDataIDs []cid.Cid
}

// MarshalMsgpack implements the msgpack.Marshaler interface
func (b BlockExecutionDataRoot) MarshalMsgpack() ([]byte, error) {
return msgpack.Marshal(struct {
BlockID Identifier
ChunkExecutionDataIDs []string
}{
BlockID: b.BlockID,
ChunkExecutionDataIDs: cidsToStrings(b.ChunkExecutionDataIDs),
})
}

// UnmarshalMsgpack implements the msgpack.Unmarshaler interface
func (b *BlockExecutionDataRoot) UnmarshalMsgpack(data []byte) error {
var temp struct {
BlockID Identifier
ChunkExecutionDataIDs []string
}

if err := msgpack.Unmarshal(data, &temp); err != nil {
return err
}

b.BlockID = temp.BlockID
cids, err := stringsToCids(temp.ChunkExecutionDataIDs)

if err != nil {
return fmt.Errorf("failed to decode chunk execution data ids: %w", err)
}

b.ChunkExecutionDataIDs = cids

return nil
}

// Helper function to convert a slice of cid.Cid to a slice of strings
func cidsToStrings(cids []cid.Cid) []string {
if cids == nil {
return nil
}
strs := make([]string, len(cids))
for i, c := range cids {
strs[i] = c.String()
}
return strs
}

// Helper function to convert a slice of strings to a slice of cid.Cid
func stringsToCids(strs []string) ([]cid.Cid, error) {
if strs == nil {
return nil, nil
}
cids := make([]cid.Cid, len(strs))
for i, s := range strs {
c, err := cid.Decode(s)
if err != nil {
return nil, fmt.Errorf("failed to decode cid %v: %w", s, err)
}
cids[i] = c
}
return cids, nil
}
40 changes: 10 additions & 30 deletions storage/badger/chunkDataPacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,32 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
badgermodel "github.com/onflow/flow-go/storage/badger/model"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/badger/transaction"
)

type ChunkDataPacks struct {
db *badger.DB
collections storage.Collections
byChunkIDCache *Cache[flow.Identifier, *badgermodel.StoredChunkDataPack]
byChunkIDCache *Cache[flow.Identifier, *storage.StoredChunkDataPack]
}

func NewChunkDataPacks(collector module.CacheMetrics, db *badger.DB, collections storage.Collections, byChunkIDCacheSize uint) *ChunkDataPacks {

store := func(key flow.Identifier, val *badgermodel.StoredChunkDataPack) func(*transaction.Tx) error {
store := func(key flow.Identifier, val *storage.StoredChunkDataPack) func(*transaction.Tx) error {
return transaction.WithTx(operation.SkipDuplicates(operation.InsertChunkDataPack(val)))
}

retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
return func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
var c badgermodel.StoredChunkDataPack
retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) {
return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) {
var c storage.StoredChunkDataPack
err := operation.RetrieveChunkDataPack(key, &c)(tx)
return &c, err
}
}

cache := newCache(collector, metrics.ResourceChunkDataPack,
withLimit[flow.Identifier, *badgermodel.StoredChunkDataPack](byChunkIDCacheSize),
withLimit[flow.Identifier, *storage.StoredChunkDataPack](byChunkIDCacheSize),
withStore(store),
withRetrieve(retrieve),
)
Expand Down Expand Up @@ -71,7 +70,7 @@ func (ch *ChunkDataPacks) Remove(chunkIDs []flow.Identifier) error {
// No errors are expected during normal operation, but it may return generic error
// if entity is not serializable or Badger unexpectedly fails to process request
func (ch *ChunkDataPacks) BatchStore(c *flow.ChunkDataPack, batch storage.BatchStorage) error {
sc := toStoredChunkDataPack(c)
sc := storage.ToStoredChunkDataPack(c)
writeBatch := batch.GetWriter()
batch.OnSucceed(func() {
ch.byChunkIDCache.Insert(sc.ChunkID, sc)
Expand Down Expand Up @@ -133,7 +132,7 @@ func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPac
return chdp, nil
}

func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*badgermodel.StoredChunkDataPack, error) {
func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*storage.StoredChunkDataPack, error) {
tx := ch.db.NewTransaction(false)
defer tx.Discard()

Expand All @@ -145,31 +144,12 @@ func (ch *ChunkDataPacks) byChunkID(chunkID flow.Identifier) (*badgermodel.Store
return schdp, nil
}

func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
return func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn) (*storage.StoredChunkDataPack, error) {
return func(tx *badger.Txn) (*storage.StoredChunkDataPack, error) {
val, err := ch.byChunkIDCache.Get(chunkID)(tx)
if err != nil {
return nil, err
}
return val, nil
}
}

func toStoredChunkDataPack(c *flow.ChunkDataPack) *badgermodel.StoredChunkDataPack {
sc := &badgermodel.StoredChunkDataPack{
ChunkID: c.ChunkID,
StartState: c.StartState,
Proof: c.Proof,
SystemChunk: false,
ExecutionDataRoot: c.ExecutionDataRoot,
}

if c.Collection != nil {
// non system chunk
sc.CollectionID = c.Collection.ID()
} else {
sc.SystemChunk = true
}

return sc
}
17 changes: 0 additions & 17 deletions storage/badger/model/storedChunkDataPack.go

This file was deleted.

8 changes: 4 additions & 4 deletions storage/badger/operation/chunkDataPacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/model/flow"
badgermodel "github.com/onflow/flow-go/storage/badger/model"
"github.com/onflow/flow-go/storage"
)

// InsertChunkDataPack inserts a chunk data pack keyed by chunk ID.
func InsertChunkDataPack(c *badgermodel.StoredChunkDataPack) func(*badger.Txn) error {
func InsertChunkDataPack(c *storage.StoredChunkDataPack) func(*badger.Txn) error {
return insert(makePrefix(codeChunkDataPack, c.ChunkID), c)
}

// BatchInsertChunkDataPack inserts a chunk data pack keyed by chunk ID into a batch
func BatchInsertChunkDataPack(c *badgermodel.StoredChunkDataPack) func(batch *badger.WriteBatch) error {
func BatchInsertChunkDataPack(c *storage.StoredChunkDataPack) func(batch *badger.WriteBatch) error {
return batchWrite(makePrefix(codeChunkDataPack, c.ChunkID), c)
}

Expand All @@ -25,7 +25,7 @@ func BatchRemoveChunkDataPack(chunkID flow.Identifier) func(batch *badger.WriteB
}

// RetrieveChunkDataPack retrieves a chunk data pack by chunk ID.
func RetrieveChunkDataPack(chunkID flow.Identifier, c *badgermodel.StoredChunkDataPack) func(*badger.Txn) error {
func RetrieveChunkDataPack(chunkID flow.Identifier, c *storage.StoredChunkDataPack) func(*badger.Txn) error {
return retrieve(makePrefix(codeChunkDataPack, chunkID), c)
}

Expand Down
10 changes: 5 additions & 5 deletions storage/badger/operation/chunkDataPacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

storagemodel "github.com/onflow/flow-go/storage/badger/model"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/unittest"
)

func TestChunkDataPack(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
collectionID := unittest.IdentifierFixture()
expected := &storagemodel.StoredChunkDataPack{
expected := &storage.StoredChunkDataPack{
ChunkID: unittest.IdentifierFixture(),
StartState: unittest.StateCommitmentFixture(),
Proof: []byte{'p'},
CollectionID: collectionID,
}

t.Run("Retrieve non-existent", func(t *testing.T) {
var actual storagemodel.StoredChunkDataPack
var actual storage.StoredChunkDataPack
err := db.View(RetrieveChunkDataPack(expected.ChunkID, &actual))
assert.Error(t, err)
})
Expand All @@ -31,7 +31,7 @@ func TestChunkDataPack(t *testing.T) {
err := db.Update(InsertChunkDataPack(expected))
require.NoError(t, err)

var actual storagemodel.StoredChunkDataPack
var actual storage.StoredChunkDataPack
err = db.View(RetrieveChunkDataPack(expected.ChunkID, &actual))
assert.NoError(t, err)

Expand All @@ -42,7 +42,7 @@ func TestChunkDataPack(t *testing.T) {
err := db.Update(RemoveChunkDataPack(expected.ChunkID))
require.NoError(t, err)

var actual storagemodel.StoredChunkDataPack
var actual storage.StoredChunkDataPack
err = db.View(RetrieveChunkDataPack(expected.ChunkID, &actual))
assert.Error(t, err)
})
Expand Down
34 changes: 31 additions & 3 deletions storage/chunkDataPacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ type ChunkDataPacks interface {
// No errors are expected during normal operation, but it may return generic error
Remove(cs []flow.Identifier) error

// BatchStore inserts the chunk header, keyed by chunk ID into a given batch
BatchStore(c *flow.ChunkDataPack, batch BatchStorage) error

// ByChunkID returns the chunk data for the given a chunk ID.
ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error)

Expand All @@ -26,3 +23,34 @@ type ChunkDataPacks interface {
// If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.
BatchRemove(chunkID flow.Identifier, batch BatchStorage) error
}

// StoredChunkDataPack is an in-storage representation of chunk data pack.
// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining
// the collection on a secondary storage.
type StoredChunkDataPack struct {
ChunkID flow.Identifier
StartState flow.StateCommitment
Proof flow.StorageProof
CollectionID flow.Identifier
SystemChunk bool
ExecutionDataRoot flow.BlockExecutionDataRoot
}

func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack {
sc := &StoredChunkDataPack{
ChunkID: c.ChunkID,
StartState: c.StartState,
Proof: c.Proof,
SystemChunk: false,
ExecutionDataRoot: c.ExecutionDataRoot,
}

if c.Collection != nil {
// non system chunk
sc.CollectionID = c.Collection.ID()
} else {
sc.SystemChunk = true
}

return sc
}
18 changes: 0 additions & 18 deletions storage/mock/chunk_data_packs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ab907b9

Please sign in to comment.