Skip to content

Commit c0df626

Browse files
authored
feat(l1): track new pooled transaction hashes (lambdaclass#3158)
**Motivation** We weren't keeping track of the received NewPooledTransationHashes messages, we were just propagating the incoming msg without validating them. <!-- Why does this pull request exist? What are its goals? --> The PR fixes the following hivetests: - BlobViolations This test sends some invalid blob tx announcements and expects the node to disconnect. - TestBlobTxWithoutSidecar This test checks that a blob transaction first advertised/transmitted without blobs will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer. - TestBlobTxWithMismatchedSidecar This test checks that a blob transaction first advertised/transmitted without blobs, whose commitment don't correspond to the blob_versioned_hashes in the transaction, will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer. ### Sequence diagram of TestBlobTxWithoutSidecar & TestBlobTxWithMismatchedSidecar ```mermaid sequenceDiagram participant Node participant BadPeer participant GoodPeer %% Setup: Forkchoice Update Node->>Node: sendForkchoiceUpdated() %% Stage 1: Bad Peer announces transaction BadPeer->>Node: NewPooledTransactionHashesMsg<br>(badTx Hash, BlobTxType, Size) Node->>BadPeer: GetPooledTransactionsPacket<br>(Request badTx) Note right of BadPeer: stage1.Done() %% Stage 2: Good Peer announces transaction Note right of GoodPeer: stage1.Wait() GoodPeer->>Node: NewPooledTransactionHashesMsg<br>(tx Hash, BlobTxType, Size) Note right of GoodPeer: stage2.Done() %% Stage 3: Bad Peer sends invalid transaction and is disconnected Note right of BadPeer: stage2.Wait() BadPeer->>Node: PooledTransactionsMsg<br>(badTx, RequestId) Node->>BadPeer: Disconnect Note right of BadPeer: stage3.Done() %% Good Peer sends correct transaction Note right of GoodPeer: stage3.Wait() Node->>GoodPeer: GetPooledTransactionsPacket<br>(Request tx) GoodPeer->>Node: PooledTransactionsMsg<br>(tx, RequestId) Note right of GoodPeer: No Disconnect<br>close(errc) %% Synchronization Points Note over BadPeer,GoodPeer: stage1: Bad Peer announces<br>stage2: Good Peer announces<br>stage3: Bad Peer disconnected ``` You can check the tests code [here](https://github.com/ethereum/go-ethereum/blob/72d92698a474059f3a73798c6312699c1f210497/cmd/devp2p/internal/ethtest/suite.go#L917) **Description** A new field `requested_pooled_txs` was added to the RLPxConnection struct. In that struct, the request_id and the requested hashed are stored to check later, when the response is received. By doing these, we can assure that the received Pooled Transactions match the expected income. Some new error types were added. The fn `get_p2p_transaction` was moved to RLPxConnection. Can be tested using: ```bash make run-hive SIMULATION=devp2p TEST_PATTERN='eth/blob' HIVE_BRANCH=mecha/update-devp2p-testfiles SIM_LOG_LEVEL=4 ``` The tests can't be activated in the CI yet. We need to update the Hive fork first. [PR](lambdaclass/hive#35) <!-- A clear and concise general description of the changes this PR introduces --> <!-- Link to issues: Resolves lambdaclass#111, Resolves lambdaclass#222 --> Closes: lambdaclass#1781 lambdaclass#3122 lambdaclass#3123 lambdaclass#1416
1 parent 699835f commit c0df626

File tree

5 files changed

+133
-54
lines changed

5 files changed

+133
-54
lines changed

crates/blockchain/blockchain.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@ use constants::{MAX_INITCODE_SIZE, MAX_TRANSACTION_DATA_SIZE};
1212
use error::MempoolError;
1313
use error::{ChainError, InvalidBlockError};
1414
use ethrex_common::constants::{GAS_PER_BLOB, MIN_BASE_FEE_PER_BLOB_GAS};
15-
use ethrex_common::types::ELASTICITY_MULTIPLIER;
1615
use ethrex_common::types::MempoolTransaction;
1716
use ethrex_common::types::block_execution_witness::ExecutionWitnessResult;
1817
use ethrex_common::types::requests::{EncodedRequests, Requests, compute_requests_hash};
1918
use ethrex_common::types::{
2019
AccountUpdate, Block, BlockHash, BlockHeader, BlockNumber, ChainConfig, EIP4844Transaction,
21-
Receipt, Transaction, compute_receipts_root, validate_block_header,
20+
Receipt, Transaction, WrappedEIP4844Transaction, compute_receipts_root, validate_block_header,
2221
validate_cancun_header_fields, validate_prague_header_fields,
2322
validate_pre_cancun_header_fields,
2423
};
24+
use ethrex_common::types::{ELASTICITY_MULTIPLIER, P2PTransaction};
2525
use ethrex_common::{Address, H256, TrieLogger};
2626
use ethrex_metrics::metrics;
2727
use ethrex_storage::{Store, UpdateBatch, error::StoreError, hash_address, hash_key};
@@ -737,6 +737,39 @@ impl Blockchain {
737737
pub fn is_synced(&self) -> bool {
738738
self.is_synced.load(Ordering::Relaxed)
739739
}
740+
741+
pub fn get_p2p_transaction_by_hash(&self, hash: &H256) -> Result<P2PTransaction, StoreError> {
742+
let Some(tx) = self.mempool.get_transaction_by_hash(*hash)? else {
743+
return Err(StoreError::Custom(format!(
744+
"Hash {} not found in the mempool",
745+
hash
746+
)));
747+
};
748+
let result = match tx {
749+
Transaction::LegacyTransaction(itx) => P2PTransaction::LegacyTransaction(itx),
750+
Transaction::EIP2930Transaction(itx) => P2PTransaction::EIP2930Transaction(itx),
751+
Transaction::EIP1559Transaction(itx) => P2PTransaction::EIP1559Transaction(itx),
752+
Transaction::EIP4844Transaction(itx) => {
753+
let Some(bundle) = self.mempool.get_blobs_bundle(*hash)? else {
754+
return Err(StoreError::Custom(format!(
755+
"Blob transaction present without its bundle: hash {}",
756+
hash
757+
)));
758+
};
759+
760+
P2PTransaction::EIP4844TransactionWithBlobs(WrappedEIP4844Transaction {
761+
tx: itx,
762+
blobs_bundle: bundle,
763+
})
764+
}
765+
Transaction::EIP7702Transaction(itx) => P2PTransaction::EIP7702Transaction(itx),
766+
Transaction::PrivilegedL2Transaction(itx) => {
767+
P2PTransaction::PrivilegedL2Transaction(itx)
768+
}
769+
};
770+
771+
Ok(result)
772+
}
740773
}
741774

742775
pub fn validate_requests_hash(

crates/blockchain/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ pub enum MempoolError {
8484
NotEnoughBalance,
8585
#[error("Transaction gas fields are invalid")]
8686
InvalidTxGasvalues,
87+
#[error("Invalid pooled TxType, expected: {0}")]
88+
InvalidPooledTxType(u8),
89+
#[error("Invalid pooled transaction size, differs from expected")]
90+
InvalidPooledTxSize,
91+
#[error("Requested pooled transaction was not received")]
92+
RequestedPooledTxNotFound,
8793
}
8894

8995
#[derive(Debug)]

crates/common/types/transaction.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,6 +1504,29 @@ mod canonic_encoding {
15041504
self.encode_canonical(&mut buf);
15051505
buf
15061506
}
1507+
1508+
pub fn compute_hash(&self) -> H256 {
1509+
match self {
1510+
P2PTransaction::LegacyTransaction(t) => {
1511+
Transaction::LegacyTransaction(t.clone()).compute_hash()
1512+
}
1513+
P2PTransaction::EIP2930Transaction(t) => {
1514+
Transaction::EIP2930Transaction(t.clone()).compute_hash()
1515+
}
1516+
P2PTransaction::EIP1559Transaction(t) => {
1517+
Transaction::EIP1559Transaction(t.clone()).compute_hash()
1518+
}
1519+
P2PTransaction::EIP4844TransactionWithBlobs(t) => {
1520+
Transaction::EIP4844Transaction(t.tx.clone()).compute_hash()
1521+
}
1522+
P2PTransaction::EIP7702Transaction(t) => {
1523+
Transaction::EIP7702Transaction(t.clone()).compute_hash()
1524+
}
1525+
P2PTransaction::PrivilegedL2Transaction(t) => {
1526+
Transaction::PrivilegedL2Transaction(t.clone()).compute_hash()
1527+
}
1528+
}
1529+
}
15071530
}
15081531
}
15091532

crates/networking/p2p/rlpx/connection.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
use super::{
2+
eth::{transactions::NewPooledTransactionHashes, update::BlockRangeUpdate},
3+
p2p::DisconnectReason,
4+
utils::log_peer_warn,
5+
};
16
use crate::{
27
kademlia::PeerChannels,
38
rlpx::{
@@ -32,7 +37,10 @@ use ethrex_storage::Store;
3237
use futures::SinkExt;
3338
use k256::{PublicKey, SecretKey, ecdsa::SigningKey};
3439
use rand::random;
35-
use std::{collections::HashSet, sync::Arc};
40+
use std::{
41+
collections::{HashMap, HashSet},
42+
sync::Arc,
43+
};
3644
use tokio::{
3745
io::{AsyncRead, AsyncWrite},
3846
sync::{
@@ -47,12 +55,6 @@ use tokio_stream::StreamExt;
4755
use tokio_util::codec::Framed;
4856
use tracing::debug;
4957

50-
use super::{
51-
eth::{transactions::NewPooledTransactionHashes, update::BlockRangeUpdate},
52-
p2p::DisconnectReason,
53-
utils::log_peer_warn,
54-
};
55-
5658
const PERIODIC_PING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
5759
const PERIODIC_TX_BROADCAST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
5860
const PERIODIC_TASKS_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
@@ -92,6 +94,7 @@ pub(crate) struct RLPxConnection<S> {
9294
next_block_range_update: Instant,
9395
last_block_range_update_block: u64,
9496
broadcasted_txs: HashSet<H256>,
97+
requested_pooled_txs: HashMap<u64, NewPooledTransactionHashes>,
9598
client_version: String,
9699
/// Send end of the channel used to broadcast messages
97100
/// to other connected peers, is ok to have it here,
@@ -130,6 +133,7 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
130133
next_block_range_update: Instant::now() + PERIODIC_BLOCK_RANGE_UPDATE_INTERVAL,
131134
last_block_range_update_block: 0,
132135
broadcasted_txs: HashSet::new(),
136+
requested_pooled_txs: HashMap::new(),
133137
client_version,
134138
connection_broadcast_send: connection_broadcast,
135139
}
@@ -568,12 +572,14 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
568572
Message::NewPooledTransactionHashes(new_pooled_transaction_hashes)
569573
if peer_supports_eth =>
570574
{
571-
//TODO(#1415): evaluate keeping track of requests to avoid sending the same twice.
572575
let hashes =
573576
new_pooled_transaction_hashes.get_transactions_to_request(&self.blockchain)?;
574577

575-
//TODO(#1416): Evaluate keeping track of the request-id.
576-
let request = GetPooledTransactions::new(random(), hashes);
578+
let request_id = random();
579+
self.requested_pooled_txs
580+
.insert(request_id, new_pooled_transaction_hashes);
581+
582+
let request = GetPooledTransactions::new(request_id, hashes);
577583
self.send(Message::GetPooledTransactions(request)).await?;
578584
}
579585
Message::GetPooledTransactions(msg) => {
@@ -582,6 +588,21 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
582588
}
583589
Message::PooledTransactions(msg) if peer_supports_eth => {
584590
if self.blockchain.is_synced() {
591+
if let Some(requested) = self.requested_pooled_txs.get(&msg.id) {
592+
if let Err(error) = msg.validate_requested(requested).await {
593+
log_peer_warn(
594+
&self.node,
595+
&format!("disconnected from peer. Reason: {}", error),
596+
);
597+
self.send_disconnect_message(Some(DisconnectReason::SubprotocolError))
598+
.await;
599+
return Err(RLPxError::DisconnectSent(
600+
DisconnectReason::SubprotocolError,
601+
));
602+
} else {
603+
self.requested_pooled_txs.remove(&msg.id);
604+
}
605+
}
585606
msg.handle(&self.node, &self.blockchain).await?;
586607
}
587608
}

crates/networking/p2p/rlpx/eth/transactions.rs

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use ethrex_blockchain::Blockchain;
44
use ethrex_blockchain::error::MempoolError;
55
use ethrex_common::types::BlobsBundle;
66
use ethrex_common::types::P2PTransaction;
7-
use ethrex_common::types::WrappedEIP4844Transaction;
87
use ethrex_common::{H256, types::Transaction};
8+
use ethrex_rlp::encode::RLPEncode;
99
use ethrex_rlp::{
1010
error::{RLPDecodeError, RLPEncodeError},
1111
structs::{Decoder, Encoder},
@@ -65,11 +65,11 @@ impl RLPxMessage for Transactions {
6565

6666
// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#newpooledtransactionhashes-0x08
6767
// Broadcast message
68-
#[derive(Debug)]
68+
#[derive(Debug, Eq, PartialEq)]
6969
pub(crate) struct NewPooledTransactionHashes {
7070
transaction_types: Bytes,
7171
transaction_sizes: Vec<usize>,
72-
transaction_hashes: Vec<H256>,
72+
pub(crate) transaction_hashes: Vec<H256>,
7373
}
7474

7575
impl NewPooledTransactionHashes {
@@ -165,8 +165,8 @@ impl RLPxMessage for NewPooledTransactionHashes {
165165
pub(crate) struct GetPooledTransactions {
166166
// id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response
167167
// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages
168-
id: u64,
169-
transaction_hashes: Vec<H256>,
168+
pub(crate) id: u64,
169+
pub(crate) transaction_hashes: Vec<H256>,
170170
}
171171

172172
impl GetPooledTransactions {
@@ -182,54 +182,19 @@ impl GetPooledTransactions {
182182
let txs = self
183183
.transaction_hashes
184184
.iter()
185-
.map(|hash| Self::get_p2p_transaction(hash, blockchain))
185+
.map(|hash| blockchain.get_p2p_transaction_by_hash(hash))
186186
// Return an error in case anything failed.
187187
.collect::<Result<Vec<_>, _>>()?
188188
.into_iter()
189189
// As per the spec, Nones are perfectly acceptable, for example if a transaction was
190190
// taken out of the mempool due to payload building after being advertised.
191-
.flatten()
192191
.collect();
193192

194193
Ok(PooledTransactions {
195194
id: self.id,
196195
pooled_transactions: txs,
197196
})
198197
}
199-
200-
/// Gets a p2p transaction given a hash.
201-
fn get_p2p_transaction(
202-
hash: &H256,
203-
blockchain: &Blockchain,
204-
) -> Result<Option<P2PTransaction>, StoreError> {
205-
let Some(tx) = blockchain.mempool.get_transaction_by_hash(*hash)? else {
206-
return Ok(None);
207-
};
208-
let result = match tx {
209-
Transaction::LegacyTransaction(itx) => P2PTransaction::LegacyTransaction(itx),
210-
Transaction::EIP2930Transaction(itx) => P2PTransaction::EIP2930Transaction(itx),
211-
Transaction::EIP1559Transaction(itx) => P2PTransaction::EIP1559Transaction(itx),
212-
Transaction::EIP4844Transaction(itx) => {
213-
let Some(bundle) = blockchain.mempool.get_blobs_bundle(*hash)? else {
214-
return Err(StoreError::Custom(format!(
215-
"Blob transaction present without its bundle: hash {}",
216-
hash
217-
)));
218-
};
219-
220-
P2PTransaction::EIP4844TransactionWithBlobs(WrappedEIP4844Transaction {
221-
tx: itx,
222-
blobs_bundle: bundle,
223-
})
224-
}
225-
Transaction::EIP7702Transaction(itx) => P2PTransaction::EIP7702Transaction(itx),
226-
Transaction::PrivilegedL2Transaction(itx) => {
227-
P2PTransaction::PrivilegedL2Transaction(itx)
228-
}
229-
};
230-
231-
Ok(Some(result))
232-
}
233198
}
234199

235200
impl RLPxMessage for GetPooledTransactions {
@@ -261,7 +226,7 @@ impl RLPxMessage for GetPooledTransactions {
261226
pub(crate) struct PooledTransactions {
262227
// id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response
263228
// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages
264-
id: u64,
229+
pub(crate) id: u64,
265230
pooled_transactions: Vec<P2PTransaction>,
266231
}
267232

@@ -273,6 +238,37 @@ impl PooledTransactions {
273238
}
274239
}
275240

241+
/// validates if the received TXs match the request
242+
pub async fn validate_requested(
243+
&self,
244+
requested: &NewPooledTransactionHashes,
245+
) -> Result<(), MempoolError> {
246+
for tx in &self.pooled_transactions {
247+
if let P2PTransaction::EIP4844TransactionWithBlobs(itx) = tx {
248+
itx.blobs_bundle.validate(&itx.tx)?;
249+
}
250+
let tx_hash = tx.compute_hash();
251+
let Some(pos) = requested
252+
.transaction_hashes
253+
.iter()
254+
.position(|&hash| hash == tx_hash)
255+
else {
256+
return Err(MempoolError::RequestedPooledTxNotFound);
257+
};
258+
259+
let expected_type = requested.transaction_types[pos];
260+
let expected_size = requested.transaction_sizes[pos];
261+
if tx.tx_type() as u8 != expected_type {
262+
return Err(MempoolError::InvalidPooledTxType(expected_type));
263+
}
264+
// remove the code from the encoding (-4)
265+
if tx.encode_to_vec().len() - 4 != expected_size {
266+
return Err(MempoolError::InvalidPooledTxSize);
267+
}
268+
}
269+
Ok(())
270+
}
271+
276272
/// Saves every incoming pooled transaction to the mempool.
277273
pub async fn handle(self, node: &Node, blockchain: &Blockchain) -> Result<(), MempoolError> {
278274
for tx in self.pooled_transactions {

0 commit comments

Comments
 (0)