diff --git a/README.md b/README.md index 06cf1b4..a167517 100644 --- a/README.md +++ b/README.md @@ -12,13 +12,17 @@ import "github.com/skip-mev/catalyst/loadtest" // Create a load test specification spec := types.LoadTestSpec{ ChainID: "my-chain-1", - NumOfBlocks: 100, - NumOfTxs: 100, + NumBatches: 50, // Send 50 batches + SendInterval: 5 * time.Second, // Send a batch every 5 seconds NodesAddresses: []types.NodeAddress{...}, BaseMnemonic: "word1 word2 word3", // BIP39 mnemonic - NumWallets: 1500 // number of wallets to derive from the base mnemonic. + NumWallets: 1500, // number of wallets to derive from the base mnemonic. GasDenom: "stake", Bech32Prefix: "cosmos", + Msgs: []types.LoadTestMsg{ + {NumTxs: 70, Type: "MsgSend"}, // 70 transactions per batch + {NumTxs: 30, Type: "MsgMultiSend"}, // 30 transactions per batch + }, } // Create and run the load test diff --git a/chains/cosmos/metrics/collector.go b/chains/cosmos/metrics/collector.go index 53085e6..faa9c8d 100644 --- a/chains/cosmos/metrics/collector.go +++ b/chains/cosmos/metrics/collector.go @@ -231,9 +231,7 @@ func (m *Collector) processNodeStats(result *loadtesttypes.LoadTestResult) { } // processBlockStats processes statistics for each block -func (m *Collector) processBlockStats(result *loadtesttypes.LoadTestResult, gasLimit int64, - numberOfBlocksRequested int, -) { +func (m *Collector) processBlockStats(result *loadtesttypes.LoadTestResult, gasLimit int64) { var results []loadtesttypes.BlockStat result.ByBlock = results @@ -247,14 +245,6 @@ func (m *Collector) processBlockStats(result *loadtesttypes.LoadTestResult, gasL sort.Slice(blockHeights, func(i, j int) bool { return blockHeights[i] < blockHeights[j] }) - // ignore any extra blocks where txs landed in block - if len(blockHeights) > numberOfBlocksRequested { - m.logger.Info("found extra blocks, excluding from gas utilization stats", - zap.Int("number_of_blocks_requested", numberOfBlocksRequested), - zap.Int("number_of_blocks_found", len(blockHeights)), - zap.Int("number_of_blocks_excluded", len(blockHeights)-numberOfBlocksRequested)) - blockHeights = blockHeights[:numberOfBlocksRequested] - } for _, height := range blockHeights { txs := m.txsByBlock[height] @@ -280,7 +270,10 @@ func (m *Collector) processBlockStats(result *loadtesttypes.LoadTestResult, gasL msgStats[tx.MsgType] = stats } - gasUtilization := float64(blockGasUsed) / float64(gasLimit) + var gasUtilization float64 + if gasLimit > 0 { + gasUtilization = float64(blockGasUsed) / float64(gasLimit) + } blockStats := loadtesttypes.BlockStat{ BlockHeight: height, @@ -310,7 +303,7 @@ func (m *Collector) processBlockStats(result *loadtesttypes.LoadTestResult, gasL } // ProcessResults returns the final load test results -func (m *Collector) ProcessResults(gasLimit int64, numOfBlocksRequested int) loadtesttypes.LoadTestResult { +func (m *Collector) ProcessResults(gasLimit int64) loadtesttypes.LoadTestResult { result := loadtesttypes.LoadTestResult{ Overall: loadtesttypes.OverallStats{ StartTime: m.startTime, @@ -347,7 +340,7 @@ func (m *Collector) ProcessResults(gasLimit int64, numOfBlocksRequested int) loa go func() { defer wg.Done() - m.processBlockStats(&result, gasLimit, numOfBlocksRequested) + m.processBlockStats(&result, gasLimit) }() wg.Wait() diff --git a/chains/cosmos/runner/runner.go b/chains/cosmos/runner/runner.go index 9fb5184..e1368a6 100644 --- a/chains/cosmos/runner/runner.go +++ b/chains/cosmos/runner/runner.go @@ -4,9 +4,7 @@ import ( "context" "fmt" "math/rand" - "regexp" "strconv" - "strings" "sync" "time" @@ -25,35 +23,29 @@ import ( "go.uber.org/zap" ) -// MsgGasEstimation stores gas estimation for a specific message type -type MsgGasEstimation struct { - gasUsed int64 - weight float64 - numTxs int -} - // Runner represents a load test runner that executes a single LoadTestSpec type Runner struct { - spec loadtesttypes.LoadTestSpec - clients []*client.Chain - wallets []*wallet.InteractingWallet - blockGasLimit int64 - gasEstimations map[loadtesttypes.LoadTestMsg]MsgGasEstimation - totalTxsPerBlock int - mu sync.Mutex - numBlocksProcessed int - collector metrics.Collector - logger *zap.Logger - sentTxs []inttypes.SentTx - sentTxsMu sync.RWMutex - txFactory *txfactory.TxFactory - accountNumbers map[string]uint64 - walletNonces map[string]uint64 - walletNoncesMu sync.Mutex + spec loadtesttypes.LoadTestSpec + clients []*client.Chain + wallets []*wallet.InteractingWallet + gasEstimations map[loadtesttypes.LoadTestMsg]uint64 + collector metrics.Collector + logger *zap.Logger + sentTxs []inttypes.SentTx + txFactory *txfactory.TxFactory + accountNumbers map[string]uint64 + walletNonces map[string]uint64 + walletNoncesMu sync.Mutex chainCfg inttypes.ChainConfig } +// PreparedTx holds a transaction and its message type +type PreparedTx struct { + TxBytes []byte + MsgType loadtesttypes.MsgType +} + // NewRunner creates a new load test runner for a given spec func NewRunner(ctx context.Context, spec loadtesttypes.LoadTestSpec) (*Runner, error) { logger, _ := zap.NewDevelopment() @@ -124,26 +116,17 @@ func NewRunner(ctx context.Context, spec loadtesttypes.LoadTestSpec) (*Runner, e return runner, nil } -// initGasEstimation performs initial gas estimation to determine how many transactions -// to send to chain +// initGasEstimation performs initial gas estimation func (r *Runner) initGasEstimation(ctx context.Context) error { client := r.clients[0] - blockGasLimit, err := client.GetGasLimit(ctx) - if err != nil { - return fmt.Errorf("failed to get block gas limit: %w", err) - } - r.blockGasLimit = blockGasLimit - - r.gasEstimations = make(map[loadtesttypes.LoadTestMsg]MsgGasEstimation) - r.totalTxsPerBlock = 0 - gasEstimations, err := r.calculateMsgGasEstimations(ctx, client) if err != nil { return err } + r.gasEstimations = gasEstimations - return r.initNumOfTxsWorkflow(gasEstimations) + return nil } // calculateMsgGasEstimations calculates gas estimations for all message types @@ -197,32 +180,72 @@ func (r *Runner) calculateMsgGasEstimations(ctx context.Context, client *client. return gasEstimations, nil } -// initNumOfTxsWorkflow initializes the gas estimations based on number of transactions -func (r *Runner) initNumOfTxsWorkflow(gasEstimations map[loadtesttypes.LoadTestMsg]uint64) error { - for _, msgSpec := range r.spec.Msgs { - gasUsed := gasEstimations[msgSpec] +// buildLoad creates transactions for a single message specification +func (r *Runner) buildLoad(ctx context.Context, msgSpec loadtesttypes.LoadTestMsg) ([]PreparedTx, error) { + fromWallet := r.wallets[rand.Intn(len(r.wallets))] + client := fromWallet.GetClient() + walletAddress := fromWallet.FormattedAddress() + + msgs, err := r.createMessagesForType(msgSpec, fromWallet) + if err != nil { + return nil, fmt.Errorf("failed to create message: %w", err) + } - numTxs := int(float64(r.spec.NumOfTxs) * msgSpec.Weight) + gasBufferFactor := 1.5 + gasUsed := r.gasEstimations[msgSpec] + gasWithBuffer := int64(float64(gasUsed) * gasBufferFactor) + fees := sdk.NewCoins(sdk.NewCoin(r.chainCfg.GasDenom, sdkmath.NewInt(gasWithBuffer))) + accountNumber := r.accountNumbers[walletAddress] - r.gasEstimations[msgSpec] = MsgGasEstimation{ - gasUsed: int64(gasUsed), //nolint:gosec // G115: overflow unlikely in practice - weight: msgSpec.Weight, - numTxs: numTxs, - } - r.totalTxsPerBlock += numTxs + r.walletNoncesMu.Lock() + nonce := r.walletNonces[walletAddress] + r.walletNonces[walletAddress]++ + r.walletNoncesMu.Unlock() - r.logger.Info("transaction allocation based on NumOfTxs workflow", - zap.String("msgType", msgSpec.Type.String()), - zap.Int("totalTxs", r.spec.NumOfTxs), - zap.Float64("weight", msgSpec.Weight), - zap.Int("numTxs", numTxs)) + memo := RandomString(16) // Avoid ErrTxInMempoolCache + + tx, err := fromWallet.CreateSignedTx(ctx, client, uint64(gasWithBuffer), fees, nonce, accountNumber, //nolint:gosec // G115: overflow unlikely in practice + memo, r.chainCfg.UnorderedTxs, r.spec.TxTimeout, msgs...) + if err != nil { + return nil, fmt.Errorf("failed to create signed tx: %w", err) } - if r.totalTxsPerBlock <= 0 { - return fmt.Errorf("calculated total number of transactions per block is zero or negative: %d", r.totalTxsPerBlock) + txBytes, err := client.GetEncodingConfig().TxConfig.TxEncoder()(tx) + if err != nil { + return nil, fmt.Errorf("failed to encode tx: %w", err) } - return nil + return []PreparedTx{{TxBytes: txBytes, MsgType: msgSpec.Type}}, nil +} + +// buildFullLoad builds the full transaction load for interval-based sending +func (r *Runner) buildFullLoad(ctx context.Context) ([][]PreparedTx, error) { + r.logger.Info("Building load...", zap.Int("num_batches", r.spec.NumBatches)) + batchLoads := make([][]PreparedTx, 0, r.spec.NumBatches) + total := 0 + + for i := range r.spec.NumBatches { + batch := make([]PreparedTx, 0) + for _, msgSpec := range r.spec.Msgs { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("ctx cancelled during load building: %w", ctx.Err()) + default: + } + for range msgSpec.NumTxs { + txs, err := r.buildLoad(ctx, msgSpec) + if err != nil { + return nil, fmt.Errorf("failed to build load for %s: %w", msgSpec.Type, err) + } + batch = append(batch, txs...) + total += len(txs) + } + } + r.logger.Info(fmt.Sprintf("built batch %d/%d", i+1, r.spec.NumBatches)) + batchLoads = append(batchLoads, batch) + } + r.logger.Info("Load built, starting loadtest", zap.Int("total_txs", total)) + return batchLoads, nil } // Run executes the load test @@ -231,182 +254,118 @@ func (r *Runner) Run(ctx context.Context) (loadtesttypes.LoadTestResult, error) return loadtesttypes.LoadTestResult{}, err } - startTime := time.Now() - done := make(chan struct{}) + r.logger.Info("Running loadtest on interval", zap.Duration("interval", r.spec.SendInterval), zap.Int("num_batches", r.spec.NumBatches)) - subCtx, cancelSub := context.WithCancel(ctx) - defer cancelSub() + // build the full load upfront + batchLoads, err := r.buildFullLoad(ctx) + if err != nil { + return loadtesttypes.LoadTestResult{}, err + } + amountPerBatch := len(batchLoads[0]) + total := len(batchLoads) * amountPerBatch - subscriptionErr := make(chan error, 1) - blockCh := make(chan inttypes.Block, 1) + crank := time.NewTicker(r.spec.SendInterval) + defer crank.Stop() - go func() { - err := r.clients[0].SubscribeToBlocks(subCtx, r.blockGasLimit, func(block inttypes.Block) { - select { - case blockCh <- block: - case <-subCtx.Done(): - return - } - }) - subscriptionErr <- err - }() + startTime := time.Now() + + // load index is the index into the batchLoads slice + loadIndex := 0 + + // go routines will send transactions and then push results to collectionChannel + mu := &sync.Mutex{} + sentTxs := make([]inttypes.SentTx, 0, total) + collectionChannel := make(chan inttypes.SentTx, amountPerBatch) + collectionDone := make(chan struct{}) go func() { + defer close(collectionDone) for { select { - case <-subCtx.Done(): + case <-ctx.Done(): return - case block := <-blockCh: - r.mu.Lock() - - r.numBlocksProcessed++ - r.logger.Debug("processing block", zap.Int64("height", block.Height), - zap.Time("timestamp", block.Timestamp), zap.Int64("gas_limit", block.GasLimit)) - - _, err := r.sendBlockTransactions(ctx) - if err != nil { - r.logger.Error("error sending block transactions", zap.Error(err)) - } - - r.logger.Info("processed block", zap.Int64("height", block.Height)) - - if r.numBlocksProcessed >= r.spec.NumOfBlocks { - r.logger.Info("load test completed- number of blocks desired reached", - zap.Int("blocks", r.numBlocksProcessed)) - r.mu.Unlock() - cancelSub() - done <- struct{}{} + case tx, ok := <-collectionChannel: + if !ok { // channel closed return } - - r.mu.Unlock() + mu.Lock() + sentTxs = append(sentTxs, tx) + mu.Unlock() } } }() - select { - case <-ctx.Done(): - r.logger.Info("load test interrupted") - return loadtesttypes.LoadTestResult{}, ctx.Err() - case <-done: - if err := <-subscriptionErr; err != nil && err != context.Canceled { - return loadtesttypes.LoadTestResult{}, fmt.Errorf("subscription error: %w", err) - } - - // Make sure all txs are processed - time.Sleep(30 * time.Second) - - collectorStartTime := time.Now() - r.collector.GroupSentTxs(ctx, r.sentTxs, r.clients, startTime) - collectorResults := r.collector.ProcessResults(r.blockGasLimit, r.spec.NumOfBlocks) - collectorEndTime := time.Now() - r.logger.Debug("collector running time", - zap.Float64("duration_seconds", collectorEndTime.Sub(collectorStartTime).Seconds())) - - return collectorResults, nil - case err := <-subscriptionErr: - if err != context.Canceled { - return loadtesttypes.LoadTestResult{}, fmt.Errorf("failed to subscribe to blocks: %w", err) - } - return loadtesttypes.LoadTestResult{}, fmt.Errorf("subscription ended unexpectedly. error: %w", err) - } -} - -// sendBlockTransactions sends transactions for a single block -func (r *Runner) sendBlockTransactions(ctx context.Context) (int, error) { //nolint:unparam // error may be returned in future versions - txsSent := 0 - var sentTxs []inttypes.SentTx - var sentTxsMu sync.Mutex - - r.logger.Info("starting to send transactions for block", zap.Int("block_number", r.numBlocksProcessed), zap.Int("expected_txs", r.totalTxsPerBlock)) - - getLatestNonce := func(walletAddr string, client *client.Chain) uint64 { //nolint:unparam // client may be used in future versions - r.walletNoncesMu.Lock() - defer r.walletNoncesMu.Unlock() - return r.walletNonces[walletAddr] - } - - updateNonce := func(walletAddr string) { - r.walletNoncesMu.Lock() - defer r.walletNoncesMu.Unlock() - r.walletNonces[walletAddr]++ - } - - var wg sync.WaitGroup - var txsSentMu sync.Mutex - - for mspSpec, estimation := range r.gasEstimations { - for i := 0; i < estimation.numTxs; i++ { - wg.Add(1) - go func(msgSpec loadtesttypes.LoadTestMsg, txIndex int) { //nolint:unparam // txIndex may be used in future versions - defer wg.Done() + // waitgroup for every tx + wg := sync.WaitGroup{} + +loop: + for { + select { + case <-crank.C: + // get the load, initialize result slice + load := batchLoads[loadIndex] + r.logger.Info("Sending txs", zap.Int("num_txs", len(load))) + + // send each tx in a go routine + for i, preparedTx := range load { + wg.Add(1) + go func(preparedTx PreparedTx, index int) { + defer wg.Done() + sentTx := inttypes.SentTx{MsgType: preparedTx.MsgType} + // select random client for sending + client := r.clients[rand.Intn(len(r.clients))] + res, err := client.BroadcastTx(ctx, preparedTx.TxBytes) + if err != nil { + r.logger.Error("failed to send tx", zap.Error(err), zap.Int("index", index), zap.Int("load_index", loadIndex)) + sentTx.Err = err + sentTx.NodeAddress = client.GetNodeAddress().RPC + } else { + sentTx.TxHash = res.TxHash + sentTx.NodeAddress = client.GetNodeAddress().RPC + } + collectionChannel <- sentTx + }(preparedTx, i) + } - if sentTx, _ := r.processSingleTransaction(ctx, msgSpec, getLatestNonce, updateNonce, &txsSentMu, &txsSent); sentTx != (inttypes.SentTx{}) { - sentTxsMu.Lock() - sentTxs = append(sentTxs, sentTx) - sentTxsMu.Unlock() - } - }(mspSpec, i) + loadIndex++ + if loadIndex >= len(batchLoads) { + // exit the loadtest loop. we have finished. + break loop + } + case <-ctx.Done(): // A channel to signal stopping the ticker + return loadtesttypes.LoadTestResult{}, fmt.Errorf("ctx cancelled during load firing: %w", ctx.Err()) } } + r.logger.Info("All transactions sent. Waiting for go routines to finish") wg.Wait() + close(collectionChannel) + <-collectionDone // wait for collection to finish - r.sentTxsMu.Lock() - r.sentTxs = append(r.sentTxs, sentTxs...) - r.sentTxsMu.Unlock() + r.logger.Info("go routines have completed", zap.Int("total_txs", len(sentTxs))) + r.sentTxs = sentTxs - r.logger.Info("completed sending transactions for block", - zap.Int("block_number", r.numBlocksProcessed), zap.Int("txs_sent", txsSent), - zap.Int("expected_txs", r.totalTxsPerBlock)) + r.logger.Info("Loadtest complete. Waiting for mempool to clear") + r.waitForEmptyMempool(ctx, 1*time.Minute) + // sleep here for a sec because, even though the mempool may be empty, we could still be in process of executing those txs + time.Sleep(5 * time.Second) - return txsSent, nil -} - -// processSingleTransaction handles creating, signing, and broadcasting a single transaction -func (r *Runner) processSingleTransaction( - ctx context.Context, - msgSpec loadtesttypes.LoadTestMsg, - getLatestNonce func(string, *client.Chain) uint64, - updateNonce func(string), - txsSentMu *sync.Mutex, - txsSent *int, -) (inttypes.SentTx, bool) { - // Select a random wallet for this transaction - fromWallet := r.wallets[rand.Intn(len(r.wallets))] - walletAddress := fromWallet.FormattedAddress() - client := fromWallet.GetClient() + r.logger.Info("Collecting metrics", zap.Int("num_txs", len(r.sentTxs))) + collectorStartTime := time.Now() + r.collector.GroupSentTxs(ctx, r.sentTxs, r.clients, startTime) - msgs, err := r.createMessagesForType(msgSpec, fromWallet) + gasLimit, err := r.clients[0].GetGasLimit(ctx) if err != nil { - r.logger.Error("failed to create message", - zap.Error(err), - zap.String("node", client.GetNodeAddress().RPC)) - return inttypes.SentTx{}, false + r.logger.Warn("failed to get gas limit from chain, using 0", zap.Error(err)) + gasLimit = 0 } - maxRetries := 3 - var sentTx inttypes.SentTx - success := false - - for attempt := 0; attempt < maxRetries; attempt++ { - if attempt > 0 { - time.Sleep(100 * time.Millisecond) - } - - nonce := getLatestNonce(walletAddress, client) - - sentTx, success = r.createAndSendTransaction( - ctx, msgSpec, fromWallet, client, msgs, nonce, - updateNonce, txsSentMu, txsSent, - ) + collectorResults := r.collector.ProcessResults(gasLimit) + collectorEndTime := time.Now() + r.logger.Debug("collector running time", + zap.Float64("duration_seconds", collectorEndTime.Sub(collectorStartTime).Seconds())) - if success { - break - } - } - - return sentTx, success + return collectorResults, nil } // createMessagesForType creates the appropriate messages based on message type @@ -430,110 +389,6 @@ func (r *Runner) createMessagesForType(msgSpec loadtesttypes.LoadTestMsg, fromWa return msgs, err } -// createAndSendTransaction creates and sends a transaction, handling the response -func (r *Runner) createAndSendTransaction( - ctx context.Context, - mspSpec loadtesttypes.LoadTestMsg, - fromWallet *wallet.InteractingWallet, - client *client.Chain, - msgs []sdk.Msg, - nonce uint64, - updateNonce func(string), - txsSentMu *sync.Mutex, - txsSent *int, -) (inttypes.SentTx, bool) { - walletAddress := fromWallet.FormattedAddress() - - gasBufferFactor := 1.1 - estimation := r.gasEstimations[mspSpec] - gasWithBuffer := int64(float64(estimation.gasUsed) * gasBufferFactor) - fees := sdk.NewCoins(sdk.NewCoin(r.chainCfg.GasDenom, sdkmath.NewInt(gasWithBuffer))) - accountNumber := r.accountNumbers[walletAddress] - memo := RandomString(16) // Avoid ErrTxInMempoolCache - - tx, err := fromWallet.CreateSignedTx(ctx, client, uint64(gasWithBuffer), fees, nonce, accountNumber, //nolint:gosec // G115: overflow unlikely in practice - memo, r.chainCfg.UnorderedTxs, r.spec.TxTimeout, msgs...) - if err != nil { - r.logger.Error("failed to create signed tx", - zap.Error(err), - zap.String("node", client.GetNodeAddress().RPC)) - return inttypes.SentTx{}, false - } - - txBytes, err := client.GetEncodingConfig().TxConfig.TxEncoder()(tx) - if err != nil { - r.logger.Error("failed to encode tx", - zap.Error(err), - zap.String("node", client.GetNodeAddress().RPC)) - return inttypes.SentTx{}, false - } - - return r.broadcastAndHandleResponse(ctx, client, txBytes, mspSpec.Type, walletAddress, nonce, updateNonce, txsSentMu, txsSent, msgs) -} - -// broadcastAndHandleResponse broadcasts a transaction and handles the response -func (r *Runner) broadcastAndHandleResponse( - ctx context.Context, - client *client.Chain, - txBytes []byte, - msgType loadtesttypes.MsgType, - walletAddress string, - nonce uint64, - updateNonce func(string), - txsSentMu *sync.Mutex, - txsSent *int, - _ []sdk.Msg, -) (inttypes.SentTx, bool) { - res, err := client.BroadcastTx(ctx, txBytes) - if err != nil { - if res != nil && res.Code == 32 && strings.Contains(res.RawLog, "account sequence mismatch") { - r.handleNonceMismatch(walletAddress, nonce, res.RawLog) - return inttypes.SentTx{}, false - } - - sentTx := inttypes.SentTx{ - Err: err, - NodeAddress: client.GetNodeAddress().RPC, - MsgType: msgType, - } - if res != nil { - sentTx.TxHash = res.TxHash - } - r.logger.Error("failed to broadcast tx", - zap.Error(err), - zap.Any("tx", sentTx)) - - return sentTx, false - } - - sentTx := inttypes.SentTx{ - TxHash: res.TxHash, - NodeAddress: client.GetNodeAddress().RPC, - MsgType: msgType, - Err: nil, - } - - updateNonce(walletAddress) - - txsSentMu.Lock() - *txsSent++ - txsSentMu.Unlock() - - return sentTx, true -} - -// handleNonceMismatch extracts the expected nonce from the error message and updates the wallet nonce -func (r *Runner) handleNonceMismatch(walletAddress string, nonce uint64, rawLog string) { //nolint:unparam // nonce may be used in future versions - expectedNonceStr := regexp.MustCompile(`expected (\d+)`).FindStringSubmatch(rawLog) - if len(expectedNonceStr) > 1 { - if expectedNonce, err := strconv.ParseUint(expectedNonceStr[1], 10, 64); err == nil { - r.walletNoncesMu.Lock() - r.walletNonces[walletAddress] = expectedNonce - r.walletNoncesMu.Unlock() - } - } -} - func (r *Runner) GetCollector() *metrics.Collector { return &r.collector } @@ -542,16 +397,6 @@ func (r *Runner) PrintResults(result loadtesttypes.LoadTestResult) { r.collector.PrintResults(result) } -func RandomString(n int) string { - letterRunes := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - - b := make([]rune, n) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - return string(b) -} - func (r *Runner) initAccountNumbers(ctx context.Context) error { for _, wallet := range r.wallets { walletAddress := wallet.FormattedAddress() @@ -571,3 +416,51 @@ func (r *Runner) initAccountNumbers(ctx context.Context) error { r.logger.Info("Account numbers and nonces initialized successfully for all wallets") return nil } + +func RandomString(n int) string { + letterRunes := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) +} + +func (r *Runner) waitForEmptyMempool(ctx context.Context, timeout time.Duration) { + wg := sync.WaitGroup{} + for _, c := range r.clients { + wg.Add(1) + go func(client *client.Chain) { + defer wg.Done() + cometClient := client.GetCometClient() + + started := time.Now() + timer := time.NewTicker(500 * time.Millisecond) + timout := time.NewTimer(timeout) + defer timer.Stop() + defer timout.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + res, err := cometClient.NumUnconfirmedTxs(ctx) + if err == nil { + if res.Count == 0 { + r.logger.Debug("mempool clear. done waiting for mempool", zap.Duration("waited", time.Since(started))) + return + } + } else { + r.logger.Debug("error calling mempool status", zap.Error(err)) + } + case <-timout.C: + r.logger.Debug("timed out waiting for mempool to clear", zap.Duration("waited", timeout)) + return + } + } + }(c) + } + wg.Wait() +} diff --git a/chains/cosmos/types/types.go b/chains/cosmos/types/types.go index a90480b..eab8804 100644 --- a/chains/cosmos/types/types.go +++ b/chains/cosmos/types/types.go @@ -91,12 +91,10 @@ func (s ChainConfig) Validate(mainCfg loadtesttypes.LoadTestSpec) error { seenMsgTypes := make(map[loadtesttypes.MsgType]bool) seenMsgArrTypes := make(map[loadtesttypes.MsgType]bool) - var totalWeight float64 for _, msg := range mainCfg.Msgs { if err := validateMsgType(msg); err != nil { return err } - totalWeight += msg.Weight switch msg.Type { case MsgArr: @@ -124,10 +122,6 @@ func (s ChainConfig) Validate(mainCfg loadtesttypes.LoadTestSpec) error { } } - if totalWeight != 1 { - return fmt.Errorf("total message weights must add up to 1.0, got %f", totalWeight) - } - if s.GasDenom == "" { return fmt.Errorf("gas denomination must be specified") } diff --git a/chains/ethereum/runner/runner.go b/chains/ethereum/runner/runner.go index 9b2d52a..7d734cd 100644 --- a/chains/ethereum/runner/runner.go +++ b/chains/ethereum/runner/runner.go @@ -282,22 +282,14 @@ func (r *Runner) buildFullLoad(ctx context.Context) ([][]*gethtypes.Transaction, } func (r *Runner) Run(ctx context.Context) (loadtesttypes.LoadTestResult, error) { - // when batches and interval are specified, user wants to run on a timed interval - if r.spec.NumBatches > 0 && r.spec.SendInterval > 0 { - r.logger.Info("Running loadtest on interval", zap.Duration("interval", r.spec.SendInterval), zap.Int("num_batches", r.spec.NumBatches)) - return r.runOnInterval(ctx) - } - // otherwise we run on blocks - return r.runOnBlocks(ctx) -} + r.logger.Info("Running loadtest on interval", zap.Duration("interval", r.spec.SendInterval), zap.Int("num_batches", r.spec.NumBatches)) -// runOnInterval starts the runner configured for interval load sending. -func (r *Runner) runOnInterval(ctx context.Context) (loadtesttypes.LoadTestResult, error) { - // deploy the initial contracts needed by the runner. + // deploy the initial contracts needed by the runner if err := r.deployInitialContracts(ctx); err != nil { return loadtesttypes.LoadTestResult{}, err } - // we build the full load upfront. that is, num_batches * [msg * msg spec amount]. + + // we build the full load upfront. that is, num_batches * [msg * msg spec amount] batchLoads, err := r.buildFullLoad(ctx) if err != nil { return loadtesttypes.LoadTestResult{}, err @@ -308,7 +300,7 @@ func (r *Runner) runOnInterval(ctx context.Context) (loadtesttypes.LoadTestResul crank := time.NewTicker(r.spec.SendInterval) defer crank.Stop() - // sleeping once before we start, as the initial contracts were showing up in results. + // sleeping once before we start, as the initial contracts were showing up in results time.Sleep(2 * time.Second) blockNum, err := r.wallets[0].GetClient().BlockNumber(ctx) if err != nil { @@ -316,10 +308,10 @@ func (r *Runner) runOnInterval(ctx context.Context) (loadtesttypes.LoadTestResul } startingBlock := blockNum - // load index is the index into the batchLoads slice. + // load index is the index into the batchLoads slice loadIndex := 0 - // go routines will send transactions and then push results to collectionChannel. + // go routines will send transactions and then push results to collectionChannel mu := &sync.Mutex{} sentTxs := make([]*inttypes.SentTx, 0, total) collectionChannel := make(chan *inttypes.SentTx, amountPerBatch) @@ -348,11 +340,11 @@ loop: for { select { case <-crank.C: - // get the load, initialize result slice. + // get the load, initialize result slice load := batchLoads[loadIndex] r.logger.Info("Sending txs", zap.Int("num_txs", len(load))) - // send each tx in a go routine. + // send each tx in a go routine for i, tx := range load { wg.Add(1) go func() { @@ -390,7 +382,7 @@ loop: r.logger.Info("Loadtest complete. Waiting for mempool to clear") r.waitForEmptyMempool(ctx, 1*time.Minute) - // sleep here for a sec because, even though the mempool may be empty, we could still be in process of executing those txs. + // sleep here for a sec because, even though the mempool may be empty, we could still be in process of executing those txs time.Sleep(5 * time.Second) blockNum, err = r.wallets[0].GetClient().BlockNumber(ctx) if err != nil { @@ -398,10 +390,8 @@ loop: } endingBlock := blockNum - // collect metrics. + // collect metrics r.logger.Info("Collecting metrics", zap.Int("num_txs", len(r.sentTxs))) - // we pass in 0 for the numOfBlockRequested, because we are not running a block based loadtest. - // The collector understands that 0 means we are on a time interval loadtest. collectorStartTime := time.Now() collectorResults, err := metrics.ProcessResults(ctx, r.logger, r.sentTxs, startingBlock, endingBlock, r.clients) if err != nil { @@ -419,154 +409,6 @@ func getTxType(tx *gethtypes.Transaction) loadtesttypes.MsgType { return inttypes.ContractCall } -// runOnBlocks runs the loadtest via block signal. -// It sets up a subscription to block headers, then builds and deploys the load when it receives a header. -func (r *Runner) runOnBlocks(ctx context.Context) (loadtesttypes.LoadTestResult, error) { - if err := r.deployInitialContracts(ctx); err != nil { - return loadtesttypes.LoadTestResult{}, err - } - ctx, cancel := context.WithCancel(ctx) - defer cancel() // Ensure cancel is always called - - blockCh := make(chan *gethtypes.Header, 1) - subscription, err := r.wsClients[0].SubscribeNewHead(ctx, blockCh) - if err != nil { - return loadtesttypes.LoadTestResult{}, err - } - - defer subscription.Unsubscribe() - done := make(chan struct{}, 1) - defer close(done) - gotStartingBlock := false - startingBlock := uint64(0) - endingBlock := uint64(0) - go func() { - for { - select { - case <-ctx.Done(): - r.logger.Debug("ctx cancelled") - return - case err := <-subscription.Err(): - if err != nil { - r.logger.Error("subscription error", zap.Error(err)) - } - cancel() - return - case block, ok := <-blockCh: - if !ok { - r.logger.Error("block header channel closed") - cancel() - return - } - if !gotStartingBlock { - startingBlock = block.Number.Uint64() - gotStartingBlock = true - } - r.blocksProcessed++ - r.logger.Debug( - "processing block", - zap.Uint64("height", block.Number.Uint64()), - zap.Uint64("time", block.Time), - zap.Uint64("gas_used", block.GasUsed), - zap.Uint64("gas_limit", block.GasLimit), - ) - numTxsSubmitted, err := r.submitLoad(ctx) - if err != nil { - r.logger.Error("error during tx submission", zap.Error(err), zap.Uint64("height", block.Number.Uint64())) - } - - r.logger.Debug("submitted transactions", zap.Uint64("height", block.Number.Uint64()), zap.Int("num_submitted", numTxsSubmitted)) - - r.logger.Info("processed block", zap.Uint64("height", block.Number.Uint64()), zap.Uint64("num_blocks_processed", r.blocksProcessed)) - if r.blocksProcessed >= uint64(r.spec.NumOfBlocks) { //nolint:gosec // G115: overflow unlikely in practice - endingBlock = block.Number.Uint64() - r.logger.Info("load test completed - number of blocks desired reached", - zap.Uint64("blocks", r.blocksProcessed)) - done <- struct{}{} - return - } - } - } - }() - - select { - case <-ctx.Done(): - r.logger.Info("ctx cancelled") - return loadtesttypes.LoadTestResult{}, ctx.Err() - case <-done: - r.logger.Info("load test completed. sleeping 30s for final txs to complete") - - r.waitForEmptyMempool(ctx, 1*time.Minute) - - collectorStartTime := time.Now() - collectorResults, err := metrics.ProcessResults(ctx, r.logger, r.sentTxs, startingBlock, endingBlock, r.clients) - if err != nil { - return loadtesttypes.LoadTestResult{Error: err.Error()}, fmt.Errorf("failed to collect metrics: %w", err) - } - collectorEndTime := time.Now() - r.logger.Debug("collector running time", - zap.Float64("duration_seconds", collectorEndTime.Sub(collectorStartTime).Seconds())) - - return *collectorResults, nil - } -} - -func (r *Runner) submitLoad(ctx context.Context) (int, error) { - // Reset wallet allocation for each block/load to enable role rotation - r.txFactory.ResetWalletAllocation() - - // first we build the tx load. this constructs all the ethereum txs based in the spec. - r.logger.Debug("building loads", zap.Int("num_msg_specs", len(r.spec.Msgs))) - txs := make([]*gethtypes.Transaction, 0, len(r.spec.Msgs)) - for _, msgSpec := range r.spec.Msgs { - for i := 0; i < msgSpec.NumMsgs; i++ { - load, err := r.buildLoad(msgSpec, false) - if err != nil { - return 0, fmt.Errorf("failed to build load: %w", err) - } - if len(load) == 0 { - continue - } - txs = append(txs, load...) - } - } - - // submit each tx in a go routine - wg := sync.WaitGroup{} - sentTxs := make([]*inttypes.SentTx, len(txs)) - for i, tx := range txs { - wg.Add(1) - go func() { - defer wg.Done() - // send the tx from the wallet assigned to this transaction's sender - fromWallet := r.getWalletForTx(tx) - err := fromWallet.SendTransaction(ctx, tx) - if err != nil { - r.logger.Debug("failed to send transaction", zap.String("tx_hash", tx.Hash().String()), zap.Error(err)) - } - - // TODO: for now its just easier to differ based on contract creation. ethereum txs dont really have - // obvious "msgtypes" inside the tx object itself. we would have to map txhash to the spec that built the tx to get anything more specific. - txType := inttypes.ContractCall - if tx.To() == nil { - txType = inttypes.ContractCreate - } - sentTxs[i] = &inttypes.SentTx{ - TxHash: tx.Hash(), - NodeAddress: "", // TODO: figure out what to do here. - MsgType: txType, - Err: err, - Tx: tx, - } - }() - } - - wg.Wait() - - r.sentTxs = append(r.sentTxs, sentTxs...) - return len(sentTxs), nil -} - func (r *Runner) buildLoad(msgSpec loadtesttypes.LoadTestMsg, useBaseline bool) ([]*gethtypes.Transaction, error) { // For ERC20 transactions, use optimal sender selection from factory var fromWallet *wallet.InteractingWallet diff --git a/chains/types/results.go b/chains/types/results.go index 38b2eba..2abb7fc 100644 --- a/chains/types/results.go +++ b/chains/types/results.go @@ -91,7 +91,7 @@ type BroadcastError struct { } type LoadTestMsg struct { - Weight float64 `yaml:"weight"` + NumTxs int `yaml:"num_txs"` Type MsgType `yaml:"type"` NumMsgs int `yaml:"num_msgs,omitempty" json:"NumMsgs,omitempty"` // Number of messages to include in MsgArr ContainedType MsgType `yaml:"contained_type,omitempty" json:"ContainedType,omitempty"` // Type of messages to include in MsgArr diff --git a/chains/types/spec.go b/chains/types/spec.go index ae74ccb..ca2709b 100644 --- a/chains/types/spec.go +++ b/chains/types/spec.go @@ -12,8 +12,6 @@ type LoadTestSpec struct { Description string `yaml:"description" json:"description"` Kind string `yaml:"kind" json:"kind"` // "cosmos" | "evm" (discriminator) ChainID string `yaml:"chain_id" json:"chain_id"` - NumOfTxs int `yaml:"num_of_txs,omitempty" json:"num_of_txs,omitempty"` - NumOfBlocks int `yaml:"num_of_blocks" json:"num_of_blocks"` SendInterval time.Duration `yaml:"send_interval" json:"send_interval"` NumBatches int `yaml:"num_batches" json:"num_batches"` BaseMnemonic string `yaml:"base_mnemonic" json:"base_mnemonic"` diff --git a/chains/types/spec_test.go b/chains/types/spec_test.go index 99950e1..d4e9349 100644 --- a/chains/types/spec_test.go +++ b/chains/types/spec_test.go @@ -18,17 +18,16 @@ func TestLoadTestSpec_Marshal_Unmarshal_Eth(t *testing.T) { spec.Description = "eth load test" spec.Kind = "eth" spec.ChainID = "262144" - spec.NumOfBlocks = 200 spec.BaseMnemonic = "seed phrase goes here" spec.NumWallets = 4 spec.ChainCfg = ðtypes.ChainConfig{NodesAddresses: []ethtypes.NodeAddress{ {RPC: "https://foobar:8545", Websocket: "ws://foobar:8546"}, }} spec.Msgs = []loadtesttypes.LoadTestMsg{ - {Weight: 0, NumMsgs: 20, Type: ethtypes.MsgCreateContract}, - {Weight: 0, NumMsgs: 20, Type: ethtypes.MsgWriteTo}, - {Weight: 0, NumMsgs: 20, Type: ethtypes.MsgCrossContractCall}, - {Weight: 0, NumMsgs: 20, Type: ethtypes.MsgCallDataBlast}, + {NumMsgs: 20, Type: ethtypes.MsgCreateContract}, + {NumMsgs: 20, Type: ethtypes.MsgWriteTo}, + {NumMsgs: 20, Type: ethtypes.MsgCrossContractCall}, + {NumMsgs: 20, Type: ethtypes.MsgCallDataBlast}, } msgBytes, err := yaml.Marshal(&spec) @@ -96,7 +95,6 @@ chain_config: Description: "cosmos load test", Kind: "cosmos", ChainID: "cosmoshub-4", - NumOfBlocks: 200, BaseMnemonic: "seed phrase goes here", NumWallets: 4, TxTimeout: 30 * time.Second, diff --git a/example/loadtest.yml b/example/loadtest.yml index 75dc148..520e1dc 100644 --- a/example/loadtest.yml +++ b/example/loadtest.yml @@ -1,7 +1,8 @@ # Example load test configuration chain_id: "ib-cosmos-0ea659" -num_of_txs: 100 # transactions per block -num_of_blocks: 100 # Process 100 blocks +num_batches: 50 # Send 50 batches +send_interval: "5s" # Send a batch every 5 seconds + nodes_addresses: - grpc: "100.122.238.40:9090" rpc: "http://100.122.238.40:26657" @@ -12,7 +13,7 @@ num_wallets: 15000 gas_denom: "stake" bech32_prefix: "cosmos" msgs: - - type: "MsgSend" # MsgSend - weight: 0.7 # 70% of transactions - - type: "MsgMultiSend" # MsgMultiSend - weight: 0.3 # 30% of transactions \ No newline at end of file + - type: "MsgSend" + num_txs: 70 # 70 transactions per batch + - type: "MsgMultiSend" + num_txs: 30 # 30 transactions per batch \ No newline at end of file diff --git a/example/petri_integration/load_test.go b/example/petri_integration/load_test.go index 8bdb61a..5ad9ddd 100644 --- a/example/petri_integration/load_test.go +++ b/example/petri_integration/load_test.go @@ -116,13 +116,13 @@ func TestPetriDockerIntegration(t *testing.T) { } msgs := []loadtesttypes.LoadTestMsg{ - {Weight: 1, Type: cosmoslttypes.MsgMultiSend}, - // {Weight: 1, Type: cosmoslttypes.MsgSend}, + {NumTxs: 10, Type: cosmoslttypes.MsgMultiSend}, } spec := loadtesttypes.LoadTestSpec{ ChainID: defaultChainConfig.ChainId, - NumOfBlocks: 5, + NumBatches: 5, + SendInterval: 10 * time.Second, BaseMnemonic: defaultChainOptions.BaseMnemonic, NumWallets: defaultChainOptions.AdditionalAccounts, Msgs: msgs, @@ -133,8 +133,7 @@ func TestPetriDockerIntegration(t *testing.T) { UnorderedTxs: false, NodesAddresses: nodeAddresses, }, - NumOfTxs: 10, - Kind: chains.CosmosKind, + Kind: chains.CosmosKind, } time.Sleep(10 * time.Second) @@ -205,15 +204,15 @@ func TestPetriDockerfileIntegration(t *testing.T) { } msgs := []loadtesttypes.LoadTestMsg{ - {Weight: 1, Type: cosmoslttypes.MsgMultiSend}, + {NumTxs: 10, Type: cosmoslttypes.MsgMultiSend}, } spec := loadtesttypes.LoadTestSpec{ ChainID: defaultChainConfig.ChainId, - NumOfBlocks: 20, + NumBatches: 10, + SendInterval: 5 * time.Second, BaseMnemonic: defaultChainOptions.BaseMnemonic, NumWallets: defaultChainOptions.AdditionalAccounts, Msgs: msgs, - NumOfTxs: 10, Kind: chains.CosmosKind, ChainCfg: &cosmoslttypes.ChainConfig{ GasDenom: defaultChainConfig.Denom,