Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions crates/ntx-builder/src/actor/account_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl NetworkAccountState {
account_delta,
} => {
// Filter network notes relevant to this account.
let network_notes = filter_by_prefix_and_map_to_single_target(
let network_notes = filter_by_account_id_and_map_to_single_target(
self.account_id,
network_notes.clone(),
);
Expand Down Expand Up @@ -249,8 +249,8 @@ impl NetworkAccountState {
return;
};

if let Some(prefix) = impact.account_delta {
if prefix == self.account_id {
if let Some(delta_account_id) = impact.account_delta {
if delta_account_id == self.account_id {
self.account.commit_delta();
}
}
Expand Down Expand Up @@ -330,8 +330,8 @@ impl TransactionImpact {
}
}

/// Filters network notes by prefix and maps them to single target network notes.
fn filter_by_prefix_and_map_to_single_target(
/// Filters network notes by account ID and maps them to single target network notes.
fn filter_by_account_id_and_map_to_single_target(
account_id: NetworkAccountId,
notes: Vec<NetworkNote>,
) -> Vec<SingleTargetNetworkNote> {
Expand Down
4 changes: 2 additions & 2 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ impl AccountActor {
// Load the account state from the store and set up the account actor state.
let account = {
match self.origin {
AccountOrigin::Store(account_prefix) => self
AccountOrigin::Store(account_id) => self
.store
.get_network_account(account_prefix)
.get_network_account(account_id)
.await
.expect("actor should be able to load account")
.expect("actor account should exist"),
Expand Down
74 changes: 39 additions & 35 deletions crates/ntx-builder/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ActorHandle {
///
/// The `Coordinator` is the central orchestrator of the network transaction builder system.
/// It manages the lifecycle of account actors. Each actor is responsible for handling transactions
/// for a specific network account prefix. The coordinator provides the following core
/// for a specific network account. The coordinator provides the following core
/// functionality:
///
/// ## Actor Management
Expand All @@ -62,7 +62,7 @@ impl ActorHandle {
/// 3. Actor completion/failure events are monitored and handled.
/// 4. Failed or completed actors are cleaned up from the registry.
pub struct Coordinator {
/// Mapping of network account prefixes to their respective message channels and cancellation
/// Mapping of network account IDs to their respective message channels and cancellation
/// tokens.
///
/// This registry serves as the primary directory for communicating with active account actors.
Expand All @@ -88,7 +88,7 @@ pub struct Coordinator {
semaphore: Arc<Semaphore>,

/// Cache of events received from the mempool that predate corresponding network accounts.
/// Grouped by account prefix to allow targeted event delivery to actors upon creation.
/// Grouped by network account ID to allow targeted event delivery to actors upon creation.
predating_events: HashMap<NetworkAccountId, IndexMap<TransactionId, Arc<MempoolEvent>>>,
}

Expand All @@ -111,18 +111,18 @@ impl Coordinator {
///
/// This method creates a new [`AccountActor`] instance for the specified account origin
/// and adds it to the coordinator's management system. The actor will be responsible for
/// processing transactions and managing state for accounts matching the network prefix.
/// processing transactions and managing state for the network account.
#[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self, origin, actor_context))]
pub async fn spawn_actor(
&mut self,
origin: AccountOrigin,
actor_context: &AccountActorContext,
) -> Result<(), SendError<Arc<MempoolEvent>>> {
let account_prefix = origin.id();
let account_id = origin.id();

// If an actor already exists for this account prefix, something has gone wrong.
if let Some(handle) = self.actor_registry.remove(&account_prefix) {
tracing::error!("account actor already exists for prefix: {}", account_prefix);
// If an actor already exists for this account ID, something has gone wrong.
if let Some(handle) = self.actor_registry.remove(&account_id) {
tracing::error!("account actor already exists for account: {}", account_id);
handle.cancel_token.cancel();
}

Expand All @@ -136,14 +136,14 @@ impl Coordinator {
self.actor_join_set.spawn(Box::pin(actor.run(semaphore)));

// Send the new actor any events that contain notes that predate account creation.
if let Some(prefix_events) = self.predating_events.remove(&account_prefix) {
for event in prefix_events.values() {
if let Some(predating_events) = self.predating_events.remove(&account_id) {
for event in predating_events.values() {
Self::send(&handle, event.clone()).await?;
}
}

self.actor_registry.insert(account_prefix, handle);
tracing::info!("created actor for account prefix: {}", account_prefix);
self.actor_registry.insert(account_id, handle);
tracing::info!("created actor for account: {}", account_id);
Ok(())
}

Expand All @@ -163,16 +163,16 @@ impl Coordinator {
let mut failed_actors = Vec::new();

// Send event to all actors.
for (account_prefix, handle) in &self.actor_registry {
for (account_id, handle) in &self.actor_registry {
if let Err(err) = Self::send(handle, event.clone()).await {
tracing::error!("failed to send event to actor {}: {}", account_prefix, err);
failed_actors.push(*account_prefix);
tracing::error!("failed to send event to actor {}: {}", account_id, err);
failed_actors.push(*account_id);
}
}
// Remove failed actors from registry and cancel them.
for prefix in failed_actors {
for account_id in failed_actors {
let handle =
self.actor_registry.remove(&prefix).expect("actor found in send loop above");
self.actor_registry.remove(&account_id).expect("actor found in send loop above");
handle.cancel_token.cancel();
}
}
Expand All @@ -189,15 +189,15 @@ impl Coordinator {
let actor_result = self.actor_join_set.join_next().await;
match actor_result {
Some(Ok(shutdown_reason)) => match shutdown_reason {
ActorShutdownReason::Cancelled(account_prefix) => {
ActorShutdownReason::Cancelled(account_id) => {
// Do not remove the actor from the registry, as it may be re-spawned.
// The coordinator should always remove actors immediately after cancellation.
tracing::info!("account actor cancelled: {}", account_prefix);
tracing::info!("account actor cancelled: {}", account_id);
Ok(())
},
ActorShutdownReason::AccountReverted(account_prefix) => {
tracing::info!("account reverted: {}", account_prefix);
self.actor_registry.remove(&account_prefix);
ActorShutdownReason::AccountReverted(account_id) => {
tracing::info!("account reverted: {}", account_id);
self.actor_registry.remove(&account_id);
Ok(())
},
ActorShutdownReason::EventChannelClosed => {
Expand Down Expand Up @@ -239,23 +239,27 @@ impl Coordinator {
if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
let account_id = delta.id();
if account_id.is_network() {
let prefix = account_id.try_into().expect("account is network account");
if let Some(actor) = self.actor_registry.get(&prefix) {
target_actors.insert(prefix, actor);
let network_account_id =
account_id.try_into().expect("account is network account");
if let Some(actor) = self.actor_registry.get(&network_account_id) {
target_actors.insert(network_account_id, actor);
}
}
}

// Determine target actors for each note.
for note in network_notes {
let NetworkNote::SingleTarget(note) = note;
let prefix = note.account_id();
if let Some(actor) = self.actor_registry.get(&prefix) {
let network_account_id = note.account_id();
if let Some(actor) = self.actor_registry.get(&network_account_id) {
// Register actor as target.
target_actors.insert(prefix, actor);
target_actors.insert(network_account_id, actor);
} else {
// Cache event for every note that doesn't have a corresponding actor.
self.predating_events.entry(prefix).or_default().insert(*id, event.clone());
self.predating_events
.entry(network_account_id)
.or_default()
.insert(*id, event.clone());
}
}
}
Expand All @@ -266,15 +270,15 @@ impl Coordinator {
Ok(())
}

/// Removes any cached events for a given transaction ID from all account prefix caches.
/// Removes any cached events for a given transaction ID from all account caches.
pub fn drain_predating_events(&mut self, tx_id: &TransactionId) {
// Remove the transaction from all prefix caches.
// Remove the transaction from all account caches.
// This iterates over all predating events which is fine because the count is expected to be
// low.
self.predating_events.retain(|_, prefix_event| {
prefix_event.shift_remove(tx_id);
// Remove entries for account prefixes with no more cached events.
!prefix_event.is_empty()
self.predating_events.retain(|_, account_events| {
account_events.shift_remove(tx_id);
// Remove entries for accounts with no more cached events.
!account_events.is_empty()
});
}

Expand Down
6 changes: 3 additions & 3 deletions crates/proto/src/domain/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use miden_protocol::crypto::merkle::SparseMerklePath;
use miden_protocol::crypto::merkle::smt::SmtProof;
use miden_protocol::note::NoteAttachment;
use miden_protocol::utils::{Deserializable, DeserializationError, Serializable};
use miden_standards::note::NetworkAccountTarget;
use miden_standards::note::{NetworkAccountTarget, NetworkAccountTargetError};
use thiserror::Error;

use super::try_convert;
Expand Down Expand Up @@ -1065,7 +1065,7 @@ impl TryFrom<&NoteAttachment> for NetworkAccountId {

fn try_from(attachment: &NoteAttachment) -> Result<Self, Self::Error> {
let target = NetworkAccountTarget::try_from(attachment)
.map_err(|e| NetworkAccountError::InvalidAttachment(e.to_string()))?;
.map_err(NetworkAccountError::InvalidAttachment)?;
Ok(NetworkAccountId(target.target_id()))
}
}
Expand Down Expand Up @@ -1097,7 +1097,7 @@ pub enum NetworkAccountError {
#[error("account ID {0} is not a valid network account ID")]
NotNetworkAccount(AccountId),
#[error("invalid network account attachment: {0}")]
InvalidAttachment(String),
InvalidAttachment(#[source] NetworkAccountTargetError),
#[error("invalid network account prefix: {0}")]
InvalidPrefix(u32),
}
Expand Down
6 changes: 3 additions & 3 deletions crates/proto/src/domain/note.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use miden_protocol::note::{
Nullifier,
};
use miden_protocol::utils::{Deserializable, Serializable};
use miden_standards::note::NetworkAccountTarget;
use miden_standards::note::{NetworkAccountTarget, NetworkAccountTargetError};
use thiserror::Error;

use super::account::NetworkAccountId;
Expand Down Expand Up @@ -282,7 +282,7 @@ impl TryFrom<Note> for SingleTargetNetworkNote {
// Single-target network notes are identified by having a NetworkAccountTarget attachment
let attachment = note.metadata().attachment();
let account_target = NetworkAccountTarget::try_from(attachment)
.map_err(|e| NetworkNoteError::InvalidAttachment(e.to_string()))?;
.map_err(NetworkNoteError::InvalidAttachment)?;
Ok(Self { note, account_target })
}
}
Expand Down Expand Up @@ -315,7 +315,7 @@ where
#[derive(Debug, Error)]
pub enum NetworkNoteError {
#[error("note does not have a valid NetworkAccountTarget attachment: {0}")]
InvalidAttachment(String),
InvalidAttachment(#[source] NetworkAccountTargetError),
}

// NOTE SCRIPT
Expand Down
Loading