Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Fixed `TransactionHeader` serialization for row insertion on database & fixed transaction cursor on retrievals ([#1701](https://github.com/0xMiden/node/issues/1701)).
- Added KMS signing support in validator ([#1677](https://github.com/0xMiden/node/pull/1677)).
- Added per-IP gRPC rate limiting across services as well as global concurrent connection limit ([#1746](https://github.com/0xMiden/node/issues/1746)).
- Users can now submit atomic transaction batches via `SubmitBatch` gRPC endpoint ([#1846](https://github.com/0xMiden/node/pull/1846)).

### Changes

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ impl BatchJob {
batch: SelectedBatch,
) -> Result<(SelectedBatch, BatchInputs), BuildBatchError> {
let block_references = batch
.txs()
.transactions()
.iter()
.map(Deref::deref)
.map(AuthenticatedTransaction::reference_block);
let unauthenticated_notes = batch
.txs()
.transactions()
.iter()
.map(Deref::deref)
.flat_map(AuthenticatedTransaction::unauthenticated_note_commitments);
Expand Down Expand Up @@ -325,10 +325,10 @@ impl BatchProver {
impl TelemetryInjectorExt for SelectedBatch {
fn inject_telemetry(&self) {
Span::current().set_attribute("batch.id", self.id());
Span::current().set_attribute("transactions.count", self.txs().len());
Span::current().set_attribute("transactions.count", self.transactions().len());
// Accumulate all telemetry based on transactions.
let (tx_ids, input_notes_count, output_notes_count, unauth_notes_count) =
self.txs().iter().fold(
self.transactions().iter().fold(
(vec![], 0, 0, 0),
|(
mut tx_ids,
Expand Down
4 changes: 0 additions & 4 deletions crates/block-producer/src/domain/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ impl SelectedBatch {
self.id
}

pub(crate) fn txs(&self) -> &[Arc<AuthenticatedTransaction>] {
&self.txs
}

pub(crate) fn into_transactions(self) -> Vec<Arc<AuthenticatedTransaction>> {
self.txs
}
Expand Down
14 changes: 7 additions & 7 deletions crates/block-producer/src/domain/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl AuthenticatedTransaction {
///
/// Returns an error if any of the transaction's nullifiers are marked as spent by the inputs.
pub fn new_unchecked(
tx: ProvenTransaction,
tx: Arc<ProvenTransaction>,
inputs: TransactionInputs,
) -> Result<AuthenticatedTransaction, StateConflict> {
let nullifiers_already_spent = tx
Expand All @@ -58,7 +58,7 @@ impl AuthenticatedTransaction {
}

Ok(AuthenticatedTransaction {
inner: Arc::new(tx),
inner: tx,
notes_authenticated_by_store: inputs.found_unauthenticated_notes,
authentication_height: inputs.current_block_height,
store_account_state: inputs.account_commitment,
Expand Down Expand Up @@ -128,6 +128,10 @@ impl AuthenticatedTransaction {
pub fn expires_at(&self) -> BlockNumber {
self.inner.expiration_block_num()
}

pub fn raw_proven_transaction(&self) -> &ProvenTransaction {
&self.inner
}
}

#[cfg(test)]
Expand All @@ -151,7 +155,7 @@ impl AuthenticatedTransaction {
current_block_height: 0.into(),
};
// SAFETY: nullifiers were set to None aka are definitely unspent.
Self::new_unchecked(inner, inputs).unwrap()
Self::new_unchecked(Arc::new(inner), inputs).unwrap()
}

/// Overrides the authentication height with the given value.
Expand All @@ -171,8 +175,4 @@ impl AuthenticatedTransaction {
self.store_account_state = None;
self
}

pub fn raw_proven_transaction(&self) -> &ProvenTransaction {
&self.inner
}
}
18 changes: 4 additions & 14 deletions crates/block-producer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ pub enum BlockProducerError {
},
}

// Transaction adding errors
// Add transaction and add user batch errors
// =================================================================================================

#[derive(Debug, Error, GrpcError)]
pub enum AddTransactionError {
#[error("failed to retrieve transaction inputs from the store")]
pub enum MempoolSubmissionError {
#[error("failed to retrieve inputs from the store")]
#[grpc(internal)]
StoreConnectionFailed(#[source] StoreError),

Expand All @@ -55,7 +55,7 @@ pub enum AddTransactionError {
stale_limit: BlockNumber,
},

#[error("transaction deserialization failed")]
#[error("request deserialization failed")]
TransactionDeserializationFailed(#[source] miden_protocol::utils::DeserializationError),

#[error(
Expand Down Expand Up @@ -93,16 +93,6 @@ pub enum StateConflict {
},
}

// Submit proven batch by user errors
// =================================================================================================

#[derive(Debug, Error, GrpcError)]
#[grpc(internal)]
pub enum SubmitProvenBatchError {
#[error("batch deserialization failed")]
Deserialization(#[source] miden_protocol::utils::DeserializationError),
}

// Batch building errors
// =================================================================================================

Expand Down
10 changes: 5 additions & 5 deletions crates/block-producer/src/mempool/graph/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ impl GraphNode for SelectedBatch {
type Id = BatchId;

fn nullifiers(&self) -> Box<dyn Iterator<Item = Nullifier> + '_> {
Box::new(self.txs().iter().flat_map(|tx| tx.nullifiers()))
Box::new(self.transactions().iter().flat_map(|tx| tx.nullifiers()))
}

fn output_notes(&self) -> Box<dyn Iterator<Item = Word> + '_> {
Box::new(self.txs().iter().flat_map(|tx| tx.output_note_commitments()))
Box::new(self.transactions().iter().flat_map(|tx| tx.output_note_commitments()))
}

fn unauthenticated_notes(&self) -> Box<dyn Iterator<Item = Word> + '_> {
Box::new(self.txs().iter().flat_map(|tx| tx.unauthenticated_note_commitments()))
Box::new(self.transactions().iter().flat_map(|tx| tx.unauthenticated_note_commitments()))
}

fn account_updates(
Expand Down Expand Up @@ -134,9 +134,9 @@ impl BatchGraph {
///
/// Panics if the batch does not exist, or has existing ancestors in the batch
/// graph.
pub fn prune(&mut self, batch: BatchId) {
self.inner.prune(batch);
pub fn prune(&mut self, batch: BatchId) -> SelectedBatch {
self.proven.remove(&batch);
self.inner.prune(batch)
}

pub fn proven_count(&self) -> usize {
Expand Down
4 changes: 2 additions & 2 deletions crates/block-producer/src/mempool/graph/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ where
/// # Panics
///
/// Panics if this node has any ancestor nodes, or if this node was not selected.
pub fn prune(&mut self, id: N::Id) {
pub fn prune(&mut self, id: N::Id) -> N {
assert!(
self.edges.parents_of(&id).is_empty(),
"Cannot prune node {id} as it still has ancestors",
);
assert!(self.selected.contains(&id), "Cannot prune node {id} as it was not selected");

self.remove(id);
self.remove(id)
}

/// Unconditionally removes the given node from the graph, deleting its edges and state.
Expand Down
156 changes: 133 additions & 23 deletions crates/block-producer/src/mempool/graph/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use miden_protocol::Word;
use miden_protocol::account::AccountId;
use miden_protocol::batch::BatchId;
use miden_protocol::block::BlockNumber;
use miden_protocol::note::Nullifier;
use miden_protocol::transaction::TransactionId;
Expand Down Expand Up @@ -75,6 +76,9 @@ pub struct TransactionGraph {
/// These are batch or block proving errors in which the transaction was a part of. This is
/// used to identify potentially buggy transactions that should be evicted.
failures: HashMap<TransactionId, u32>,

user_batch_txs: HashMap<BatchId, Vec<TransactionId>>,
txs_user_batch: HashMap<TransactionId, BatchId>,
}

impl TransactionGraph {
Expand All @@ -85,16 +89,100 @@ impl TransactionGraph {
self.inner.append(tx)
}

pub fn select_batch(&mut self, mut budget: BatchBudget) -> Option<SelectedBatch> {
pub fn append_user_batch(
&mut self,
batch: &[Arc<AuthenticatedTransaction>],
) -> Result<(), StateConflict> {
let batch_id =
BatchId::from_transactions(batch.iter().map(|tx| tx.raw_proven_transaction()));

// Append each transaction, but revert atomically on error.
for (idx, tx) in batch.iter().enumerate() {
if let Err(err) = self.append(Arc::clone(tx)) {
// We revert in reverse order because inner.revert panics if the node doesn't exist.
for tx in batch.iter().take(idx).rev() {
let reverted = self.inner.revert_node_and_descendants(tx.id());
assert_eq!(reverted.len(), 1);
assert_eq!(&reverted[0], tx);
}

return Err(err);
}
}

let txs = batch.iter().map(GraphNode::id).collect::<Vec<_>>();
for tx in &txs {
self.txs_user_batch.insert(*tx, batch_id);
}
self.user_batch_txs.insert(batch_id, txs);

Ok(())
}

pub fn select_batch(&mut self, budget: BatchBudget) -> Option<SelectedBatch> {
self.select_user_batch().or_else(|| self.select_conventional_batch(budget))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want some doc comments to make it clear that budget is intended to only relevant for conventional batches.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also are we OK with user batches always taking priority over conventional here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also wondering if we need to prevent user batches of size 1 (or some other limit). Unsure if that is relevant to this PR just a general thought.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also are we OK with user batches always taking priority over conventional here?

I'm unsure, but at the moment it doesn't matter much. If its a concern we can make it random -- I was thinking maybe that's best.

Also wondering if we need to prevent user batches of size 1 (or some other limit). Unsure if that is relevant to this PR just a general thought.

Good question. I'm unsure 😬 I wonder if that makes some user loop more difficult i.e. they always submit user batches, but sometimes they don't have many transactions to bundle..

Probably we would want some limit even in the future? cc @bobbinth

}

fn select_user_batch(&mut self) -> Option<SelectedBatch> {
// Comb through all user batch candidates.
let candidate_batches = self
.inner
.selection_candidates()
.values()
.filter_map(|tx| self.txs_user_batch.get(&tx.id()))
.copied()
.collect::<HashSet<_>>();

'outer: for candidate in candidate_batches {
let mut selected = SelectedBatch::builder();

let txs = self
.user_batch_txs
.get(&candidate)
.cloned()
.expect("bi-directional mapping should be coherent");

for tx in txs {
let Some(tx) = self.inner.selection_candidates().get(&tx).copied() else {
// Rollback this batch selection since it cannot complete.
for tx in selected.txs.into_iter().rev() {
self.inner.deselect(tx.id());
}

continue 'outer;
};
let tx = Arc::clone(tx);

self.inner.select_candidate(tx.id());
selected.push(tx);
}

assert!(!selected.is_empty(), "User batch should not be empty");
return Some(selected.build());
}

None
}

fn select_conventional_batch(&mut self, mut budget: BatchBudget) -> Option<SelectedBatch> {
let mut selected = SelectedBatch::builder();

while let Some((id, tx)) = self.inner.selection_candidates().pop_first() {
if budget.check_then_subtract(tx) == BudgetStatus::Exceeded {
loop {
// Select arbitrary candidate which is _not_ part of a user batch.
let candidates = self.inner.selection_candidates();
let Some(candidate) =
candidates.values().find(|tx| !self.txs_user_batch.contains_key(&tx.id()))
else {
break;
};

if budget.check_then_subtract(candidate) == BudgetStatus::Exceeded {
break;
}

selected.push(Arc::clone(tx));
self.inner.select_candidate(*id);
let candidate = Arc::clone(candidate);
self.inner.select_candidate(candidate.id());
selected.push(candidate);
}

if selected.is_empty() {
Expand Down Expand Up @@ -127,22 +215,40 @@ impl TransactionGraph {
///
/// This includes batches that have been marked as proven.
///
/// Returns the reverted batches in the _reverse_ chronological order they were appended in.
/// Returns the reverted transactions in the _reverse_ chronological order they were appended
/// in.
pub fn revert_tx_and_descendants(&mut self, transaction: TransactionId) -> Vec<TransactionId> {
// We need this check because `inner.revert..` panics if the node is unknown.
if !self.inner.contains(&transaction) {
return Vec::default();
}
// This is a bit more involved because we also need to atomically revert user batches.
let mut to_revert = vec![transaction];
let mut reverted = Vec::new();

while let Some(revert) = to_revert.pop() {
// We need this check because `inner.revert..` panics if the node is unknown.
//
// And this transaction might already have been reverted as part of descendents in a
// prior loop.
if !self.inner.contains(&revert) {
continue;
}

let reverted = self
.inner
.revert_node_and_descendants(transaction)
.into_iter()
.map(|tx| tx.id())
.collect();
let x = self.inner.revert_node_and_descendants(transaction);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let x = self.inner.revert_node_and_descendants(transaction);
let x = self.inner.revert_node_and_descendants(revert);

Should this be revert instead of transaction?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you!


// Clean up book keeping and also revert transactions from the same user batch, if any.
for tx in &x {
self.failures.remove(&tx.id());

// Note that this is a pretty rough shod approach. We just dump the entire batch of
// transactions in, which will result in at least the current
// transaction being duplicated in `to_revert`. This isn't a concern
// though since we skip already processed transactions at the top of the loop.
if let Some(batch) = self.txs_user_batch.remove(&tx.id()) {
if let Some(batch) = self.user_batch_txs.remove(&batch) {
to_revert.extend(batch);
}
}
}

for tx in &reverted {
self.failures.remove(tx);
reverted.extend(x.into_iter().map(|tx| tx.id()));
}

reverted
Expand Down Expand Up @@ -187,15 +293,19 @@ impl TransactionGraph {
reverted
}

/// Prunes the given transaction.
/// Prunes the given given batch's transactions.
///
/// # Panics
///
/// Panics if the transaction does not exist, or has existing ancestors in the transaction
/// Panics if the transactions do not exist, or has existing ancestors in the transaction
/// graph.
pub fn prune(&mut self, transaction: TransactionId) {
self.inner.prune(transaction);
self.failures.remove(&transaction);
pub fn prune(&mut self, batch: &SelectedBatch) {
for tx in batch.transactions() {
self.inner.prune(tx.id());
self.failures.remove(&tx.id());
self.txs_user_batch.remove(&tx.id());
}
self.user_batch_txs.remove(&batch.id());
}

/// Number of transactions which have not been selected for inclusion in a batch.
Expand Down
Loading
Loading