Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/ntx-builder/src/actor/account_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl NetworkAccountState {
account_delta,
} => {
// Filter network notes relevant to this account.
let network_notes = filter_by_prefix_and_map_to_single_target(
let network_notes = filter_by_account_id_and_map_to_single_target(
self.account_id,
network_notes.clone(),
);
Expand Down Expand Up @@ -249,8 +249,8 @@ impl NetworkAccountState {
return;
};

if let Some(prefix) = impact.account_delta {
if prefix == self.account_id {
if let Some(delta_account_id) = impact.account_delta {
if delta_account_id == self.account_id {
self.account.commit_delta();
}
}
Expand Down Expand Up @@ -330,8 +330,8 @@ impl TransactionImpact {
}
}

/// Filters network notes by prefix and maps them to single target network notes.
fn filter_by_prefix_and_map_to_single_target(
/// Filters network notes by account ID and maps them to single target network notes.
fn filter_by_account_id_and_map_to_single_target(
account_id: NetworkAccountId,
notes: Vec<NetworkNote>,
) -> Vec<SingleTargetNetworkNote> {
Expand Down
4 changes: 2 additions & 2 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ impl AccountActor {
// Load the account state from the store and set up the account actor state.
let account = {
match self.origin {
AccountOrigin::Store(account_prefix) => self
AccountOrigin::Store(account_id) => self
.store
.get_network_account(account_prefix)
.get_network_account(account_id)
.await
.expect("actor should be able to load account")
.expect("actor account should exist"),
Expand Down
74 changes: 39 additions & 35 deletions crates/ntx-builder/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ActorHandle {
///
/// The `Coordinator` is the central orchestrator of the network transaction builder system.
/// It manages the lifecycle of account actors. Each actor is responsible for handling transactions
/// for a specific network account prefix. The coordinator provides the following core
/// for a specific network account. The coordinator provides the following core
/// functionality:
///
/// ## Actor Management
Expand All @@ -62,7 +62,7 @@ impl ActorHandle {
/// 3. Actor completion/failure events are monitored and handled.
/// 4. Failed or completed actors are cleaned up from the registry.
pub struct Coordinator {
/// Mapping of network account prefixes to their respective message channels and cancellation
/// Mapping of network account IDs to their respective message channels and cancellation
/// tokens.
///
/// This registry serves as the primary directory for communicating with active account actors.
Expand All @@ -88,7 +88,7 @@ pub struct Coordinator {
semaphore: Arc<Semaphore>,

/// Cache of events received from the mempool that predate corresponding network accounts.
/// Grouped by account prefix to allow targeted event delivery to actors upon creation.
/// Grouped by network account ID to allow targeted event delivery to actors upon creation.
predating_events: HashMap<NetworkAccountId, IndexMap<TransactionId, Arc<MempoolEvent>>>,
}

Expand All @@ -111,18 +111,18 @@ impl Coordinator {
///
/// This method creates a new [`AccountActor`] instance for the specified account origin
/// and adds it to the coordinator's management system. The actor will be responsible for
/// processing transactions and managing state for accounts matching the network prefix.
/// processing transactions and managing state for the network account.
#[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self, origin, actor_context))]
pub async fn spawn_actor(
&mut self,
origin: AccountOrigin,
actor_context: &AccountActorContext,
) -> Result<(), SendError<Arc<MempoolEvent>>> {
let account_prefix = origin.id();
let account_id = origin.id();

// If an actor already exists for this account prefix, something has gone wrong.
if let Some(handle) = self.actor_registry.remove(&account_prefix) {
tracing::error!("account actor already exists for prefix: {}", account_prefix);
// If an actor already exists for this account ID, something has gone wrong.
if let Some(handle) = self.actor_registry.remove(&account_id) {
tracing::error!("account actor already exists for account: {}", account_id);
handle.cancel_token.cancel();
}

Expand All @@ -136,14 +136,14 @@ impl Coordinator {
self.actor_join_set.spawn(Box::pin(actor.run(semaphore)));

// Send the new actor any events that contain notes that predate account creation.
if let Some(prefix_events) = self.predating_events.remove(&account_prefix) {
for event in prefix_events.values() {
if let Some(predating_events) = self.predating_events.remove(&account_id) {
for event in predating_events.values() {
Self::send(&handle, event.clone()).await?;
}
}

self.actor_registry.insert(account_prefix, handle);
tracing::info!("created actor for account prefix: {}", account_prefix);
self.actor_registry.insert(account_id, handle);
tracing::info!("created actor for account: {}", account_id);
Ok(())
}

Expand All @@ -163,16 +163,16 @@ impl Coordinator {
let mut failed_actors = Vec::new();

// Send event to all actors.
for (account_prefix, handle) in &self.actor_registry {
for (account_id, handle) in &self.actor_registry {
if let Err(err) = Self::send(handle, event.clone()).await {
tracing::error!("failed to send event to actor {}: {}", account_prefix, err);
failed_actors.push(*account_prefix);
tracing::error!("failed to send event to actor {}: {}", account_id, err);
failed_actors.push(*account_id);
}
}
// Remove failed actors from registry and cancel them.
for prefix in failed_actors {
for account_id in failed_actors {
let handle =
self.actor_registry.remove(&prefix).expect("actor found in send loop above");
self.actor_registry.remove(&account_id).expect("actor found in send loop above");
handle.cancel_token.cancel();
}
}
Expand All @@ -189,15 +189,15 @@ impl Coordinator {
let actor_result = self.actor_join_set.join_next().await;
match actor_result {
Some(Ok(shutdown_reason)) => match shutdown_reason {
ActorShutdownReason::Cancelled(account_prefix) => {
ActorShutdownReason::Cancelled(account_id) => {
// Do not remove the actor from the registry, as it may be re-spawned.
// The coordinator should always remove actors immediately after cancellation.
tracing::info!("account actor cancelled: {}", account_prefix);
tracing::info!("account actor cancelled: {}", account_id);
Ok(())
},
ActorShutdownReason::AccountReverted(account_prefix) => {
tracing::info!("account reverted: {}", account_prefix);
self.actor_registry.remove(&account_prefix);
ActorShutdownReason::AccountReverted(account_id) => {
tracing::info!("account reverted: {}", account_id);
self.actor_registry.remove(&account_id);
Ok(())
},
ActorShutdownReason::EventChannelClosed => {
Expand Down Expand Up @@ -239,23 +239,27 @@ impl Coordinator {
if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
let account_id = delta.id();
if account_id.is_network() {
let prefix = account_id.try_into().expect("account is network account");
if let Some(actor) = self.actor_registry.get(&prefix) {
target_actors.insert(prefix, actor);
let network_account_id =
account_id.try_into().expect("account is network account");
if let Some(actor) = self.actor_registry.get(&network_account_id) {
target_actors.insert(network_account_id, actor);
}
}
}

// Determine target actors for each note.
for note in network_notes {
let NetworkNote::SingleTarget(note) = note;
let prefix = note.account_id();
if let Some(actor) = self.actor_registry.get(&prefix) {
let network_account_id = note.account_id();
if let Some(actor) = self.actor_registry.get(&network_account_id) {
// Register actor as target.
target_actors.insert(prefix, actor);
target_actors.insert(network_account_id, actor);
} else {
// Cache event for every note that doesn't have a corresponding actor.
self.predating_events.entry(prefix).or_default().insert(*id, event.clone());
self.predating_events
.entry(network_account_id)
.or_default()
.insert(*id, event.clone());
}
}
}
Expand All @@ -266,15 +270,15 @@ impl Coordinator {
Ok(())
}

/// Removes any cached events for a given transaction ID from all account prefix caches.
/// Removes any cached events for a given transaction ID from all account caches.
pub fn drain_predating_events(&mut self, tx_id: &TransactionId) {
// Remove the transaction from all prefix caches.
// Remove the transaction from all account caches.
// This iterates over all predating events which is fine because the count is expected to be
// low.
self.predating_events.retain(|_, prefix_event| {
prefix_event.shift_remove(tx_id);
// Remove entries for account prefixes with no more cached events.
!prefix_event.is_empty()
self.predating_events.retain(|_, account_events| {
account_events.shift_remove(tx_id);
// Remove entries for accounts with no more cached events.
!account_events.is_empty()
});
}

Expand Down
6 changes: 3 additions & 3 deletions crates/proto/src/domain/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use miden_protocol::crypto::merkle::SparseMerklePath;
use miden_protocol::crypto::merkle::smt::SmtProof;
use miden_protocol::note::NoteAttachment;
use miden_protocol::utils::{Deserializable, DeserializationError, Serializable};
use miden_standards::note::NetworkAccountTarget;
use miden_standards::note::{NetworkAccountTarget, NetworkAccountTargetError};
use thiserror::Error;

use super::try_convert;
Expand Down Expand Up @@ -1065,7 +1065,7 @@ impl TryFrom<&NoteAttachment> for NetworkAccountId {

fn try_from(attachment: &NoteAttachment) -> Result<Self, Self::Error> {
let target = NetworkAccountTarget::try_from(attachment)
.map_err(|e| NetworkAccountError::InvalidAttachment(e.to_string()))?;
.map_err(NetworkAccountError::InvalidAttachment)?;
Ok(NetworkAccountId(target.target_id()))
}
}
Expand Down Expand Up @@ -1097,7 +1097,7 @@ pub enum NetworkAccountError {
#[error("account ID {0} is not a valid network account ID")]
NotNetworkAccount(AccountId),
#[error("invalid network account attachment: {0}")]
InvalidAttachment(String),
InvalidAttachment(#[source] NetworkAccountTargetError),
#[error("invalid network account prefix: {0}")]
InvalidPrefix(u32),
}
Expand Down
6 changes: 3 additions & 3 deletions crates/proto/src/domain/note.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use miden_protocol::note::{
Nullifier,
};
use miden_protocol::utils::{Deserializable, Serializable};
use miden_standards::note::NetworkAccountTarget;
use miden_standards::note::{NetworkAccountTarget, NetworkAccountTargetError};
use thiserror::Error;

use super::account::NetworkAccountId;
Expand Down Expand Up @@ -282,7 +282,7 @@ impl TryFrom<Note> for SingleTargetNetworkNote {
// Single-target network notes are identified by having a NetworkAccountTarget attachment
let attachment = note.metadata().attachment();
let account_target = NetworkAccountTarget::try_from(attachment)
.map_err(|e| NetworkNoteError::InvalidAttachment(e.to_string()))?;
.map_err(NetworkNoteError::InvalidAttachment)?;
Ok(Self { note, account_target })
}
}
Expand Down Expand Up @@ -315,7 +315,7 @@ where
#[derive(Debug, Error)]
pub enum NetworkNoteError {
#[error("note does not have a valid NetworkAccountTarget attachment: {0}")]
InvalidAttachment(String),
InvalidAttachment(#[source] NetworkAccountTargetError),
}

// NOTE SCRIPT
Expand Down
6 changes: 3 additions & 3 deletions crates/store/src/db/migrations/2025062000000_setup/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ CREATE TABLE notes (
note_type INTEGER NOT NULL, -- 1-Public (0b01), 2-Private (0b10), 3-Encrypted (0b11)
sender BLOB NOT NULL,
tag INTEGER NOT NULL,
is_single_target_network_note INTEGER NOT NULL, -- 1 if note has NetworkAccountTarget attachment, 0 otherwise
network_note_type INTEGER NOT NULL, -- 0-not a network note, 1-single account target network note
attachment BLOB NOT NULL, -- Serialized note attachment data
inclusion_path BLOB NOT NULL, -- Serialized sparse Merkle path of the note in the block's note tree
consumed_at INTEGER, -- Block number when the note was consumed
Expand All @@ -63,7 +63,7 @@ CREATE TABLE notes (

PRIMARY KEY (committed_at, batch_index, note_index),
CONSTRAINT notes_type_in_enum CHECK (note_type BETWEEN 1 AND 3),
CONSTRAINT notes_is_single_target_network_note_is_bool CHECK (is_single_target_network_note BETWEEN 0 AND 1),
CONSTRAINT notes_network_note_type_in_enum CHECK (network_note_type BETWEEN 0 AND 1),
CONSTRAINT notes_consumed_at_is_u32 CHECK (consumed_at BETWEEN 0 AND 0xFFFFFFFF),
CONSTRAINT notes_batch_index_is_u32 CHECK (batch_index BETWEEN 0 AND 0xFFFFFFFF),
CONSTRAINT notes_note_index_is_u32 CHECK (note_index BETWEEN 0 AND 0xFFFFFFFF)
Expand All @@ -74,7 +74,7 @@ CREATE INDEX idx_notes_note_commitment ON notes(note_commitment);
CREATE INDEX idx_notes_sender ON notes(sender, committed_at);
CREATE INDEX idx_notes_tag ON notes(tag, committed_at);
CREATE INDEX idx_notes_nullifier ON notes(nullifier);
CREATE INDEX idx_unconsumed_network_notes ON notes(is_single_target_network_note, consumed_at);
CREATE INDEX idx_unconsumed_network_notes ON notes(network_note_type, consumed_at);
-- Index for joining with block_headers on committed_at
CREATE INDEX idx_notes_committed_at ON notes(committed_at);
-- Index for joining with note_scripts
Expand Down
6 changes: 3 additions & 3 deletions crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,9 @@ impl Db {
block_num: BlockNumber,
page: Page,
) -> Result<(Vec<NoteRecord>, Page)> {
// Network notes sent to a specific account have their tags set to the prefix of the target
// account ID. So we can convert the ID prefix into a note tag to query the notes for a
// given account.
// Single-target network notes have their tags derived from the target account ID.
// The 30-bit account ID prefix is used as the note tag, allowing us to query notes
// for a given network account.
self.transact("unconsumed network notes for account", move |conn| {
models::queries::select_unconsumed_network_notes_by_tag(
conn,
Expand Down
Loading
Loading