diff --git a/chains/ethereum/runner/persistent.go b/chains/ethereum/runner/persistent.go index de7ad87..f3ca451 100644 --- a/chains/ethereum/runner/persistent.go +++ b/chains/ethereum/runner/persistent.go @@ -3,6 +3,7 @@ package runner import ( "context" "fmt" + "math" "sync" gethtypes "github.com/ethereum/go-ethereum/core/types" @@ -18,6 +19,13 @@ func (r *Runner) runPersistent(ctx context.Context) (loadtesttypes.LoadTestResul return loadtesttypes.LoadTestResult{}, err } + // We fund InitialWallets * 2^N wallets every block where N == the number of bootstrap loads sent. + // We therefore require (log(num_wallets) - log(initial_wallets))/log(2) bootstrap loads to full fund. + requiredBootstrapLoads := uint64((math.Log10(float64(r.spec.NumWallets))-math.Log10(float64(r.spec.InitialWallets)))/math.Log10(2)) + 1 + var blocksProcessed uint64 + // boostrapBackoff controls how many blocks are between load publication while we're still bootstrapping (funding wallets). + bootstrapBackoff := uint64(5) + ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -53,6 +61,7 @@ func (r *Runner) runPersistent(ctx context.Context) (loadtesttypes.LoadTestResul cancel() return case block, ok := <-blockCh: + blocksProcessed++ if !ok { r.logger.Error("block header channel closed") cancel() @@ -65,6 +74,13 @@ func (r *Runner) runPersistent(ctx context.Context) (loadtesttypes.LoadTestResul zap.Uint64("gas_used", block.GasUsed), zap.Uint64("gas_limit", block.GasLimit), ) + + sentBootstrapLoads := blocksProcessed / bootstrapBackoff + // Only throttle load creation if we're still bootstrapping. + // In that case we publish load every bootstrapBackoff blocks. + if (sentBootstrapLoads <= requiredBootstrapLoads) && (blocksProcessed%bootstrapBackoff != 0) { + continue + } numTxsSubmitted, err := r.submitLoadPersistent(ctx, maxLoadSize) if err != nil { r.logger.Error("error during tx submission", zap.Error(err), zap.Uint64("height", block.Number.Uint64())) @@ -126,29 +142,49 @@ func (r *Runner) submitLoadPersistent(ctx context.Context, maxLoadSize int) (int func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadSize int, useBaseline bool) ([]*gethtypes.Transaction, error) { r.logger.Info("building load", zap.Int("maxLoadSize", maxLoadSize)) var txnLoad []*gethtypes.Transaction + var wg sync.WaitGroup + txChan := make(chan *gethtypes.Transaction, maxLoadSize) for range maxLoadSize { - sender := r.txFactory.GetNextSender() + wg.Go(func() { + sender := r.txFactory.GetNextSender() - if sender == nil { - r.logger.Info("failed to get sender") - break - } - nonce, ok := r.nonces.Load(sender.Address()) - if !ok { - // this really should not happen ever. better safe than sorry. - return nil, fmt.Errorf("nonce for wallet %s not found", sender.Address()) - } - tx, err := r.txFactory.BuildTxs(msgSpec, sender, nonce.(uint64), useBaseline) - if err != nil { - return nil, fmt.Errorf("failed to build txs %w", err) - } - lastTx := tx[len(tx)-1] - if lastTx == nil { - return nil, nil + if sender == nil { + return + } + nonce, ok := r.nonces.Load(sender.Address()) + if !ok { + // this really should not happen ever. better safe than sorry. + r.logger.Error("nonce for wallet not found", zap.String("wallet", sender.Address().String())) + return + } + tx, err := r.txFactory.BuildTxs(msgSpec, sender, nonce.(uint64), useBaseline) + if err != nil { + r.logger.Error("failed to build txs", zap.Error(err)) + return + } + lastTx := tx[len(tx)-1] + if lastTx == nil { + return + } + r.nonces.Store(sender.Address(), lastTx.Nonce()+1) + // Only use single txn builders here + for _, txn := range tx { + txChan <- txn + } + }) + } + doneChan := make(chan struct{}) + go func() { + wg.Wait() + doneChan <- struct{}{} + }() + for { + select { + case txn := <-txChan: + txnLoad = append(txnLoad, txn) + case <-doneChan: + r.logger.Info("Generated load txs", zap.Int("num_txs", len(txnLoad))) + return txnLoad, nil } - r.nonces.Store(sender.Address(), lastTx.Nonce()+1) - // Only use single txn builders here - txnLoad = append(txnLoad, tx...) } - return txnLoad, nil } diff --git a/chains/ethereum/runner/runner.go b/chains/ethereum/runner/runner.go index 48b04ae..f363008 100644 --- a/chains/ethereum/runner/runner.go +++ b/chains/ethereum/runner/runner.go @@ -86,17 +86,23 @@ func NewRunner(ctx context.Context, logger *zap.Logger, spec loadtesttypes.LoadT var distribution txfactory.TxDistribution if spec.InitialWallets > 0 && spec.InitialWallets < spec.NumWallets { - distribution = txfactory.NewTxDistributionBootstrapped(wallets, spec.InitialWallets) + logger.Info("Using TxDistributionBootstrapped", zap.Int("initial_wallets", spec.InitialWallets), zap.Int("num_wallets", spec.NumWallets)) + distribution = txfactory.NewTxDistributionBootstrapped(logger, wallets, spec.InitialWallets) } else { + logger.Info("Using TxDistributionEven") distribution = txfactory.NewTxDistributionEven(wallets) } txf := txfactory.NewTxFactory(logger, chainCfg.TxOpts, distribution) nonces := sync.Map{} - for _, wallet := range wallets { - nonce, err := wallet.GetClient().PendingNonceAt(ctx, wallet.Address()) + for i, wallet := range wallets { + if i%10000 == 0 { + logger.Info("Initializing nonces for accounts", zap.Int("progress", i)) + } + nonce, err := wallet.GetNonce(ctx) if err != nil { - logger.Warn("Failed getting nonce for wallet setting to 0", zap.String("address", wallet.Address().String())) + logger.Warn("Failed getting nonce for wallet setting to 0", zap.String("address", + wallet.Address().String()), zap.Error(err)) } nonces.Store(wallet.Address(), nonce) } diff --git a/chains/ethereum/txfactory/factory.go b/chains/ethereum/txfactory/factory.go index 25f99ab..3957e9d 100644 --- a/chains/ethereum/txfactory/factory.go +++ b/chains/ethereum/txfactory/factory.go @@ -423,17 +423,21 @@ func (f *TxFactory) createMsgNativeTransferERC20(ctx context.Context, fromWallet } func (f *TxFactory) createMsgNativeGasTransfer(ctx context.Context, fromWallet *ethwallet.InteractingWallet, - nonce uint64, useBaseline bool, + _ uint64, useBaseline bool, ) (*types.Transaction, error) { // Use optimal recipient selection to minimize reuse and prevent self-transfers recipient := f.txDistribution.GetNextReceiver() // Get balance and transfer half of it - bal, err := fromWallet.GetBalance(ctx) + bal, err := fromWallet.GetClient().PendingBalanceAt(ctx, fromWallet.Address()) if err != nil { return nil, fmt.Errorf("failed to get balance of %s: %w", fromWallet.FormattedAddress(), err) } transferAmount := new(big.Int).Div(bal, big.NewInt(2)) + nonce, err := fromWallet.GetNonce(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get nonce of %s: %w", fromWallet.FormattedAddress(), err) + } // Create a simple native gas transfer transaction var gasLimit uint64 diff --git a/chains/ethereum/txfactory/tx_distribution_bootstrapped.go b/chains/ethereum/txfactory/tx_distribution_bootstrapped.go index 14b5f6f..5d38916 100644 --- a/chains/ethereum/txfactory/tx_distribution_bootstrapped.go +++ b/chains/ethereum/txfactory/tx_distribution_bootstrapped.go @@ -3,6 +3,8 @@ package txfactory import ( "sync" + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/common" ethwallet "github.com/skip-mev/catalyst/chains/ethereum/wallet" ) @@ -11,6 +13,7 @@ var _ TxDistribution = &TxDistributionBootstrapped{} type TxDistributionBootstrapped struct { mu sync.Mutex + logger *zap.Logger wallets []*ethwallet.InteractingWallet // Wallet allocation tracking for minimizing reuse with role rotation @@ -20,13 +23,16 @@ type TxDistributionBootstrapped struct { numWallets int // the length of wallets } -func NewTxDistributionBootstrapped(wallets []*ethwallet.InteractingWallet, fundedWallets int) *TxDistributionBootstrapped { +func NewTxDistributionBootstrapped(logger *zap.Logger, wallets []*ethwallet.InteractingWallet, + fundedWallets int, +) *TxDistributionBootstrapped { numWallets := len(wallets) receiverIndex := fundedWallets if fundedWallets == numWallets { receiverIndex = numWallets / 2 } return &TxDistributionBootstrapped{ + logger: logger, wallets: wallets, fundedWallets: fundedWallets, senderIndex: 0, diff --git a/chains/ethereum/txfactory/tx_distribution_bootstrapped_test.go b/chains/ethereum/txfactory/tx_distribution_bootstrapped_test.go index 76875d7..e53fe8f 100644 --- a/chains/ethereum/txfactory/tx_distribution_bootstrapped_test.go +++ b/chains/ethereum/txfactory/tx_distribution_bootstrapped_test.go @@ -21,7 +21,7 @@ func TestBootstrapping(t *testing.T) { // Note: In real implementation, we would need to mock the Address() method // For this test, we'll focus on the logic } - distr := NewTxDistributionBootstrapped(mockWallets, initialWallets) + distr := NewTxDistributionBootstrapped(zap.NewNop(), mockWallets, initialWallets) for range numBlocks { for range numMsgs { if sender := distr.GetNextSender(); sender == nil { @@ -45,7 +45,7 @@ func TestSenderWalletAllocationBootstrapped(t *testing.T) { logger := zap.NewNop() txOpts := ethtypes.TxOpts{} - distr := NewTxDistributionBootstrapped(mockWallets, 1) + distr := NewTxDistributionBootstrapped(zap.NewNop(), mockWallets, 1) factory := NewTxFactory(logger, txOpts, distr) // 1 starting wallet and 10 total wallets should take 3 full loads and 1 partial