From 52573baf22ef8989c9f7d5682707a3dcdc18a1a0 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 10 Jun 2024 20:14:08 -0500 Subject: [PATCH 1/2] Add atree-inlined-status command to util program atree-inlined-status command examines atree payloads and outputs number of payloads inlined, not inlined, etc. in JSON format. Among several other flags, the program accepts - n-workers (default=8) - n-payloads (default=all, number of payloads to sample) This can be used for testing and debugging migration results. --- .../atree_inlined_status_test.go | 133 ++++++ cmd/util/cmd/atree_inlined_status/cmd.go | 391 ++++++++++++++++++ cmd/util/cmd/root.go | 2 + cmd/util/ledger/util/atree_util.go | 59 +++ ledger/complete/ledger.go | 5 + model/flow/ledger.go | 4 + 6 files changed, 594 insertions(+) create mode 100644 cmd/util/cmd/atree_inlined_status/atree_inlined_status_test.go create mode 100644 cmd/util/cmd/atree_inlined_status/cmd.go create mode 100644 cmd/util/ledger/util/atree_util.go diff --git a/cmd/util/cmd/atree_inlined_status/atree_inlined_status_test.go b/cmd/util/cmd/atree_inlined_status/atree_inlined_status_test.go new file mode 100644 index 00000000000..3a73ab5c212 --- /dev/null +++ b/cmd/util/cmd/atree_inlined_status/atree_inlined_status_test.go @@ -0,0 +1,133 @@ +package atree_inlined_status + +import ( + crand "crypto/rand" + "math/rand" + "testing" + + "github.com/fxamacker/cbor/v2" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/model/flow" +) + +func TestCheckAtreeInlinedStatus(t *testing.T) { + const nWorkers = 10 + + t.Run("no payloads", func(t *testing.T) { + var payloads []*ledger.Payload + atreeInlinedPayloadCount, atreeNonInlinedPayloadCount, err := checkAtreeInlinedStatus(payloads, nWorkers) + require.NoError(t, err) + require.Equal(t, 0, atreeInlinedPayloadCount) + require.Equal(t, 0, atreeNonInlinedPayloadCount) + }) + + t.Run("payloads no goroutine", func(t *testing.T) { + payloadCount := rand.Intn(50) + testCheckAtreeInlinedStatus(t, payloadCount, nWorkers) + }) + + t.Run("payloads using goroutine", func(t *testing.T) { + payloadCount := rand.Intn(numOfPayloadPerJob) + numOfPayloadPerJob + testCheckAtreeInlinedStatus(t, payloadCount, nWorkers) + }) +} + +func testCheckAtreeInlinedStatus(t *testing.T, payloadCount int, nWorkers int) { + atreeNoninlinedPayloadCount := rand.Intn(payloadCount + 1) + atreeInlinedPayloadCount := payloadCount - atreeNoninlinedPayloadCount + + payloads := make([]*ledger.Payload, 0, payloadCount) + for i := 0; i < atreeInlinedPayloadCount; i++ { + key := getRandomKey() + value := getAtreeInlinedPayload(t) + payloads = append(payloads, ledger.NewPayload(key, value)) + } + for i := 0; i < atreeNoninlinedPayloadCount; i++ { + key := getRandomKey() + value := getAtreeNoninlinedPayload(t) + payloads = append(payloads, ledger.NewPayload(key, value)) + } + + rand.Shuffle(len(payloads), func(i, j int) { + payloads[i], payloads[j] = payloads[j], payloads[i] + }) + + gotAtreeInlinedPayloadCount, gotAtreeNoninlinedPayloadCount, err := checkAtreeInlinedStatus(payloads, nWorkers) + require.NoError(t, err) + require.Equal(t, atreeNoninlinedPayloadCount, gotAtreeNoninlinedPayloadCount) + require.Equal(t, atreeInlinedPayloadCount, gotAtreeInlinedPayloadCount) +} + +func getAtreeNoninlinedPayload(t *testing.T) []byte { + num := rand.Uint64() + encodedNum, err := cbor.Marshal(num) + require.NoError(t, err) + + data := []byte{ + // extra data + // version + 0x00, + // extra data flag + 0x80, + // array of extra data + 0x81, + // type info + 0x18, 0x2a, + + // version + 0x00, + // array data slab flag + 0x80, + // CBOR encoded array head (fixed size 3 byte) + 0x99, 0x00, 0x01, + } + + return append(data, encodedNum...) +} + +func getAtreeInlinedPayload(t *testing.T) []byte { + num := rand.Uint64() + encodedNum, err := cbor.Marshal(num) + require.NoError(t, err) + + data := []byte{ + // version + 0x10, + // flag + 0x80, + + // extra data + // array of extra data + 0x81, + // type info + 0x18, 0x2a, + + // CBOR encoded array head (fixed size 3 byte) + 0x99, 0x00, 0x01, + } + + return append(data, encodedNum...) +} + +func getRandomKey() ledger.Key { + var address [8]byte + _, err := crand.Read(address[:]) + if err != nil { + panic(err) + } + + var key [9]byte + key[0] = flow.SlabIndexPrefix + _, err = crand.Read(key[1:]) + if err != nil { + panic(err) + } + + return ledger.Key{ + KeyParts: []ledger.KeyPart{ + {Type: uint16(0), Value: address[:]}, + {Type: uint16(2), Value: key[:]}, + }} +} diff --git a/cmd/util/cmd/atree_inlined_status/cmd.go b/cmd/util/cmd/atree_inlined_status/cmd.go new file mode 100644 index 00000000000..66d7ecfaa9b --- /dev/null +++ b/cmd/util/cmd/atree_inlined_status/cmd.go @@ -0,0 +1,391 @@ +package atree_inlined_status + +import ( + "context" + "encoding/hex" + "fmt" + "math" + + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" + + "github.com/onflow/flow-go/cmd/util/ledger/reporters" + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" + "github.com/onflow/flow-go/ledger/common/pathfinder" + "github.com/onflow/flow-go/ledger/complete" + "github.com/onflow/flow-go/ledger/complete/wal" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/metrics" +) + +var ( + flagOutputDirectory string + flagPayloads string + flagState string + flagStateCommitment string + flagNumOfPayloadToSample int + flagNWorker int +) + +var Cmd = &cobra.Command{ + Use: "atree-inlined-status", + Short: "Check if atree payloads are inlined in given state", + Run: run, +} + +const ( + ReporterName = "atree-inlined-status" + + numOfPayloadPerJob = 1_000 +) + +func init() { + + Cmd.Flags().StringVar( + &flagPayloads, + "payloads", + "", + "Input payload file name", + ) + + Cmd.Flags().StringVar( + &flagState, + "state", + "", + "Input state file name", + ) + + Cmd.Flags().StringVar( + &flagStateCommitment, + "state-commitment", + "", + "Input state commitment", + ) + + Cmd.Flags().StringVar( + &flagOutputDirectory, + "output-directory", + "", + "Output directory", + ) + + _ = Cmd.MarkFlagRequired("output-directory") + + Cmd.Flags().IntVar( + &flagNWorker, + "n-workers", + 8, + "number of workers to use", + ) + + Cmd.Flags().IntVar( + &flagNumOfPayloadToSample, + "n-payloads", + -1, + "number of payloads to sample for inlined status (sample all payloads by default)", + ) +} + +func run(*cobra.Command, []string) { + + if flagPayloads == "" && flagState == "" { + log.Fatal().Msg("Either --payloads or --state must be provided") + } else if flagPayloads != "" && flagState != "" { + log.Fatal().Msg("Only one of --payloads or --state must be provided") + } + if flagState != "" && flagStateCommitment == "" { + log.Fatal().Msg("--state-commitment must be provided when --state is provided") + } + + if flagNumOfPayloadToSample == 0 { + log.Fatal().Msg("--n-payloads must be either > 0 or -1 (check all payloads)") + } + + rw := reporters.NewReportFileWriterFactory(flagOutputDirectory, log.Logger). + ReportWriter(ReporterName) + defer rw.Close() + + var payloads []*ledger.Payload + var err error + + if flagPayloads != "" { + log.Info().Msgf("Reading payloads from %s", flagPayloads) + + _, payloads, err = util.ReadPayloadFile(log.Logger, flagPayloads) + if err != nil { + log.Fatal().Err(err).Msg("failed to read payloads") + } + } else { + log.Info().Msgf("Reading trie %s", flagStateCommitment) + + stateCommitment := parseStateCommitment(flagStateCommitment) + payloads, err = readTrie(flagState, stateCommitment) + if err != nil { + log.Fatal().Err(err).Msg("failed to read state") + } + } + + totalPayloadCount := len(payloads) + samplePayloadCount := len(payloads) + + if flagNumOfPayloadToSample > 0 && flagNumOfPayloadToSample < len(payloads) { + samplePayloadCount = flagNumOfPayloadToSample + } + + payloadsToSample := payloads + + if samplePayloadCount < totalPayloadCount { + atreePayloadCount := 0 + i := 0 + for ; atreePayloadCount < samplePayloadCount; i++ { + registerID, _, err := convert.PayloadToRegister(payloads[i]) + if err != nil { + log.Fatal().Err(err).Msg("failed to convert payload to register") + } + + if flow.IsSlabIndexKey(registerID.Key) { + atreePayloadCount++ + } + } + + payloadsToSample = payloads[:i] + } + + atreeInlinedPayloadCount, atreeNonInlinedPayloadCount, err := checkAtreeInlinedStatus(payloadsToSample, flagNWorker) + if err != nil { + log.Fatal().Err(err).Msg("failed to check atree inlined status") + } + + rw.Write(stateStatus{ + InputPayloadFile: flagPayloads, + InputState: flagState, + InputStateCommitment: flagStateCommitment, + TotalPayloadCount: len(payloads), + SamplePayloadCount: len(payloadsToSample), + AtreeInlinedPayloadCount: atreeInlinedPayloadCount, + AtreeNonInlinedPayloadCount: atreeNonInlinedPayloadCount, + }) +} + +func checkAtreeInlinedStatus(payloads []*ledger.Payload, nWorkers int) ( + atreeInlinedPayloadCount int, + atreeNonInlinedPayloadCount int, + err error, +) { + + if len(payloads)/numOfPayloadPerJob < nWorkers { + nWorkers = len(payloads) / numOfPayloadPerJob + } + + log.Info().Msgf("checking atree payload inlined status...") + + if nWorkers <= 1 { + // Skip goroutine to avoid overhead + for _, p := range payloads { + isAtreeSlab, isInlined, err := util.IsPayloadAtreeInlined(p) + if err != nil { + return 0, 0, err + } + + if !isAtreeSlab { + continue + } + + if isInlined { + atreeInlinedPayloadCount++ + } else { + atreeNonInlinedPayloadCount++ + } + } + return + } + + type job struct { + payloads []*ledger.Payload + } + + type result struct { + atreeInlinedPayloadCount int + atreeNonInlinedPayloadCount int + } + + numOfJobs := (len(payloads) + numOfPayloadPerJob - 1) / numOfPayloadPerJob + + jobs := make(chan job, numOfJobs) + + results := make(chan result, numOfJobs) + + g, ctx := errgroup.WithContext(context.Background()) + + // Launch goroutine to check atree register inlined state + for i := 0; i < nWorkers; i++ { + g.Go(func() error { + for job := range jobs { + var result result + + for _, p := range job.payloads { + isAtreeSlab, isInlined, err := util.IsPayloadAtreeInlined(p) + if err != nil { + return err + } + + if !isAtreeSlab { + continue + } + + if isInlined { + result.atreeInlinedPayloadCount++ + } else { + result.atreeNonInlinedPayloadCount++ + } + } + + select { + case results <- result: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + + // Launch goroutine to wait for workers and close output channel + go func() { + _ = g.Wait() + close(results) + }() + + // Send job to jobs channel + payloadStartIndex := 0 + for { + if payloadStartIndex == len(payloads) { + close(jobs) + break + } + + endIndex := payloadStartIndex + numOfPayloadPerJob + if endIndex > len(payloads) { + endIndex = len(payloads) + } + + jobs <- job{payloads: payloads[payloadStartIndex:endIndex]} + + payloadStartIndex = endIndex + } + + // Gather results + for result := range results { + atreeInlinedPayloadCount += result.atreeInlinedPayloadCount + atreeNonInlinedPayloadCount += result.atreeNonInlinedPayloadCount + } + + log.Info().Msgf("waiting for goroutines...") + + if err := g.Wait(); err != nil { + return 0, 0, err + } + + return atreeInlinedPayloadCount, atreeNonInlinedPayloadCount, nil +} + +type stateStatus struct { + InputPayloadFile string `json:",omitempty"` + InputState string `json:",omitempty"` + InputStateCommitment string `json:",omitempty"` + TotalPayloadCount int + SamplePayloadCount int + AtreeInlinedPayloadCount int + AtreeNonInlinedPayloadCount int +} + +func readTrie(dir string, targetHash flow.StateCommitment) ([]*ledger.Payload, error) { + log.Info().Msg("init WAL") + + diskWal, err := wal.NewDiskWAL( + log.Logger, + nil, + metrics.NewNoopCollector(), + dir, + complete.DefaultCacheSize, + pathfinder.PathByteSize, + wal.SegmentSize, + ) + if err != nil { + return nil, fmt.Errorf("cannot create disk WAL: %w", err) + } + + log.Info().Msg("init ledger") + + led, err := complete.NewLedger( + diskWal, + complete.DefaultCacheSize, + &metrics.NoopCollector{}, + log.Logger, + complete.DefaultPathFinderVersion) + if err != nil { + return nil, fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err) + } + + const ( + checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation. + checkpointsToKeep = 1 + ) + + log.Info().Msg("init compactor") + + compactor, err := complete.NewCompactor( + led, + diskWal, + log.Logger, + complete.DefaultCacheSize, + checkpointDistance, + checkpointsToKeep, + atomic.NewBool(false), + &metrics.NoopCollector{}, + ) + if err != nil { + return nil, fmt.Errorf("cannot create compactor: %w", err) + } + + log.Info().Msgf("waiting for compactor to load checkpoint and WAL") + + <-compactor.Ready() + + defer func() { + <-led.Done() + <-compactor.Done() + }() + + state := ledger.State(targetHash) + + trie, err := led.Trie(ledger.RootHash(state)) + if err != nil { + s, _ := led.MostRecentTouchedState() + log.Info(). + Str("hash", s.String()). + Msgf("Most recently touched state") + return nil, fmt.Errorf("cannot get trie at the given state commitment: %w", err) + } + + return trie.AllPayloads(), nil +} + +func parseStateCommitment(stateCommitmentHex string) flow.StateCommitment { + var err error + stateCommitmentBytes, err := hex.DecodeString(stateCommitmentHex) + if err != nil { + log.Fatal().Err(err).Msg("cannot get decode the state commitment") + } + + stateCommitment, err := flow.ToStateCommitment(stateCommitmentBytes) + if err != nil { + log.Fatal().Err(err).Msg("invalid state commitment length") + } + + return stateCommitment +} diff --git a/cmd/util/cmd/root.go b/cmd/util/cmd/root.go index 7b2833ade11..0c576715c62 100644 --- a/cmd/util/cmd/root.go +++ b/cmd/util/cmd/root.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/viper" "github.com/onflow/flow-go/cmd/util/cmd/addresses" + "github.com/onflow/flow-go/cmd/util/cmd/atree_inlined_status" bootstrap_execution_state_payloads "github.com/onflow/flow-go/cmd/util/cmd/bootstrap-execution-state-payloads" checkpoint_collect_stats "github.com/onflow/flow-go/cmd/util/cmd/checkpoint-collect-stats" checkpoint_list_tries "github.com/onflow/flow-go/cmd/util/cmd/checkpoint-list-tries" @@ -86,6 +87,7 @@ func addCommands() { rootCmd.AddCommand(addresses.Cmd) rootCmd.AddCommand(bootstrap_execution_state_payloads.Cmd) rootCmd.AddCommand(extractpayloads.Cmd) + rootCmd.AddCommand(atree_inlined_status.Cmd) } func initConfig() { diff --git a/cmd/util/ledger/util/atree_util.go b/cmd/util/ledger/util/atree_util.go new file mode 100644 index 00000000000..8b29a6735ad --- /dev/null +++ b/cmd/util/ledger/util/atree_util.go @@ -0,0 +1,59 @@ +package util + +import ( + "fmt" + + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" + "github.com/onflow/flow-go/model/flow" +) + +func IsPayloadAtreeInlined(payload *ledger.Payload) (isAtreeSlab bool, isInlined bool, err error) { + registerID, registerValue, err := convert.PayloadToRegister(payload) + if err != nil { + return false, false, fmt.Errorf("failed to convert payload to register: %w", err) + } + return IsRegisterAtreeInlined(registerID.Key, registerValue) +} + +func IsRegisterAtreeInlined(key string, value []byte) (isAtreeSlab bool, isInlined bool, err error) { + if !flow.IsSlabIndexKey(key) { + return false, false, nil + } + + // Check Atree register version + + head, err := newHeadFromData(value) + if err != nil { + return false, false, err + } + + version := head.version() + if version > maxSupportedVersion { + return false, false, fmt.Errorf("atree slab version %d, max supported version %d", version, maxSupportedVersion) + } + + return true, version == inlinedVersion, nil +} + +const ( + maskVersion byte = 0b1111_0000 + + noninlinedVersion = 0 + inlinedVersion = 1 + maxSupportedVersion = inlinedVersion +) + +type head [2]byte + +func newHeadFromData(data []byte) (head, error) { + if len(data) < 2 { + return head{}, fmt.Errorf("atree slab must be at least 2 bytes, got %d bytes", len(data)) + } + + return head{data[0], data[1]}, nil +} + +func (h *head) version() byte { + return (h[0] & maskVersion) >> 4 +} diff --git a/ledger/complete/ledger.go b/ledger/complete/ledger.go index 4ddd327f308..cfb6b5d2503 100644 --- a/ledger/complete/ledger.go +++ b/ledger/complete/ledger.go @@ -324,6 +324,11 @@ func (l *Ledger) Tries() ([]*trie.MTrie, error) { return l.forest.GetTries() } +// Trie returns the trie stored in the forest +func (l *Ledger) Trie(rootHash ledger.RootHash) (*trie.MTrie, error) { + return l.forest.GetTrie(rootHash) +} + // Checkpointer returns a checkpointer instance func (l *Ledger) Checkpointer() (*realWAL.Checkpointer, error) { checkpointer, err := l.wal.NewCheckpointer() diff --git a/model/flow/ledger.go b/model/flow/ledger.go index 182bf33e623..77393207664 100644 --- a/model/flow/ledger.go +++ b/model/flow/ledger.go @@ -138,6 +138,10 @@ func (id RegisterID) IsSlabIndex() bool { return len(id.Key) == 9 && id.Key[0] == SlabIndexPrefix } +func IsSlabIndexKey(key string) bool { + return len(key) == 9 && key[0] == SlabIndexPrefix +} + // String returns formatted string representation of the RegisterID. func (id RegisterID) String() string { formattedKey := "" From a9d1bb4eac054ee2c7da33bb6c200db4b62da49e Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 11 Jun 2024 12:18:49 -0500 Subject: [PATCH 2/2] Extract readTrie and parseStateCommitment funcs --- cmd/util/cmd/atree_inlined_status/cmd.go | 99 +-------------------- cmd/util/ledger/util/state.go | 104 +++++++++++++++++++++++ 2 files changed, 106 insertions(+), 97 deletions(-) create mode 100644 cmd/util/ledger/util/state.go diff --git a/cmd/util/cmd/atree_inlined_status/cmd.go b/cmd/util/cmd/atree_inlined_status/cmd.go index 66d7ecfaa9b..453b566ff7a 100644 --- a/cmd/util/cmd/atree_inlined_status/cmd.go +++ b/cmd/util/cmd/atree_inlined_status/cmd.go @@ -2,24 +2,16 @@ package atree_inlined_status import ( "context" - "encoding/hex" - "fmt" - "math" "github.com/rs/zerolog/log" "github.com/spf13/cobra" - "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/onflow/flow-go/cmd/util/ledger/reporters" "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" - "github.com/onflow/flow-go/ledger/common/pathfinder" - "github.com/onflow/flow-go/ledger/complete" - "github.com/onflow/flow-go/ledger/complete/wal" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/metrics" ) var ( @@ -122,8 +114,8 @@ func run(*cobra.Command, []string) { } else { log.Info().Msgf("Reading trie %s", flagStateCommitment) - stateCommitment := parseStateCommitment(flagStateCommitment) - payloads, err = readTrie(flagState, stateCommitment) + stateCommitment := util.ParseStateCommitment(flagStateCommitment) + payloads, err = util.ReadTrie(flagState, stateCommitment) if err != nil { log.Fatal().Err(err).Msg("failed to read state") } @@ -302,90 +294,3 @@ type stateStatus struct { AtreeInlinedPayloadCount int AtreeNonInlinedPayloadCount int } - -func readTrie(dir string, targetHash flow.StateCommitment) ([]*ledger.Payload, error) { - log.Info().Msg("init WAL") - - diskWal, err := wal.NewDiskWAL( - log.Logger, - nil, - metrics.NewNoopCollector(), - dir, - complete.DefaultCacheSize, - pathfinder.PathByteSize, - wal.SegmentSize, - ) - if err != nil { - return nil, fmt.Errorf("cannot create disk WAL: %w", err) - } - - log.Info().Msg("init ledger") - - led, err := complete.NewLedger( - diskWal, - complete.DefaultCacheSize, - &metrics.NoopCollector{}, - log.Logger, - complete.DefaultPathFinderVersion) - if err != nil { - return nil, fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err) - } - - const ( - checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation. - checkpointsToKeep = 1 - ) - - log.Info().Msg("init compactor") - - compactor, err := complete.NewCompactor( - led, - diskWal, - log.Logger, - complete.DefaultCacheSize, - checkpointDistance, - checkpointsToKeep, - atomic.NewBool(false), - &metrics.NoopCollector{}, - ) - if err != nil { - return nil, fmt.Errorf("cannot create compactor: %w", err) - } - - log.Info().Msgf("waiting for compactor to load checkpoint and WAL") - - <-compactor.Ready() - - defer func() { - <-led.Done() - <-compactor.Done() - }() - - state := ledger.State(targetHash) - - trie, err := led.Trie(ledger.RootHash(state)) - if err != nil { - s, _ := led.MostRecentTouchedState() - log.Info(). - Str("hash", s.String()). - Msgf("Most recently touched state") - return nil, fmt.Errorf("cannot get trie at the given state commitment: %w", err) - } - - return trie.AllPayloads(), nil -} - -func parseStateCommitment(stateCommitmentHex string) flow.StateCommitment { - var err error - stateCommitmentBytes, err := hex.DecodeString(stateCommitmentHex) - if err != nil { - log.Fatal().Err(err).Msg("cannot get decode the state commitment") - } - - stateCommitment, err := flow.ToStateCommitment(stateCommitmentBytes) - if err != nil { - log.Fatal().Err(err).Msg("invalid state commitment length") - } - - return stateCommitment -} diff --git a/cmd/util/ledger/util/state.go b/cmd/util/ledger/util/state.go new file mode 100644 index 00000000000..9e56a461985 --- /dev/null +++ b/cmd/util/ledger/util/state.go @@ -0,0 +1,104 @@ +package util + +import ( + "encoding/hex" + "fmt" + "math" + + "github.com/rs/zerolog/log" + "go.uber.org/atomic" + + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/pathfinder" + "github.com/onflow/flow-go/ledger/complete" + "github.com/onflow/flow-go/ledger/complete/wal" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/metrics" +) + +func ReadTrie(dir string, targetHash flow.StateCommitment) ([]*ledger.Payload, error) { + log.Info().Msg("init WAL") + + diskWal, err := wal.NewDiskWAL( + log.Logger, + nil, + metrics.NewNoopCollector(), + dir, + complete.DefaultCacheSize, + pathfinder.PathByteSize, + wal.SegmentSize, + ) + if err != nil { + return nil, fmt.Errorf("cannot create disk WAL: %w", err) + } + + log.Info().Msg("init ledger") + + led, err := complete.NewLedger( + diskWal, + complete.DefaultCacheSize, + &metrics.NoopCollector{}, + log.Logger, + complete.DefaultPathFinderVersion) + if err != nil { + return nil, fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err) + } + + const ( + checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation. + checkpointsToKeep = 1 + ) + + log.Info().Msg("init compactor") + + compactor, err := complete.NewCompactor( + led, + diskWal, + log.Logger, + complete.DefaultCacheSize, + checkpointDistance, + checkpointsToKeep, + atomic.NewBool(false), + &metrics.NoopCollector{}, + ) + if err != nil { + return nil, fmt.Errorf("cannot create compactor: %w", err) + } + + log.Info().Msgf("waiting for compactor to load checkpoint and WAL") + + <-compactor.Ready() + + defer func() { + <-led.Done() + <-compactor.Done() + }() + + state := ledger.State(targetHash) + + trie, err := led.Trie(ledger.RootHash(state)) + if err != nil { + s, _ := led.MostRecentTouchedState() + log.Info(). + Str("hash", s.String()). + Msgf("Most recently touched state") + return nil, fmt.Errorf("cannot get trie at the given state commitment: %w", err) + } + + return trie.AllPayloads(), nil +} + +func ParseStateCommitment(stateCommitmentHex string) flow.StateCommitment { + var err error + stateCommitmentBytes, err := hex.DecodeString(stateCommitmentHex) + if err != nil { + log.Fatal().Err(err).Msg("cannot get decode the state commitment") + } + + stateCommitment, err := flow.ToStateCommitment(stateCommitmentBytes) + if err != nil { + log.Fatal().Err(err).Msg("invalid state commitment length") + } + + return stateCommitment +}