Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/blockchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ tokio-util.workspace = true

ethrex-metrics = { path = "./metrics", default-features = false }

lru = "0.16.2"

[dev-dependencies]
serde_json.workspace = true
hex = "0.4.3"
Expand Down
44 changes: 40 additions & 4 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ use ethrex_storage::{
use ethrex_trie::{Nibbles, Node, NodeRef, Trie};
use ethrex_vm::backends::levm::db::DatabaseLogger;
use ethrex_vm::{BlockExecutionResult, DynVmDatabase, Evm, EvmError};
use lru::LruCache;
use mempool::Mempool;
use payload::PayloadOrTask;
use rustc_hash::FxHashMap;
use rustc_hash::{FxBuildHasher, FxHashMap};
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::sync::{
Expand All @@ -61,6 +62,8 @@ use ethrex_common::types::BlobsBundle;
const MAX_PAYLOADS: usize = 10;
const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000;

const TX_SENDER_CACHE_MAX_SIZE: usize = 50_000;

type StoreUpdatesMap = FxHashMap<H256, (Result<Trie, StoreError>, FxHashMap<Nibbles, Vec<u8>>)>;
//TODO: Implement a struct Chain or BlockChain to encapsulate
//functionality and canonical chain state and config
Expand Down Expand Up @@ -90,6 +93,8 @@ pub struct Blockchain {
/// Mapping from a payload id to either a complete payload or a payload build task
/// We need to keep completed payloads around in case consensus requests them twice
pub payloads: Arc<TokioMutex<Vec<(u64, PayloadOrTask)>>>,
/// Cache for transaction senders, mapping from transaction hash to sender address
tx_sender_cache: Arc<Mutex<LruCache<H256, Address, FxBuildHasher>>>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -142,6 +147,10 @@ impl Blockchain {
is_synced: AtomicBool::new(false),
payloads: Arc::new(TokioMutex::new(Vec::new())),
options: blockchain_opts,
tx_sender_cache: Arc::new(Mutex::new(LruCache::with_hasher(
TX_SENDER_CACHE_MAX_SIZE.try_into().unwrap(),
FxBuildHasher,
))),
}
}

Expand All @@ -152,6 +161,10 @@ impl Blockchain {
is_synced: AtomicBool::new(false),
payloads: Arc::new(TokioMutex::new(Vec::new())),
options: BlockchainOptions::default(),
tx_sender_cache: Arc::new(Mutex::new(LruCache::with_hasher(
TX_SENDER_CACHE_MAX_SIZE.try_into().unwrap(),
FxBuildHasher,
))),
}
}

Expand Down Expand Up @@ -227,14 +240,35 @@ impl Blockchain {
let queue_length = AtomicUsize::new(0);
let queue_length_ref = &queue_length;
let mut max_queue_length = 0;
let tx_sender_cache = self.tx_sender_cache.clone();

let (execution_result, account_updates_list) = std::thread::scope(|s| {
let max_queue_length_ref = &mut max_queue_length;
let (tx, rx) = channel();
let execution_handle = std::thread::Builder::new()
.name("block_executor_execution".to_string())
.spawn_scoped(s, move || -> Result<_, ChainError> {
let block_senders = {
let mut tx_sender_cache = tx_sender_cache.lock().unwrap();
block.body.precompute_tx_hashes();
block
.body
.transactions
.iter()
.map(|tx| {
tx_sender_cache
.get(&tx.hash())
.copied()
.unwrap_or(H160::zero())
})
.collect()
};
let block_senders = block
.body
.recover_with_cached_senders(block_senders)
.unwrap();
let execution_result =
vm.execute_block_pipeline(block, tx, queue_length_ref)?;
vm.execute_block_pipeline(block, tx, queue_length_ref, &block_senders)?;

// Validate execution went alright
validate_gas_used(&execution_result.receipts, &block.header)?;
Expand Down Expand Up @@ -1400,8 +1434,9 @@ impl Blockchain {

// Add transaction and blobs bundle to storage
self.mempool
.add_transaction(hash, MempoolTransaction::new(transaction, sender))?;
.add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?;
self.mempool.add_blobs_bundle(hash, blobs_bundle)?;
self.tx_sender_cache.lock().unwrap().put(hash, sender);
Ok(hash)
}

Expand All @@ -1426,7 +1461,8 @@ impl Blockchain {

// Add transaction to storage
self.mempool
.add_transaction(hash, MempoolTransaction::new(transaction, sender))?;
.add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?;
self.tx_sender_cache.lock().unwrap().put(hash, sender);

Ok(hash)
}
Expand Down
9 changes: 6 additions & 3 deletions crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl Mempool {
pub fn add_transaction(
&self,
hash: H256,
tx_sender: Address,
transaction: MempoolTransaction,
) -> Result<(), StoreError> {
let mut inner = self.write()?;
Expand All @@ -119,7 +120,7 @@ impl Mempool {
inner.txs_order.push_back(hash);
inner
.txs_by_sender_nonce
.insert((transaction.sender(), transaction.nonce()), hash);
.insert((tx_sender, transaction.nonce()), hash);
inner.transaction_pool.insert(hash, transaction);
inner.broadcast_pool.insert(hash);

Expand Down Expand Up @@ -852,9 +853,11 @@ mod tests {
let filter =
|tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) };
mempool
.add_transaction(blob_tx_hash, blob_tx.clone())
.add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone())
.unwrap();
mempool
.add_transaction(plain_tx_hash, plain_tx_sender, plain_tx)
.unwrap();
mempool.add_transaction(plain_tx_hash, plain_tx).unwrap();
let txs = mempool.filter_transactions_with_filter_fn(&filter).unwrap();
assert_eq!(txs, HashMap::from([(blob_tx.sender(), vec![blob_tx])]));
}
Expand Down
27 changes: 26 additions & 1 deletion crates/common/types/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use ethrex_rlp::{
structs::{Decoder, Encoder},
};
use ethrex_trie::Trie;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
use rkyv::{Archive, Deserialize as RDeserialize, Serialize as RSerialize};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -260,6 +260,31 @@ impl BlockBody {
.map(|tx| Ok((tx, tx.sender()?)))
.collect::<Result<Vec<(&Transaction, Address)>, secp256k1::Error>>()
}

pub fn precompute_tx_hashes(&self) {
self.transactions.par_iter().for_each(|tx| {
let _ = tx.hash();
});
}

pub fn recover_with_cached_senders(
&self,
cached_senders: Vec<Address>,
) -> Result<Vec<Address>, secp256k1::Error> {
// Recovering addresses is computationally expensive.
// Computing them in parallel greatly reduces execution time.
cached_senders
.par_iter()
.zip(&self.transactions)
.map(|(sender, tx)| {
if sender.is_zero() {
tx.sender()
} else {
Ok(*sender)
}
})
.collect::<Result<Vec<Address>, secp256k1::Error>>()
}
}

pub fn compute_transactions_root(transactions: &[Transaction]) -> H256 {
Expand Down
7 changes: 3 additions & 4 deletions crates/vm/backends/levm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl LEVM {
vm_type: VMType,
merkleizer: Sender<Vec<AccountUpdate>>,
queue_length: &AtomicUsize,
block_senders: &[Address],
) -> Result<BlockExecutionResult, EvmError> {
Self::prepare_block(block, db, vm_type)?;

Expand All @@ -113,9 +114,7 @@ impl LEVM {
// The value itself can be safely changed.
let mut tx_since_last_flush = 2;

for (tx, tx_sender) in block.body.get_transactions_with_sender().map_err(|error| {
EvmError::Transaction(format!("Couldn't recover addresses with error: {error}"))
})? {
for (tx, tx_sender) in block.body.transactions.iter().zip(block_senders) {
if cumulative_gas_used + tx.gas_limit() > block.header.gas_limit {
return Err(EvmError::Transaction(format!(
"Gas allowance exceeded. Block gas limit {} can be surpassed by executing transaction with gas limit {}",
Expand All @@ -126,7 +125,7 @@ impl LEVM {

let report = Self::execute_tx_in_block(
tx,
tx_sender,
*tx_sender,
&block.header,
db,
vm_type,
Expand Down
10 changes: 9 additions & 1 deletion crates/vm/backends/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,16 @@ impl Evm {
block: &Block,
merkleizer: Sender<Vec<AccountUpdate>>,
queue_length: &AtomicUsize,
block_senders: &[Address],
) -> Result<BlockExecutionResult, EvmError> {
LEVM::execute_block_pipeline(block, &mut self.db, self.vm_type, merkleizer, queue_length)
LEVM::execute_block_pipeline(
block,
&mut self.db,
self.vm_type,
merkleizer,
queue_length,
block_senders,
)
}

/// Wraps [LEVM::execute_tx].
Expand Down
Loading