From b4f7f371737ac39f02624b1271c55304ba0b6a20 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 24 Jan 2025 00:02:15 +0100 Subject: [PATCH 01/33] chore: chain builder cosmetics --- tools/chainbuilder/README.md | 7 +++---- tools/chainbuilder/main.go | 2 ++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tools/chainbuilder/README.md b/tools/chainbuilder/README.md index 50ceb3c06d..eee54c36bc 100644 --- a/tools/chainbuilder/README.md +++ b/tools/chainbuilder/README.md @@ -10,7 +10,7 @@ Use `go` to run the binary as follows: go run ./tools/chainbuilder ``` -This will create a directory with the name `testnode-{chainID}`. All files will be populated and blocks generated based on specified input. You can run a validator on the file system afterwards by calling: +This will create a directory with the name `testnode-{chainID}`. All files will be populated and blocks generated based on specified input. You can run a validator on the file system afterward by calling: ```shell celestia-appd start --home /path/to/testnode-{chainID} @@ -19,9 +19,8 @@ celestia-appd start --home /path/to/testnode-{chainID} The following are the set of options when generating a chain: - `num-blocks` the number of blocks to be generated (default: 100) -- `block-size` the size of the blocks to be generated (default <2MB). This will be a single PFB transaction -- `square-size` the size of the max square (default: 128) +- `block-size` the size of the blocks to be generated in bytes (default <2MB). This will be a single PFB transaction - `existing-dir` point this to a directory if you want to extend an existing chain rather than create a new one -- `namespace` allows you to pick a custom v0 namespace. By default "test" will be chosen. +- `namespace` allows you to pick a custom v0 namespace. By default, "test" will be chosen. This tool takes roughly 60-70ms per 2MB block. diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 914706ba01..a23be44fb7 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -459,6 +459,7 @@ func generateSquareRoutine( dataCh chan<- *tmproto.Data, ) error { for i := 0; i < cfg.NumBlocks; i++ { + fmt.Printf("generating block %d\n", i) select { case <-ctx.Done(): return ctx.Err() @@ -501,6 +502,7 @@ func generateSquareRoutine( return err } + fmt.Printf("generated block size %d", dah.SquareSize()) select { case dataCh <- &tmproto.Data{ Txs: txs, From 5257eb263884ed791efa9285aac1aa89f65f4877 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 24 Jan 2025 00:04:33 +0100 Subject: [PATCH 02/33] chore: chain builder cosmetics --- tools/chainbuilder/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index a23be44fb7..4406860d1f 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -502,7 +502,7 @@ func generateSquareRoutine( return err } - fmt.Printf("generated block size %d", dah.SquareSize()) + fmt.Printf("generated block size %d\n", dah.SquareSize()) select { case dataCh <- &tmproto.Data{ Txs: txs, From 18e8a43dfc629532d9e2536501966c14807b56b9 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 24 Jan 2025 11:17:54 +0100 Subject: [PATCH 03/33] chore: try to parallelize --- tools/chainbuilder/main.go | 479 +++++++++++++++++++++---------------- 1 file changed, 271 insertions(+), 208 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 4406860d1f..c155bdeb99 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "time" "github.com/celestiaorg/go-square/v2" @@ -61,6 +62,8 @@ func main() { upToTime, _ := cmd.Flags().GetBool("up-to-now") appVersion, _ := cmd.Flags().GetUint64("app-version") chainID, _ := cmd.Flags().GetString("chain-id") + numWorkers, _ := cmd.Flags().GetInt("num-workers") + var namespace share.Namespace if namespaceStr == "" { namespace = defaultNamespace @@ -81,6 +84,7 @@ func main() { ChainID: tmrand.Str(6), UpToTime: upToTime, AppVersion: appVersion, + NumWorkers: numWorkers, } if chainID != "" { @@ -104,6 +108,8 @@ func main() { rootCmd.Flags().Bool("up-to-now", false, "Tool will terminate if the block time reaches the current time") rootCmd.Flags().Uint64("app-version", appconsts.LatestVersion, "App version to use for the chain") rootCmd.Flags().String("chain-id", "", "Chain ID to use for the chain. Defaults to a random 6 character string") + rootCmd.Flags().Int("num-workers", 1, "Number of goroutines to use in parallel for block-data generation") + rootCmd.SilenceUsage = true rootCmd.SilenceErrors = true if err := rootCmd.Execute(); err != nil { @@ -121,6 +127,9 @@ type BuilderConfig struct { ChainID string AppVersion uint64 UpToTime bool + + // Added for parallel generation + NumWorkers int } func Run(ctx context.Context, cfg BuilderConfig, dir string) error { @@ -129,11 +138,13 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { encCfg := encoding.MakeConfig(app.ModuleBasics) tmCfg := app.DefaultConsensusConfig() + var ( gen *genesis.Genesis kr keyring.Keyring err error ) + if cfg.ExistingDir == "" { dir = filepath.Join(dir, fmt.Sprintf("testnode-%s", cfg.ChainID)) kr, err = keyring.New(app.Name, keyring.BackendTest, dir, nil, encCfg.Codec) @@ -147,7 +158,9 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { appCfg.StateSync.SnapshotInterval = 0 cp := app.DefaultConsensusParams() - cp.Version.AppVersion = cfg.AppVersion // set the app version + // Set the app version + cp.Version.AppVersion = cfg.AppVersion + gen = genesis.NewDefaultGenesis(). WithConsensusParams(cp). WithKeyring(kr). @@ -179,6 +192,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { if err != nil { return fmt.Errorf("failed to create block database: %w", err) } + defer blockDB.Close() blockStore := store.NewBlockStore(blockDB) @@ -186,6 +200,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { if err != nil { return fmt.Errorf("failed to create state database: %w", err) } + defer stateDB.Close() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ DiscardABCIResponses: true, @@ -195,6 +210,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { if err != nil { return fmt.Errorf("failed to create application database: %w", err) } + defer appDB.Close() simApp := app.New( log.NewNopLogger(), @@ -202,7 +218,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { nil, 0, encCfg, - 0, // upgrade height v2 + 0, // upgrade height 0, // timeout commit util.EmptyAppOptions{}, baseapp.SetMinGasPrices(fmt.Sprintf("%f%s", appconsts.DefaultMinGasPrice, appconsts.BondDenom)), @@ -216,6 +232,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { } if lastHeight == 0 { + // brand new chain if gen == nil { return fmt.Errorf("non empty directory but no blocks found") } @@ -254,6 +271,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { state.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1) state.AppHash = res.AppHash state.LastResultsHash = merkle.HashFromByteSlices(nil) + if err := stateStore.Save(state); err != nil { return fmt.Errorf("failed to save initial state: %w", err) } @@ -261,26 +279,28 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { } else { fmt.Println("Starting from height", lastHeight) } + state, err := stateStore.Load() if err != nil { return fmt.Errorf("failed to load state: %w", err) } if cfg.ExistingDir != "" { - // if this is extending an existing chain, we want to start - // the time to be where the existing chain left off + // extending an existing chain, start from the chain's last known block time currentTime = state.LastBlockTime.Add(cfg.BlockInterval) } if state.ConsensusParams.Version.AppVersion != cfg.AppVersion { - return fmt.Errorf("app version mismatch: state has %d, but cfg has %d", state.ConsensusParams.Version.AppVersion, cfg.AppVersion) + return fmt.Errorf("app version mismatch: state has %d, but cfg has %d", + state.ConsensusParams.Version.AppVersion, cfg.AppVersion) } - if state.LastBlockHeight != lastHeight { return fmt.Errorf("last block height mismatch: state has %d, but block store has %d", state.LastBlockHeight, lastHeight) } validatorPower := state.Validators.Validators[0].VotingPower + // We retrieve the current account for the validator + // We'll assume account number = 0, and the sequence starts at (lastHeight + 1). signer, err := user.NewSigner( kr, encCfg.TxConfig, @@ -292,12 +312,11 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { return fmt.Errorf("failed to create new signer: %w", err) } - var ( - errCh = make(chan error, 2) - dataCh = make(chan *tmproto.Data, 100) - persistCh = make(chan persistData, 100) - commit = types.NewCommit(0, 0, types.BlockID{}, nil) - ) + // We'll generate data in parallel, but we still build blocks sequentially. + // The code below sets up the concurrency pipeline. + + // commit at lastHeight + commit := types.NewCommit(0, 0, types.BlockID{}, nil) if lastHeight > 0 { commit = blockStore.LoadSeenCommit(lastHeight) } @@ -305,246 +324,290 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - go func() { - errCh <- generateSquareRoutine(ctx, signer, cfg, dataCh) - }() + // Number of blocks to produce + totalBlocks := cfg.NumBlocks + + // We will generate results in random completion order, so we’ll store them + // in a map and process them sequentially (by index) in the main loop. + resultsCh := make(chan generatedBlockData, cfg.NumWorkers) + errCh := make(chan error, 1) + + // We set the base sequence to the signer's current sequence + baseSequence := signer.Accounts()[0].Sequence() + + // Launch worker pool + var wg sync.WaitGroup + jobCh := make(chan int) // block index for each job + + for w := 0; w < cfg.NumWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := range jobCh { + data, err := generateBlockData(i, baseSequence+uint64(i), cfg, kr, encCfg, state) + select { + case <-ctx.Done(): + return + case resultsCh <- generatedBlockData{Index: i, Data: data, Err: err}: + } + } + }() + } + // Feed jobs go func() { - errCh <- persistDataRoutine(ctx, stateStore, blockStore, persistCh) + defer close(resultsCh) + defer cancel() // ensure workers get canceled once done + for i := 0; i < totalBlocks; i++ { + select { + case <-ctx.Done(): + return + case jobCh <- i: + } + } + close(jobCh) + wg.Wait() }() - lastBlock := blockStore.LoadBlock(blockStore.Height()) - - for height := lastHeight + 1; height <= int64(cfg.NumBlocks)+lastHeight; height++ { - if cfg.UpToTime && lastBlock != nil && lastBlock.Time.Add(cfg.BlockInterval).After(time.Now().UTC()) { - fmt.Printf("blocks cannot be generated into the future, stopping at height %d\n", lastBlock.Height) - break - } + // Main loop: read from resultsCh, build blocks in sequential order + nextIndex := 0 + pending := make(map[int]*tmproto.Data) + lastBlock := blockStore.LoadBlock(lastHeight) +mainLoop: + for nextIndex < totalBlocks { select { case <-ctx.Done(): return ctx.Err() - case dataPB := <-dataCh: - data, err := types.DataFromProto(dataPB) - if err != nil { - return fmt.Errorf("failed to convert data from protobuf: %w", err) - } - block, blockParts := state.MakeBlock(height, data, commit, nil, validatorAddr) - blockID := types.BlockID{ - Hash: block.Hash(), - PartSetHeader: blockParts.Header(), - } - precommitVote := &tmproto.Vote{ - Height: height, - Round: 0, - Type: tmproto.PrecommitType, - BlockID: blockID.ToProto(), - ValidatorAddress: validatorAddr, - Timestamp: currentTime, - Signature: nil, + case res, ok := <-resultsCh: + if !ok { + // no more results will arrive + break mainLoop } - - if err := validatorKey.SignVote(state.ChainID, precommitVote); err != nil { - return fmt.Errorf("failed to sign precommit vote (%s): %w", precommitVote.String(), err) + if res.Err != nil && errCh != nil { + // capture the first error + errCh <- res.Err + errCh = nil + cancel() + continue } + pending[res.Index] = res.Data - commitSig := types.CommitSig{ - BlockIDFlag: types.BlockIDFlagCommit, - ValidatorAddress: validatorAddr, - Timestamp: currentTime, - Signature: precommitVote.Signature, - } - commit = types.NewCommit(height, 0, blockID, []types.CommitSig{commitSig}) - - var lastCommitInfo abci.LastCommitInfo - if height > 1 { - lastCommitInfo = abci.LastCommitInfo{ - Round: 0, - Votes: []abci.VoteInfo{ - { - Validator: abci.Validator{ - Address: validatorAddr, - Power: validatorPower, - }, - SignedLastBlock: true, - }, - }, + // See if we can build any consecutive blocks now + for { + dataProto, ok := pending[nextIndex] + if !ok { + break + } + delete(pending, nextIndex) + + // If we might be generating blocks "up to now" + if cfg.UpToTime && lastBlock != nil && lastBlock.Time.Add(cfg.BlockInterval).After(time.Now().UTC()) { + fmt.Printf("Cannot generate block %d into the future, stopping.\n", lastBlock.Height+1) + break mainLoop } - } - beginBlockResp := simApp.BeginBlock(abci.RequestBeginBlock{ - Hash: block.Hash(), - Header: *block.Header.ToProto(), - LastCommitInfo: lastCommitInfo, - }) + // Build the next block + height := lastHeight + 1 + int64(nextIndex) + data, err := types.DataFromProto(dataProto) + if err != nil { + return fmt.Errorf("failed to convert data from protobuf: %w", err) + } - deliverTxResponses := make([]*abci.ResponseDeliverTx, len(block.Data.Txs)) + block, blockParts := state.MakeBlock(height, data, commit, nil, validatorAddr) + blockID := types.BlockID{ + Hash: block.Hash(), + PartSetHeader: blockParts.Header(), + } - for idx, tx := range block.Data.Txs { - blobTx, isBlobTx := types.UnmarshalBlobTx(tx) - if isBlobTx { - tx = blobTx.Tx + precommitVote := &tmproto.Vote{ + Height: height, + Round: 0, + Type: tmproto.PrecommitType, + BlockID: blockID.ToProto(), + ValidatorAddress: validatorAddr, + Timestamp: currentTime, + Signature: nil, + } + if err := validatorKey.SignVote(state.ChainID, precommitVote); err != nil { + return fmt.Errorf("failed to sign precommit vote (%s): %w", precommitVote.String(), err) + } + commitSig := types.CommitSig{ + BlockIDFlag: types.BlockIDFlagCommit, + ValidatorAddress: validatorAddr, + Timestamp: currentTime, + Signature: precommitVote.Signature, } - deliverTxResponse := simApp.DeliverTx(abci.RequestDeliverTx{ - Tx: tx, + commit = types.NewCommit(height, 0, blockID, []types.CommitSig{commitSig}) + + var lastCommitInfo abci.LastCommitInfo + if height > 1 { + lastCommitInfo = abci.LastCommitInfo{ + Round: 0, + Votes: []abci.VoteInfo{ + { + Validator: abci.Validator{ + Address: validatorAddr, + Power: validatorPower, + }, + SignedLastBlock: true, + }, + }, + } + } + beginBlockResp := simApp.BeginBlock(abci.RequestBeginBlock{ + Hash: block.Hash(), + Header: *block.Header.ToProto(), + LastCommitInfo: lastCommitInfo, }) - if deliverTxResponse.Code != abci.CodeTypeOK { - return fmt.Errorf("failed to deliver tx: %s", deliverTxResponse.Log) + + deliverTxResponses := make([]*abci.ResponseDeliverTx, len(block.Data.Txs)) + for idx, tx := range block.Data.Txs { + blobTx, isBlobTx := types.UnmarshalBlobTx(tx) + if isBlobTx { + tx = blobTx.Tx + } + deliverTxResponse := simApp.DeliverTx(abci.RequestDeliverTx{Tx: tx}) + if deliverTxResponse.Code != abci.CodeTypeOK { + return fmt.Errorf("failed to deliver tx: %s", deliverTxResponse.Log) + } + deliverTxResponses[idx] = &deliverTxResponse } - deliverTxResponses[idx] = &deliverTxResponse - } - endBlockResp := simApp.EndBlock(abci.RequestEndBlock{ - Height: block.Height, - }) - - commitResp := simApp.Commit() - state.LastBlockHeight = height - state.LastBlockID = blockID - state.LastBlockTime = block.Time - state.LastValidators = state.Validators - state.Validators = state.NextValidators - state.NextValidators = state.NextValidators.CopyIncrementProposerPriority(1) - state.AppHash = commitResp.Data - state.LastResultsHash = sm.ABCIResponsesResultsHash(&smproto.ABCIResponses{ - DeliverTxs: deliverTxResponses, - BeginBlock: &beginBlockResp, - EndBlock: &endBlockResp, - }) - currentTime = currentTime.Add(cfg.BlockInterval) - persistCh <- persistData{ - state: state.Copy(), - block: block, - seenCommit: &types.Commit{ + endBlockResp := simApp.EndBlock(abci.RequestEndBlock{Height: block.Height}) + commitResp := simApp.Commit() + + // Update state + state.LastBlockHeight = height + state.LastBlockID = blockID + state.LastBlockTime = block.Time + state.LastValidators = state.Validators + state.Validators = state.NextValidators + state.NextValidators = state.NextValidators.CopyIncrementProposerPriority(1) + state.AppHash = commitResp.Data + state.LastResultsHash = sm.ABCIResponsesResultsHash(&smproto.ABCIResponses{ + DeliverTxs: deliverTxResponses, + BeginBlock: &beginBlockResp, + EndBlock: &endBlockResp, + }) + + // Persist block and state + ps := &types.Commit{ Height: commit.Height, Round: commit.Round, BlockID: commit.BlockID, Signatures: []types.CommitSig{commitSig}, - }, + } + blockParts = block.MakePartSet(types.BlockPartSizeBytes) + blockStore.SaveBlock(block, blockParts, ps) + if blockStore.Height()%100 == 0 { + fmt.Println("Reached height", blockStore.Height()) + } + if err := stateStore.Save(state); err != nil { + return fmt.Errorf("failed to save state at height %d: %w", blockStore.Height(), err) + } + + lastBlock = block + currentTime = currentTime.Add(cfg.BlockInterval) + nextIndex++ } } } - close(dataCh) - close(persistCh) - - var firstErr error - for i := 0; i < cap(errCh); i++ { + // If there was an error in the generation goroutines, retrieve it + close(errCh) + if len(errCh) > 0 { err := <-errCh - if err != nil && firstErr == nil { - firstErr = err + if err != nil { + return err } } - if err := blockDB.Close(); err != nil { - return fmt.Errorf("failed to close block database: %w", err) - } - if err := stateDB.Close(); err != nil { - return fmt.Errorf("failed to close state database: %w", err) - } - if err := appDB.Close(); err != nil { - return fmt.Errorf("failed to close application database: %w", err) - } - - fmt.Println("Chain built successfully", state.LastBlockHeight) + fmt.Println("Chain built successfully at height", state.LastBlockHeight) + return nil +} - return firstErr +// generatedBlockData holds the result of a parallel “generate block data” job. +type generatedBlockData struct { + Index int + Data *tmproto.Data + Err error } -func generateSquareRoutine( - ctx context.Context, - signer *user.Signer, +// generateBlockData performs the same logic that was originally in +// generateSquareRoutine, but for a single block index. We allow specifying the +// sequence number directly instead of calling IncrementSequence on the signer. +func generateBlockData( + blockIndex int, + seq uint64, cfg BuilderConfig, - dataCh chan<- *tmproto.Data, -) error { - for i := 0; i < cfg.NumBlocks; i++ { - fmt.Printf("generating block %d\n", i) - select { - case <-ctx.Done(): - return ctx.Err() - default: - } + kr keyring.Keyring, + encCfg encoding.Config, + state sm.State, +) (*tmproto.Data, error) { + fmt.Printf("[Worker] generating block data %d\n", blockIndex) - account := signer.Accounts()[0] + signer, err := user.NewSigner( + kr, + encCfg.TxConfig, + state.ChainID, + state.ConsensusParams.Version.AppVersion, + user.NewAccount(testnode.DefaultValidatorAccountName, 0, seq), + ) + if err != nil { + return nil, fmt.Errorf("failed to create new signer: %w", err) + } - blob, err := share.NewV0Blob(cfg.Namespace, crypto.CRandBytes(cfg.BlockSize)) - if err != nil { - return err - } + // We only use a single account: + account := signer.Accounts()[0] // default validator account + accountName := account.Name() - blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(cfg.BlockSize)}) - fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 - tx, _, err := signer.CreatePayForBlobs(account.Name(), []*share.Blob{blob}, user.SetGasLimit(blobGas), user.SetFee(uint64(fee))) - if err != nil { - return err - } - if err := signer.IncrementSequence(account.Name()); err != nil { - return err - } - - dataSquare, txs, err := square.Build( - [][]byte{tx}, - maxSquareSize, - appconsts.SubtreeRootThreshold(1), - ) - if err != nil { - return err - } + // Create a random blob + blob, err := share.NewV0Blob(cfg.Namespace, crypto.CRandBytes(cfg.BlockSize)) + if err != nil { + return nil, err + } - eds, err := da.ExtendShares(share.ToBytes(dataSquare)) - if err != nil { - return err - } + // Build the single transaction with PFB + blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(cfg.BlockSize)}) + fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 - dah, err := da.NewDataAvailabilityHeader(eds) - if err != nil { - return err - } - - fmt.Printf("generated block size %d\n", dah.SquareSize()) - select { - case dataCh <- &tmproto.Data{ - Txs: txs, - Hash: dah.Hash(), - SquareSize: uint64(dataSquare.Size()), - }: - case <-ctx.Done(): - return ctx.Err() - } + err = signer.SetSequence(accountName, seq) + if err != nil { + return nil, err + } + // Instead of signer.IncrementSequence, we directly set the sequence. + tx, _, err := signer.CreatePayForBlobs( + accountName, + []*share.Blob{blob}, + user.SetGasLimit(blobGas), + user.SetFee(uint64(fee)), + ) + if err != nil { + return nil, err } - return nil -} -type persistData struct { - state sm.State - block *types.Block - seenCommit *types.Commit -} + // Build data square for the single TX + dataSquare, txs, err := square.Build([][]byte{tx}, maxSquareSize, appconsts.SubtreeRootThreshold(1)) + if err != nil { + return nil, err + } -func persistDataRoutine( - ctx context.Context, - stateStore sm.Store, - blockStore *store.BlockStore, - dataCh <-chan persistData, -) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case data, ok := <-dataCh: - if !ok { - return nil - } - blockParts := data.block.MakePartSet(types.BlockPartSizeBytes) - blockStore.SaveBlock(data.block, blockParts, data.seenCommit) - if blockStore.Height()%100 == 0 { - fmt.Println("Reached height", blockStore.Height()) - } + eds, err := da.ExtendShares(share.ToBytes(dataSquare)) + if err != nil { + return nil, err + } - if err := stateStore.Save(data.state); err != nil { - return err - } - } + dah, err := da.NewDataAvailabilityHeader(eds) + if err != nil { + return nil, err } + + fmt.Printf("[Worker] generated block data %d with square size %d\n", blockIndex, dah.SquareSize()) + return &tmproto.Data{ + Txs: txs, + Hash: dah.Hash(), + SquareSize: uint64(dataSquare.Size()), + }, nil } From 5ae985f9c430b4c751995d860d8240464afd1ea5 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 24 Jan 2025 11:29:23 +0100 Subject: [PATCH 04/33] chore: remove cancells --- tools/chainbuilder/main.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index c155bdeb99..d7ead4983b 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -321,8 +321,8 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { commit = blockStore.LoadSeenCommit(lastHeight) } - ctx, cancel := context.WithCancel(ctx) - defer cancel() + //ctx, cancel := context.WithCancel(ctx) + //defer cancel() // Number of blocks to produce totalBlocks := cfg.NumBlocks @@ -357,7 +357,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { // Feed jobs go func() { defer close(resultsCh) - defer cancel() // ensure workers get canceled once done + //defer cancel() // ensure workers get canceled once done for i := 0; i < totalBlocks; i++ { select { case <-ctx.Done(): @@ -389,7 +389,7 @@ mainLoop: // capture the first error errCh <- res.Err errCh = nil - cancel() + //cancel() continue } pending[res.Index] = res.Data From c48ecbbfd5366e3c5f5a264030fd5d15374e7787 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 24 Jan 2025 11:30:53 +0100 Subject: [PATCH 05/33] chore: reintroduce cancel --- tools/chainbuilder/main.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index d7ead4983b..c155bdeb99 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -321,8 +321,8 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { commit = blockStore.LoadSeenCommit(lastHeight) } - //ctx, cancel := context.WithCancel(ctx) - //defer cancel() + ctx, cancel := context.WithCancel(ctx) + defer cancel() // Number of blocks to produce totalBlocks := cfg.NumBlocks @@ -357,7 +357,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { // Feed jobs go func() { defer close(resultsCh) - //defer cancel() // ensure workers get canceled once done + defer cancel() // ensure workers get canceled once done for i := 0; i < totalBlocks; i++ { select { case <-ctx.Done(): @@ -389,7 +389,7 @@ mainLoop: // capture the first error errCh <- res.Err errCh = nil - //cancel() + cancel() continue } pending[res.Index] = res.Data From ffff0e9318fa7a18aa59cb586c203cdb47849359 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 12:47:47 +0100 Subject: [PATCH 06/33] chore: use other DBs --- tools/chainbuilder/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index c155bdeb99..8735b285fd 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -188,7 +188,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { validatorKey := privval.LoadFilePV(tmCfg.PrivValidatorKeyFile(), tmCfg.PrivValidatorStateFile()) validatorAddr := validatorKey.Key.Address - blockDB, err := dbm.NewDB("blockstore", dbm.GoLevelDBBackend, tmCfg.DBDir()) + blockDB, err := dbm.NewDB("blockstore", dbm.PebbleDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create block database: %w", err) } @@ -196,7 +196,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { blockStore := store.NewBlockStore(blockDB) - stateDB, err := dbm.NewDB("state", dbm.GoLevelDBBackend, tmCfg.DBDir()) + stateDB, err := dbm.NewDB("state", dbm.PebbleDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create state database: %w", err) } From 4ce4699d5771347cc1ba6188e73a9a889d527970 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 12:55:05 +0100 Subject: [PATCH 07/33] chore: use the synchronous version but with different databases --- tools/chainbuilder/main.go | 477 ++++++++++++++++--------------------- 1 file changed, 206 insertions(+), 271 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 8735b285fd..8c81998495 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "path/filepath" - "sync" "time" "github.com/celestiaorg/go-square/v2" @@ -62,8 +61,6 @@ func main() { upToTime, _ := cmd.Flags().GetBool("up-to-now") appVersion, _ := cmd.Flags().GetUint64("app-version") chainID, _ := cmd.Flags().GetString("chain-id") - numWorkers, _ := cmd.Flags().GetInt("num-workers") - var namespace share.Namespace if namespaceStr == "" { namespace = defaultNamespace @@ -84,7 +81,6 @@ func main() { ChainID: tmrand.Str(6), UpToTime: upToTime, AppVersion: appVersion, - NumWorkers: numWorkers, } if chainID != "" { @@ -108,8 +104,6 @@ func main() { rootCmd.Flags().Bool("up-to-now", false, "Tool will terminate if the block time reaches the current time") rootCmd.Flags().Uint64("app-version", appconsts.LatestVersion, "App version to use for the chain") rootCmd.Flags().String("chain-id", "", "Chain ID to use for the chain. Defaults to a random 6 character string") - rootCmd.Flags().Int("num-workers", 1, "Number of goroutines to use in parallel for block-data generation") - rootCmd.SilenceUsage = true rootCmd.SilenceErrors = true if err := rootCmd.Execute(); err != nil { @@ -127,9 +121,6 @@ type BuilderConfig struct { ChainID string AppVersion uint64 UpToTime bool - - // Added for parallel generation - NumWorkers int } func Run(ctx context.Context, cfg BuilderConfig, dir string) error { @@ -138,13 +129,11 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { encCfg := encoding.MakeConfig(app.ModuleBasics) tmCfg := app.DefaultConsensusConfig() - var ( gen *genesis.Genesis kr keyring.Keyring err error ) - if cfg.ExistingDir == "" { dir = filepath.Join(dir, fmt.Sprintf("testnode-%s", cfg.ChainID)) kr, err = keyring.New(app.Name, keyring.BackendTest, dir, nil, encCfg.Codec) @@ -158,9 +147,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { appCfg.StateSync.SnapshotInterval = 0 cp := app.DefaultConsensusParams() - // Set the app version - cp.Version.AppVersion = cfg.AppVersion - + cp.Version.AppVersion = cfg.AppVersion // set the app version gen = genesis.NewDefaultGenesis(). WithConsensusParams(cp). WithKeyring(kr). @@ -192,7 +179,6 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { if err != nil { return fmt.Errorf("failed to create block database: %w", err) } - defer blockDB.Close() blockStore := store.NewBlockStore(blockDB) @@ -200,7 +186,6 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { if err != nil { return fmt.Errorf("failed to create state database: %w", err) } - defer stateDB.Close() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ DiscardABCIResponses: true, @@ -210,7 +195,6 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { if err != nil { return fmt.Errorf("failed to create application database: %w", err) } - defer appDB.Close() simApp := app.New( log.NewNopLogger(), @@ -218,7 +202,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { nil, 0, encCfg, - 0, // upgrade height + 0, // upgrade height v2 0, // timeout commit util.EmptyAppOptions{}, baseapp.SetMinGasPrices(fmt.Sprintf("%f%s", appconsts.DefaultMinGasPrice, appconsts.BondDenom)), @@ -232,7 +216,6 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { } if lastHeight == 0 { - // brand new chain if gen == nil { return fmt.Errorf("non empty directory but no blocks found") } @@ -271,7 +254,6 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { state.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1) state.AppHash = res.AppHash state.LastResultsHash = merkle.HashFromByteSlices(nil) - if err := stateStore.Save(state); err != nil { return fmt.Errorf("failed to save initial state: %w", err) } @@ -279,28 +261,26 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { } else { fmt.Println("Starting from height", lastHeight) } - state, err := stateStore.Load() if err != nil { return fmt.Errorf("failed to load state: %w", err) } if cfg.ExistingDir != "" { - // extending an existing chain, start from the chain's last known block time + // if this is extending an existing chain, we want to start + // the time to be where the existing chain left off currentTime = state.LastBlockTime.Add(cfg.BlockInterval) } if state.ConsensusParams.Version.AppVersion != cfg.AppVersion { - return fmt.Errorf("app version mismatch: state has %d, but cfg has %d", - state.ConsensusParams.Version.AppVersion, cfg.AppVersion) + return fmt.Errorf("app version mismatch: state has %d, but cfg has %d", state.ConsensusParams.Version.AppVersion, cfg.AppVersion) } + if state.LastBlockHeight != lastHeight { return fmt.Errorf("last block height mismatch: state has %d, but block store has %d", state.LastBlockHeight, lastHeight) } validatorPower := state.Validators.Validators[0].VotingPower - // We retrieve the current account for the validator - // We'll assume account number = 0, and the sequence starts at (lastHeight + 1). signer, err := user.NewSigner( kr, encCfg.TxConfig, @@ -312,11 +292,12 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { return fmt.Errorf("failed to create new signer: %w", err) } - // We'll generate data in parallel, but we still build blocks sequentially. - // The code below sets up the concurrency pipeline. - - // commit at lastHeight - commit := types.NewCommit(0, 0, types.BlockID{}, nil) + var ( + errCh = make(chan error, 2) + dataCh = make(chan *tmproto.Data, 100) + persistCh = make(chan persistData, 100) + commit = types.NewCommit(0, 0, types.BlockID{}, nil) + ) if lastHeight > 0 { commit = blockStore.LoadSeenCommit(lastHeight) } @@ -324,290 +305,244 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - // Number of blocks to produce - totalBlocks := cfg.NumBlocks - - // We will generate results in random completion order, so we’ll store them - // in a map and process them sequentially (by index) in the main loop. - resultsCh := make(chan generatedBlockData, cfg.NumWorkers) - errCh := make(chan error, 1) - - // We set the base sequence to the signer's current sequence - baseSequence := signer.Accounts()[0].Sequence() - - // Launch worker pool - var wg sync.WaitGroup - jobCh := make(chan int) // block index for each job - - for w := 0; w < cfg.NumWorkers; w++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := range jobCh { - data, err := generateBlockData(i, baseSequence+uint64(i), cfg, kr, encCfg, state) - select { - case <-ctx.Done(): - return - case resultsCh <- generatedBlockData{Index: i, Data: data, Err: err}: - } - } - }() - } + go func() { + errCh <- generateSquareRoutine(ctx, signer, cfg, dataCh) + }() - // Feed jobs go func() { - defer close(resultsCh) - defer cancel() // ensure workers get canceled once done - for i := 0; i < totalBlocks; i++ { - select { - case <-ctx.Done(): - return - case jobCh <- i: - } - } - close(jobCh) - wg.Wait() + errCh <- persistDataRoutine(ctx, stateStore, blockStore, persistCh) }() - // Main loop: read from resultsCh, build blocks in sequential order - nextIndex := 0 - pending := make(map[int]*tmproto.Data) - lastBlock := blockStore.LoadBlock(lastHeight) + lastBlock := blockStore.LoadBlock(blockStore.Height()) + + for height := lastHeight + 1; height <= int64(cfg.NumBlocks)+lastHeight; height++ { + if cfg.UpToTime && lastBlock != nil && lastBlock.Time.Add(cfg.BlockInterval).After(time.Now().UTC()) { + fmt.Printf("blocks cannot be generated into the future, stopping at height %d\n", lastBlock.Height) + break + } -mainLoop: - for nextIndex < totalBlocks { select { case <-ctx.Done(): return ctx.Err() - - case res, ok := <-resultsCh: - if !ok { - // no more results will arrive - break mainLoop + case dataPB := <-dataCh: + data, err := types.DataFromProto(dataPB) + if err != nil { + return fmt.Errorf("failed to convert data from protobuf: %w", err) } - if res.Err != nil && errCh != nil { - // capture the first error - errCh <- res.Err - errCh = nil - cancel() - continue + block, blockParts := state.MakeBlock(height, data, commit, nil, validatorAddr) + blockID := types.BlockID{ + Hash: block.Hash(), + PartSetHeader: blockParts.Header(), } - pending[res.Index] = res.Data - - // See if we can build any consecutive blocks now - for { - dataProto, ok := pending[nextIndex] - if !ok { - break - } - delete(pending, nextIndex) - // If we might be generating blocks "up to now" - if cfg.UpToTime && lastBlock != nil && lastBlock.Time.Add(cfg.BlockInterval).After(time.Now().UTC()) { - fmt.Printf("Cannot generate block %d into the future, stopping.\n", lastBlock.Height+1) - break mainLoop - } - - // Build the next block - height := lastHeight + 1 + int64(nextIndex) - data, err := types.DataFromProto(dataProto) - if err != nil { - return fmt.Errorf("failed to convert data from protobuf: %w", err) - } + precommitVote := &tmproto.Vote{ + Height: height, + Round: 0, + Type: tmproto.PrecommitType, + BlockID: blockID.ToProto(), + ValidatorAddress: validatorAddr, + Timestamp: currentTime, + Signature: nil, + } - block, blockParts := state.MakeBlock(height, data, commit, nil, validatorAddr) - blockID := types.BlockID{ - Hash: block.Hash(), - PartSetHeader: blockParts.Header(), - } + if err := validatorKey.SignVote(state.ChainID, precommitVote); err != nil { + return fmt.Errorf("failed to sign precommit vote (%s): %w", precommitVote.String(), err) + } - precommitVote := &tmproto.Vote{ - Height: height, - Round: 0, - Type: tmproto.PrecommitType, - BlockID: blockID.ToProto(), - ValidatorAddress: validatorAddr, - Timestamp: currentTime, - Signature: nil, - } - if err := validatorKey.SignVote(state.ChainID, precommitVote); err != nil { - return fmt.Errorf("failed to sign precommit vote (%s): %w", precommitVote.String(), err) - } - commitSig := types.CommitSig{ - BlockIDFlag: types.BlockIDFlagCommit, - ValidatorAddress: validatorAddr, - Timestamp: currentTime, - Signature: precommitVote.Signature, - } - commit = types.NewCommit(height, 0, blockID, []types.CommitSig{commitSig}) - - var lastCommitInfo abci.LastCommitInfo - if height > 1 { - lastCommitInfo = abci.LastCommitInfo{ - Round: 0, - Votes: []abci.VoteInfo{ - { - Validator: abci.Validator{ - Address: validatorAddr, - Power: validatorPower, - }, - SignedLastBlock: true, + commitSig := types.CommitSig{ + BlockIDFlag: types.BlockIDFlagCommit, + ValidatorAddress: validatorAddr, + Timestamp: currentTime, + Signature: precommitVote.Signature, + } + commit = types.NewCommit(height, 0, blockID, []types.CommitSig{commitSig}) + + var lastCommitInfo abci.LastCommitInfo + if height > 1 { + lastCommitInfo = abci.LastCommitInfo{ + Round: 0, + Votes: []abci.VoteInfo{ + { + Validator: abci.Validator{ + Address: validatorAddr, + Power: validatorPower, }, + SignedLastBlock: true, }, - } + }, } - beginBlockResp := simApp.BeginBlock(abci.RequestBeginBlock{ - Hash: block.Hash(), - Header: *block.Header.ToProto(), - LastCommitInfo: lastCommitInfo, - }) + } - deliverTxResponses := make([]*abci.ResponseDeliverTx, len(block.Data.Txs)) - for idx, tx := range block.Data.Txs { - blobTx, isBlobTx := types.UnmarshalBlobTx(tx) - if isBlobTx { - tx = blobTx.Tx - } - deliverTxResponse := simApp.DeliverTx(abci.RequestDeliverTx{Tx: tx}) - if deliverTxResponse.Code != abci.CodeTypeOK { - return fmt.Errorf("failed to deliver tx: %s", deliverTxResponse.Log) - } - deliverTxResponses[idx] = &deliverTxResponse - } + beginBlockResp := simApp.BeginBlock(abci.RequestBeginBlock{ + Hash: block.Hash(), + Header: *block.Header.ToProto(), + LastCommitInfo: lastCommitInfo, + }) + + deliverTxResponses := make([]*abci.ResponseDeliverTx, len(block.Data.Txs)) - endBlockResp := simApp.EndBlock(abci.RequestEndBlock{Height: block.Height}) - commitResp := simApp.Commit() - - // Update state - state.LastBlockHeight = height - state.LastBlockID = blockID - state.LastBlockTime = block.Time - state.LastValidators = state.Validators - state.Validators = state.NextValidators - state.NextValidators = state.NextValidators.CopyIncrementProposerPriority(1) - state.AppHash = commitResp.Data - state.LastResultsHash = sm.ABCIResponsesResultsHash(&smproto.ABCIResponses{ - DeliverTxs: deliverTxResponses, - BeginBlock: &beginBlockResp, - EndBlock: &endBlockResp, + for idx, tx := range block.Data.Txs { + blobTx, isBlobTx := types.UnmarshalBlobTx(tx) + if isBlobTx { + tx = blobTx.Tx + } + deliverTxResponse := simApp.DeliverTx(abci.RequestDeliverTx{ + Tx: tx, }) + if deliverTxResponse.Code != abci.CodeTypeOK { + return fmt.Errorf("failed to deliver tx: %s", deliverTxResponse.Log) + } + deliverTxResponses[idx] = &deliverTxResponse + } - // Persist block and state - ps := &types.Commit{ + endBlockResp := simApp.EndBlock(abci.RequestEndBlock{ + Height: block.Height, + }) + + commitResp := simApp.Commit() + state.LastBlockHeight = height + state.LastBlockID = blockID + state.LastBlockTime = block.Time + state.LastValidators = state.Validators + state.Validators = state.NextValidators + state.NextValidators = state.NextValidators.CopyIncrementProposerPriority(1) + state.AppHash = commitResp.Data + state.LastResultsHash = sm.ABCIResponsesResultsHash(&smproto.ABCIResponses{ + DeliverTxs: deliverTxResponses, + BeginBlock: &beginBlockResp, + EndBlock: &endBlockResp, + }) + currentTime = currentTime.Add(cfg.BlockInterval) + persistCh <- persistData{ + state: state.Copy(), + block: block, + seenCommit: &types.Commit{ Height: commit.Height, Round: commit.Round, BlockID: commit.BlockID, Signatures: []types.CommitSig{commitSig}, - } - blockParts = block.MakePartSet(types.BlockPartSizeBytes) - blockStore.SaveBlock(block, blockParts, ps) - if blockStore.Height()%100 == 0 { - fmt.Println("Reached height", blockStore.Height()) - } - if err := stateStore.Save(state); err != nil { - return fmt.Errorf("failed to save state at height %d: %w", blockStore.Height(), err) - } - - lastBlock = block - currentTime = currentTime.Add(cfg.BlockInterval) - nextIndex++ + }, } } } - // If there was an error in the generation goroutines, retrieve it - close(errCh) - if len(errCh) > 0 { + close(dataCh) + close(persistCh) + + var firstErr error + for i := 0; i < cap(errCh); i++ { err := <-errCh - if err != nil { - return err + if err != nil && firstErr == nil { + firstErr = err } } - fmt.Println("Chain built successfully at height", state.LastBlockHeight) - return nil -} + if err := blockDB.Close(); err != nil { + return fmt.Errorf("failed to close block database: %w", err) + } + if err := stateDB.Close(); err != nil { + return fmt.Errorf("failed to close state database: %w", err) + } + if err := appDB.Close(); err != nil { + return fmt.Errorf("failed to close application database: %w", err) + } + + fmt.Println("Chain built successfully", state.LastBlockHeight) -// generatedBlockData holds the result of a parallel “generate block data” job. -type generatedBlockData struct { - Index int - Data *tmproto.Data - Err error + return firstErr } -// generateBlockData performs the same logic that was originally in -// generateSquareRoutine, but for a single block index. We allow specifying the -// sequence number directly instead of calling IncrementSequence on the signer. -func generateBlockData( - blockIndex int, - seq uint64, +func generateSquareRoutine( + ctx context.Context, + signer *user.Signer, cfg BuilderConfig, - kr keyring.Keyring, - encCfg encoding.Config, - state sm.State, -) (*tmproto.Data, error) { - fmt.Printf("[Worker] generating block data %d\n", blockIndex) + dataCh chan<- *tmproto.Data, +) error { + for i := 0; i < cfg.NumBlocks; i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } - signer, err := user.NewSigner( - kr, - encCfg.TxConfig, - state.ChainID, - state.ConsensusParams.Version.AppVersion, - user.NewAccount(testnode.DefaultValidatorAccountName, 0, seq), - ) - if err != nil { - return nil, fmt.Errorf("failed to create new signer: %w", err) - } + account := signer.Accounts()[0] - // We only use a single account: - account := signer.Accounts()[0] // default validator account - accountName := account.Name() + blob, err := share.NewV0Blob(cfg.Namespace, crypto.CRandBytes(cfg.BlockSize)) + if err != nil { + return err + } - // Create a random blob - blob, err := share.NewV0Blob(cfg.Namespace, crypto.CRandBytes(cfg.BlockSize)) - if err != nil { - return nil, err - } + blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(cfg.BlockSize)}) + fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 + tx, _, err := signer.CreatePayForBlobs(account.Name(), []*share.Blob{blob}, user.SetGasLimit(blobGas), user.SetFee(uint64(fee))) + if err != nil { + return err + } + if err := signer.IncrementSequence(account.Name()); err != nil { + return err + } - // Build the single transaction with PFB - blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(cfg.BlockSize)}) - fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 + dataSquare, txs, err := square.Build( + [][]byte{tx}, + maxSquareSize, + appconsts.SubtreeRootThreshold(1), + ) + if err != nil { + return err + } - err = signer.SetSequence(accountName, seq) - if err != nil { - return nil, err - } - // Instead of signer.IncrementSequence, we directly set the sequence. - tx, _, err := signer.CreatePayForBlobs( - accountName, - []*share.Blob{blob}, - user.SetGasLimit(blobGas), - user.SetFee(uint64(fee)), - ) - if err != nil { - return nil, err - } + eds, err := da.ExtendShares(share.ToBytes(dataSquare)) + if err != nil { + return err + } - // Build data square for the single TX - dataSquare, txs, err := square.Build([][]byte{tx}, maxSquareSize, appconsts.SubtreeRootThreshold(1)) - if err != nil { - return nil, err - } + dah, err := da.NewDataAvailabilityHeader(eds) + if err != nil { + return err + } - eds, err := da.ExtendShares(share.ToBytes(dataSquare)) - if err != nil { - return nil, err + select { + case dataCh <- &tmproto.Data{ + Txs: txs, + Hash: dah.Hash(), + SquareSize: uint64(dataSquare.Size()), + }: + case <-ctx.Done(): + return ctx.Err() + } } + return nil +} - dah, err := da.NewDataAvailabilityHeader(eds) - if err != nil { - return nil, err - } +type persistData struct { + state sm.State + block *types.Block + seenCommit *types.Commit +} + +func persistDataRoutine( + ctx context.Context, + stateStore sm.Store, + blockStore *store.BlockStore, + dataCh <-chan persistData, +) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case data, ok := <-dataCh: + if !ok { + return nil + } + blockParts := data.block.MakePartSet(types.BlockPartSizeBytes) + blockStore.SaveBlock(data.block, blockParts, data.seenCommit) + if blockStore.Height()%100 == 0 { + fmt.Println("Reached height", blockStore.Height()) + } - fmt.Printf("[Worker] generated block data %d with square size %d\n", blockIndex, dah.SquareSize()) - return &tmproto.Data{ - Txs: txs, - Hash: dah.Hash(), - SquareSize: uint64(dataSquare.Size()), - }, nil + if err := stateStore.Save(data.state); err != nil { + return err + } + } + } } From 0f3c800449561443be749f09e4b3d2d82e3f9758 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 13:04:14 +0100 Subject: [PATCH 08/33] chore: use c level db --- tools/chainbuilder/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 8c81998495..0cf696abdb 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -175,14 +175,14 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { validatorKey := privval.LoadFilePV(tmCfg.PrivValidatorKeyFile(), tmCfg.PrivValidatorStateFile()) validatorAddr := validatorKey.Key.Address - blockDB, err := dbm.NewDB("blockstore", dbm.PebbleDBBackend, tmCfg.DBDir()) + blockDB, err := dbm.NewDB("blockstore", dbm.CLevelDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create block database: %w", err) } blockStore := store.NewBlockStore(blockDB) - stateDB, err := dbm.NewDB("state", dbm.PebbleDBBackend, tmCfg.DBDir()) + stateDB, err := dbm.NewDB("state", dbm.CLevelDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create state database: %w", err) } From 884f08b69bd59780afae8bac147b2b33a9beb5b5 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 13:13:16 +0100 Subject: [PATCH 09/33] chore: switch to badger db --- tools/chainbuilder/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 0cf696abdb..763c2ea418 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -175,14 +175,14 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { validatorKey := privval.LoadFilePV(tmCfg.PrivValidatorKeyFile(), tmCfg.PrivValidatorStateFile()) validatorAddr := validatorKey.Key.Address - blockDB, err := dbm.NewDB("blockstore", dbm.CLevelDBBackend, tmCfg.DBDir()) + blockDB, err := dbm.NewDB("blockstore", dbm.BadgerDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create block database: %w", err) } blockStore := store.NewBlockStore(blockDB) - stateDB, err := dbm.NewDB("state", dbm.CLevelDBBackend, tmCfg.DBDir()) + stateDB, err := dbm.NewDB("state", dbm.BadgerDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create state database: %w", err) } @@ -191,7 +191,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { DiscardABCIResponses: true, }) - appDB, err := tmdbm.NewDB("application", tmdbm.GoLevelDBBackend, tmCfg.DBDir()) + appDB, err := tmdbm.NewDB("application", tmdbm.BadgerDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create application database: %w", err) } From 163b1f0bc71c8fcea490e24766205401729aa12a Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 15:57:27 +0100 Subject: [PATCH 10/33] chore: revert to pebble and golevel --- tools/chainbuilder/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 763c2ea418..8c81998495 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -175,14 +175,14 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { validatorKey := privval.LoadFilePV(tmCfg.PrivValidatorKeyFile(), tmCfg.PrivValidatorStateFile()) validatorAddr := validatorKey.Key.Address - blockDB, err := dbm.NewDB("blockstore", dbm.BadgerDBBackend, tmCfg.DBDir()) + blockDB, err := dbm.NewDB("blockstore", dbm.PebbleDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create block database: %w", err) } blockStore := store.NewBlockStore(blockDB) - stateDB, err := dbm.NewDB("state", dbm.BadgerDBBackend, tmCfg.DBDir()) + stateDB, err := dbm.NewDB("state", dbm.PebbleDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create state database: %w", err) } @@ -191,7 +191,7 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { DiscardABCIResponses: true, }) - appDB, err := tmdbm.NewDB("application", tmdbm.BadgerDBBackend, tmCfg.DBDir()) + appDB, err := tmdbm.NewDB("application", tmdbm.GoLevelDBBackend, tmCfg.DBDir()) if err != nil { return fmt.Errorf("failed to create application database: %w", err) } From 11811d07f7a65a4f0a261f1a3332d4a6c0e62b99 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 20:06:38 +0100 Subject: [PATCH 11/33] chore: use pebble db provider --- cmd/celestia-appd/cmd/start.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cmd/celestia-appd/cmd/start.go b/cmd/celestia-appd/cmd/start.go index 057ae4422a..e3034a55f3 100644 --- a/cmd/celestia-appd/cmd/start.go +++ b/cmd/celestia-appd/cmd/start.go @@ -5,6 +5,7 @@ package cmd import ( "fmt" + db "github.com/cometbft/cometbft-db" "io" "net" "net/http" @@ -257,6 +258,14 @@ func startStandAlone(ctx *server.Context, appCreator srvrtypes.AppCreator) error return server.WaitForQuitSignals() } +func dbProvider(ctx *node.DBContext) (db.DB, error) { + blockDB, err := db.NewDB(ctx.ID, db.PebbleDBBackend, ctx.Config.DBDir()) + if err != nil { + return nil, err + } + return blockDB, nil +} + func startInProcess(ctx *server.Context, clientCtx client.Context, appCreator srvrtypes.AppCreator) error { cfg := ctx.Config home := cfg.RootDir @@ -307,7 +316,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, appCreator sr nodeKey, proxy.NewLocalClientCreator(app), genDocProvider, - node.DefaultDBProvider, + dbProvider, node.DefaultMetricsProvider(cfg.Instrumentation), ctx.Logger, ) From 123377b43b44cb91e5aa27caebf05e654a7736e0 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 20:14:29 +0100 Subject: [PATCH 12/33] chore: bump consts --- pkg/appconsts/initial_consts.go | 2 +- pkg/appconsts/v3/app_consts.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/appconsts/initial_consts.go b/pkg/appconsts/initial_consts.go index c9924f5174..705d67d420 100644 --- a/pkg/appconsts/initial_consts.go +++ b/pkg/appconsts/initial_consts.go @@ -11,7 +11,7 @@ import ( const ( // DefaultGovMaxSquareSize is the default value for the governance modifiable // max square size. - DefaultGovMaxSquareSize = 64 + DefaultGovMaxSquareSize = 512 // DefaultMaxBytes is the default value for the governance modifiable // maximum number of bytes allowed in a valid block. diff --git a/pkg/appconsts/v3/app_consts.go b/pkg/appconsts/v3/app_consts.go index e5b660bf2e..a018cfec79 100644 --- a/pkg/appconsts/v3/app_consts.go +++ b/pkg/appconsts/v3/app_consts.go @@ -4,11 +4,11 @@ import "time" const ( Version uint64 = 3 - SquareSizeUpperBound int = 128 + SquareSizeUpperBound int = 512 SubtreeRootThreshold int = 64 TxSizeCostPerByte uint64 = 10 GasPerBlobByte uint32 = 8 - MaxTxSize int = 2097152 // 2 MiB in bytes + MaxTxSize int = 209715200 // 2 MiB in bytes TimeoutPropose = time.Millisecond * 3500 TimeoutCommit = time.Millisecond * 4200 // UpgradeHeightDelay is the number of blocks after a quorum has been From 89e3161fd543e7a0a6eadf637df76e8458814be2 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 20:47:06 +0100 Subject: [PATCH 13/33] chore: add some prints --- app/check_tx.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/app/check_tx.go b/app/check_tx.go index 68ff288caa..07dd8a3bdd 100644 --- a/app/check_tx.go +++ b/app/check_tx.go @@ -19,12 +19,14 @@ import ( func (app *App) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { tx := req.Tx + fmt.Println("received transaction") // all txs must be less than or equal to the max tx size limit maxTxSize := appconsts.MaxTxSize(app.AppVersion()) currentTxSize := len(tx) if currentTxSize > maxTxSize { return sdkerrors.ResponseCheckTxWithEvents(errors.Wrapf(apperr.ErrTxExceedsMaxSize, "tx size %d bytes is larger than the application's configured MaxTxSize of %d bytes for version %d", currentTxSize, maxTxSize, app.AppVersion()), 0, 0, []abci.Event{}, false) } + fmt.Println("not more than max tx size") // check if the transaction contains blobs btx, isBlob, err := blobtx.UnmarshalBlobTx(tx) @@ -32,6 +34,8 @@ func (app *App) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { return sdkerrors.ResponseCheckTxWithEvents(err, 0, 0, []abci.Event{}, false) } + fmt.Println("unmarshalling tx") + if !isBlob { // reject transactions that can't be decoded sdkTx, err := app.txConfig.TxDecoder()(tx) From 716beb27cd1f8f71e8d9e3a260d071be31a4be68 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 21:03:05 +0100 Subject: [PATCH 14/33] chore: fill block with multiple txs --- tools/chainbuilder/main.go | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 8c81998495..b8f89a1d65 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -452,6 +452,8 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { return firstErr } +const blobSize = 2_000_000 + func generateSquareRoutine( ctx context.Context, signer *user.Signer, @@ -467,23 +469,27 @@ func generateSquareRoutine( account := signer.Accounts()[0] - blob, err := share.NewV0Blob(cfg.Namespace, crypto.CRandBytes(cfg.BlockSize)) - if err != nil { - return err - } + blobTxs := make([][]byte, 0) + for size := 0; size < cfg.BlockSize; size += blobSize { + blob, err := share.NewV0Blob(cfg.Namespace, crypto.CRandBytes(blobSize)) + if err != nil { + return err + } - blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(cfg.BlockSize)}) - fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 - tx, _, err := signer.CreatePayForBlobs(account.Name(), []*share.Blob{blob}, user.SetGasLimit(blobGas), user.SetFee(uint64(fee))) - if err != nil { - return err - } - if err := signer.IncrementSequence(account.Name()); err != nil { - return err + blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(blobSize)}) + fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 + tx, _, err := signer.CreatePayForBlobs(account.Name(), []*share.Blob{blob}, user.SetGasLimit(blobGas), user.SetFee(uint64(fee))) + if err != nil { + return err + } + if err := signer.IncrementSequence(account.Name()); err != nil { + return err + } + blobTxs = append(blobTxs, tx) } dataSquare, txs, err := square.Build( - [][]byte{tx}, + blobTxs, maxSquareSize, appconsts.SubtreeRootThreshold(1), ) From 6980eb2d1c9fa4f6bc2b4a70f780c6166b2f4a89 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 21:08:36 +0100 Subject: [PATCH 15/33] chore: decreate max gv square size --- pkg/appconsts/initial_consts.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/appconsts/initial_consts.go b/pkg/appconsts/initial_consts.go index 705d67d420..bfbe37c7cd 100644 --- a/pkg/appconsts/initial_consts.go +++ b/pkg/appconsts/initial_consts.go @@ -11,7 +11,7 @@ import ( const ( // DefaultGovMaxSquareSize is the default value for the governance modifiable // max square size. - DefaultGovMaxSquareSize = 512 + DefaultGovMaxSquareSize = 256 // DefaultMaxBytes is the default value for the governance modifiable // maximum number of bytes allowed in a valid block. From ac82da89eb23776ac4ef6bb3007875ef095914d8 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 27 Jan 2025 21:12:01 +0100 Subject: [PATCH 16/33] Revert "chore: fill block with multiple txs" This reverts commit 716beb27cd1f8f71e8d9e3a260d071be31a4be68. --- tools/chainbuilder/main.go | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index b8f89a1d65..8c81998495 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -452,8 +452,6 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { return firstErr } -const blobSize = 2_000_000 - func generateSquareRoutine( ctx context.Context, signer *user.Signer, @@ -469,27 +467,23 @@ func generateSquareRoutine( account := signer.Accounts()[0] - blobTxs := make([][]byte, 0) - for size := 0; size < cfg.BlockSize; size += blobSize { - blob, err := share.NewV0Blob(cfg.Namespace, crypto.CRandBytes(blobSize)) - if err != nil { - return err - } + blob, err := share.NewV0Blob(cfg.Namespace, crypto.CRandBytes(cfg.BlockSize)) + if err != nil { + return err + } - blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(blobSize)}) - fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 - tx, _, err := signer.CreatePayForBlobs(account.Name(), []*share.Blob{blob}, user.SetGasLimit(blobGas), user.SetFee(uint64(fee))) - if err != nil { - return err - } - if err := signer.IncrementSequence(account.Name()); err != nil { - return err - } - blobTxs = append(blobTxs, tx) + blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(cfg.BlockSize)}) + fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 + tx, _, err := signer.CreatePayForBlobs(account.Name(), []*share.Blob{blob}, user.SetGasLimit(blobGas), user.SetFee(uint64(fee))) + if err != nil { + return err + } + if err := signer.IncrementSequence(account.Name()); err != nil { + return err } dataSquare, txs, err := square.Build( - blobTxs, + [][]byte{tx}, maxSquareSize, appconsts.SubtreeRootThreshold(1), ) From a7e2c66972f4653da661ddfc3a39ed4e4a099c49 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 30 Jan 2025 20:28:05 +0100 Subject: [PATCH 17/33] chore: empty data --- tools/chainbuilder/main.go | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 8c81998495..5e078cf2f0 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + blobtypes "github.com/celestiaorg/celestia-app/v3/x/blob/types" "os" "path/filepath" "time" @@ -34,7 +35,6 @@ import ( "github.com/celestiaorg/celestia-app/v3/test/util" "github.com/celestiaorg/celestia-app/v3/test/util/genesis" "github.com/celestiaorg/celestia-app/v3/test/util/testnode" - blobtypes "github.com/celestiaorg/celestia-app/v3/x/blob/types" ) var defaultNamespace share.Namespace @@ -325,6 +325,14 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { case <-ctx.Done(): return ctx.Err() case dataPB := <-dataCh: + if height == 1 { + fmt.Println("generating empty block") + d, err := emptyData() + if err != nil { + return err + } + dataPB = d + } data, err := types.DataFromProto(dataPB) if err != nil { return fmt.Errorf("failed to convert data from protobuf: %w", err) @@ -514,6 +522,32 @@ func generateSquareRoutine( return nil } +func emptyData() (*tmproto.Data, error) { + dataSquare, txs, err := square.Build( + [][]byte{}, + maxSquareSize, + appconsts.SubtreeRootThreshold(1), + ) + if err != nil { + return nil, err + } + + eds, err := da.ExtendShares(share.ToBytes(dataSquare)) + if err != nil { + return nil, err + } + + dah, err := da.NewDataAvailabilityHeader(eds) + if err != nil { + return nil, err + } + return &tmproto.Data{ + Txs: txs, + Hash: dah.Hash(), + SquareSize: uint64(dataSquare.Size()), + }, nil +} + type persistData struct { state sm.State block *types.Block From a4e9a02639b7000b0add3e93c0bfa685aa58ac86 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 30 Jan 2025 20:30:46 +0100 Subject: [PATCH 18/33] chore: empty data --- tools/chainbuilder/main.go | 194 +++++++++++++++++++------------------ 1 file changed, 99 insertions(+), 95 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 5e078cf2f0..f0b38cd1c2 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -321,116 +321,120 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { break } - select { - case <-ctx.Done(): - return ctx.Err() - case dataPB := <-dataCh: - if height == 1 { - fmt.Println("generating empty block") - d, err := emptyData() - if err != nil { - return err - } - dataPB = d - } - data, err := types.DataFromProto(dataPB) + var dd *tmproto.Data + if height == 1 { + fmt.Println("generating empty block") + d, err := emptyData() if err != nil { - return fmt.Errorf("failed to convert data from protobuf: %w", err) + return err } - block, blockParts := state.MakeBlock(height, data, commit, nil, validatorAddr) - blockID := types.BlockID{ - Hash: block.Hash(), - PartSetHeader: blockParts.Header(), + dd = d + } else { + select { + case <-ctx.Done(): + return ctx.Err() + case dataPB := <-dataCh: + dd = dataPB } + } - precommitVote := &tmproto.Vote{ - Height: height, - Round: 0, - Type: tmproto.PrecommitType, - BlockID: blockID.ToProto(), - ValidatorAddress: validatorAddr, - Timestamp: currentTime, - Signature: nil, - } + data, err := types.DataFromProto(dd) + if err != nil { + return fmt.Errorf("failed to convert data from protobuf: %w", err) + } + block, blockParts := state.MakeBlock(height, data, commit, nil, validatorAddr) + blockID := types.BlockID{ + Hash: block.Hash(), + PartSetHeader: blockParts.Header(), + } - if err := validatorKey.SignVote(state.ChainID, precommitVote); err != nil { - return fmt.Errorf("failed to sign precommit vote (%s): %w", precommitVote.String(), err) - } + precommitVote := &tmproto.Vote{ + Height: height, + Round: 0, + Type: tmproto.PrecommitType, + BlockID: blockID.ToProto(), + ValidatorAddress: validatorAddr, + Timestamp: currentTime, + Signature: nil, + } - commitSig := types.CommitSig{ - BlockIDFlag: types.BlockIDFlagCommit, - ValidatorAddress: validatorAddr, - Timestamp: currentTime, - Signature: precommitVote.Signature, - } - commit = types.NewCommit(height, 0, blockID, []types.CommitSig{commitSig}) - - var lastCommitInfo abci.LastCommitInfo - if height > 1 { - lastCommitInfo = abci.LastCommitInfo{ - Round: 0, - Votes: []abci.VoteInfo{ - { - Validator: abci.Validator{ - Address: validatorAddr, - Power: validatorPower, - }, - SignedLastBlock: true, + if err := validatorKey.SignVote(state.ChainID, precommitVote); err != nil { + return fmt.Errorf("failed to sign precommit vote (%s): %w", precommitVote.String(), err) + } + + commitSig := types.CommitSig{ + BlockIDFlag: types.BlockIDFlagCommit, + ValidatorAddress: validatorAddr, + Timestamp: currentTime, + Signature: precommitVote.Signature, + } + commit = types.NewCommit(height, 0, blockID, []types.CommitSig{commitSig}) + + var lastCommitInfo abci.LastCommitInfo + if height > 1 { + lastCommitInfo = abci.LastCommitInfo{ + Round: 0, + Votes: []abci.VoteInfo{ + { + Validator: abci.Validator{ + Address: validatorAddr, + Power: validatorPower, }, + SignedLastBlock: true, }, - } + }, } + } - beginBlockResp := simApp.BeginBlock(abci.RequestBeginBlock{ - Hash: block.Hash(), - Header: *block.Header.ToProto(), - LastCommitInfo: lastCommitInfo, - }) + beginBlockResp := simApp.BeginBlock(abci.RequestBeginBlock{ + Hash: block.Hash(), + Header: *block.Header.ToProto(), + LastCommitInfo: lastCommitInfo, + }) - deliverTxResponses := make([]*abci.ResponseDeliverTx, len(block.Data.Txs)) + deliverTxResponses := make([]*abci.ResponseDeliverTx, len(block.Data.Txs)) - for idx, tx := range block.Data.Txs { - blobTx, isBlobTx := types.UnmarshalBlobTx(tx) - if isBlobTx { - tx = blobTx.Tx - } - deliverTxResponse := simApp.DeliverTx(abci.RequestDeliverTx{ - Tx: tx, - }) - if deliverTxResponse.Code != abci.CodeTypeOK { - return fmt.Errorf("failed to deliver tx: %s", deliverTxResponse.Log) - } - deliverTxResponses[idx] = &deliverTxResponse + for idx, tx := range block.Data.Txs { + blobTx, isBlobTx := types.UnmarshalBlobTx(tx) + if isBlobTx { + tx = blobTx.Tx } - - endBlockResp := simApp.EndBlock(abci.RequestEndBlock{ - Height: block.Height, + deliverTxResponse := simApp.DeliverTx(abci.RequestDeliverTx{ + Tx: tx, }) - - commitResp := simApp.Commit() - state.LastBlockHeight = height - state.LastBlockID = blockID - state.LastBlockTime = block.Time - state.LastValidators = state.Validators - state.Validators = state.NextValidators - state.NextValidators = state.NextValidators.CopyIncrementProposerPriority(1) - state.AppHash = commitResp.Data - state.LastResultsHash = sm.ABCIResponsesResultsHash(&smproto.ABCIResponses{ - DeliverTxs: deliverTxResponses, - BeginBlock: &beginBlockResp, - EndBlock: &endBlockResp, - }) - currentTime = currentTime.Add(cfg.BlockInterval) - persistCh <- persistData{ - state: state.Copy(), - block: block, - seenCommit: &types.Commit{ - Height: commit.Height, - Round: commit.Round, - BlockID: commit.BlockID, - Signatures: []types.CommitSig{commitSig}, - }, + if deliverTxResponse.Code != abci.CodeTypeOK { + return fmt.Errorf("failed to deliver tx: %s", deliverTxResponse.Log) } + deliverTxResponses[idx] = &deliverTxResponse + } + + endBlockResp := simApp.EndBlock(abci.RequestEndBlock{ + Height: block.Height, + }) + + commitResp := simApp.Commit() + state.LastBlockHeight = height + state.LastBlockID = blockID + state.LastBlockTime = block.Time + state.LastValidators = state.Validators + state.Validators = state.NextValidators + state.NextValidators = state.NextValidators.CopyIncrementProposerPriority(1) + state.AppHash = commitResp.Data + state.LastResultsHash = sm.ABCIResponsesResultsHash(&smproto.ABCIResponses{ + DeliverTxs: deliverTxResponses, + BeginBlock: &beginBlockResp, + EndBlock: &endBlockResp, + }) + currentTime = currentTime.Add(cfg.BlockInterval) + persistCh <- persistData{ + state: state.Copy(), + block: block, + seenCommit: &types.Commit{ + Height: commit.Height, + Round: commit.Round, + BlockID: commit.BlockID, + Signatures: []types.CommitSig{commitSig}, + }, } } From c4fce348a838faf4dbdaa5dbd4c586ff21fc9932 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 30 Jan 2025 21:46:05 +0100 Subject: [PATCH 19/33] chore: prioritize checking done context --- tools/chainbuilder/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index f0b38cd1c2..c64ca48cde 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -514,13 +514,13 @@ func generateSquareRoutine( } select { + case <-ctx.Done(): + return ctx.Err() case dataCh <- &tmproto.Data{ Txs: txs, Hash: dah.Hash(), SquareSize: uint64(dataSquare.Size()), }: - case <-ctx.Done(): - return ctx.Err() } } return nil From ee6e446ebafcaa6c0b37bb24f6faf85538e54450 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 30 Jan 2025 21:48:00 +0100 Subject: [PATCH 20/33] chore: prioritize checking done context --- tools/chainbuilder/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index c64ca48cde..d851472b63 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -470,7 +470,7 @@ func generateSquareRoutine( cfg BuilderConfig, dataCh chan<- *tmproto.Data, ) error { - for i := 0; i < cfg.NumBlocks; i++ { + for i := 0; i < cfg.NumBlocks-1; i++ { select { case <-ctx.Done(): return ctx.Err() From 5e356f88689fd8844b20d97bd9aa84d41e737f3b Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 10:28:34 +0100 Subject: [PATCH 21/33] chore: revert unnecessary changes + cosmetics --- app/check_tx.go | 4 -- cmd/celestia-appd/cmd/start.go | 11 +----- pkg/appconsts/initial_consts.go | 2 +- pkg/appconsts/v3/app_consts.go | 4 +- tools/chainbuilder/main.go | 66 ++++++++++++++++----------------- 5 files changed, 36 insertions(+), 51 deletions(-) diff --git a/app/check_tx.go b/app/check_tx.go index 07dd8a3bdd..68ff288caa 100644 --- a/app/check_tx.go +++ b/app/check_tx.go @@ -19,14 +19,12 @@ import ( func (app *App) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { tx := req.Tx - fmt.Println("received transaction") // all txs must be less than or equal to the max tx size limit maxTxSize := appconsts.MaxTxSize(app.AppVersion()) currentTxSize := len(tx) if currentTxSize > maxTxSize { return sdkerrors.ResponseCheckTxWithEvents(errors.Wrapf(apperr.ErrTxExceedsMaxSize, "tx size %d bytes is larger than the application's configured MaxTxSize of %d bytes for version %d", currentTxSize, maxTxSize, app.AppVersion()), 0, 0, []abci.Event{}, false) } - fmt.Println("not more than max tx size") // check if the transaction contains blobs btx, isBlob, err := blobtx.UnmarshalBlobTx(tx) @@ -34,8 +32,6 @@ func (app *App) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { return sdkerrors.ResponseCheckTxWithEvents(err, 0, 0, []abci.Event{}, false) } - fmt.Println("unmarshalling tx") - if !isBlob { // reject transactions that can't be decoded sdkTx, err := app.txConfig.TxDecoder()(tx) diff --git a/cmd/celestia-appd/cmd/start.go b/cmd/celestia-appd/cmd/start.go index e3034a55f3..057ae4422a 100644 --- a/cmd/celestia-appd/cmd/start.go +++ b/cmd/celestia-appd/cmd/start.go @@ -5,7 +5,6 @@ package cmd import ( "fmt" - db "github.com/cometbft/cometbft-db" "io" "net" "net/http" @@ -258,14 +257,6 @@ func startStandAlone(ctx *server.Context, appCreator srvrtypes.AppCreator) error return server.WaitForQuitSignals() } -func dbProvider(ctx *node.DBContext) (db.DB, error) { - blockDB, err := db.NewDB(ctx.ID, db.PebbleDBBackend, ctx.Config.DBDir()) - if err != nil { - return nil, err - } - return blockDB, nil -} - func startInProcess(ctx *server.Context, clientCtx client.Context, appCreator srvrtypes.AppCreator) error { cfg := ctx.Config home := cfg.RootDir @@ -316,7 +307,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, appCreator sr nodeKey, proxy.NewLocalClientCreator(app), genDocProvider, - dbProvider, + node.DefaultDBProvider, node.DefaultMetricsProvider(cfg.Instrumentation), ctx.Logger, ) diff --git a/pkg/appconsts/initial_consts.go b/pkg/appconsts/initial_consts.go index bfbe37c7cd..c9924f5174 100644 --- a/pkg/appconsts/initial_consts.go +++ b/pkg/appconsts/initial_consts.go @@ -11,7 +11,7 @@ import ( const ( // DefaultGovMaxSquareSize is the default value for the governance modifiable // max square size. - DefaultGovMaxSquareSize = 256 + DefaultGovMaxSquareSize = 64 // DefaultMaxBytes is the default value for the governance modifiable // maximum number of bytes allowed in a valid block. diff --git a/pkg/appconsts/v3/app_consts.go b/pkg/appconsts/v3/app_consts.go index a018cfec79..e5b660bf2e 100644 --- a/pkg/appconsts/v3/app_consts.go +++ b/pkg/appconsts/v3/app_consts.go @@ -4,11 +4,11 @@ import "time" const ( Version uint64 = 3 - SquareSizeUpperBound int = 512 + SquareSizeUpperBound int = 128 SubtreeRootThreshold int = 64 TxSizeCostPerByte uint64 = 10 GasPerBlobByte uint32 = 8 - MaxTxSize int = 209715200 // 2 MiB in bytes + MaxTxSize int = 2097152 // 2 MiB in bytes TimeoutPropose = time.Millisecond * 3500 TimeoutCommit = time.Millisecond * 4200 // UpgradeHeightDelay is the number of blocks after a quorum has been diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index d851472b63..0204c13bd8 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -39,6 +39,33 @@ import ( var defaultNamespace share.Namespace +// emptyBlockData contains the protobuf block data for a block without transactions. +var emptyBlockData = func() tmproto.Data { + dataSquare, txs, err := square.Build( + [][]byte{}, + maxSquareSize, + appconsts.SubtreeRootThreshold(1), + ) + if err != nil { + panic(err) + } + + eds, err := da.ExtendShares(share.ToBytes(dataSquare)) + if err != nil { + panic(err) + } + + dah, err := da.NewDataAvailabilityHeader(eds) + if err != nil { + panic(err) + } + return tmproto.Data{ + Txs: txs, + Hash: dah.Hash(), + SquareSize: uint64(dataSquare.Size()), + } +}() + const ( defaultNamespaceStr = "test" maxSquareSize = 512 @@ -323,12 +350,8 @@ func Run(ctx context.Context, cfg BuilderConfig, dir string) error { var dd *tmproto.Data if height == 1 { - fmt.Println("generating empty block") - d, err := emptyData() - if err != nil { - return err - } - dd = d + // generating an empty block for height 1 + dd = &emptyBlockData } else { select { case <-ctx.Done(): @@ -470,6 +493,7 @@ func generateSquareRoutine( cfg BuilderConfig, dataCh chan<- *tmproto.Data, ) error { + // cfg.NumBlocks-1 because block 0 is genesis and block 1 shouldn't contain any transaction for i := 0; i < cfg.NumBlocks-1; i++ { select { case <-ctx.Done(): @@ -514,44 +538,18 @@ func generateSquareRoutine( } select { - case <-ctx.Done(): - return ctx.Err() case dataCh <- &tmproto.Data{ Txs: txs, Hash: dah.Hash(), SquareSize: uint64(dataSquare.Size()), }: + case <-ctx.Done(): + return ctx.Err() } } return nil } -func emptyData() (*tmproto.Data, error) { - dataSquare, txs, err := square.Build( - [][]byte{}, - maxSquareSize, - appconsts.SubtreeRootThreshold(1), - ) - if err != nil { - return nil, err - } - - eds, err := da.ExtendShares(share.ToBytes(dataSquare)) - if err != nil { - return nil, err - } - - dah, err := da.NewDataAvailabilityHeader(eds) - if err != nil { - return nil, err - } - return &tmproto.Data{ - Txs: txs, - Hash: dah.Hash(), - SquareSize: uint64(dataSquare.Size()), - }, nil -} - type persistData struct { state sm.State block *types.Block From c6404bbc26dfdc52567597b26d94544982d85198 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 10:46:19 +0100 Subject: [PATCH 22/33] chore: gofjmpt --- tools/chainbuilder/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 0204c13bd8..763e0e0ca6 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -3,11 +3,12 @@ package main import ( "context" "fmt" - blobtypes "github.com/celestiaorg/celestia-app/v3/x/blob/types" "os" "path/filepath" "time" + blobtypes "github.com/celestiaorg/celestia-app/v3/x/blob/types" + "github.com/celestiaorg/go-square/v2" "github.com/celestiaorg/go-square/v2/share" dbm "github.com/cometbft/cometbft-db" From 03906052c96630be92b5939df965c874b0d90c90 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 11:32:27 +0100 Subject: [PATCH 23/33] chore:keep using the pebble store --- cmd/celestia-appd/cmd/start.go | 11 ++++++++++- pkg/appconsts/initial_consts.go | 2 +- pkg/appconsts/v3/app_consts.go | 4 ++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/cmd/celestia-appd/cmd/start.go b/cmd/celestia-appd/cmd/start.go index 057ae4422a..e3034a55f3 100644 --- a/cmd/celestia-appd/cmd/start.go +++ b/cmd/celestia-appd/cmd/start.go @@ -5,6 +5,7 @@ package cmd import ( "fmt" + db "github.com/cometbft/cometbft-db" "io" "net" "net/http" @@ -257,6 +258,14 @@ func startStandAlone(ctx *server.Context, appCreator srvrtypes.AppCreator) error return server.WaitForQuitSignals() } +func dbProvider(ctx *node.DBContext) (db.DB, error) { + blockDB, err := db.NewDB(ctx.ID, db.PebbleDBBackend, ctx.Config.DBDir()) + if err != nil { + return nil, err + } + return blockDB, nil +} + func startInProcess(ctx *server.Context, clientCtx client.Context, appCreator srvrtypes.AppCreator) error { cfg := ctx.Config home := cfg.RootDir @@ -307,7 +316,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, appCreator sr nodeKey, proxy.NewLocalClientCreator(app), genDocProvider, - node.DefaultDBProvider, + dbProvider, node.DefaultMetricsProvider(cfg.Instrumentation), ctx.Logger, ) diff --git a/pkg/appconsts/initial_consts.go b/pkg/appconsts/initial_consts.go index c9924f5174..bfbe37c7cd 100644 --- a/pkg/appconsts/initial_consts.go +++ b/pkg/appconsts/initial_consts.go @@ -11,7 +11,7 @@ import ( const ( // DefaultGovMaxSquareSize is the default value for the governance modifiable // max square size. - DefaultGovMaxSquareSize = 64 + DefaultGovMaxSquareSize = 256 // DefaultMaxBytes is the default value for the governance modifiable // maximum number of bytes allowed in a valid block. diff --git a/pkg/appconsts/v3/app_consts.go b/pkg/appconsts/v3/app_consts.go index e5b660bf2e..a018cfec79 100644 --- a/pkg/appconsts/v3/app_consts.go +++ b/pkg/appconsts/v3/app_consts.go @@ -4,11 +4,11 @@ import "time" const ( Version uint64 = 3 - SquareSizeUpperBound int = 128 + SquareSizeUpperBound int = 512 SubtreeRootThreshold int = 64 TxSizeCostPerByte uint64 = 10 GasPerBlobByte uint32 = 8 - MaxTxSize int = 2097152 // 2 MiB in bytes + MaxTxSize int = 209715200 // 2 MiB in bytes TimeoutPropose = time.Millisecond * 3500 TimeoutCommit = time.Millisecond * 4200 // UpgradeHeightDelay is the number of blocks after a quorum has been From ddc1a5ad2cf7eec44ad9fdd8c14fac0ea88644c4 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 11:40:17 +0100 Subject: [PATCH 24/33] chore: increase max tx size --- pkg/appconsts/v3/app_consts.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/appconsts/v3/app_consts.go b/pkg/appconsts/v3/app_consts.go index a018cfec79..0afaf2c393 100644 --- a/pkg/appconsts/v3/app_consts.go +++ b/pkg/appconsts/v3/app_consts.go @@ -8,7 +8,7 @@ const ( SubtreeRootThreshold int = 64 TxSizeCostPerByte uint64 = 10 GasPerBlobByte uint32 = 8 - MaxTxSize int = 209715200 // 2 MiB in bytes + MaxTxSize int = 409715200 // 2 MiB in bytes TimeoutPropose = time.Millisecond * 3500 TimeoutCommit = time.Millisecond * 4200 // UpgradeHeightDelay is the number of blocks after a quorum has been From 8d60a499a4f264748f54b6a760fdf9e8513a6eeb Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 11:46:24 +0100 Subject: [PATCH 25/33] chore: increase max tx size --- app/default_overrides.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/default_overrides.go b/app/default_overrides.go index f0e7d13cba..1d0ed5a921 100644 --- a/app/default_overrides.go +++ b/app/default_overrides.go @@ -296,6 +296,6 @@ func DefaultAppConfig() *serverconfig.Config { cfg.MinGasPrices = fmt.Sprintf("%v%s", appconsts.DefaultMinGasPrice, BondDenom) const mebibyte = 1048576 - cfg.GRPC.MaxRecvMsgSize = 20 * mebibyte + cfg.GRPC.MaxRecvMsgSize = 40 * mebibyte return cfg } From 7833fddf14175cf668bcf5f5b4d87fb603b09e0b Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 11:55:21 +0100 Subject: [PATCH 26/33] chore: increase MaxBodyBytes --- app/default_overrides.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/default_overrides.go b/app/default_overrides.go index 1d0ed5a921..ce82efd11d 100644 --- a/app/default_overrides.go +++ b/app/default_overrides.go @@ -259,7 +259,7 @@ func DefaultConsensusConfig() *tmcfg.Config { cfg := tmcfg.DefaultConfig() // Set broadcast timeout to be 50 seconds in order to avoid timeouts for long block times cfg.RPC.TimeoutBroadcastTxCommit = 50 * time.Second - cfg.RPC.MaxBodyBytes = int64(8388608) // 8 MiB + cfg.RPC.MaxBodyBytes = int64(83886080) // 8 MiB cfg.Mempool.TTLNumBlocks = 12 cfg.Mempool.TTLDuration = 75 * time.Second From fcd7192b5f530aa86792004a32c576da61859a2b Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 11:58:32 +0100 Subject: [PATCH 27/33] chore: increase MaxTxBytes --- app/default_overrides.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/default_overrides.go b/app/default_overrides.go index ce82efd11d..accc466eab 100644 --- a/app/default_overrides.go +++ b/app/default_overrides.go @@ -263,8 +263,8 @@ func DefaultConsensusConfig() *tmcfg.Config { cfg.Mempool.TTLNumBlocks = 12 cfg.Mempool.TTLDuration = 75 * time.Second - cfg.Mempool.MaxTxBytes = 7_897_088 - cfg.Mempool.MaxTxsBytes = 39_485_440 + cfg.Mempool.MaxTxBytes = 7_897_088_000 + cfg.Mempool.MaxTxsBytes = 39_485_440_000 cfg.Mempool.Version = "v1" // prioritized mempool cfg.Consensus.TimeoutPropose = appconsts.GetTimeoutPropose(appconsts.LatestVersion) From 784a56dc98159059bc952b4571367b98d0bb5332 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 12:23:50 +0100 Subject: [PATCH 28/33] chore: multiple blobs per square --- tools/chainbuilder/main.go | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index 763e0e0ca6..f226d3788a 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "math/rand" "os" "path/filepath" "time" @@ -504,23 +505,29 @@ func generateSquareRoutine( account := signer.Accounts()[0] - blob, err := share.NewV0Blob(cfg.Namespace, crypto.CRandBytes(cfg.BlockSize)) - if err != nil { - return err - } + blobTxs := make([][]byte, 0) + numberOfBlobs := rand.Intn(100) + blobSize := cfg.BlockSize / numberOfBlobs + for size := 0; size < cfg.BlockSize; size += blobSize { + blob, err := share.NewV0Blob(share.RandomNamespace(), crypto.CRandBytes(blobSize)) + if err != nil { + return err + } - blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(cfg.BlockSize)}) - fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 - tx, _, err := signer.CreatePayForBlobs(account.Name(), []*share.Blob{blob}, user.SetGasLimit(blobGas), user.SetFee(uint64(fee))) - if err != nil { - return err - } - if err := signer.IncrementSequence(account.Name()); err != nil { - return err + blobGas := blobtypes.DefaultEstimateGas([]uint32{uint32(blobSize)}) + fee := float64(blobGas) * appconsts.DefaultMinGasPrice * 2 + tx, _, err := signer.CreatePayForBlobs(account.Name(), []*share.Blob{blob}, user.SetGasLimit(blobGas), user.SetFee(uint64(fee))) + if err != nil { + return err + } + if err := signer.IncrementSequence(account.Name()); err != nil { + return err + } + blobTxs = append(blobTxs, tx) } dataSquare, txs, err := square.Build( - [][]byte{tx}, + blobTxs, maxSquareSize, appconsts.SubtreeRootThreshold(1), ) From 1d814ca4ce718b30d4137b4ddbb08a728945f05e Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 12:25:07 +0100 Subject: [PATCH 29/33] chore: printing the number of blobs in block --- tools/chainbuilder/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index f226d3788a..a06dc11e32 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -507,6 +507,7 @@ func generateSquareRoutine( blobTxs := make([][]byte, 0) numberOfBlobs := rand.Intn(100) + fmt.Printf("generating block with %d blobs\n", numberOfBlobs) blobSize := cfg.BlockSize / numberOfBlobs for size := 0; size < cfg.BlockSize; size += blobSize { blob, err := share.NewV0Blob(share.RandomNamespace(), crypto.CRandBytes(blobSize)) From 5db7695098b45c36df0e93802d1aefd803c4e3dc Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 12:57:41 +0100 Subject: [PATCH 30/33] chore: increase send/receive rates --- app/default_overrides.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/default_overrides.go b/app/default_overrides.go index accc466eab..969b5604d8 100644 --- a/app/default_overrides.go +++ b/app/default_overrides.go @@ -275,8 +275,8 @@ func DefaultConsensusConfig() *tmcfg.Config { cfg.Storage.DiscardABCIResponses = true const mebibyte = 1048576 - cfg.P2P.SendRate = 10 * mebibyte - cfg.P2P.RecvRate = 10 * mebibyte + cfg.P2P.SendRate = 1000 * mebibyte + cfg.P2P.RecvRate = 1000 * mebibyte return cfg } From e5bb1d6a17b28ebdbabe02b216e18b3952519d95 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 12:57:46 +0100 Subject: [PATCH 31/33] Revert "chore: printing the number of blobs in block" This reverts commit 1d814ca4ce718b30d4137b4ddbb08a728945f05e. --- tools/chainbuilder/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index a06dc11e32..f226d3788a 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -507,7 +507,6 @@ func generateSquareRoutine( blobTxs := make([][]byte, 0) numberOfBlobs := rand.Intn(100) - fmt.Printf("generating block with %d blobs\n", numberOfBlobs) blobSize := cfg.BlockSize / numberOfBlobs for size := 0; size < cfg.BlockSize; size += blobSize { blob, err := share.NewV0Blob(share.RandomNamespace(), crypto.CRandBytes(blobSize)) From 0b8ad9834720396107459a9a01f00e49bf92ef34 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 31 Jan 2025 13:17:10 +0100 Subject: [PATCH 32/33] chore: avoid 0 case --- tools/chainbuilder/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/chainbuilder/main.go b/tools/chainbuilder/main.go index f226d3788a..ec8754d6fa 100644 --- a/tools/chainbuilder/main.go +++ b/tools/chainbuilder/main.go @@ -506,7 +506,7 @@ func generateSquareRoutine( account := signer.Accounts()[0] blobTxs := make([][]byte, 0) - numberOfBlobs := rand.Intn(100) + numberOfBlobs := rand.Intn(100) + 1 blobSize := cfg.BlockSize / numberOfBlobs for size := 0; size < cfg.BlockSize; size += blobSize { blob, err := share.NewV0Blob(share.RandomNamespace(), crypto.CRandBytes(blobSize)) From b630ac326dbed5f755a37e4844eff9194105a695 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 11 Feb 2025 15:14:47 +0100 Subject: [PATCH 33/33] chore: add some services config --- celestia-appd.service | 15 +++++++++++++++ chainbuilder.service | 14 ++++++++++++++ generate_script.sh | 8 ++++++++ txsim.service | 15 +++++++++++++++ 4 files changed, 52 insertions(+) create mode 100644 celestia-appd.service create mode 100644 chainbuilder.service create mode 100644 generate_script.sh create mode 100644 txsim.service diff --git a/celestia-appd.service b/celestia-appd.service new file mode 100644 index 0000000000..616e6fc295 --- /dev/null +++ b/celestia-appd.service @@ -0,0 +1,15 @@ +[Unit] +Description=Generate chain and run +After=network.target + +[Service] +Type=simple +Environment="PATH=/usr/local/go/bin:/root/go/bin:/usr/bin:/bin" +ExecStart=/root/go/bin/celestia-appd start --minimum-gas-prices 0.000001utia --home /root/chainbuilder/generated/celestia-app/testnode-32mb-100k --grpc.enable +User=root +Restart=always +RestartSec=3 +WorkingDirectory=/root/chainbuilder/generated/celestia-app + +[Install] +WantedBy=multi-user.target diff --git a/chainbuilder.service b/chainbuilder.service new file mode 100644 index 0000000000..19ff710c78 --- /dev/null +++ b/chainbuilder.service @@ -0,0 +1,14 @@ +[Unit] +Description=Generate chain and run +After=network.target + +[Service] +Type=simple +Environment="PATH=/usr/local/go/bin:/root/go/bin:/usr/bin:/bin" +ExecStart=/root/chainbuilder/generated/celestia-app/run.sh +User=root +WorkingDirectory=/root/chainbuilder/generated/celestia-app +RemainAfterExit=yes + +[Install] +WantedBy=multi-user.target diff --git a/generate_script.sh b/generate_script.sh new file mode 100644 index 0000000000..d72b21f4f5 --- /dev/null +++ b/generate_script.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +go run ./tools/chainbuilder --num-blocks 100000 --block-size 32000000 --chain-id 32mb-100k + +systemctl daemon-reload +systemctl start txsim + +celestia-appd start --minimum-gas-prices 0.000001utia --home ~/chainbuilder/generated/celestia-app/testnode-32mb-100k --grpc.enable diff --git a/txsim.service b/txsim.service new file mode 100644 index 0000000000..e551485537 --- /dev/null +++ b/txsim.service @@ -0,0 +1,15 @@ +[Unit] +Description=TxSim Service +After=network.target + +[Service] +Type=simple +Environment="PATH=/usr/local/go/bin:/root/go/bin:/usr/bin:/bin" +ExecStart=/root/go/bin/txsim --blob-sizes 3000000-30000000 --grpc-endpoint localhost:9090 --blob 10 --key-path /root/chainbuilder/generated/celestia-app/testnode-32mb-100k --master validator +Restart=always +RestartSec=3 +User=root +WorkingDirectory=/home/ubuntu + +[Install] +WantedBy=multi-user.target