Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@
public_keys_file: String,

#[clap(
long,
short = 'a',
required_unless_present = "devnet",
help = "Address of the Proof-of-Authority owner"
)]
poa_owner_address: Option<String>,

#[clap(
long,
short = 'c',

Check failure on line 70 in utils/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

this function has too many arguments (8/7)

error: this function has too many arguments (8/7) --> utils/src/spammer.rs:61:5 | 61 | / pub fn new( 62 | | url: Url, 63 | | signer_index: usize, 64 | | max_num_txs: u64, ... | 69 | | chain_id: u64, 70 | | ) -> Result<Self> { | |_____________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/rust-1.91.0/index.html#too_many_arguments = note: `-D clippy::too-many-arguments` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::too_many_arguments)]`
help = "Chain ID for the genesis file (default: 12345)",
default_value_t = 12345
)]
Expand Down Expand Up @@ -130,6 +130,9 @@
/// Rate of transactions per second
#[clap(short, long, default_value = "1000")]
rate: u64,
/// Interval in ms for sending batches of transactions
#[clap(short, long, default_value = "200")]
interval: u64,
/// Time to run the spammer for in seconds
#[clap(short, long, default_value = "0")]
time: u64,
Expand All @@ -150,6 +153,7 @@
rpc_url,
num_txs,
rate,
interval,
time,
blobs,
signer_index,
Expand All @@ -163,6 +167,7 @@
*num_txs,
*time,
*rate,
*interval,
*blobs,
*chain_id,
)?
Expand Down Expand Up @@ -296,6 +301,9 @@
/// Rate of transactions per second
#[clap(short, long, default_value_t = 1000)]
rate: u64,
/// Interval in ms for sending batches of transactions
#[clap(short, long, default_value = "200")]
interval: u64,
/// Time to run the spammer for in seconds
#[clap(short, long, default_value_t = 0)]
time: u64,
Expand All @@ -315,6 +323,7 @@
rpc_url,
num_txs,
rate,
interval,
time,
signer_index,
chain_id,
Expand All @@ -326,6 +335,7 @@
*num_txs,
*time,
*rate,
*interval,
contract,
function,
args,
Expand Down
235 changes: 173 additions & 62 deletions utils/src/spammer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
use crate::make_signers;
use crate::tx::{make_signed_contract_call_tx, make_signed_eip1559_tx, make_signed_eip4844_tx};

/// Target pool size to maintain (in number of transactions).
const TARGET_POOL_SIZE: u64 = 30_000;

struct ContractPayload {
/// Contract address for contract call spamming.
address: Address,
Expand All @@ -44,6 +47,8 @@
max_time: u64,
/// Maximum number of transactions to send per second.
max_rate: u64,
/// Number of ms between sending batches of txs (default: 200).
batch_interval: u64,
/// Whether to send EIP-4844 blob transactions.
blobs: bool,
/// Chain ID for the transactions.
Expand All @@ -59,6 +64,7 @@
max_num_txs: u64,
max_time: u64,
max_rate: u64,
batch_interval: u64,
blobs: bool,
chain_id: u64,
) -> Result<Self> {
Expand All @@ -70,6 +76,7 @@
max_num_txs,
max_time,
max_rate,
batch_interval,
blobs,
chain_id,
contract_payload: None,
Expand All @@ -83,6 +90,7 @@
max_num_txs: u64,
max_time: u64,
max_rate: u64,
batch_interval: u64,
contract: &Address,
function: &str,
args: &[String],
Expand All @@ -101,6 +109,7 @@
max_num_txs,
max_time,
max_rate,
batch_interval,
blobs: false, // Contract calls don't use blobs
contract_payload: Some(contract_payload),
chain_id,
Expand Down Expand Up @@ -155,6 +164,17 @@
Ok(u64::from_str_radix(hex_str, 16)?)
}

// Get current txpool status.
async fn get_txpool_status(&self) -> Result<TxpoolStatus> {
self.client.rpc_request("txpool_status", vec![]).await
}

// Get current number of pending and queued transactions in the pool.
async fn get_mempool_count(&self) -> Result<u64> {
let status = self.get_txpool_status().await?;
Ok(status.pending + status.queued)
}

/// Generate and send transactions to the Ethereum node at a controlled rate.
async fn spammer(
&self,
Expand All @@ -162,94 +182,122 @@
report_sender: Sender<Instant>,
finish_sender: Sender<()>,
) -> Result<()> {
// Fetch latest nonce for the sender address.

Check warning on line 185 in utils/src/spammer.rs

View workflow job for this annotation

GitHub Actions / Formatting

Diff in /home/runner/work/emerald/emerald/utils/src/spammer.rs
let address = self.signer.address();
let latest_nonce = self.get_latest_nonce(address).await?;
debug!("Spamming {address} starting from nonce={latest_nonce}");
let txs_per_batch = self.max_rate
.saturating_mul(self.batch_interval)
.checked_div(1000)
.unwrap_or(0);
debug!(
"Spamming {address} starting from nonce={latest_nonce} at rate {}, sending {txs_per_batch} txs every {}ms",
self.max_rate,
self.batch_interval,
);

// Initialize nonce and counters.
let mut nonce = latest_nonce;
let start_time = Instant::now();
let mut txs_sent_total = 0u64;
let mut interval = time::interval(Duration::from_secs(1));
let mut interval = time::interval(Duration::from_millis(self.batch_interval));

loop {
// Wait for next one-second tick.
let _ = interval.tick().await;
let interval_start = Instant::now();

// Prepare batch of transactions for this interval.
let mut batch_entries = Vec::with_capacity(self.max_rate as usize);
// Verify the nonce for gaps
// TODO: probably this should run as a separate task
let on_chain_nonce = self.get_latest_nonce(address).await?;
// If the span between the on-chain nonce and the one we are about to send
// is too big, then probably there is a gap that doesn't allow the
// on-chain nonce too advance.
let nonce_span = nonce.saturating_sub(on_chain_nonce);
if nonce_span > self.max_rate {
debug!("Current nonce={nonce}, on-chain nonce={on_chain_nonce}. Sending 10 txs");
let batch_entries = self.build_batch_entries(10, on_chain_nonce).await?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 10? should it not be none_span?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small number of tx to clear the nonce gap in case there is one. Note that it might be that there is no nonce gap and the blocks are committed slower than the injection rate. So we shouldn't inject a lot of transactions here as we might very well send a lot of duplicates.

if let Some(results) = self.send_raw_batch(&batch_entries).await? {
if results.len() != batch_entries.len() {
return Err(eyre::eyre!(
"Batch response count {} does not match request count {}",
results.len(),
batch_entries.len()
));
}

for _ in 0..self.max_rate {
// Check exit conditions before creating each transaction.
if (self.max_num_txs > 0 && txs_sent_total >= self.max_num_txs)
|| (self.max_time > 0 && start_time.elapsed().as_secs() >= self.max_time)
{
break;
// Report individual results.
for ((_, tx_bytes_len), result) in batch_entries.into_iter().zip(results) {
let mapped_result = result.map(|_| tx_bytes_len);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we discarding the Ok value of results?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because it contains the tx hash, which isn't used later anywhere, and we don't care about single tx success because it will slow down the spammer. We should modify this if needed. cc @mpoke

result_sender.send(mapped_result).await?;
}
} else {
debug!("Batch eth_sendRawTransaction timed out; skipping this tick");
let _ = report_sender.send(interval_start).await;
Comment thread
ljoss17 marked this conversation as resolved.
Outdated
continue;
}
}

// Create one transaction and sign it.
let signed_tx = if let Some(ref payload) = self.contract_payload {
// Contract call transaction
make_signed_contract_call_tx(
&self.signer,
nonce,
payload.address,
&payload.function_sig,
payload.args.as_slice(),
self.chain_id,
)
.await?
} else if self.blobs {
// Blob transaction
make_signed_eip4844_tx(&self.signer, nonce, self.chain_id).await?
} else {
// Regular transfer
make_signed_eip1559_tx(&self.signer, nonce, self.chain_id).await?
};
let tx_bytes = signed_tx.encoded_2718();
let tx_bytes_len = tx_bytes.len() as u64;

// Add to batch.
let payload = hex::encode(tx_bytes);
batch_entries.push((vec![json!(payload)], tx_bytes_len));

nonce += 1;
txs_sent_total += 1;
// Get current pool size and calculate dynamic send rate
let current_pool_size = self.get_mempool_count().await.unwrap_or(0);
let space_available = TARGET_POOL_SIZE.saturating_sub(current_pool_size);
let txs_to_send = if space_available < txs_per_batch {
space_available
} else {
txs_per_batch
};

// Continue if there is no space available
if txs_to_send == 0 {
debug!("Mempool already full. Do not send more transactions.");
let _ = report_sender.send(interval_start).await;
continue;
}

// Limit the max number of transactions
let tx_count = if self.max_num_txs > 0 {
txs_to_send.min(self.max_num_txs.saturating_sub(txs_sent_total))
} else {
txs_to_send
};

// Prepare batch of transactions for this interval.
let batch_entries = self.build_batch_entries(tx_count, nonce).await?;
let batch_size = batch_entries.len() as u64;

debug!(
"Pool: {current_pool_size}/{TARGET_POOL_SIZE}, sending {batch_size} txs from nonce {nonce} (rate: {})",
self.max_rate
);

// Send all transactions in a single batch RPC call.
if !batch_entries.is_empty() {
let params: Vec<_> = batch_entries
.iter()
.map(|(params, _)| params.clone())
.collect();

let results = self
.client
.rpc_batch_request("eth_sendRawTransaction", params)
.await?;

if results.len() != batch_entries.len() {
return Err(eyre::eyre!(
"Batch response count {} does not match request count {}",
results.len(),
batch_entries.len()
));
}
if let Some(results) = self.send_raw_batch(&batch_entries).await? {
if results.len() != batch_entries.len() {
return Err(eyre::eyre!(
"Batch response count {} does not match request count {}",
results.len(),
batch_entries.len()
));
}

// Report individual results.
for ((_, tx_bytes_len), result) in batch_entries.into_iter().zip(results) {
let mapped_result = result.map(|_| tx_bytes_len);
result_sender.send(mapped_result).await?;
// Report individual results.
for ((_, tx_bytes_len), result) in batch_entries.into_iter().zip(results) {
let mapped_result = result.map(|_| tx_bytes_len);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here

result_sender.send(mapped_result).await?;
}

txs_sent_total += batch_size;
nonce += batch_size;
} else {
debug!("Batch eth_sendRawTransaction timed out; skipping this tick");
}
}

// Give time to the in-flight results to be received.
sleep(Duration::from_millis(20)).await;

// Signal tracker to report stats after this batch.
report_sender.try_send(interval_start)?;
let _ = report_sender.send(interval_start).await;

// Check exit conditions after each tick.
if (self.max_num_txs > 0 && txs_sent_total >= self.max_num_txs)
Expand All @@ -262,6 +310,68 @@
Ok(())
}

async fn build_batch_entries(
&self,
tx_count: u64,
nonce: u64,
) -> Result<Vec<(Vec<serde_json::Value>, u64)>> {
let mut batch_entries = Vec::with_capacity(tx_count as usize);
let mut next_nonce = nonce;

for _ in 0..tx_count {
let signed_tx = if let Some(ref payload) = self.contract_payload {
make_signed_contract_call_tx(
&self.signer,
next_nonce,
payload.address,
&payload.function_sig,
payload.args.as_slice(),
self.chain_id,
)
.await?
} else if self.blobs {
make_signed_eip4844_tx(&self.signer, next_nonce, self.chain_id).await?
} else {
make_signed_eip1559_tx(&self.signer, next_nonce, self.chain_id).await?
};

let tx_bytes = signed_tx.encoded_2718();
let tx_bytes_len = tx_bytes.len() as u64;
let payload = hex::encode(tx_bytes);
batch_entries.push((vec![json!(payload)], tx_bytes_len));
next_nonce += 1;
}

Ok(batch_entries)
}

async fn send_raw_batch(
&self,
batch_entries: &[(Vec<serde_json::Value>, u64)],
) -> Result<Option<Vec<Result<String>>>> {
let params: Vec<_> = batch_entries
.iter()
.map(|(params, _)| params.clone())
.collect();

match self
.client
.rpc_batch_request("eth_sendRawTransaction", params)
.await
{
Ok(responses) => Ok(Some(responses)),
Err(err) => {
if let Some(jsonrpsee_core::client::Error::RequestTimeout) =
err.downcast_ref::<jsonrpsee_core::client::Error>()
{
Ok(None)
} else {
Err(err)
}
}
}
}

// Track and report statistics on sent transactions.
async fn tracker(
&self,
Expand Down Expand Up @@ -290,7 +400,7 @@
sleep(Duration::from_secs(1) - elapsed).await;
}

let pool_status: TxpoolStatus = self.client.rpc_request("txpool_status", vec![]).await?;
let pool_status = self.get_txpool_status().await?;
debug!("{stats_last_second}; {pool_status:?}");

// Update total, then reset last second stats
Expand Down Expand Up @@ -378,14 +488,15 @@
}
}

#[derive(Clone)]
struct RpcClient {
client: HttpClient,
}

impl RpcClient {
pub fn new(url: Url) -> Result<Self> {
let client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(5))
.request_timeout(Duration::from_secs(1))
.build(url)?;
Ok(Self { client })
}
Expand Down
Loading