diff --git a/CHANGELOG.md b/CHANGELOG.md index 496312b0cd..0cd5327eb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - Fixed `TransactionHeader` serialization for row insertion on database & fixed transaction cursor on retrievals ([#1701](https://github.com/0xMiden/node/issues/1701)). - Added KMS signing support in validator ([#1677](https://github.com/0xMiden/node/pull/1677)). - Added per-IP gRPC rate limiting across services as well as global concurrent connection limit ([#1746](https://github.com/0xMiden/node/issues/1746)). +- Users can now submit atomic transaction batches via `SubmitBatch` gRPC endpoint ([#1846](https://github.com/0xMiden/node/pull/1846)). ### Changes diff --git a/Cargo.lock b/Cargo.lock index 16ea17404b..e600d10ed1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3128,6 +3128,7 @@ dependencies = [ "miden-protocol", "miden-standards", "miden-tx", + "miden-tx-batch-prover", "reqwest", "rstest", "semver 1.0.27", diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index 34dab83a3f..549d76261f 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -200,12 +200,12 @@ impl BatchJob { batch: SelectedBatch, ) -> Result<(SelectedBatch, BatchInputs), BuildBatchError> { let block_references = batch - .txs() + .transactions() .iter() .map(Deref::deref) .map(AuthenticatedTransaction::reference_block); let unauthenticated_notes = batch - .txs() + .transactions() .iter() .map(Deref::deref) .flat_map(AuthenticatedTransaction::unauthenticated_note_commitments); @@ -325,10 +325,10 @@ impl BatchProver { impl TelemetryInjectorExt for SelectedBatch { fn inject_telemetry(&self) { Span::current().set_attribute("batch.id", self.id()); - Span::current().set_attribute("transactions.count", self.txs().len()); + Span::current().set_attribute("transactions.count", self.transactions().len()); // Accumulate all telemetry based on transactions. let (tx_ids, input_notes_count, output_notes_count, unauth_notes_count) = - self.txs().iter().fold( + self.transactions().iter().fold( (vec![], 0, 0, 0), |( mut tx_ids, diff --git a/crates/block-producer/src/domain/batch.rs b/crates/block-producer/src/domain/batch.rs index 0fd2029bfc..9c3cbb7e17 100644 --- a/crates/block-producer/src/domain/batch.rs +++ b/crates/block-producer/src/domain/batch.rs @@ -34,10 +34,6 @@ impl SelectedBatch { self.id } - pub(crate) fn txs(&self) -> &[Arc] { - &self.txs - } - pub(crate) fn into_transactions(self) -> Vec> { self.txs } diff --git a/crates/block-producer/src/domain/transaction.rs b/crates/block-producer/src/domain/transaction.rs index 6b59ea3be2..06e2bb4bde 100644 --- a/crates/block-producer/src/domain/transaction.rs +++ b/crates/block-producer/src/domain/transaction.rs @@ -46,7 +46,7 @@ impl AuthenticatedTransaction { /// /// Returns an error if any of the transaction's nullifiers are marked as spent by the inputs. pub fn new_unchecked( - tx: ProvenTransaction, + tx: Arc, inputs: TransactionInputs, ) -> Result { let nullifiers_already_spent = tx @@ -58,7 +58,7 @@ impl AuthenticatedTransaction { } Ok(AuthenticatedTransaction { - inner: Arc::new(tx), + inner: tx, notes_authenticated_by_store: inputs.found_unauthenticated_notes, authentication_height: inputs.current_block_height, store_account_state: inputs.account_commitment, @@ -128,6 +128,10 @@ impl AuthenticatedTransaction { pub fn expires_at(&self) -> BlockNumber { self.inner.expiration_block_num() } + + pub fn raw_proven_transaction(&self) -> &ProvenTransaction { + &self.inner + } } #[cfg(test)] @@ -151,7 +155,7 @@ impl AuthenticatedTransaction { current_block_height: 0.into(), }; // SAFETY: nullifiers were set to None aka are definitely unspent. - Self::new_unchecked(inner, inputs).unwrap() + Self::new_unchecked(Arc::new(inner), inputs).unwrap() } /// Overrides the authentication height with the given value. @@ -171,8 +175,4 @@ impl AuthenticatedTransaction { self.store_account_state = None; self } - - pub fn raw_proven_transaction(&self) -> &ProvenTransaction { - &self.inner - } } diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index 9e344e8313..7b0cca0246 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -34,12 +34,12 @@ pub enum BlockProducerError { }, } -// Transaction adding errors +// Add transaction and add user batch errors // ================================================================================================= #[derive(Debug, Error, GrpcError)] -pub enum AddTransactionError { - #[error("failed to retrieve transaction inputs from the store")] +pub enum MempoolSubmissionError { + #[error("failed to retrieve inputs from the store")] #[grpc(internal)] StoreConnectionFailed(#[source] StoreError), @@ -55,7 +55,7 @@ pub enum AddTransactionError { stale_limit: BlockNumber, }, - #[error("transaction deserialization failed")] + #[error("request deserialization failed")] TransactionDeserializationFailed(#[source] miden_protocol::utils::DeserializationError), #[error( @@ -93,16 +93,6 @@ pub enum StateConflict { }, } -// Submit proven batch by user errors -// ================================================================================================= - -#[derive(Debug, Error, GrpcError)] -#[grpc(internal)] -pub enum SubmitProvenBatchError { - #[error("batch deserialization failed")] - Deserialization(#[source] miden_protocol::utils::DeserializationError), -} - // Batch building errors // ================================================================================================= diff --git a/crates/block-producer/src/mempool/graph/batch.rs b/crates/block-producer/src/mempool/graph/batch.rs index 929f824381..be4b264531 100644 --- a/crates/block-producer/src/mempool/graph/batch.rs +++ b/crates/block-producer/src/mempool/graph/batch.rs @@ -18,15 +18,15 @@ impl GraphNode for SelectedBatch { type Id = BatchId; fn nullifiers(&self) -> Box + '_> { - Box::new(self.txs().iter().flat_map(|tx| tx.nullifiers())) + Box::new(self.transactions().iter().flat_map(|tx| tx.nullifiers())) } fn output_notes(&self) -> Box + '_> { - Box::new(self.txs().iter().flat_map(|tx| tx.output_note_commitments())) + Box::new(self.transactions().iter().flat_map(|tx| tx.output_note_commitments())) } fn unauthenticated_notes(&self) -> Box + '_> { - Box::new(self.txs().iter().flat_map(|tx| tx.unauthenticated_note_commitments())) + Box::new(self.transactions().iter().flat_map(|tx| tx.unauthenticated_note_commitments())) } fn account_updates( @@ -134,9 +134,9 @@ impl BatchGraph { /// /// Panics if the batch does not exist, or has existing ancestors in the batch /// graph. - pub fn prune(&mut self, batch: BatchId) { - self.inner.prune(batch); + pub fn prune(&mut self, batch: BatchId) -> SelectedBatch { self.proven.remove(&batch); + self.inner.prune(batch) } pub fn proven_count(&self) -> usize { diff --git a/crates/block-producer/src/mempool/graph/dag.rs b/crates/block-producer/src/mempool/graph/dag.rs index 8952f6db53..f17c4f6778 100644 --- a/crates/block-producer/src/mempool/graph/dag.rs +++ b/crates/block-producer/src/mempool/graph/dag.rs @@ -219,14 +219,14 @@ where /// # Panics /// /// Panics if this node has any ancestor nodes, or if this node was not selected. - pub fn prune(&mut self, id: N::Id) { + pub fn prune(&mut self, id: N::Id) -> N { assert!( self.edges.parents_of(&id).is_empty(), "Cannot prune node {id} as it still has ancestors", ); assert!(self.selected.contains(&id), "Cannot prune node {id} as it was not selected"); - self.remove(id); + self.remove(id) } /// Unconditionally removes the given node from the graph, deleting its edges and state. diff --git a/crates/block-producer/src/mempool/graph/transaction.rs b/crates/block-producer/src/mempool/graph/transaction.rs index c1ecfd93f4..0672ecc40c 100644 --- a/crates/block-producer/src/mempool/graph/transaction.rs +++ b/crates/block-producer/src/mempool/graph/transaction.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use miden_protocol::Word; use miden_protocol::account::AccountId; +use miden_protocol::batch::BatchId; use miden_protocol::block::BlockNumber; use miden_protocol::note::Nullifier; use miden_protocol::transaction::TransactionId; @@ -75,6 +76,9 @@ pub struct TransactionGraph { /// These are batch or block proving errors in which the transaction was a part of. This is /// used to identify potentially buggy transactions that should be evicted. failures: HashMap, + + user_batch_txs: HashMap>, + txs_user_batch: HashMap, } impl TransactionGraph { @@ -85,16 +89,100 @@ impl TransactionGraph { self.inner.append(tx) } - pub fn select_batch(&mut self, mut budget: BatchBudget) -> Option { + pub fn append_user_batch( + &mut self, + batch: &[Arc], + ) -> Result<(), StateConflict> { + let batch_id = + BatchId::from_transactions(batch.iter().map(|tx| tx.raw_proven_transaction())); + + // Append each transaction, but revert atomically on error. + for (idx, tx) in batch.iter().enumerate() { + if let Err(err) = self.append(Arc::clone(tx)) { + // We revert in reverse order because inner.revert panics if the node doesn't exist. + for tx in batch.iter().take(idx).rev() { + let reverted = self.inner.revert_node_and_descendants(tx.id()); + assert_eq!(reverted.len(), 1); + assert_eq!(&reverted[0], tx); + } + + return Err(err); + } + } + + let txs = batch.iter().map(GraphNode::id).collect::>(); + for tx in &txs { + self.txs_user_batch.insert(*tx, batch_id); + } + self.user_batch_txs.insert(batch_id, txs); + + Ok(()) + } + + pub fn select_batch(&mut self, budget: BatchBudget) -> Option { + self.select_user_batch().or_else(|| self.select_conventional_batch(budget)) + } + + fn select_user_batch(&mut self) -> Option { + // Comb through all user batch candidates. + let candidate_batches = self + .inner + .selection_candidates() + .values() + .filter_map(|tx| self.txs_user_batch.get(&tx.id())) + .copied() + .collect::>(); + + 'outer: for candidate in candidate_batches { + let mut selected = SelectedBatch::builder(); + + let txs = self + .user_batch_txs + .get(&candidate) + .cloned() + .expect("bi-directional mapping should be coherent"); + + for tx in txs { + let Some(tx) = self.inner.selection_candidates().get(&tx).copied() else { + // Rollback this batch selection since it cannot complete. + for tx in selected.txs.into_iter().rev() { + self.inner.deselect(tx.id()); + } + + continue 'outer; + }; + let tx = Arc::clone(tx); + + self.inner.select_candidate(tx.id()); + selected.push(tx); + } + + assert!(!selected.is_empty(), "User batch should not be empty"); + return Some(selected.build()); + } + + None + } + + fn select_conventional_batch(&mut self, mut budget: BatchBudget) -> Option { let mut selected = SelectedBatch::builder(); - while let Some((id, tx)) = self.inner.selection_candidates().pop_first() { - if budget.check_then_subtract(tx) == BudgetStatus::Exceeded { + loop { + // Select arbitrary candidate which is _not_ part of a user batch. + let candidates = self.inner.selection_candidates(); + let Some(candidate) = + candidates.values().find(|tx| !self.txs_user_batch.contains_key(&tx.id())) + else { + break; + }; + + if budget.check_then_subtract(candidate) == BudgetStatus::Exceeded { break; } - selected.push(Arc::clone(tx)); - self.inner.select_candidate(*id); + let candidate = Arc::clone(candidate); + self.inner.select_candidate(candidate.id()); + selected.push(candidate); } if selected.is_empty() { @@ -127,22 +215,40 @@ impl TransactionGraph { /// /// This includes batches that have been marked as proven. /// - /// Returns the reverted batches in the _reverse_ chronological order they were appended in. + /// Returns the reverted transactions in the _reverse_ chronological order they were appended + /// in. pub fn revert_tx_and_descendants(&mut self, transaction: TransactionId) -> Vec { - // We need this check because `inner.revert..` panics if the node is unknown. - if !self.inner.contains(&transaction) { - return Vec::default(); - } + // This is a bit more involved because we also need to atomically revert user batches. + let mut to_revert = vec![transaction]; + let mut reverted = Vec::new(); + + while let Some(revert) = to_revert.pop() { + // We need this check because `inner.revert..` panics if the node is unknown. + // + // And this transaction might already have been reverted as part of descendents in a + // prior loop. + if !self.inner.contains(&revert) { + continue; + } - let reverted = self - .inner - .revert_node_and_descendants(transaction) - .into_iter() - .map(|tx| tx.id()) - .collect(); + let reverted_now = self.inner.revert_node_and_descendants(revert); + + // Clean up book keeping and also revert transactions from the same user batch, if any. + for tx in &reverted_now { + self.failures.remove(&tx.id()); + + // Note that this is a pretty rough shod approach. We just dump the entire batch of + // transactions in, which will result in at least the current + // transaction being duplicated in `to_revert`. This isn't a concern + // though since we skip already processed transactions at the top of the loop. + if let Some(batch) = self.txs_user_batch.remove(&tx.id()) { + if let Some(batch) = self.user_batch_txs.remove(&batch) { + to_revert.extend(batch); + } + } + } - for tx in &reverted { - self.failures.remove(tx); + reverted.extend(reverted_now.into_iter().map(|tx| tx.id())); } reverted @@ -187,15 +293,19 @@ impl TransactionGraph { reverted } - /// Prunes the given transaction. + /// Prunes the given given batch's transactions. /// /// # Panics /// - /// Panics if the transaction does not exist, or has existing ancestors in the transaction + /// Panics if the transactions do not exist, or has existing ancestors in the transaction /// graph. - pub fn prune(&mut self, transaction: TransactionId) { - self.inner.prune(transaction); - self.failures.remove(&transaction); + pub fn prune(&mut self, batch: &SelectedBatch) { + for tx in batch.transactions() { + self.inner.prune(tx.id()); + self.failures.remove(&tx.id()); + self.txs_user_batch.remove(&tx.id()); + } + self.user_batch_txs.remove(&batch.id()); } /// Number of transactions which have not been selected for inclusion in a batch. diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 3ac5bb3291..dbdc64ae4a 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -65,7 +65,8 @@ use tracing::instrument; use crate::domain::batch::SelectedBatch; use crate::domain::transaction::AuthenticatedTransaction; -use crate::errors::{AddTransactionError, StateConflict}; +use crate::errors::{MempoolSubmissionError, StateConflict}; +use crate::mempool::budget::BudgetStatus; use crate::{ COMPONENT, DEFAULT_MEMPOOL_TX_CAPACITY, @@ -229,22 +230,59 @@ impl Mempool { pub fn add_transaction( &mut self, tx: Arc, - ) -> Result { + ) -> Result { if self.unbatched_transactions_count() >= self.config.tx_capacity.get() { - return Err(AddTransactionError::CapacityExceeded); + return Err(MempoolSubmissionError::CapacityExceeded); } self.authentication_staleness_check(tx.authentication_height())?; self.expiration_check(tx.expires_at())?; self.transactions .append(Arc::clone(&tx)) - .map_err(AddTransactionError::StateConflict)?; + .map_err(MempoolSubmissionError::StateConflict)?; self.subscription.transaction_added(&tx); self.inject_telemetry(); Ok(self.chain_tip) } + #[instrument(target = COMPONENT, name = "mempool.add_user_batch", skip_all)] + pub fn add_user_batch( + &mut self, + txs: &[Arc], + ) -> Result { + assert!(!txs.is_empty(), "Cannot have a batch with no transactions"); + + if self.unbatched_transactions_count() + txs.len() > self.config.tx_capacity.get() { + return Err(MempoolSubmissionError::CapacityExceeded); + } + + // Ensure the batch doesn't exceed the mempool budget for batches. + let mut budget = self.config.batch_budget; + for tx in txs { + if budget.check_then_subtract(tx) == BudgetStatus::Exceeded { + // TODO: better error plox. + return Err(MempoolSubmissionError::CapacityExceeded); + } + } + + for tx in txs { + self.authentication_staleness_check(tx.authentication_height())?; + self.expiration_check(tx.expires_at())?; + } + + self.transactions + .append_user_batch(txs) + .map_err(MempoolSubmissionError::StateConflict)?; + + for tx in txs { + self.subscription.transaction_added(tx); + } + self.inject_telemetry(); + + Ok(self.chain_tip) + } + /// Returns a set of transactions for the next batch. /// /// Transactions are returned in a valid execution ordering. @@ -478,15 +516,8 @@ impl Mempool { // // The same logic follows for transactions. for batch in block.iter().map(|batch| batch.id()) { - self.batches.prune(batch); - } - - for tx in block - .iter() - .flat_map(|batch| batch.transactions().as_slice()) - .map(TransactionHeader::id) - { - self.transactions.prune(tx); + let batch = self.batches.prune(batch); + self.transactions.prune(&batch); } } @@ -518,14 +549,14 @@ impl Mempool { fn authentication_staleness_check( &self, authentication_height: BlockNumber, - ) -> Result<(), AddTransactionError> { + ) -> Result<(), MempoolSubmissionError> { let limit = self .chain_tip .checked_sub(self.committed_blocks.len() as u32) .expect("amount of committed blocks cannot exceed the chain tip"); if authentication_height < limit { - return Err(AddTransactionError::StaleInputs { + return Err(MempoolSubmissionError::StaleInputs { input_block: authentication_height, stale_limit: limit, }); @@ -540,10 +571,10 @@ impl Mempool { Ok(()) } - fn expiration_check(&self, expired_at: BlockNumber) -> Result<(), AddTransactionError> { + fn expiration_check(&self, expired_at: BlockNumber) -> Result<(), MempoolSubmissionError> { let limit = self.chain_tip + self.config.expiration_slack; if expired_at <= limit { - return Err(AddTransactionError::Expired { expired_at, limit }); + return Err(MempoolSubmissionError::Expired { expired_at, limit }); } Ok(()) diff --git a/crates/block-producer/src/mempool/tests.rs b/crates/block-producer/src/mempool/tests.rs index 4c7059acf1..b83af0cc8e 100644 --- a/crates/block-producer/src/mempool/tests.rs +++ b/crates/block-producer/src/mempool/tests.rs @@ -12,6 +12,7 @@ use crate::test_utils::MockProvenTxBuilder; use crate::test_utils::batch::TransactionBatchConstructor; mod add_transaction; +mod add_user_batch; impl Mempool { /// Returns an empty [`Mempool`] and a perfect clone intended for use as the Unit Under Test and @@ -71,15 +72,15 @@ fn children_of_failed_batches_are_ignored() { let (mut uut, _) = Mempool::for_tests(); uut.add_transaction(txs[0].clone()).unwrap(); let parent_batch = uut.select_batch().unwrap(); - assert_eq!(parent_batch.txs(), vec![txs[0].clone()]); + assert_eq!(parent_batch.transactions(), vec![txs[0].clone()]); uut.add_transaction(txs[1].clone()).unwrap(); let child_batch_a = uut.select_batch().unwrap(); - assert_eq!(child_batch_a.txs(), vec![txs[1].clone()]); + assert_eq!(child_batch_a.transactions(), vec![txs[1].clone()]); uut.add_transaction(txs[2].clone()).unwrap(); let next_batch = uut.select_batch().unwrap(); - assert_eq!(next_batch.txs(), vec![txs[2].clone()]); + assert_eq!(next_batch.transactions(), vec![txs[2].clone()]); // Child batch jobs are now dangling. uut.rollback_batch(parent_batch.id()); @@ -118,7 +119,7 @@ fn failed_batch_transactions_are_requeued() { reference.add_transaction(txs[2].clone()).unwrap(); reference .transactions - .increment_failure_count(failed_batch.txs().iter().map(|tx| tx.id())); + .increment_failure_count(failed_batch.transactions().iter().map(|tx| tx.id())); assert_eq!(uut, reference); } @@ -326,9 +327,9 @@ fn pass_through_txs_on_an_empty_account() { // Ensure the batch contains a,b and final. Final should also be the last tx since its order // is required. - assert!(batch.txs().contains(&tx_pass_through_a)); - assert!(batch.txs().contains(&tx_pass_through_b)); - assert_eq!(batch.txs().last().unwrap(), &tx_final); + assert!(batch.transactions().contains(&tx_pass_through_a)); + assert!(batch.transactions().contains(&tx_pass_through_b)); + assert_eq!(batch.transactions().last().unwrap(), &tx_final); } /// Tests that pass through transactions retain parent-child relations based on notes, even though @@ -366,11 +367,11 @@ fn pass_through_txs_with_note_dependencies() { // relationship was correctly inferred by the mempool. uut.add_transaction(tx_pass_through_a.clone()).unwrap(); let batch_a = uut.select_batch().unwrap(); - assert_eq!(batch_a.txs(), std::slice::from_ref(&tx_pass_through_a)); + assert_eq!(batch_a.transactions(), std::slice::from_ref(&tx_pass_through_a)); uut.add_transaction(tx_pass_through_b.clone()).unwrap(); let batch_b = uut.select_batch().unwrap(); - assert_eq!(batch_b.txs(), std::slice::from_ref(&tx_pass_through_b)); + assert_eq!(batch_b.transactions(), std::slice::from_ref(&tx_pass_through_b)); // Rollback (a) and check that (b) also reverted by comparing to the reference. uut.rollback_batch(batch_a.id()); @@ -378,7 +379,7 @@ fn pass_through_txs_with_note_dependencies() { reference.add_transaction(tx_pass_through_b).unwrap(); reference .transactions - .increment_failure_count(batch_a.txs().iter().map(|tx| tx.id())); + .increment_failure_count(batch_a.transactions().iter().map(|tx| tx.id())); assert_eq!(uut, reference); } diff --git a/crates/block-producer/src/mempool/tests/add_transaction.rs b/crates/block-producer/src/mempool/tests/add_transaction.rs index 4747f179b3..559c63936b 100644 --- a/crates/block-producer/src/mempool/tests/add_transaction.rs +++ b/crates/block-producer/src/mempool/tests/add_transaction.rs @@ -5,7 +5,7 @@ use miden_protocol::Word; use miden_protocol::block::BlockHeader; use crate::domain::transaction::AuthenticatedTransaction; -use crate::errors::{AddTransactionError, StateConflict}; +use crate::errors::{MempoolSubmissionError, StateConflict}; use crate::mempool::Mempool; use crate::test_utils::{MockProvenTxBuilder, mock_account_id}; @@ -105,7 +105,7 @@ mod tx_expiration { assert_matches!( result, - Err(AddTransactionError::Expired { .. }), + Err(MempoolSubmissionError::Expired { .. }), "Failed run with expiration {i} and limit {limit}" ); } @@ -121,7 +121,7 @@ mod tx_expiration { let tx = Arc::new(tx); let result = uut.add_transaction(tx); - assert_matches!(result, Err(AddTransactionError::Expired { .. })); + assert_matches!(result, Err(MempoolSubmissionError::Expired { .. })); } } @@ -235,7 +235,7 @@ fn duplicate_nullifiers_are_rejected() { // We overlap with one nullifier. assert_matches!( result, - Err(AddTransactionError::StateConflict(StateConflict::NullifiersAlreadyExist(..))) + Err(MempoolSubmissionError::StateConflict(StateConflict::NullifiersAlreadyExist(..))) ); } @@ -268,7 +268,9 @@ fn duplicate_output_notes_are_rejected() { assert_matches!( result, - Err(AddTransactionError::StateConflict(StateConflict::OutputNotesAlreadyExist(..))) + Err(MempoolSubmissionError::StateConflict(StateConflict::OutputNotesAlreadyExist( + .. + ))) ); } @@ -301,9 +303,9 @@ fn unknown_unauthenticated_notes_are_rejected() { assert_matches!( result, - Err(AddTransactionError::StateConflict(StateConflict::UnauthenticatedNotesMissing( - .. - ))) + Err(MempoolSubmissionError::StateConflict( + StateConflict::UnauthenticatedNotesMissing(..) + )) ); } @@ -403,7 +405,7 @@ mod account_state { assert_matches!( result, - Err(AddTransactionError::StateConflict( + Err(MempoolSubmissionError::StateConflict( StateConflict::AccountCommitmentMismatch { .. } )) ); @@ -433,7 +435,7 @@ mod account_state { let result = uut.add_transaction(tx); assert_matches!( result, - Err(AddTransactionError::StateConflict( + Err(MempoolSubmissionError::StateConflict( StateConflict::AccountCommitmentMismatch { .. } )) ); @@ -483,7 +485,7 @@ mod new_account { let result = uut.add_transaction(tx); assert_matches!( result, - Err(AddTransactionError::StateConflict( + Err(MempoolSubmissionError::StateConflict( StateConflict::AccountCommitmentMismatch { .. } )) ); @@ -509,7 +511,7 @@ mod new_account { let result = uut.add_transaction(tx); assert_matches!( result, - Err(AddTransactionError::StateConflict( + Err(MempoolSubmissionError::StateConflict( StateConflict::AccountCommitmentMismatch { .. } )) ); diff --git a/crates/block-producer/src/mempool/tests/add_user_batch.rs b/crates/block-producer/src/mempool/tests/add_user_batch.rs new file mode 100644 index 0000000000..6d4aa3aeec --- /dev/null +++ b/crates/block-producer/src/mempool/tests/add_user_batch.rs @@ -0,0 +1,107 @@ +use std::sync::Arc; + +use assert_matches::assert_matches; +use miden_protocol::batch::BatchId; +use pretty_assertions::assert_eq; + +use crate::domain::transaction::AuthenticatedTransaction; +use crate::errors::{MempoolSubmissionError, StateConflict}; +use crate::mempool::Mempool; +use crate::test_utils::MockProvenTxBuilder; + +/// This checks that transactions from a user batch remain as the same batch upon selection. +/// +/// Since the selection process is random, its difficult to test this directly, but this at +/// least acts as a smoke test. We select two batches and check that one of them is the user +/// batch. +#[test] +fn user_batch_is_isolated_from_other_transactions() { + let (mut uut, _) = Mempool::for_tests(); + + let conventional_a = build_tx(MockProvenTxBuilder::with_account_index(200)); + let conventional_b = build_tx(MockProvenTxBuilder::with_account_index(201)); + + uut.add_transaction(conventional_a.clone()).unwrap(); + uut.add_transaction(conventional_b.clone()).unwrap(); + + let user_batch_txs = MockProvenTxBuilder::sequential(); + let user_batch_id = + BatchId::from_transactions(user_batch_txs.iter().map(|tx| tx.raw_proven_transaction())); + uut.add_user_batch(&user_batch_txs).unwrap(); + + let batch_a = uut.select_batch().unwrap(); + let batch_b = uut.select_batch().unwrap(); + + let (user, conventional) = if batch_a.id() == user_batch_id { + (batch_a, batch_b) + } else { + (batch_b, batch_a) + }; + + assert_eq!(user.id(), user_batch_id); + assert_eq!(user.transactions(), user_batch_txs.as_slice()); + + assert_eq!(conventional.transactions().len(), 2); + assert!(conventional.transactions().contains(&conventional_a)); + assert!(conventional.transactions().contains(&conventional_b)); +} + +#[test] +fn user_batch_respects_batch_budget() { + let (mut uut, _) = Mempool::for_tests(); + uut.config.batch_budget.transactions = 1; + + let user_batch_txs = MockProvenTxBuilder::sequential(); + let result = uut.add_user_batch(&user_batch_txs[..2]); + + assert_matches!(result, Err(MempoolSubmissionError::CapacityExceeded)); +} + +#[test] +fn user_batch_with_internal_state_conflicts_are_rejected() { + let (mut uut, reference) = Mempool::for_tests(); + + let conflicting_a = tx_with_nullifiers(10, 0..1); + let conflicting_b = tx_with_nullifiers(11, 0..1); + + let result = uut.add_user_batch(&[conflicting_a.clone(), conflicting_b.clone()]); + + assert_matches!( + result, + Err(MempoolSubmissionError::StateConflict(StateConflict::NullifiersAlreadyExist(..))) + ); + + assert_eq!(uut, reference); +} + +#[test] +fn user_batch_conflicts_with_existing_state_are_rejected() { + let (mut uut, mut reference) = Mempool::for_tests(); + + let existing = tx_with_nullifiers(20, 5..6); + uut.add_transaction(existing.clone()).unwrap(); + reference.add_transaction(existing.clone()).unwrap(); + + let conflicting = tx_with_nullifiers(21, 5..6); + let companion = tx_with_nullifiers(22, 6..7); + + let result = uut.add_user_batch(&[conflicting.clone(), companion.clone()]); + + assert_matches!( + result, + Err(MempoolSubmissionError::StateConflict(StateConflict::NullifiersAlreadyExist(..))) + ); + + assert_eq!(uut, reference); +} + +fn build_tx(builder: MockProvenTxBuilder) -> Arc { + Arc::new(AuthenticatedTransaction::from_inner(builder.build())) +} + +fn tx_with_nullifiers( + account_index: u32, + range: std::ops::Range, +) -> Arc { + build_tx(MockProvenTxBuilder::with_account_index(account_index).nullifiers_range(range)) +} diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index aad5335c5d..dd4c63da7b 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -14,7 +14,7 @@ use miden_node_utils::clap::GrpcOptionsInternal; use miden_node_utils::formatting::{format_input_notes, format_output_notes}; use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn}; use miden_node_utils::tracing::grpc::grpc_trace_fn; -use miden_protocol::batch::ProvenBatch; +use miden_protocol::batch::ProposedBatch; use miden_protocol::block::BlockNumber; use miden_protocol::transaction::ProvenTransaction; use miden_protocol::utils::serde::Deserializable; @@ -29,7 +29,7 @@ use url::Url; use crate::batch_builder::BatchBuilder; use crate::block_builder::BlockBuilder; use crate::domain::transaction::AuthenticatedTransaction; -use crate::errors::{AddTransactionError, BlockProducerError, StoreError, SubmitProvenBatchError}; +use crate::errors::{BlockProducerError, MempoolSubmissionError, StoreError}; use crate::mempool::{BatchBudget, BlockBudget, Mempool, MempoolConfig, SharedMempool}; use crate::store::StoreClient; use crate::validator::BlockProducerValidatorClient; @@ -314,11 +314,11 @@ impl BlockProducerRpcServer { async fn submit_proven_transaction( &self, request: proto::transaction::ProvenTransaction, - ) -> Result { + ) -> Result { debug!(target: COMPONENT, ?request); let tx = ProvenTransaction::read_from_bytes(&request.transaction) - .map_err(AddTransactionError::TransactionDeserializationFailed)?; + .map_err(MempoolSubmissionError::TransactionDeserializationFailed)?; let tx_id = tx.id(); @@ -339,20 +339,14 @@ impl BlockProducerRpcServer { .store .get_tx_inputs(&tx) .await - .map_err(AddTransactionError::StoreConnectionFailed)?; + .map_err(MempoolSubmissionError::StoreConnectionFailed)?; // SAFETY: we assume that the rpc component has verified the transaction proof already. - let tx = AuthenticatedTransaction::new_unchecked(tx, inputs) + let tx = AuthenticatedTransaction::new_unchecked(Arc::new(tx), inputs) .map(Arc::new) - .map_err(AddTransactionError::StateConflict)?; + .map_err(MempoolSubmissionError::StateConflict)?; - self.mempool - .lock() - .await - .lock() - .await - .add_transaction(tx) - .map(|block_height| proto::blockchain::BlockNumber { block_num: block_height.as_u32() }) + self.mempool.lock().await.lock().await.add_transaction(tx).map(Into::into) } #[instrument( @@ -361,14 +355,33 @@ impl BlockProducerRpcServer { skip_all, err )] - async fn submit_proven_batch( + async fn submit_batch( &self, - request: proto::transaction::ProvenTransactionBatch, - ) -> Result { - let _batch = ProvenBatch::read_from_bytes(&request.encoded) - .map_err(SubmitProvenBatchError::Deserialization)?; + request: proto::transaction::TransactionBatch, + ) -> Result { + let batch = ProposedBatch::read_from_bytes(&request.proposed_batch) + .map_err(MempoolSubmissionError::TransactionDeserializationFailed)?; + + // We assume that the rpc component has verified everything, including the transaction + // proofs. + + let mut txs = Vec::with_capacity(batch.transactions().len()); + for tx in batch.transactions() { + let inputs = self + .store + .get_tx_inputs(tx) + .await + .map_err(MempoolSubmissionError::StoreConnectionFailed)?; + + // SAFETY: We assume that the rpc component has verified the transaction proofs, as well + // as the batch integrity itself. + let tx = AuthenticatedTransaction::new_unchecked(Arc::clone(tx), inputs) + .map(Arc::new) + .map_err(MempoolSubmissionError::StateConflict)?; + txs.push(tx); + } - todo!(); + self.mempool.lock().await.lock().await.add_user_batch(&txs).map(Into::into) } } @@ -387,11 +400,11 @@ impl api_server::Api for BlockProducerRpcServer { .map_err(Into::into) } - async fn submit_proven_batch( + async fn submit_batch( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, Status> { - self.submit_proven_batch(request.into_inner()) + self.submit_batch(request.into_inner()) .await .map(tonic::Response::new) // This Status::from mapping takes care of hiding internal errors. diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 537173e67d..4892f315ff 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -24,6 +24,7 @@ miden-node-proto-build = { workspace = true } miden-node-utils = { workspace = true } miden-protocol = { default-features = true, workspace = true } miden-tx = { default-features = true, workspace = true } +miden-tx-batch-prover = { workspace = true } semver = { version = "1.0" } thiserror = { workspace = true } tokio = { features = ["macros", "net", "rt-multi-thread"], workspace = true } diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index a0ec88859a..2e3bd3bdae 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -17,13 +17,14 @@ use miden_node_utils::limiter::{ QueryParamNullifierLimit, QueryParamStorageMapKeyTotalLimit, }; -use miden_protocol::batch::ProvenBatch; +use miden_protocol::batch::ProposedBatch; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::note::{Note, NoteRecipient, NoteScript}; use miden_protocol::transaction::{OutputNote, ProvenTransaction, ProvenTransactionBuilder}; use miden_protocol::utils::serde::{Deserializable, Serializable}; use miden_protocol::{MIN_PROOF_SECURITY_LEVEL, Word}; use miden_tx::TransactionVerifier; +use miden_tx_batch_prover::LocalBatchProver; use tonic::{IntoRequest, Request, Response, Status}; use tracing::{debug, info}; use url::Url; @@ -387,47 +388,81 @@ impl api_server::Api for RpcService { /// Deserializes the batch, strips MAST decorators from full output note scripts, rebuilds /// the batch, then forwards it to the block producer. - async fn submit_proven_batch( + async fn submit_batch( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, Status> { let Some(block_producer) = &self.block_producer else { return Err(Status::unavailable("Batch submission not available in read-only mode")); }; - let mut request = request.into_inner(); - - let batch = ProvenBatch::read_from_bytes(&request.encoded) - .map_err(|err| Status::invalid_argument(err.as_report_context("invalid batch")))?; - - // Build a new batch with output notes' decorators removed - let stripped_outputs: Vec = - strip_output_note_decorators(batch.output_notes().iter()).collect(); + let request = request.into_inner(); - let rebuilt_batch = ProvenBatch::new( - batch.id(), - batch.reference_block_commitment(), - batch.reference_block_num(), - batch.account_updates().clone(), - batch.input_notes().clone(), - stripped_outputs, - batch.batch_expiration_block_num(), - batch.transactions().clone(), - ) - .map_err(|e| Status::invalid_argument(e.to_string()))?; + let batch = ProposedBatch::read_from_bytes(&request.proposed_batch).map_err(|err| { + Status::invalid_argument(err.as_report_context("invalid proposed_batch")) + })?; - request.encoded = rebuilt_batch.to_bytes(); + // Perform this check here since its cheap. If this passes we can safely zip inputs and + // transactions. + if request.transaction_inputs.len() != batch.transactions().len() { + return Err(Status::invalid_argument(format!( + "Number of inputs {} does not match number of transaction {} in batch", + request.transaction_inputs.len(), + batch.transactions().len() + ))); + } - // Only allow deployment transactions for new network accounts - for tx in batch.transactions().as_slice() { - if tx.account_id().is_network() && !tx.initial_state_commitment().is_empty() { + // Only allow deployment transactions for new network accounts. + for tx in batch.transactions() { + if tx.account_id().is_network() + && !tx.account_update().initial_state_commitment().is_empty() + { return Err(Status::invalid_argument( "Network transactions may not be submitted by users yet", )); } } - block_producer.clone().submit_proven_batch(request).await + // Verify batch transaction proofs. + let proof = LocalBatchProver::new(MIN_PROOF_SECURITY_LEVEL).prove(batch.clone()).map_err( + |err| Status::invalid_argument(err.as_report_context("proposed block proof failed")), + )?; + // Verify the reference header matches the canonical chain. + let reference_header = self + .get_block_header_by_number(Request::new(proto::rpc::BlockHeaderByNumberRequest { + block_num: proof.reference_block_num().as_u32().into(), + include_mmr_proof: false.into(), + })) + .await? + .into_inner() + .block_header + .expect("store should always send block header"); + let reference_commitment: Word = reference_header + .chain_commitment + .expect("store should always fill block header") + .try_into() + .expect("store Word should be okay"); + if reference_commitment != proof.reference_block_commitment() { + return Err(Status::invalid_argument(format!( + "batch reference commitment {} at block {} does not match canonical chain's commitemnt of {}", + proof.reference_block_num(), + proof.reference_block_commitment(), + reference_commitment + ))); + } + + // Submit each transaction to the validator. + // + // SAFETY: We checked earlier that the two iterators are the same length. + for (tx, inputs) in batch.transactions().iter().zip(&request.transaction_inputs) { + let request = proto::transaction::ProvenTransaction { + transaction: tx.to_bytes(), + transaction_inputs: inputs.clone().into(), + }; + self.validator.clone().submit_proven_transaction(request).await?; + } + + block_producer.clone().submit_batch(request).await } // -- Status & utility endpoints ---------------------------------------------------------- diff --git a/proto/proto/internal/block_producer.proto b/proto/proto/internal/block_producer.proto index e81f5c2452..9ab2be140a 100644 --- a/proto/proto/internal/block_producer.proto +++ b/proto/proto/internal/block_producer.proto @@ -2,12 +2,12 @@ syntax = "proto3"; package block_producer; +import "google/protobuf/empty.proto"; import "rpc.proto"; -import "types/note.proto"; import "types/blockchain.proto"; +import "types/note.proto"; import "types/primitives.proto"; import "types/transaction.proto"; -import "google/protobuf/empty.proto"; // BLOCK PRODUCER SERVICE // ================================================================================================ @@ -19,19 +19,12 @@ service Api { // Submits proven transaction to the Miden network. Returns the node's current block height. rpc SubmitProvenTransaction(transaction.ProvenTransaction) returns (blockchain.BlockNumber) {} - // Submits a proven batch to the Miden network. + // Submits a batch of transactions to the Miden network. // - // The batch may include transactions which were are: - // - // - already in the mempool i.e. previously successfully submitted - // - will be submitted to the mempool in the future - // - won't be submitted to the mempool at all - // - // All transactions in the batch but not in the mempool must build on the current mempool - // state following normal transaction submission rules. + // All transactions in this batch will be considered atomic, and be committed together or not all. // // Returns the node's current block height. - rpc SubmitProvenBatch(transaction.ProvenTransactionBatch) returns (blockchain.BlockNumber) {} + rpc SubmitBatch(transaction.TransactionBatch) returns (blockchain.BlockNumber) {} // Subscribe to mempool events. // @@ -87,5 +80,5 @@ message MempoolEvent { TransactionAdded transaction_added = 1; BlockCommitted block_committed = 2; TransactionsReverted transactions_reverted = 3; - }; + } } diff --git a/proto/proto/rpc.proto b/proto/proto/rpc.proto index 1a218539ee..d98267b9cd 100644 --- a/proto/proto/rpc.proto +++ b/proto/proto/rpc.proto @@ -2,12 +2,12 @@ syntax = "proto3"; package rpc; +import "google/protobuf/empty.proto"; import "types/account.proto"; import "types/blockchain.proto"; import "types/note.proto"; import "types/primitives.proto"; import "types/transaction.proto"; -import "google/protobuf/empty.proto"; // RPC API // ================================================================================================ @@ -59,19 +59,12 @@ service Api { // Submits proven transaction to the Miden network. Returns the node's current block height. rpc SubmitProvenTransaction(transaction.ProvenTransaction) returns (blockchain.BlockNumber) {} - // Submits a proven batch of transactions to the Miden network. - // - // The batch may include transactions which were are: - // - // - already in the mempool i.e. previously successfully submitted - // - will be submitted to the mempool in the future - // - won't be submitted to the mempool at all + // Submits a batch of transactions to the Miden network. // - // All transactions in the batch but not in the mempool must build on the current mempool - // state following normal transaction submission rules. + // All transactions in this batch will be considered atomic, and be committed together or not all. // // Returns the node's current block height. - rpc SubmitProvenBatch(transaction.ProvenTransactionBatch) returns (blockchain.BlockNumber) {} + rpc SubmitBatch(transaction.TransactionBatch) returns (blockchain.BlockNumber) {} // STATE SYNCHRONIZATION ENDPOINTS // -------------------------------------------------------------------------------------------- @@ -125,11 +118,9 @@ message RpcStatus { BlockProducerStatus block_producer = 4; } - // BLOCK PRODUCER STATUS // ================================================================================================ - // Represents the status of the block producer. message BlockProducerStatus { // The block producer's running version. @@ -268,7 +259,6 @@ message AccountRequest { // Represents the result of getting account proof. message AccountResponse { - message AccountDetails { // Account header. account.AccountHeader header = 1; diff --git a/proto/proto/types/transaction.proto b/proto/proto/types/transaction.proto index 8be04946d7..0895565934 100644 --- a/proto/proto/types/transaction.proto +++ b/proto/proto/types/transaction.proto @@ -18,10 +18,15 @@ message ProvenTransaction { optional bytes transaction_inputs = 2; } -message ProvenTransactionBatch { - // Encoded using [winter_utils::Serializable] implementation for - // [miden_protocol::transaction::proven_tx::ProvenTransaction]. - bytes encoded = 1; +message TransactionBatch { + // The proposed batch of transaction encoded using [winter_utils::Serializable] implementation + // for [miden_protocol::batch::ProposedBatch]. + bytes proposed_batch = 1; + // Each transaction's inputs encoded using [winter_utils::Serializable] implementation for + // [miden_protocol::transaction::TransactionInputs]. + // + // Order of inputs should match the transaction order in the batch. + repeated bytes transaction_inputs = 2; } // Represents a transaction ID.