Skip to content
62 changes: 41 additions & 21 deletions chains/ethereum/runner/persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,32 +123,52 @@
return len(sentTxs), nil
}

func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadSize int, useBaseline bool) ([]*gethtypes.Transaction, error) {

Check failure on line 126 in chains/ethereum/runner/persistent.go

View workflow job for this annotation

GitHub Actions / lint

(*Runner).buildLoadPersistent - result 1 (error) is always nil (unparam)
r.logger.Info("building load", zap.Int("maxLoadSize", maxLoadSize))
var txnLoad []*gethtypes.Transaction
var wg sync.WaitGroup
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Loop Variable Reference: Concurrency Bug

The loop captures the variable i by reference in the closure. When the goroutine executes, i will have the final value from the loop, causing all goroutines to write to sentTxs[len(txs)-1]. Capture i by value in the loop or pass it as a function parameter to ensure each goroutine writes to the correct index.

Fix in Cursor Fix in Web

txChan := make(chan *gethtypes.Transaction, maxLoadSize)
for range maxLoadSize {
sender := r.txFactory.GetNextSender()
wg.Go(func() {
sender := r.txFactory.GetNextSender()

if sender == nil {
r.logger.Info("failed to get sender")
break
}
nonce, ok := r.nonces.Load(sender.Address())
if !ok {
// this really should not happen ever. better safe than sorry.
return nil, fmt.Errorf("nonce for wallet %s not found", sender.Address())
}
tx, err := r.txFactory.BuildTxs(msgSpec, sender, nonce.(uint64), useBaseline)
if err != nil {
return nil, fmt.Errorf("failed to build txs %w", err)
}
lastTx := tx[len(tx)-1]
if lastTx == nil {
return nil, nil
if sender == nil {
return
}
nonce, ok := r.nonces.Load(sender.Address())
if !ok {
// this really should not happen ever. better safe than sorry.
r.logger.Error("nonce for wallet not found", zap.String("wallet", sender.Address().String()))
return
}
tx, err := r.txFactory.BuildTxs(msgSpec, sender, nonce.(uint64), useBaseline)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just confirming it is not possible to generate two txs for the same sender in a single load? i.e. GetNextSender will never return the same sender without returning nil in between? if it does there may be a nonce issue here due to goroutines, but from my quick look it doesnt look like that is a possible scenario.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetNextSender will return nil when it runs out of funded accounts which have not been used already. When we start a new round of load generation we will start over from index 0.

Once all wallets are funded we will reset sender and receiver indices to 0 and len(wallets)/2 respectively and iterate through N messages each time, incrementing modulo length of wallets.

if err != nil {
r.logger.Error("failed to build txs", zap.Error(err))
return
}
lastTx := tx[len(tx)-1]
if lastTx == nil {
return
}
r.nonces.Store(sender.Address(), lastTx.Nonce()+1)
// Only use single txn builders here
for _, txn := range tx {
txChan <- txn
}
})
}
doneChan := make(chan struct{})
go func() {
wg.Wait()
doneChan <- struct{}{}
}()
for {
select {
case txn := <-txChan:
txnLoad = append(txnLoad, txn)
case <-doneChan:
r.logger.Info("Generated load txs", zap.Int("num_txs", len(txnLoad)))
return txnLoad, nil
}
r.nonces.Store(sender.Address(), lastTx.Nonce()+1)
// Only use single txn builders here
txnLoad = append(txnLoad, tx...)
}
return txnLoad, nil
}
15 changes: 12 additions & 3 deletions chains/ethereum/txfactory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
GetNextReceiver() common.Address
GetBaselineWallet() *ethwallet.InteractingWallet
ResetWalletAllocation()
GetAccountBalance(common.Address) *big.Int
SetAccountBalance(common.Address, *big.Int)
}

type TxFactory struct {
Expand Down Expand Up @@ -429,9 +431,9 @@
recipient := f.txDistribution.GetNextReceiver()

// Get balance and transfer half of it
bal, err := fromWallet.GetBalance(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get balance of %s: %w", fromWallet.FormattedAddress(), err)
bal := f.txDistribution.GetAccountBalance(fromWallet.Address())
if bal == nil {
return nil, fmt.Errorf("failed to get balance of %s: %w", fromWallet.FormattedAddress())

Check failure on line 436 in chains/ethereum/txfactory/factory.go

View workflow job for this annotation

GitHub Actions / lint

printf: fmt.Errorf format %w reads arg #2, but call has 1 arg (govet)

Check failure on line 436 in chains/ethereum/txfactory/factory.go

View workflow job for this annotation

GitHub Actions / test

fmt.Errorf format %w reads arg #2, but call has 1 arg
}
transferAmount := new(big.Int).Div(bal, big.NewInt(2))

Expand Down Expand Up @@ -471,6 +473,13 @@
if err != nil {
return nil, fmt.Errorf("failed to sign transaction: %w", err)
}
f.txDistribution.SetAccountBalance(fromWallet.Address(), transferAmount)
recBal := f.txDistribution.GetAccountBalance(recipient)
newRecBal := new(big.Int).Set(transferAmount)
if recBal != nil {
newRecBal = newRecBal.Add(newRecBal, recBal)
}
f.txDistribution.SetAccountBalance(recipient, newRecBal)

return signedTx, nil
}
25 changes: 25 additions & 0 deletions chains/ethereum/txfactory/tx_distribution_bootstrapped.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package txfactory

import (
"context"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -18,6 +20,8 @@ type TxDistributionBootstrapped struct {
senderIndex int // Next wallet to use as a sender
receiverIndex int // Next wallet to use as a receiver
numWallets int // the length of wallets
// Map of address to balance (common.Address => *big.Int)
balanceCache *sync.Map
}

func NewTxDistributionBootstrapped(wallets []*ethwallet.InteractingWallet, fundedWallets int) *TxDistributionBootstrapped {
Expand All @@ -26,15 +30,36 @@ func NewTxDistributionBootstrapped(wallets []*ethwallet.InteractingWallet, funde
if fundedWallets == numWallets {
receiverIndex = numWallets / 2
}
balanceCache := &sync.Map{}
for _, w := range wallets {
bal, err := w.GetBalance(context.Background())
if err != nil {
continue
}
balanceCache.Store(w.Address(), bal)
}
return &TxDistributionBootstrapped{
wallets: wallets,
fundedWallets: fundedWallets,
senderIndex: 0,
receiverIndex: receiverIndex,
numWallets: numWallets,
balanceCache: balanceCache,
}
}

func (d *TxDistributionBootstrapped) GetAccountBalance(addr common.Address) *big.Int {
bal, ok := d.balanceCache.Load(addr)
if !ok {
return nil
}
return bal.(*big.Int)
}

func (d *TxDistributionBootstrapped) SetAccountBalance(addr common.Address, bal *big.Int) {
d.balanceCache.Store(addr, bal)
}

// GetNextSender returns the next sender wallet
// Returns nil if no further senders can be used during this load generation.
func (d *TxDistributionBootstrapped) GetNextSender() *ethwallet.InteractingWallet {
Expand Down
7 changes: 7 additions & 0 deletions chains/ethereum/txfactory/tx_distribution_even.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package txfactory

import (
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -29,6 +30,12 @@ func NewTxDistributionEven(wallets []*ethwallet.InteractingWallet) *TxDistributi
}
}

func (d *TxDistributionEven) GetAccountBalance(addr common.Address) *big.Int {
return nil
}

func (d *TxDistributionEven) SetAccountBalance(addr common.Address, bal *big.Int) {}

// GetNextSender returns the next sender wallet using round-robin within the current load
func (d *TxDistributionEven) GetNextSender() *ethwallet.InteractingWallet {
d.mu.Lock()
Expand Down
Loading