diff --git a/Cargo.lock b/Cargo.lock index 5ff4a459f..67c22adb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3702,7 +3702,7 @@ dependencies = [ "magicblock-program", "magicblock-rpc-client", "magicblock-table-mania", - "rand 0.8.5", + "rand 0.9.1", "rusqlite", "solana-account", "solana-pubkey", @@ -3864,6 +3864,7 @@ dependencies = [ "magicblock-metrics", "magicblock-program", "parking_lot 0.12.4", + "rustc-hash 2.1.1", "solana-account", "solana-address-lookup-table-program", "solana-bpf-loader-program", @@ -3871,6 +3872,7 @@ dependencies = [ "solana-feature-set", "solana-fee", "solana-fee-structure", + "solana-keypair", "solana-loader-v4-program", "solana-program", "solana-program-runtime", @@ -3901,7 +3903,7 @@ dependencies = [ "magicblock-metrics", "num-derive", "num-traits", - "rand 0.8.5", + "rand 0.9.1", "serde", "solana-log-collector", "solana-program-runtime", @@ -3932,7 +3934,7 @@ dependencies = [ "ed25519-dalek", "log", "magicblock-rpc-client", - "rand 0.8.5", + "rand 0.9.1", "sha3", "solana-pubkey", "solana-rpc-client", @@ -3978,6 +3980,7 @@ dependencies = [ "magicblock-api", "magicblock-config", "magicblock-version", + "num_cpus", "solana-sdk", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index b7c9686d8..6fbcb0d45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ jsonrpc-pubsub = "18.0.0" jsonrpc-ws-server = "18.0.0" lazy_static = "1.4.0" libc = "0.2.153" -log = { version = "0.4.20", features = ["release_max_level_info"] } +log = { version = "0.4.20" } lru = "0.16.0" macrotest = "1" magic-domain-program = { git = "https://github.com/magicblock-labs/magic-domain-program.git", rev = "ea04d46", default-features = false } @@ -140,11 +140,12 @@ prost = "0.11.9" json = { package = "sonic-rs", version = "0.5.3" } protobuf-src = "1.1" quote = "1.0" -rand = "0.8.5" +rand = "0.9" rayon = "1.10.0" # bundled sqlite 3.44 rusqlite = { version = "0.37.0", features = ["bundled"] } rustc_version = "0.4" +rustc-hash = "2.1" scc = "2.4" semver = "1.0.22" serde = "1.0.217" diff --git a/magicblock-aperture/Cargo.toml b/magicblock-aperture/Cargo.toml index 50ce6c6db..01a433b95 100644 --- a/magicblock-aperture/Cargo.toml +++ b/magicblock-aperture/Cargo.toml @@ -66,7 +66,7 @@ log = { workspace = true } serde = { workspace = true } [dev-dependencies] -rand = "0.9" +rand = { workspace = true } test-kit = { workspace = true } solana-rpc-client = { workspace = true } solana-pubsub-client = { workspace = true } diff --git a/magicblock-aperture/src/lib.rs b/magicblock-aperture/src/lib.rs index 1aee7c78a..fa55bbeca 100644 --- a/magicblock-aperture/src/lib.rs +++ b/magicblock-aperture/src/lib.rs @@ -32,9 +32,9 @@ impl JsonRpcServer { // Start up an event processor task, which will handle forwarding of any validator // originating event to client subscribers, or use them to update server's caches // - // NOTE: currently we only start 1 instance, but it + // NOTE: currently we only start 2 instances, but it // can be scaled to more if that becomes a bottleneck - EventProcessor::start(&state, dispatch, 1, cancel.clone()); + EventProcessor::start(&state, dispatch, 2, cancel.clone()); // initialize HTTP and Websocket servers let websocket = { diff --git a/magicblock-aperture/src/requests/http/get_blocks_with_limit.rs b/magicblock-aperture/src/requests/http/get_blocks_with_limit.rs index c82db4e19..00dbd2562 100644 --- a/magicblock-aperture/src/requests/http/get_blocks_with_limit.rs +++ b/magicblock-aperture/src/requests/http/get_blocks_with_limit.rs @@ -21,9 +21,8 @@ impl HttpDispatcher { .min(MAX_DEFAULT_BLOCKS_LIMIT); let end_slot = start_slot + limit; // Calculate the end slot, ensuring it does not exceed the latest block height. - let end_slot = (end_slot).min(self.blocks.block_height()); + let end_slot = (end_slot).min(self.blocks.block_height() + 1); - // The range is exclusive of the end slot, so `(start..end)` is correct. let slots = (start_slot..end_slot).collect::>(); Ok(ResponsePayload::encode_no_context(&request.id, slots)) diff --git a/magicblock-aperture/src/requests/http/get_token_account_balance.rs b/magicblock-aperture/src/requests/http/get_token_account_balance.rs index 7103b81f0..bedf55e99 100644 --- a/magicblock-aperture/src/requests/http/get_token_account_balance.rs +++ b/magicblock-aperture/src/requests/http/get_token_account_balance.rs @@ -1,7 +1,10 @@ use std::mem::size_of; use solana_account::AccountSharedData; -use solana_account_decoder::parse_token::UiTokenAmount; +use solana_account_decoder::{ + parse_account_data::SplTokenAdditionalDataV2, + parse_token::token_amount_to_ui_amount_v3, +}; use super::{ prelude::*, MINT_DECIMALS_OFFSET, SPL_MINT_RANGE, SPL_TOKEN_AMOUNT_RANGE, @@ -58,13 +61,14 @@ impl HttpDispatcher { u64::from_le_bytes(buffer) }; - let ui_amount = (token_amount as f64) / 10f64.powi(decimals as i32); - let ui_token_amount = UiTokenAmount { - amount: token_amount.to_string(), - ui_amount: Some(ui_amount), - ui_amount_string: ui_amount.to_string(), - decimals, - }; + let ui_token_amount = token_amount_to_ui_amount_v3( + token_amount, + &SplTokenAdditionalDataV2 { + decimals, + interest_bearing_config: None, + scaled_ui_amount_config: None, + }, + ); let slot = self.blocks.block_height(); Ok(ResponsePayload::encode(&request.id, ui_token_amount, slot)) diff --git a/magicblock-aperture/src/requests/http/mocked.rs b/magicblock-aperture/src/requests/http/mocked.rs index 2cb8185cb..1fb9e5ae0 100644 --- a/magicblock-aperture/src/requests/http/mocked.rs +++ b/magicblock-aperture/src/requests/http/mocked.rs @@ -84,7 +84,7 @@ impl HttpDispatcher { Ok(ResponsePayload::encode( &request.id, Vec::<()>::new(), - self.blocks.get_latest().slot, + self.blocks.block_height(), )) } @@ -103,7 +103,7 @@ impl HttpDispatcher { Ok(ResponsePayload::encode( &request.id, supply, - self.blocks.get_latest().slot, + self.blocks.block_height(), )) } @@ -120,7 +120,7 @@ impl HttpDispatcher { Ok(ResponsePayload::encode( &request.id, supply, - self.blocks.get_latest().slot, + self.blocks.block_height(), )) } diff --git a/magicblock-aperture/src/requests/http/request_airdrop.rs b/magicblock-aperture/src/requests/http/request_airdrop.rs index decf2e11f..11c827629 100644 --- a/magicblock-aperture/src/requests/http/request_airdrop.rs +++ b/magicblock-aperture/src/requests/http/request_airdrop.rs @@ -15,13 +15,18 @@ impl HttpDispatcher { // Airdrops are only supported if a faucet keypair is configured. // Which is never the case with *ephemeral* running mode of the validator let Some(ref faucet) = self.context.faucet else { - return Err(RpcError::invalid_request("method is not supported")); + return Err(RpcError::invalid_request( + "free airdrop faucet is disabled", + )); }; let (pubkey, lamports) = parse_params!(request.params()?, Serde32Bytes, u64); let pubkey = some_or_err!(pubkey); let lamports = some_or_err!(lamports); + if lamports == 0 { + return Err(RpcError::invalid_params("lamports must be > 0")); + } // Build and execute the airdrop transfer transaction. let txn = solana_system_transaction::transfer( diff --git a/magicblock-aperture/src/tests.rs b/magicblock-aperture/src/tests.rs index 8d49c818c..5c37d21e3 100644 --- a/magicblock-aperture/src/tests.rs +++ b/magicblock-aperture/src/tests.rs @@ -57,7 +57,7 @@ mod event_processor { let env = ExecutionTestEnv::new(); env.advance_slot(); let node_context = NodeContext { - identity: env.payer.pubkey(), + identity: env.get_payer().pubkey, ..Default::default() }; let state = SharedState::new( diff --git a/magicblock-aperture/tests/mocked.rs b/magicblock-aperture/tests/mocked.rs index c9740d0f7..25bb24a04 100644 --- a/magicblock-aperture/tests/mocked.rs +++ b/magicblock-aperture/tests/mocked.rs @@ -1,6 +1,5 @@ use setup::RpcTestEnv; use solana_pubkey::Pubkey; -use test_kit::Signer; mod setup; @@ -17,7 +16,7 @@ async fn test_get_slot_leaders() { assert_eq!(leaders.len(), 1, "should return a single leader"); assert_eq!( leaders[0], - env.execution.payer.pubkey(), + env.execution.get_payer().pubkey, "leader should be the validator's own identity" ); } @@ -193,7 +192,7 @@ async fn test_get_cluster_nodes() { assert_eq!(nodes.len(), 1, "should be exactly one node in the cluster"); assert_eq!( nodes[0].pubkey, - env.execution.payer.pubkey().to_string(), + env.execution.get_payer().pubkey.to_string(), "node pubkey should match validator identity" ); } diff --git a/magicblock-aperture/tests/node.rs b/magicblock-aperture/tests/node.rs index 3c5c5d680..1ab44973d 100644 --- a/magicblock-aperture/tests/node.rs +++ b/magicblock-aperture/tests/node.rs @@ -1,5 +1,4 @@ use setup::RpcTestEnv; -use test_kit::Signer; mod setup; @@ -35,7 +34,7 @@ async fn test_get_identity() { assert_eq!( identity, - env.execution.payer.pubkey(), + env.execution.get_payer().pubkey, "identity should match the validator's public key" ); } diff --git a/magicblock-aperture/tests/setup.rs b/magicblock-aperture/tests/setup.rs index decfacf9d..e3236e96f 100644 --- a/magicblock-aperture/tests/setup.rs +++ b/magicblock-aperture/tests/setup.rs @@ -93,7 +93,7 @@ impl RpcTestEnv { let (server, config) = loop { let port: u16 = rand::random_range(7000..u16::MAX - 1); let node_context = NodeContext { - identity: execution.payer.pubkey(), + identity: execution.get_payer().pubkey, faucet: Some(faucet.insecure_clone()), base_fee: Self::BASE_FEE, featureset: Default::default(), diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 749489458..70d1c2b91 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -302,8 +302,15 @@ impl MagicValidator { base_fee: config.validator.base_fees.unwrap_or_default(), featureset: txn_scheduler_state.environment.feature_set.clone(), }; - let transaction_scheduler = - TransactionScheduler::new(1, txn_scheduler_state); + // We dedicate half of the available resources to the execution + // runtime, -1 is taken up by the transaction scheduler itself + let transaction_executors = + (num_cpus::get() / 2).saturating_sub(1).max(1) as u32; + let transaction_scheduler = TransactionScheduler::new( + transaction_executors, + txn_scheduler_state, + ); + info!("Running execution backend with {transaction_executors} threads"); transaction_scheduler.spawn(); let shared_state = SharedState::new( diff --git a/magicblock-core/src/link/transactions.rs b/magicblock-core/src/link/transactions.rs index 779c871a7..1303ed2db 100644 --- a/magicblock-core/src/link/transactions.rs +++ b/magicblock-core/src/link/transactions.rs @@ -51,6 +51,7 @@ pub type TxnReplayResultTx = oneshot::Sender; /// Contains the final, committed status of an executed /// transaction, including its result and metadata. /// This is the message type that is communicated to subscribers via event processors. +#[derive(Debug)] pub struct TransactionStatus { pub signature: Signature, pub slot: Slot, @@ -76,6 +77,7 @@ pub enum TransactionProcessingMode { } /// The detailed outcome of a standard transaction execution. +#[derive(Debug)] pub struct TransactionExecutionResult { pub result: TransactionResult, pub accounts: Box<[Pubkey]>, diff --git a/magicblock-processor/Cargo.toml b/magicblock-processor/Cargo.toml index 8aa007057..e45e32dc2 100644 --- a/magicblock-processor/Cargo.toml +++ b/magicblock-processor/Cargo.toml @@ -39,8 +39,11 @@ solana-transaction = { workspace = true } solana-transaction-status = { workspace = true } solana-transaction-error = { workspace = true } +rustc-hash = { workspace = true } + [dev-dependencies] guinea = { workspace = true } +solana-keypair = { workspace = true } solana-signature = { workspace = true } solana-signer = { workspace = true } test-kit = { workspace = true } diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index 08eb55c90..4083e7dea 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -20,7 +20,8 @@ use solana_svm::transaction_processor::{ use tokio::{runtime::Builder, sync::mpsc::Sender}; use crate::{ - builtins::BUILTINS, scheduler::state::TransactionSchedulerState, WorkerId, + builtins::BUILTINS, + scheduler::{locks::ExecutorId, state::TransactionSchedulerState}, }; /// A dedicated, single-threaded worker responsible for processing transactions using @@ -30,7 +31,7 @@ use crate::{ /// executors can be spawned to process transactions in parallel. pub(super) struct TransactionExecutor { /// A unique identifier for this worker instance. - id: WorkerId, + id: ExecutorId, /// A handle to the global accounts database for reading and writing account state. accountsdb: Arc, /// A handle to the global ledger for writing committed transaction history. @@ -50,7 +51,7 @@ pub(super) struct TransactionExecutor { /// A channel to send out account state updates after processing. accounts_tx: AccountUpdateTx, /// A back-channel to notify the `TransactionScheduler` that this worker is ready for more work. - ready_tx: Sender, + ready_tx: Sender, /// A read lock held during a slot's processing to synchronize with critical global /// operations like `AccountsDb` snapshots. sync: StWLock, @@ -63,10 +64,10 @@ impl TransactionExecutor { /// with a globally shared one. This allows updates made by one executor to be immediately /// visible to all others, preventing redundant program loads. pub(super) fn new( - id: WorkerId, + id: ExecutorId, state: &TransactionSchedulerState, rx: TransactionToProcessRx, - ready_tx: Sender, + ready_tx: Sender, programs_cache: Arc>>, ) -> Self { let slot = state.accountsdb.slot(); @@ -173,7 +174,8 @@ impl TransactionExecutor { } } // Notify the scheduler that this worker is ready for another transaction. - let _ = self.ready_tx.send(self.id).await; + // NOTE: the channel is guaranteed to have enough capacity to push into. + let _ = self.ready_tx.try_send(self.id); } // When a new block is produced, transition to the new slot. _ = block_updated.recv() => { diff --git a/magicblock-processor/src/lib.rs b/magicblock-processor/src/lib.rs index d01b9a9d9..db6c4fe26 100644 --- a/magicblock-processor/src/lib.rs +++ b/magicblock-processor/src/lib.rs @@ -10,8 +10,6 @@ use solana_program::feature; use solana_rent_collector::RentCollector; use solana_svm::transaction_processor::TransactionProcessingEnvironment; -type WorkerId = u8; - /// Initialize an SVM enviroment for transaction processing pub fn build_svm_env( accountsdb: &AccountsDb, diff --git a/magicblock-processor/src/scheduler.rs b/magicblock-processor/src/scheduler.rs deleted file mode 100644 index 0aee90a73..000000000 --- a/magicblock-processor/src/scheduler.rs +++ /dev/null @@ -1,150 +0,0 @@ -use std::sync::{Arc, RwLock}; - -use log::info; -use magicblock_core::link::transactions::{ - ProcessableTransaction, TransactionToProcessRx, -}; -use magicblock_ledger::LatestBlock; -use solana_program_runtime::loaded_programs::ProgramCache; -use state::TransactionSchedulerState; -use tokio::{ - runtime::Builder, - sync::mpsc::{channel, Receiver, Sender}, -}; - -use crate::{ - executor::{SimpleForkGraph, TransactionExecutor}, - WorkerId, -}; - -/// The central transaction scheduler responsible for distributing work to a -/// pool of `TransactionExecutor` workers. -/// -/// This struct acts as the single entry point for all transactions entering the processing -/// pipeline. It receives transactions from a global queue and dispatches them to available -/// worker threads for execution or simulation. -pub struct TransactionScheduler { - /// The receiving end of the global queue for all new transactions. - transactions_rx: TransactionToProcessRx, - /// A channel that receives readiness notifications from workers, - /// indicating they are free to accept new work. - ready_rx: Receiver, - /// A list of sender channels, one for each `TransactionExecutor` worker. - executors: Vec>, - /// A handle to the globally shared cache for loaded BPF programs. - program_cache: Arc>>, - /// A handle to the globally shared state of the latest block. - latest_block: LatestBlock, -} - -impl TransactionScheduler { - /// Creates and initializes a new `TransactionScheduler` and its associated pool of workers. - /// - /// This function performs the initial setup for the entire transaction processing pipeline: - /// 1. Prepares the shared program cache and ensures necessary sysvars are in the `AccountsDb`. - /// 2. Creates a pool of `TransactionExecutor` workers, each with its own dedicated channel. - /// 3. Spawns each worker in its own OS thread for maximum isolation and performance. - pub fn new(workers: u8, state: TransactionSchedulerState) -> Self { - let mut executors = Vec::with_capacity(workers as usize); - - // Create the back-channel for workers to signal their readiness. - let (ready_tx, ready_rx) = channel(workers as usize); - // Perform one-time setup of the shared program cache and sysvars. - let program_cache = state.prepare_programs_cache(); - state.prepare_sysvars(); - - for id in 0..workers { - // Each executor has a channel capacity of 1, as it - // can only process one transaction at a time. - let (transactions_tx, transactions_rx) = channel(1); - let executor = TransactionExecutor::new( - id, - &state, - transactions_rx, - ready_tx.clone(), - program_cache.clone(), - ); - executor.populate_builtins(); - executor.spawn(); - executors.push(transactions_tx); - } - Self { - transactions_rx: state.txn_to_process_rx, - ready_rx, - executors, - latest_block: state.ledger.latest_block().clone(), - program_cache, - } - } - - /// Spawns the scheduler's main event loop into a new, dedicated OS thread. - /// - /// Similar to the executors, the scheduler runs in its own thread with a dedicated - /// single-threaded Tokio runtime for performance and to prevent it from interfering - /// with other application tasks. - pub fn spawn(self) { - let task = move || { - let runtime = Builder::new_current_thread() - .thread_name("transaction scheduler") - .build() - .expect( - "building single threaded tokio runtime should succeed", - ); - runtime.block_on(tokio::task::unconstrained(self.run())); - }; - std::thread::spawn(task); - } - - /// The main event loop of the transaction scheduler. - /// - /// This loop multiplexes between three primary events: - /// 1. Receiving a new transaction and dispatching it to an available worker. - /// 2. Receiving a readiness notification from a worker. - /// 3. Receiving a notification of a new block, triggering a slot transition. - async fn run(mut self) { - let mut block_produced = self.latest_block.subscribe(); - let mut ready = true; - loop { - tokio::select! { - biased; - // A worker has finished its task and is ready for more. - Some(_) = self.ready_rx.recv() => { - // TODO(bmuddha): - // This branch will be used by a multi-threaded scheduler - // with account-level locking to manage the pool of ready workers. - ready = true; - } - // Receive new transactions for scheduling. - Some(txn) = self.transactions_rx.recv(), if ready => { - // TODO(bmuddha): - // The current implementation sends to the first worker only. - // A future implementation with account-level locking will enable - // dispatching to any available worker. - let Some(tx) = self.executors.first() else { - continue; - }; - let _ = tx.send(txn).await; - ready = false; - } - // A new block has been produced. - _ = block_produced.recv() => { - self.transition_to_new_slot(); - } - // The main transaction channel has closed, indicating a system shutdown. - else => { - break - } - } - } - info!("transaction scheduler has terminated"); - } - - /// Updates the scheduler's state when a new slot begins. - fn transition_to_new_slot(&self) { - // Re-root the shared program cache to the new slot. - self.program_cache.write().unwrap().latest_root_slot = - self.latest_block.load().slot; - } -} - -pub mod state; diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs new file mode 100644 index 000000000..496c9b881 --- /dev/null +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -0,0 +1,239 @@ +//! Manages the state of transaction processing across multiple executors. +//! +//! This module contains the `ExecutionCoordinator`, which tracks ready executors, +//! queues of blocked transactions, and the locks held by each worker. It acts +//! as the central state machine for the scheduling process. + +use std::collections::VecDeque; + +use magicblock_core::link::transactions::ProcessableTransaction; +use solana_pubkey::Pubkey; + +use super::locks::{ + next_transaction_id, AccountContention, ExecutorId, LocksCache, RcLock, + TransactionContention, TransactionId, INIT_TRANSACTION_ID, +}; + +/// A queue of transactions waiting for a specific executor to release a lock. +type TransactionQueue = VecDeque; +/// A list of transaction queues, indexed by `ExecutorId`. Each executor has its own queue. +type BlockedTransactionQueues = Vec; +/// A list of all locks acquired by an executor, indexed by `ExecutorId`. +type AcquiredLocks = Vec>; + +/// A transaction bundled with its unique ID for tracking purposes. +pub(super) struct TransactionWithId { + pub(super) id: TransactionId, + pub(super) txn: ProcessableTransaction, +} + +impl TransactionWithId { + /// Creates a new transaction with a unique ID. + pub(super) fn new(txn: ProcessableTransaction) -> Self { + Self { + id: next_transaction_id(), + txn, + } + } +} + +/// Manages the state for all transaction executors, including their +/// readiness, blocked transactions, and acquired account locks. +pub(super) struct ExecutionCoordinator { + /// A queue for each executor to hold transactions that are waiting for its locks. + blocked_transactions: BlockedTransactionQueues, + /// A map tracking which executor is blocking which transaction. + transaction_contention: TransactionContention, + /// A map tracking which transactions are contending for which account. + account_contention: AccountContention, + /// A pool of executor IDs that are currently idle and ready for new work. + ready_executors: Vec, + /// A list of locks currently held by each executor. + acquired_locks: AcquiredLocks, + /// The global cache of all account locks. + locks: LocksCache, +} + +impl ExecutionCoordinator { + /// Creates a new `ExecutionCoordinator` for a given number of executors. + pub(super) fn new(count: usize) -> Self { + Self { + blocked_transactions: (0..count).map(|_| VecDeque::new()).collect(), + acquired_locks: (0..count).map(|_| Vec::new()).collect(), + ready_executors: (0..count as u32).collect(), + transaction_contention: TransactionContention::default(), + account_contention: AccountContention::default(), + locks: LocksCache::default(), + } + } + + /// Queues a transaction that is blocked by a contended lock. + /// + /// The `blocker_id` can be either an `ExecutorId` or a `TransactionId`. + /// If it's a `TransactionId`, this function resolves it to the underlying + /// `ExecutorId` that holds the conflicting lock, and returns that executor + pub(super) fn queue_transaction( + &mut self, + blocker_id: TransactionId, + transaction: TransactionWithId, + ) -> ExecutorId { + // A `blocker_id` greater than `INIT_TRANSACTION_ID`, it is a `TransactionId` + // of another waiting transaction. We must resolve it to the actual executor. + let executor = if blocker_id >= INIT_TRANSACTION_ID { + // A `TransactionId` is only returned as a blocker if that + // transaction is already tracked in the contention map. + self.transaction_contention + .get(&blocker_id) + .copied() + // SAFETY: + // This invariant is enforced via careful transaction scheduling + // flow, if the transaction is not found in the map, this indicates a + // hard logic error, which might lead to deadlocks, thus we terminate + // the scheduler here. Test coverage should catch this inconsistency. + .expect("unknown transaction for blocker resolution") + } else { + blocker_id as ExecutorId + }; + + let queue = &mut self.blocked_transactions[executor as usize]; + self.transaction_contention.insert(transaction.id, executor); + let index = queue.binary_search_by(|tx| tx.id.cmp(&transaction.id)); + if let Err(index) = index { + queue.insert(index, transaction); + } + executor + } + + /// Checks if there are any executors ready to process a transaction. + #[inline] + pub(super) fn is_ready(&self) -> bool { + !self.ready_executors.is_empty() + } + + /// Retrieves the ID of a ready executor, if one is available. + #[inline] + pub(super) fn get_ready_executor(&mut self) -> Option { + self.ready_executors.pop() + } + + /// Returns an executor to the pool of ready executors. + #[inline] + pub(super) fn release_executor(&mut self, executor: ExecutorId) { + self.ready_executors.push(executor) + } + + /// Releases all account locks held by a specific executor. + pub(crate) fn unlock_accounts(&mut self, executor: ExecutorId) { + let locks = &mut self.acquired_locks[executor as usize]; + // Iteratively drain the list of acquired locks. + while let Some(lock) = locks.pop() { + lock.borrow_mut().unlock(executor); + } + } + + /// Retrieves the next blocked transaction waiting for a given executor. + pub(super) fn next_blocked_transaction( + &mut self, + executor: ExecutorId, + ) -> Option { + self.blocked_transactions[executor as usize].pop_front() + } + + /// Attempts to acquire all necessary read and write locks for a transaction. + /// + /// This function iterates through all accounts in the transaction's message and + /// attempts to acquire the appropriate lock for each. If any lock is contended, + /// it fails early and returns the ID of the blocking executor or transaction. + pub(super) fn try_acquire_locks( + &mut self, + executor: ExecutorId, + transaction: &TransactionWithId, + ) -> Result<(), TransactionId> { + let message = transaction.txn.transaction.message(); + let accounts_to_lock = message.account_keys().iter().enumerate(); + let acquired_locks = &mut self.acquired_locks[executor as usize]; + + for (i, &acc) in accounts_to_lock.clone() { + // Get or create the lock for the account. + let lock = self.locks.entry(acc).or_default().clone(); + // See whether there's a contention for the given account + // if there's one, then we need to follow the breadcrumbs + // (txns->executor) to find out where our transaction + // needs to be queued. + let mut result = + if let Some(contenders) = self.account_contention.get(&acc) { + match contenders.binary_search(&transaction.id) { + // If we are the first contender, then we can proceed + // and try to acquire the needed account locks + Ok(index) | Err(index) if index == 0 => Ok(()), + // If we are not, then we need to get queued after + // the transaction contending right in front of us + Ok(index) | Err(index) => Err(contenders[index - 1]), + } + } else { + Ok(()) + }; + + if result.is_ok() { + // Attempt to acquire a write or read lock. + result = if message.is_writable(i) { + lock.borrow_mut().write(executor) + } else { + lock.borrow_mut().read(executor) + } + .map_err(|e| e as TransactionId); + } + + // We couldn't lock all of the accounts, so we are bailing, but + // first we need to set contention, and unlock successful locks + if let Err(e) = result { + for lock in acquired_locks.drain(..) { + let mut lock = lock.borrow_mut(); + lock.unlock(executor); + } + for (i, &acc) in accounts_to_lock { + // We only set contention for write locks, + // in order to prevent writer starvation + if message.is_writable(i) { + self.contend_account(acc, transaction.id); + } + } + return Err(e); + } + + acquired_locks.push(lock); + } + + // On success, the transaction is no longer blocking anything. + self.transaction_contention.remove(&transaction.id); + for (_, acc) in accounts_to_lock { + self.clear_account_contention(acc, transaction.id); + } + Ok(()) + } + + /// Sets the transaction contention for this account. Contenders are ordered + /// based on their ID, which honours the "first in, first served" policy + #[inline] + fn contend_account(&mut self, acc: Pubkey, txn: TransactionId) { + let contenders = self.account_contention.entry(acc).or_default(); + if let Err(index) = contenders.binary_search(&txn) { + contenders.insert(index, txn); + } + } + + /// Removes the given transaction from contenders list for the specified account + #[inline] + fn clear_account_contention(&mut self, acc: &Pubkey, txn: TransactionId) { + let Some(contenders) = self.account_contention.get_mut(acc) else { + return; + }; + if let Ok(index) = contenders.binary_search(&txn) { + contenders.remove(index); + } + // Prevent unbounded growth of tracking map + if contenders.is_empty() { + self.account_contention.remove(acc); + } + } +} diff --git a/magicblock-processor/src/scheduler/locks.rs b/magicblock-processor/src/scheduler/locks.rs new file mode 100644 index 000000000..2f727df1a --- /dev/null +++ b/magicblock-processor/src/scheduler/locks.rs @@ -0,0 +1,109 @@ +//! Fast, in-memory account locking primitives for the multi-threaded scheduler. +//! +//! This version uses a single `u64` bitmask to represent the entire lock state, +//! including read locks, write locks, and contention, for maximum efficiency. + +use std::{cell::RefCell, collections::VecDeque, rc::Rc}; + +use rustc_hash::FxHashMap; +use solana_pubkey::Pubkey; + +// A bitmask representing the lock state. +// - MSB: Write lock flag. +// - Remaining bits: Read locks for each executor. +type ReadWriteLock = u64; + +/// Unique identifier for a transaction executor worker. +pub(crate) type ExecutorId = u32; + +/// Unique identifier for a transaction to be scheduled. +/// +/// NOTE: the type is specifically set to u64, so that it +/// will be statistically impossible to overlow it within +/// next few millenia, given the indended use case of the +/// type as a tagging counter for incoming transactions. +pub(super) type TransactionId = u64; + +/// A shared, mutable reference to an `AccountLock`. +pub(super) type RcLock = Rc>; + +/// In-memory cache of account locks. +pub(super) type LocksCache = FxHashMap; +/// A map from a blocked transaction to the executor that holds the conflicting lock. +pub(super) type TransactionContention = FxHashMap; +/// A map from account's pubkey to all transaction, that are contending to acquire its lock. +pub(super) type AccountContention = FxHashMap>; + +/// The maximum number of concurrent executors supported by the bitmask. +/// One bit is reserved for the write flag. +pub(super) const MAX_SVM_EXECUTORS: u32 = ReadWriteLock::BITS - 1u32; + +/// The bit used to indicate a write lock is held. This is the most significant bit. +const WRITE_BIT_MASK: u64 = 1u64 << (ReadWriteLock::BITS - 1u32); +/// The starting transaction ID, follows the max possible value for ExecutorId +pub(crate) const INIT_TRANSACTION_ID: TransactionId = + MAX_SVM_EXECUTORS as TransactionId; + +/// A read/write lock on a single Solana account, represented by a `u64` bitmask. +#[derive(Default, Debug)] +pub(super) struct AccountLock { + rw: ReadWriteLock, +} + +impl AccountLock { + /// Attempts to acquire a write lock. Fails if any other lock is held. + #[inline] + pub(super) fn write( + &mut self, + executor: ExecutorId, + ) -> Result<(), ExecutorId> { + if self.rw != 0 { + // If the lock is held, `trailing_zeros()` will return the index of the + // least significant bit that is set. This corresponds to the ID of the + // executor that holds the lock. + return Err(self.rw.trailing_zeros()); + } + // Set the write lock bit and the bit for the acquiring executor. + self.rw = WRITE_BIT_MASK | (1u64 << executor); + Ok(()) + } + + /// Attempts to acquire a read lock. Fails if a write lock is held. + #[inline] + pub(super) fn read( + &mut self, + executor: ExecutorId, + ) -> Result<(), ExecutorId> { + // Check if the write lock bit is set. + if self.rw & WRITE_BIT_MASK != 0 { + // If a write lock is held, the conflicting executor is the one whose + // bit is set. We can find it using `trailing_zeros()`. + return Err(self.rw.trailing_zeros()); + } + // Set the bit corresponding to the executor to acquire a read lock. + self.rw |= 1u64 << executor; + Ok(()) + } + + /// Releases a lock held by an executor. + #[inline] + pub(super) fn unlock(&mut self, executor: ExecutorId) { + // To release the lock, we clear both the write bit and the executor's + // read bit. This is done using a bitwise AND with the inverted mask. + self.rw &= !(WRITE_BIT_MASK | (1u64 << executor)); + } +} + +/// Generates a new, unique transaction ID. +pub(super) fn next_transaction_id() -> TransactionId { + static mut COUNTER: TransactionId = INIT_TRANSACTION_ID; + // SAFETY: This is safe because the scheduler, which calls this function, + // operates in a single, dedicated thread. Therefore, there are no concurrent + // access concerns for this static mutable variable. The u64::MAX is large + // enough range to statistically guarantee that no two transactions created + // during the lifetime of the validator have the same ID. + unsafe { + COUNTER = COUNTER.wrapping_add(1).max(INIT_TRANSACTION_ID); + COUNTER + } +} diff --git a/magicblock-processor/src/scheduler/mod.rs b/magicblock-processor/src/scheduler/mod.rs new file mode 100644 index 000000000..1878c94f1 --- /dev/null +++ b/magicblock-processor/src/scheduler/mod.rs @@ -0,0 +1,240 @@ +//! The central transaction scheduler and its event loop. +//! +//! This module is the entry point for all transactions into the processing pipeline. +//! It is responsible for creating and managing a pool of `TransactionExecutor` +//! workers and dispatching transactions to them for execution. + +use std::sync::{Arc, RwLock}; + +use coordinator::{ExecutionCoordinator, TransactionWithId}; +use locks::{ExecutorId, MAX_SVM_EXECUTORS}; +use log::{error, info}; +use magicblock_core::link::transactions::{ + ProcessableTransaction, TransactionToProcessRx, +}; +use magicblock_ledger::LatestBlock; +use solana_program_runtime::loaded_programs::ProgramCache; +use state::TransactionSchedulerState; +use tokio::{ + runtime::Builder, + sync::mpsc::{channel, Receiver, Sender}, +}; + +use crate::executor::{SimpleForkGraph, TransactionExecutor}; + +/// Each executor has a channel capacity of 1, as it +/// can only process one transaction at a time. +const EXECUTOR_QUEUE_CAPACITY: usize = 1; + +/// The central transaction scheduler responsible for distributing work to a +/// pool of `TransactionExecutor` workers. +/// +/// This struct acts as the single entry point for all transactions entering the processing +/// pipeline. It receives transactions from a global queue and dispatches them to available +/// worker threads for execution or simulation. +pub struct TransactionScheduler { + /// Manages the state of all executors, including locks and blocked transactions. + coordinator: ExecutionCoordinator, + /// The receiving end of the global queue for all new transactions. + transactions_rx: TransactionToProcessRx, + /// A channel that receives readiness notifications from workers, + /// indicating they are free to accept new work. + ready_rx: Receiver, + /// A list of sender channels, one for each `TransactionExecutor` worker. + executors: Vec>, + /// A handle to the globally shared cache for loaded BPF programs. + program_cache: Arc>>, + /// A handle to the globally shared state of the latest block. + latest_block: LatestBlock, +} + +impl TransactionScheduler { + /// Creates and initializes a new `TransactionScheduler` and its associated pool of workers. + /// + /// This function performs the initial setup for the entire transaction processing pipeline: + /// 1. Prepares the shared program cache and ensures necessary sysvars are in the `AccountsDb`. + /// 2. Creates a pool of `TransactionExecutor` workers, each with its own dedicated channel. + /// 3. Spawns each worker in its own OS thread for maximum isolation and performance. + pub fn new(executors: u32, state: TransactionSchedulerState) -> Self { + let count = executors.clamp(1, MAX_SVM_EXECUTORS) as usize; + let mut executors = Vec::with_capacity(count); + + // Create the back-channel for workers to signal their readiness. + let (ready_tx, ready_rx) = channel(count); + // Perform one-time setup of the shared program cache and sysvars. + let program_cache = state.prepare_programs_cache(); + state.prepare_sysvars(); + + for id in 0..count { + let (transactions_tx, transactions_rx) = + channel(EXECUTOR_QUEUE_CAPACITY); + let executor = TransactionExecutor::new( + id as u32, + &state, + transactions_rx, + ready_tx.clone(), + program_cache.clone(), + ); + executor.populate_builtins(); + executor.spawn(); + executors.push(transactions_tx); + } + let coordinator = ExecutionCoordinator::new(count); + Self { + coordinator, + transactions_rx: state.txn_to_process_rx, + ready_rx, + executors, + latest_block: state.ledger.latest_block().clone(), + program_cache, + } + } + + /// Spawns the scheduler's main event loop into a new, dedicated OS thread. + /// + /// The scheduler runs in its own thread with a dedicated single-threaded Tokio + /// runtime. This design ensures that the scheduling logic, which is a critical + /// path, does not compete for resources with other tasks. + pub fn spawn(self) { + let task = move || { + let runtime = Builder::new_current_thread() + .thread_name("transaction-scheduler") + .build() + .expect("Failed to build single-threaded Tokio runtime"); + runtime.block_on(tokio::task::unconstrained(self.run())); + }; + std::thread::spawn(task); + } + + /// The main event loop of the transaction scheduler. + /// + /// This loop multiplexes between three primary events using `tokio::select!`: + /// 1. **Worker Readiness**: A worker signals it is ready for a new task. + /// 2. **New Transaction**: A new transaction arrives for processing. + /// 3. **New Block**: A new block is produced, triggering a slot transition. + /// + /// The `biased` selection ensures that ready workers are processed before + /// the incoming transactions, which helps to keep the pipeline full and + /// maximize throughput. + async fn run(mut self) { + let mut block_produced = self.latest_block.subscribe(); + loop { + tokio::select! { + biased; + // A new block has been produced. + Ok(()) = block_produced.recv() => { + self.transition_to_new_slot(); + } + // A worker has finished its task and is ready for more. + Some(executor) = self.ready_rx.recv() => { + self.handle_ready_executor(executor); + } + // Receive new transactions for scheduling, but + // only if there is at least one ready worker. + Some(txn) = self.transactions_rx.recv(), if self.coordinator.is_ready() => { + self.handle_new_transaction(txn); + } + // The main transaction channel has closed, indicating a system shutdown. + else => { + break + } + } + } + info!("Transaction scheduler has terminated"); + } + + /// Handles a notification that a worker has become ready. + fn handle_ready_executor(&mut self, executor: ExecutorId) { + self.coordinator.unlock_accounts(executor); + self.reschedule_blocked_transactions(executor); + } + + /// Handles a new transaction from the global queue. + fn handle_new_transaction(&mut self, txn: ProcessableTransaction) { + // SAFETY: + // This unwrap is safe due to the `if self.coordinator.is_ready()` + // guard in the `select!` macro, which calls this method + let executor = self.coordinator.get_ready_executor().expect( + "unreachable: is_ready() guard ensures an executor is available", + ); + let txn = TransactionWithId::new(txn); + self.schedule_transaction(executor, txn); + } + + /// Updates the scheduler's state when a new slot begins. + fn transition_to_new_slot(&self) { + // Re-root the shared program cache to the new slot. + self.program_cache.write().unwrap().latest_root_slot = + self.latest_block.load().slot; + } + + /// Attempts to reschedule transactions that were blocked by the newly freed executor. + fn reschedule_blocked_transactions(&mut self, blocker: ExecutorId) { + let mut executor = Some(blocker); + while let Some(exec) = executor.take() { + let txn = self.coordinator.next_blocked_transaction(blocker); + let blocked = if let Some(txn) = txn { + executor = self.coordinator.get_ready_executor(); + self.schedule_transaction(exec, txn) + } else { + self.coordinator.release_executor(exec); + break; + }; + // Here we check whether the transaction was blocked and re-queued: + // 1. If it was blocked by other executor (not the original blocker), + // then we continue with scheduling attempts, so that either the newly + // freed executor has some work to do, or its own queue is exhausted + // 2. The transaction is being blocked by the same original (newly freed) + // executor, which means we have re-queued it into the same queue, and + // we just abort all further scheduling attempts until the next cycle + if let Some(exec) = blocked { + if exec == blocker { + break; + } + } + // If the transaction was re-queued to another executor or successfully + // scheduled, then we keep draining the queue of original blocker + } + // If we have broken out of the loop holding some executor, release it + if let Some(executor) = executor { + self.coordinator.release_executor(executor); + } + } + + /// Attempts to schedule a single transaction for execution. + /// + /// If the transaction's required account locks are acquired, it is sent to the + /// specified executor. Otherwise, it is queued and will be retried later. The + /// optional return value indicates a blocking executor, which is used by caller + /// to make further decisions regarding further scheduling attempts. + fn schedule_transaction( + &mut self, + executor: ExecutorId, + txn: TransactionWithId, + ) -> Option { + let blocker = self.coordinator.try_acquire_locks(executor, &txn); + if let Err(blocker) = blocker { + self.coordinator.release_executor(executor); + let blocker = self.coordinator.queue_transaction(blocker, txn); + return Some(blocker); + } + // It's safe to ignore the result of the send operation. If the send fails, + // it means the executor's channel is closed, which only happens on shutdown. + // NOTE: the channel will always have enough capacity, since the executor was + // marked ready, which means that its transaction queue is currently empty. + let _ = self.executors[executor as usize].try_send(txn.txn).inspect_err(|e| { + error!("Executor {executor} has shutdown or crashed, should not be possible: {e}") + }); + None + } +} + +pub mod coordinator; +pub mod locks; +pub mod state; +#[cfg(test)] +mod tests; + +// SAFETY: +// Rc used within the scheduler never escapes to other threads +unsafe impl Send for TransactionScheduler {} diff --git a/magicblock-processor/src/scheduler/tests.rs b/magicblock-processor/src/scheduler/tests.rs new file mode 100644 index 000000000..932cf0aaa --- /dev/null +++ b/magicblock-processor/src/scheduler/tests.rs @@ -0,0 +1,474 @@ +use magicblock_core::link::transactions::{ + ProcessableTransaction, SanitizeableTransaction, TransactionProcessingMode, +}; +use solana_keypair::Keypair; +use solana_program::{ + hash::Hash, + instruction::{AccountMeta, Instruction}, +}; +use solana_pubkey::Pubkey; +use solana_signer::Signer; +use solana_transaction::Transaction; + +use super::coordinator::{ExecutionCoordinator, TransactionWithId}; +use crate::scheduler::ExecutorId; + +// --- Test Setup --- + +/// Creates a mock transaction with the specified accounts for testing. +fn create_mock_transaction( + accounts: &[(Pubkey, bool)], // A tuple of (PublicKey, is_writable) +) -> TransactionWithId { + let payer = Keypair::new(); + let instructions: Vec = accounts + .iter() + .map(|(pubkey, is_writable)| { + let meta = if *is_writable { + AccountMeta::new(*pubkey, false) + } else { + AccountMeta::new_readonly(*pubkey, false) + }; + Instruction::new_with_bincode(Pubkey::new_unique(), &(), vec![meta]) + }) + .collect(); + + let transaction = Transaction::new_signed_with_payer( + &instructions, + Some(&payer.pubkey()), + &[payer], + Hash::new_unique(), + ); + + let processable_txn = ProcessableTransaction { + transaction: transaction.sanitize(false).unwrap(), + mode: TransactionProcessingMode::Execution(None), + }; + TransactionWithId::new(processable_txn) +} + +// --- Basic Tests --- + +#[test] +/// Tests that two transactions with no overlapping accounts can be scheduled concurrently. +fn test_non_conflicting_transactions() { + let mut coordinator = ExecutionCoordinator::new(2); + + // Two transactions writing to different accounts + let txn1 = create_mock_transaction(&[(Pubkey::new_unique(), true)]); + let txn2 = create_mock_transaction(&[(Pubkey::new_unique(), true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + + // Both transactions should acquire locks without any issues. + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire lock without conflict" + ); + assert!( + coordinator.try_acquire_locks(exec2, &txn2).is_ok(), + "Txn2 should acquire lock without conflict" + ); +} + +#[test] +/// Tests that multiple transactions can take read locks on the same account concurrently. +fn test_read_read_no_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, false)]); + let txn2 = create_mock_transaction(&[(shared_account, false)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + + // Both transactions should be able to acquire read locks on the same account. + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire read lock" + ); + assert!( + coordinator.try_acquire_locks(exec2, &txn2).is_ok(), + "Txn2 should also acquire read lock" + ); +} + +// --- Contention Tests --- + +#[test] +/// Tests that a write lock blocks another write lock on the same account. +fn test_write_write_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire write lock" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker = + coordinator.try_acquire_locks(exec2, &txn2).unwrap_err() as ExecutorId; + + // Txn2 should be blocked by the executor holding the lock (exec1). + assert_eq!(blocker, exec1, "Txn2 should be blocked by executor 1"); +} + +#[test] +/// Tests that a write lock blocks a read lock on the same account. +fn test_write_read_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, false)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire write lock" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker = + coordinator.try_acquire_locks(exec2, &txn2).unwrap_err() as ExecutorId; + + // Txn2 should be blocked by exec1. + assert_eq!( + blocker, exec1, + "Read lock (Txn2) should be blocked by write lock (Txn1)" + ); +} + +#[test] +/// Tests that a read lock blocks a write lock on the same account. +fn test_read_write_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, false)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire read lock" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker = + coordinator.try_acquire_locks(exec2, &txn2).unwrap_err() as ExecutorId; + + // Txn2 should be blocked by exec1. + assert_eq!( + blocker, exec1, + "Write lock (Txn2) should be blocked by read lock (Txn1)" + ); +} + +// --- Advanced Scenarios --- + +#[test] +/// Tests contention with a mix of read and write locks across multiple accounts. +fn test_multiple_mixed_locks_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + let acc_c = Pubkey::new_unique(); + + // Txn 1: Writes A, Reads B + let txn1 = create_mock_transaction(&[(acc_a, true), (acc_b, false)]); + // Txn 2: Reads A, Writes C + let txn2 = create_mock_transaction(&[(acc_a, false), (acc_c, true)]); + // Txn 3: Writes B, Writes C + let txn3 = create_mock_transaction(&[(acc_b, true), (acc_c, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should lock A (write) and B (read)" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + // Txn2 should be blocked by Txn1's write lock on A. + assert_eq!( + coordinator.try_acquire_locks(exec2, &txn2).unwrap_err() as ExecutorId, + exec1, + "Txn2 should be blocked by Txn1 on account A" + ); + + // Txn3 should be blocked by Txn1's read lock on B. + assert_eq!( + coordinator.try_acquire_locks(exec2, &txn3).unwrap_err() as ExecutorId, + exec1, + "Txn3 should be blocked by Txn1 on account B" + ); +} + +#[test] +/// Tests a chain of dependencies: Txn3 waits for Txn2, which waits for Txn1. +fn test_transaction_dependency_chain() { + let mut coordinator = ExecutionCoordinator::new(3); + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true), (acc_a, false)]); + let txn3 = create_mock_transaction(&[(acc_b, false)]); + + // Schedule Txn1, which locks A for writing. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + + // Txn2 needs to read A, so it's blocked by Txn1. + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker1 = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + assert_eq!( + blocker1 as ExecutorId, exec1, + "Txn2 should be blocked by exec1" + ); + coordinator.queue_transaction(blocker1, txn2); + + // Txn3 needs to read B, but Txn2 (which writes to B) is already queued. + // So, Txn3 should be blocked by Txn2's transaction ID. + let exec3 = coordinator.get_ready_executor().unwrap(); + let blocker2 = coordinator.try_acquire_locks(exec3, &txn3).unwrap_err(); + let blocked_txn = coordinator.next_blocked_transaction(exec1).unwrap(); + assert_eq!( + blocker2, blocked_txn.id, + "Txn3 should be blocked by the transaction ID of Txn2" + ); +} + +#[test] +/// Simulates a scenario where all executors are busy, and a new transaction gets queued and then rescheduled. +fn test_full_executor_pool_and_reschedule() { + let mut coordinator = ExecutionCoordinator::new(2); + + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + let acc_c = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true)]); + let txn3 = create_mock_transaction(&[(acc_a, true), (acc_c, true)]); + + // Occupy both available executors. + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + assert!(coordinator.try_acquire_locks(exec2, &txn2).is_ok()); + + // No more ready executors should be available. + assert!( + coordinator.get_ready_executor().is_none(), + "Executor pool should be empty" + ); + + // Txn3 arrives and contends with Txn1 on account A. + let blocker = coordinator.try_acquire_locks(exec1, &txn3).unwrap_err(); + assert_eq!(blocker as ExecutorId, exec1); + coordinator.queue_transaction(blocker, txn3); + + // Executor 1 finishes its work and releases its locks. + coordinator.unlock_accounts(exec1); + coordinator.release_executor(exec1); + + // Now that an executor is free, we should be able to reschedule the blocked transaction. + let ready_exec = coordinator.get_ready_executor().unwrap(); + let blocked_txn = coordinator.next_blocked_transaction(exec1).unwrap(); + assert!( + coordinator + .try_acquire_locks(ready_exec, &blocked_txn) + .is_ok(), + "Should be able to reschedule the blocked transaction" + ); +} + +// --- Edge Cases --- + +#[test] +/// Tests that a transaction with no accounts can be processed without issues. +fn test_transaction_with_no_accounts() { + let mut coordinator = ExecutionCoordinator::new(1); + let txn = create_mock_transaction(&[]); + let exec = coordinator.get_ready_executor().unwrap(); + + assert!( + coordinator.try_acquire_locks(exec, &txn).is_ok(), + "Transaction with no accounts should not fail" + ); +} + +#[test] +/// Tests that many read locks can be acquired on the same account concurrently. +fn test_multiple_read_locks_on_same_account() { + let mut coordinator = ExecutionCoordinator::new(3); + let shared_account = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(shared_account, false)]); + let txn2 = create_mock_transaction(&[(shared_account, false)]); + let txn3 = create_mock_transaction(&[(shared_account, false)]); + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + let exec3 = coordinator.get_ready_executor().unwrap(); + + // All three should acquire read locks without contention. + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + assert!(coordinator.try_acquire_locks(exec2, &txn2).is_ok()); + assert!(coordinator.try_acquire_locks(exec3, &txn3).is_ok()); +} + +#[test] +/// Tests a rapid lock-unlock-lock cycle to ensure state is managed correctly. +fn test_rapid_lock_unlock_cycle() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + + // Lock, unlock, and then lock again with a different transaction. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + coordinator.unlock_accounts(exec1); + coordinator.release_executor(exec1); + + let exec2 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec2, &txn2).is_ok(), + "Should be able to lock the account again after it was released" + ); +} + +#[test] +/// Tests rescheduling multiple transactions that were all blocked by the same executor. +fn test_reschedule_multiple_blocked_on_same_executor() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + let txn3 = create_mock_transaction(&[(shared_account, true)]); + + // Txn1 takes the lock. Txn2 and Txn3 are queued as blocked. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker1 = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + coordinator.queue_transaction(blocker1, txn2); + let blocker2 = coordinator.try_acquire_locks(exec2, &txn3).unwrap_err(); + coordinator.queue_transaction(blocker2, txn3); + + // Txn1 finishes. + coordinator.unlock_accounts(exec1); + coordinator.release_executor(exec1); + + // The first blocked transaction (Txn2) should now be schedulable. + let ready_exec = coordinator.get_ready_executor().unwrap(); + let blocked_txn1 = coordinator.next_blocked_transaction(exec1).unwrap(); + let result = coordinator.try_acquire_locks(ready_exec, &blocked_txn1); + assert!( + result.is_ok(), + "First blocked transaction should be reschedulable" + ); + + // The second blocked transaction (Txn3) should still be in the queue. + assert!( + coordinator.next_blocked_transaction(exec1).is_some(), + "Second blocked transaction should still be queued" + ); +} + +#[test] +/// Tests a transaction that contends on multiple accounts held by different executors. +fn test_contention_on_multiple_accounts() { + let mut coordinator = ExecutionCoordinator::new(3); + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true)]); + // This transaction will contend with both Txn1 and Txn2. + let txn3 = create_mock_transaction(&[(acc_a, true), (acc_b, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + assert!(coordinator.try_acquire_locks(exec2, &txn2).is_ok()); + + let exec3 = coordinator.get_ready_executor().unwrap(); + // The coordinator should report the first detected contention. + let blocker = coordinator.try_acquire_locks(exec3, &txn3).unwrap_err(); + assert_eq!( + blocker as ExecutorId, exec1, + "Should be blocked by the first contended account (A)" + ); +} + +#[test] +/// Tests that no ready executors are available when the pool is fully utilized. +fn test_no_ready_executors() { + let mut coordinator = ExecutionCoordinator::new(1); + let txn1 = create_mock_transaction(&[(Pubkey::new_unique(), true)]); + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + + // The only executor is now busy. + assert!( + coordinator.get_ready_executor().is_none(), + "There should be no ready executors" + ); +} + +#[test] +/// Tests that an executor can release locks and immediately reacquire new ones. +fn test_release_and_reacquire_lock() { + let mut coordinator = ExecutionCoordinator::new(1); + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + coordinator.unlock_accounts(exec1); + + // The executor should be able to immediately acquire a lock on a different account. + assert!( + coordinator.try_acquire_locks(exec1, &txn2).is_ok(), + "Executor should be able to reacquire a lock after releasing" + ); +} + +#[test] +/// Tests a scenario where a transaction is blocked by another transaction that is itself already queued. +fn test_transaction_blocked_by_queued_transaction() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + let txn3 = create_mock_transaction(&[(shared_account, true)]); + + // Txn1 acquires the lock. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + + // Txn2 is blocked by Txn1. + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker1 = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + assert_eq!(blocker1 as ExecutorId, exec1); + coordinator.queue_transaction(blocker1, txn2); + + // Txn3 is blocked by the already queued Txn2. The error should be the transaction ID. + let blocker2 = coordinator.try_acquire_locks(exec2, &txn3).unwrap_err(); + let blocked_txn = coordinator.next_blocked_transaction(exec1).unwrap(); + assert_eq!( + blocker2, blocked_txn.id, + "Txn3 should be blocked by the ID of the queued Txn2" + ); +} diff --git a/magicblock-processor/tests/fees.rs b/magicblock-processor/tests/fees.rs index ca559dfd1..70ae7ab8f 100644 --- a/magicblock-processor/tests/fees.rs +++ b/magicblock-processor/tests/fees.rs @@ -171,7 +171,7 @@ async fn test_escrowed_payer_success() { "escrow account update should have been sent" ); assert!( - !updated_accounts.contains(&env.payer.pubkey()), + !updated_accounts.contains(&env.get_payer().pubkey), "orginal payer account update should not have been sent" ); assert_eq!( @@ -267,7 +267,7 @@ async fn test_escrow_charged_for_failed_transaction() { #[tokio::test] async fn test_transaction_gasless_mode() { // Initialize the environment with a base fee of 0. - let env = ExecutionTestEnv::new_with_fee(0); + let env = ExecutionTestEnv::new_with_config(0, 1, false); let mut payer = env.get_payer(); payer.set_lamports(1); // Not enough to cover standard fee payer.set_delegated(false); // Explicitly set the payer as NON-delegated. diff --git a/magicblock-processor/tests/scheduling.rs b/magicblock-processor/tests/scheduling.rs new file mode 100644 index 000000000..d8c931d2f --- /dev/null +++ b/magicblock-processor/tests/scheduling.rs @@ -0,0 +1,788 @@ +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use guinea::GuineaInstruction; +use solana_account::ReadableAccount; +use solana_program::{ + instruction::{AccountMeta, Instruction}, + native_token::LAMPORTS_PER_SOL, +}; +use solana_pubkey::Pubkey; +use solana_signature::Signature; +use solana_transaction::Transaction; +use test_kit::{ExecutionTestEnv, Signer}; +use tokio::time; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); +const STRESS_TEST_TIMEOUT: Duration = Duration::from_secs(10); +const DEFAULT_LAMPORTS: u64 = LAMPORTS_PER_SOL * 10; +const TRANSFER_AMOUNT: u64 = 1000; +const FEE: u64 = ExecutionTestEnv::BASE_FEE; + +// ################################################################# +// ## Helpers +// ################################################################# + +/// Creates an `ExecutionTestEnv` with the specified executor count +/// and `defer_startup` set to `true`. +fn setup_env(executors: u32) -> ExecutionTestEnv { + ExecutionTestEnv::new_with_config(FEE, executors, true) +} + +/// Creates N accounts owned by the `guinea` program. +/// These are used for `WriteByteToData` and `PrintSizes` instructions. +fn create_accounts(env: &ExecutionTestEnv, count: usize) -> Vec { + (0..count) + .map(|_| { + env.create_account_with_config(DEFAULT_LAMPORTS, 128, guinea::ID) + .pubkey() + }) + .collect() +} + +/// Builds a `Transfer` transaction from `from` to `to`. +/// The `env.payer` pays the fee. +fn build_transfer_tx( + env: &ExecutionTestEnv, + from: &Pubkey, + to: &Pubkey, + lamports: u64, +) -> (Transaction, Signature) { + let ix = Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::Transfer(lamports), + vec![AccountMeta::new(*from, false), AccountMeta::new(*to, false)], + ); + + // `env.build_transaction` signs with `env.payer` + let tx = env.build_transaction(&[ix]); + let sig = tx.signatures[0]; + (tx, sig) +} + +/// Builds a `WriteByteToData` transaction. +/// The `account_pubkey` must be owned by `guinea::ID`. +fn build_write_tx( + env: &ExecutionTestEnv, + account_pubkey: &Pubkey, + value: u8, +) -> (Transaction, Signature) { + let ix = Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::WriteByteToData(value), + vec![AccountMeta::new(*account_pubkey, false)], + ); + let tx = env.build_transaction(&[ix]); + let sig = tx.signatures[0]; + (tx, sig) +} + +/// Builds a `PrintSizes` (read-only) transaction. +fn build_readonly_tx( + env: &ExecutionTestEnv, + accounts: &[Pubkey], +) -> (Transaction, Signature) { + let metas = accounts + .iter() + .map(|pk| AccountMeta::new_readonly(*pk, false)) + .collect(); + let ix = Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::PrintSizes, + metas, + ); + let tx = env.build_transaction(&[ix]); + let sig = tx.signatures[0]; + (tx, sig) +} + +/// Drains the status channel and asserts all expected transactions were successful. +async fn assert_statuses( + env: &ExecutionTestEnv, + mut expected_sigs: HashSet, + timeout: Duration, +) { + let start = std::time::Instant::now(); + + let count = expected_sigs.len(); + while !expected_sigs.is_empty() { + // Check for overall test timeout + if start.elapsed() >= timeout { + panic!( + "Timeout waiting for transaction statuses. Expected: {count}, Missing: {}", + expected_sigs.len() + ); + } + + let recv = env.dispatch.transaction_status.recv_async(); + match time::timeout(Duration::from_millis(100), recv).await { + Ok(Ok(status)) => { + // Received a status + assert!( + status.result.result.is_ok(), + "Transaction {} failed: {:?}", + status.signature, + status.result.result + ); + // Check that we expected this signature + assert!( + expected_sigs.remove(&status.signature), + "Received unexpected signature: {}", + status.signature + ); + } + Ok(Err(e)) => { + // Channel disconnected + panic!("Transaction status channel disconnected: {:?}", e); + } + Err(_) => { + // `recv_async` timed out (poll), this is fine, loop again + continue; + } + } + } +} + +/// Drains the status channel and asserts all transactions +/// were successful *and* executed in the exact order specified. +async fn assert_statuses_in_order( + env: &ExecutionTestEnv, + expected_sigs_order: Vec, + timeout: Duration, +) { + let start = std::time::Instant::now(); + let mut received_sigs_order = Vec::with_capacity(expected_sigs_order.len()); + let expected_len = expected_sigs_order.len(); + + while received_sigs_order.len() < expected_len { + // Check for overall test timeout + if start.elapsed() >= timeout { + panic!( + "Timeout waiting for transaction statuses. Expected {} statuses, but only got {}. Missing: {:?}", + expected_len, + received_sigs_order.len(), + &expected_sigs_order[received_sigs_order.len()..] + ); + } + + let recv = env.dispatch.transaction_status.recv_async(); + match time::timeout(Duration::from_millis(100), recv).await { + Ok(Ok(status)) => { + // Received a status + assert!( + status.result.result.is_ok(), + "Transaction {} failed: {:?}", + status.signature, + status.result.result + ); + received_sigs_order.push(status.signature); + } + Ok(Err(e)) => { + // Channel disconnected + panic!("Transaction status channel disconnected: {:?}", e); + } + Err(_) => { + // `recv_async` timed out (poll), this is fine, loop again + continue; + } + } + } + + // Verify the execution order matches the scheduling order. + assert_eq!( + received_sigs_order, expected_sigs_order, + "Transactions were not executed in the expected order." + ); +} + +/// Schedules all transactions sequentially and returns their signatures. +async fn schedule_all( + env: &ExecutionTestEnv, + txs: Vec, +) -> HashSet { + let sigs = txs.iter().map(|tx| tx.signatures[0]).collect(); + for s in txs.into_iter().map(|tx| env.schedule_transaction(tx)) { + s.await; + } + sigs +} + +/// Schedules all transactions sequentially and returns +/// their signatures in an ordered Vec. +async fn schedule_all_and_get_order( + env: &ExecutionTestEnv, + txs: Vec, +) -> Vec { + let mut sigs = Vec::with_capacity(txs.len()); + for tx in txs { + sigs.push(tx.signatures[0]); + env.schedule_transaction(tx).await; + } + sigs +} + +// ################################################################# +// ## Test Scenarios +// ################################################################# + +/// **Scenario 1: Parallel Transfers (No Conflicts)** +/// +/// Schedules N transactions that do not conflict (e.g., A->B, C->D, E->F). +/// With multiple executors, these should all be processed in parallel. +async fn scenario_parallel_transfers(executors: u32) { + let mut env = setup_env(executors); + let num_pairs = 20; + let accounts = create_accounts(&env, num_pairs * 2); + let mut initial_balances = HashMap::new(); + let mut txs = vec![]; + + for i in 0..num_pairs { + let from = &accounts[i * 2]; + let to = &accounts[i * 2 + 1]; + initial_balances.insert(from, env.get_account(*from).lamports()); + initial_balances.insert(to, env.get_account(*to).lamports()); + + let (tx, _) = build_transfer_tx(&env, from, to, TRANSFER_AMOUNT); + txs.push(tx); + // we force a different slot to generate a new signature + env.advance_slot(); + } + + let sigs = schedule_all(&env, txs).await; + env.run_scheduler(); + env.advance_slot(); + + assert_statuses(&env, sigs.clone(), DEFAULT_TIMEOUT).await; + + // Verify final state + // Ensure all writes are committed + for i in 0..num_pairs { + let from_pk = accounts[i * 2]; + let to_pk = accounts[i * 2 + 1]; + let final_from = env.get_account(from_pk).lamports(); + let final_to = env.get_account(to_pk).lamports(); + let initial_from = initial_balances.get(&from_pk).unwrap(); + let initial_to = initial_balances.get(&to_pk).unwrap(); + + // `from` account doesn't pay the fee, `env.payer` does. + assert_eq!(final_from, initial_from - TRANSFER_AMOUNT); + assert_eq!(final_to, initial_to + TRANSFER_AMOUNT); + } + + // Verify ledger + for sig in sigs { + assert!( + env.get_transaction(sig).is_some(), + "Transaction {sig} not in ledger", + ); + } +} + +/// **Scenario 2: Conflicting Transfers (Write Lock on 1 Account)** +/// +/// Schedules N transactions that all write to the *same account* (e.g., B->A, C->A, D->A). +/// The scheduler must serialize these, regardless of executor count. +async fn scenario_conflicting_transfers(executors: u32) { + let mut env = setup_env(executors); + let num_senders = 10; + let mut accounts = create_accounts(&env, num_senders + 1); + let recipient_pk = accounts.pop().unwrap(); // Account A + let senders = accounts; // B, C, D... + + let initial_recipient_balance = env.get_account(recipient_pk).lamports(); + let mut initial_sender_balances = HashMap::new(); + let mut txs = vec![]; + + for sender in &senders { + initial_sender_balances + .insert(sender, env.get_account(*sender).lamports()); + let (tx, _) = + build_transfer_tx(&env, sender, &recipient_pk, TRANSFER_AMOUNT); + txs.push(tx); + // we force a different slot to generate a new signature + env.advance_slot(); + } + + let sigs = schedule_all(&env, txs).await; + env.run_scheduler(); + env.advance_slot(); + + assert_statuses(&env, sigs, DEFAULT_TIMEOUT).await; + + // Verify final state + let final_recipient_balance = env.get_account(recipient_pk).lamports(); + let total_received = TRANSFER_AMOUNT * num_senders as u64; + assert_eq!( + final_recipient_balance, + initial_recipient_balance + total_received + ); + + for sender in &senders { + let final_sender = env.get_account(*sender).lamports(); + let initial_sender = initial_sender_balances.get(sender).unwrap(); + // `sender` account doesn't pay the fee, `env.payer` does. + assert_eq!(final_sender, initial_sender - TRANSFER_AMOUNT); + } +} + +/// **Scenario 3: Parallel ReadOnly Transactions** +/// +/// Schedules N transactions that are all read-only. +/// These should all execute in parallel. +async fn scenario_readonly_parallel(executors: u32) { + let mut env = setup_env(executors); + let num_txs = 50; + let accounts = create_accounts(&env, 10); + let mut txs = vec![]; + + for i in 0..num_txs { + let acc_idx1 = i % 10; + let acc_idx2 = (i + 1) % 10; + let (tx, _) = + build_readonly_tx(&env, &[accounts[acc_idx1], accounts[acc_idx2]]); + txs.push(tx); + // we force a different slot to generate a new signature + env.advance_slot(); + } + + let sigs = schedule_all(&env, txs).await; + env.run_scheduler(); + env.advance_slot(); + + assert_statuses(&env, sigs, DEFAULT_TIMEOUT).await +} + +/// **Scenario 4: Mixed Workload (Conflicts + Parallel)** +/// +/// Schedules a mix of conflicting and non-conflicting transactions. +/// T1: A -> B (Conflicts with T3) +/// T2: C -> D (Parallel) +/// T3: B -> A (Conflicts with T1) +/// T4: E -> F (Parallel) +async fn scenario_mixed_workload(executors: u32) { + let mut env = setup_env(executors); + let num_sets = 10; + let accounts = create_accounts(&env, 6); + let (acc_a, acc_b, acc_c, acc_d, acc_e, acc_f) = ( + &accounts[0], + &accounts[1], + &accounts[2], + &accounts[3], + &accounts[4], + &accounts[5], + ); + + let mut initial_balances = HashMap::new(); + for acc in &accounts { + initial_balances.insert(acc, env.get_account(*acc).lamports()); + } + + let mut txs = vec![]; + for _ in 0..num_sets { + // T1: A -> B + let (tx1, _) = build_transfer_tx(&env, acc_a, acc_b, TRANSFER_AMOUNT); + txs.push(tx1); + // T2: C -> D + let (tx2, _) = build_transfer_tx(&env, acc_c, acc_d, TRANSFER_AMOUNT); + txs.push(tx2); + // T3: B -> A + let (tx3, _) = build_transfer_tx(&env, acc_b, acc_a, TRANSFER_AMOUNT); + txs.push(tx3); + // T4: E -> F + let (tx4, _) = build_transfer_tx(&env, acc_e, acc_f, TRANSFER_AMOUNT); + txs.push(tx4); + // we force a different slot to generate a new signature + env.advance_slot(); + } + + let sigs = schedule_all(&env, txs).await; + env.run_scheduler(); + env.advance_slot(); + + assert_statuses(&env, sigs, DEFAULT_TIMEOUT).await; + + // Verify final state + env.advance_slot(); + let n = num_sets as u64; + + // A and B transferred back and forth `n` times. + // Their net change is 0, as they are not paying fees. + assert_eq!( + env.get_account(*acc_a).lamports(), + *initial_balances.get(acc_a).unwrap() + ); + assert_eq!( + env.get_account(*acc_b).lamports(), + *initial_balances.get(acc_b).unwrap() + ); + // C, D, E, F just did one-way transfers. No fees paid by them. + assert_eq!( + env.get_account(*acc_c).lamports(), + initial_balances.get(acc_c).unwrap() - (TRANSFER_AMOUNT * n) + ); + assert_eq!( + env.get_account(*acc_d).lamports(), + initial_balances.get(acc_d).unwrap() + (TRANSFER_AMOUNT * n) + ); + assert_eq!( + env.get_account(*acc_e).lamports(), + initial_balances.get(acc_e).unwrap() - (TRANSFER_AMOUNT * n) + ); + assert_eq!( + env.get_account(*acc_f).lamports(), + initial_balances.get(acc_f).unwrap() + (TRANSFER_AMOUNT * n) + ); +} + +/// **Scenario 5: Conflicting Data Writes** +/// +/// Schedules N transactions that all write to the *same account's data* +/// using `WriteByteToData`. This creates a write-lock conflict. +async fn scenario_conflicting_writes(executors: u32) { + let mut env = setup_env(executors); + let num_txs = 20; + let guinea_accounts = create_accounts(&env, 1); + let acc_a = &guinea_accounts[0]; + let mut txs = vec![]; + let mut written_values = HashSet::new(); + + for i in 0..num_txs { + let value = (i + 1) as u8; // Write 1, 2, 3... + let (tx, _) = build_write_tx(&env, acc_a, value); + txs.push(tx); + written_values.insert(value); + // we force a different slot to generate a new signature + env.advance_slot(); + } + + let sigs = schedule_all(&env, txs).await; + env.run_scheduler(); + env.advance_slot(); + + assert_statuses(&env, sigs, DEFAULT_TIMEOUT).await; + + // Verify final state + env.advance_slot(); + let account_data = env.get_account(*acc_a).data().to_vec(); + let final_value = account_data[0]; + + // We can't guarantee *which* write was last, but it must be one of them. + assert!( + written_values.contains(&final_value), + "Final value {} was not in the set of written values", + final_value + ); +} + +/// **Scenario 6: Serial Conflicting Writes (Asserts Order)** +/// +/// Schedules N transactions that all write to the *same account*. +/// Asserts that they are executed sequentially in the *exact order* +/// they were scheduled, regardless of executor count, due to the write lock. +async fn scenario_serial_conflicting_writes(executors: u32) { + let mut env = setup_env(executors); + let num_txs = 20; + let guinea_accounts = create_accounts(&env, 1); + let acc_a = &guinea_accounts[0]; + let mut txs = vec![]; + + for i in 0..num_txs { + let value = i as u8; // Write 0, 1, 2... 19 + let (tx, _) = build_write_tx(&env, acc_a, value); + txs.push(tx); + // we force a different slot to generate a new signature + env.advance_slot(); + } + + // Schedule sequentially and get the ordered signatures + let sigs_order = schedule_all_and_get_order(&env, txs).await; + env.run_scheduler(); + env.advance_slot(); + + // Assert that the statuses were received in the *exact* order of scheduling + assert_statuses_in_order(&env, sigs_order, DEFAULT_TIMEOUT).await; + + // Verify final state + env.advance_slot(); + let account_data = env.get_account(*acc_a).data().to_vec(); + let final_value = account_data[0]; + + // The final value *must* be the last value we wrote (19) + assert_eq!( + final_value, + (num_txs - 1) as u8, + "Final account data should be the last value written" + ); +} + +/// **Scenario 7: Serial Transfer Chain (Asserts Order)** +/// +/// Schedules N transactions in a dependency chain (A->B, B->C, C->D). +/// Asserts that they are executed sequentially in the *exact order* +/// they were scheduled, as each tx depends on the lock from the previous. +async fn scenario_serial_transfer_chain(executors: u32) { + let mut env = setup_env(executors); + let num_txs = 20; + let num_accounts = num_txs + 1; + let accounts = create_accounts(&env, num_accounts); + let mut initial_balances = HashMap::new(); + let mut txs = vec![]; + + for acc in &accounts { + initial_balances.insert(acc, env.get_account(*acc).lamports()); + } + + for i in 0..num_txs { + let from = &accounts[i]; + let to = &accounts[i + 1]; + let (tx, _) = build_transfer_tx(&env, from, to, TRANSFER_AMOUNT); + txs.push(tx); + // we force a different slot to generate a new signature + env.advance_slot(); + } + + // Schedule sequentially and get the ordered signatures + let sigs_order = schedule_all_and_get_order(&env, txs).await; + env.run_scheduler(); + env.advance_slot(); + + // Assert that the statuses were received in the *exact* order of scheduling + assert_statuses_in_order(&env, sigs_order, DEFAULT_TIMEOUT).await; + + // Verify final state + env.advance_slot(); + // First account (A) should have lost lamports + let first_acc = &accounts[0]; + assert_eq!( + env.get_account(*first_acc).lamports(), + initial_balances.get(first_acc).unwrap() - TRANSFER_AMOUNT + ); + + // Intermediary accounts (B, C, ...) should have net-zero change + for int_acc in accounts.iter().take(num_txs).skip(1) { + assert_eq!( + env.get_account(*int_acc).lamports(), + *initial_balances.get(int_acc).unwrap(), + "Intermediary account {} lamports changed", + int_acc + ); + } + + // Last account should have gained lamports + let last_acc = &accounts[num_accounts - 1]; + assert_eq!( + env.get_account(*last_acc).lamports(), + initial_balances.get(last_acc).unwrap() + TRANSFER_AMOUNT + ); +} + +// ################################################################# +// ## Test Matrix +// ################################################################# + +/// Macro to generate tests for different executor counts +macro_rules! test_scenario { + ($scenario_fn:ident, $test_name:ident, $executors:expr) => { + #[tokio::test] + #[allow(non_snake_case)] + async fn $test_name() { + $scenario_fn($executors).await; + } + }; +} + +// --- Test Scenario 1: Parallel Transfers (No Conflicts) --- +test_scenario!( + scenario_parallel_transfers, + test_parallel_transfers_1_executor, + 1 +); +test_scenario!( + scenario_parallel_transfers, + test_parallel_transfers_2_executors, + 2 +); +test_scenario!( + scenario_parallel_transfers, + test_parallel_transfers_4_executors, + 4 +); +test_scenario!( + scenario_parallel_transfers, + test_parallel_transfers_8_executors, + 8 +); + +// --- Test Scenario 2: Conflicting Transfers (Write Lock on 1 Account) --- +test_scenario!( + scenario_conflicting_transfers, + test_conflicting_transfers_1_executor, + 1 +); +test_scenario!( + scenario_conflicting_transfers, + test_conflicting_transfers_2_executors, + 2 +); +test_scenario!( + scenario_conflicting_transfers, + test_conflicting_transfers_4_executors, + 4 +); +test_scenario!( + scenario_conflicting_transfers, + test_conflicting_transfers_8_executors, + 8 +); + +// --- Test Scenario 3: Parallel ReadOnly Txs --- +test_scenario!( + scenario_readonly_parallel, + test_readonly_parallel_1_executor, + 1 +); +test_scenario!( + scenario_readonly_parallel, + test_readonly_parallel_2_executors, + 2 +); +test_scenario!( + scenario_readonly_parallel, + test_readonly_parallel_4_executors, + 4 +); +test_scenario!( + scenario_readonly_parallel, + test_readonly_parallel_8_executors, + 8 +); + +// --- Test Scenario 4: Mixed Workload (Conflicts + Parallel) --- +test_scenario!(scenario_mixed_workload, test_mixed_workload_1_executor, 1); +test_scenario!(scenario_mixed_workload, test_mixed_workload_2_executors, 2); +test_scenario!(scenario_mixed_workload, test_mixed_workload_4_executors, 4); +test_scenario!(scenario_mixed_workload, test_mixed_workload_8_executors, 8); + +// --- Test Scenario 5: Conflicting Data Writes --- +test_scenario!( + scenario_conflicting_writes, + test_conflicting_writes_1_executor, + 1 +); +test_scenario!( + scenario_conflicting_writes, + test_conflicting_writes_2_executors, + 2 +); +test_scenario!( + scenario_conflicting_writes, + test_conflicting_writes_4_executors, + 4 +); +test_scenario!( + scenario_conflicting_writes, + test_conflicting_writes_8_executors, + 8 +); + +// --- Test Scenario 6: Serial Conflicting Writes (Asserts Order) --- +test_scenario!( + scenario_serial_conflicting_writes, + test_serial_conflicting_writes_1_executor, + 1 +); +test_scenario!( + scenario_serial_conflicting_writes, + test_serial_conflicting_writes_2_executors, + 2 +); +test_scenario!( + scenario_serial_conflicting_writes, + test_serial_conflicting_writes_4_executors, + 4 +); +test_scenario!( + scenario_serial_conflicting_writes, + test_serial_conflicting_writes_8_executors, + 8 +); + +// --- Test Scenario 7: Serial Transfer Chain (Asserts Order) --- +test_scenario!( + scenario_serial_transfer_chain, + test_serial_transfer_chain_1_executor, + 1 +); +test_scenario!( + scenario_serial_transfer_chain, + test_serial_transfer_chain_2_executors, + 2 +); +test_scenario!( + scenario_serial_transfer_chain, + test_serial_transfer_chain_4_executors, + 4 +); +test_scenario!( + scenario_serial_transfer_chain, + test_serial_transfer_chain_8_executors, + 8 +); + +// --- Test Scenario 8: Large Queue Stress Test --- +/// **Scenario 8: Large Queue Stress Test** +/// +/// Schedules 1000 transactions of mixed types to check for deadlocks or +/// race conditions under heavy load. +#[tokio::test] +async fn test_large_queue_mixed_8_executors() { + let mut env = setup_env(8); + let num_txs = 1000; + let num_accounts = 100; + + let transfer_accounts = create_accounts(&env, num_accounts); + let guinea_accounts = create_accounts(&env, num_accounts); + let mut txs = vec![]; + + for i in 0..num_txs { + let r = i % 4; + let idx = i % num_accounts; + + let tx = match r { + // 0: Non-conflicting transfer (A->B, B->C, ...) + 0 => { + let from = &transfer_accounts[idx]; + let to_idx = (idx + 1) % num_accounts; + let to = &transfer_accounts[to_idx]; + build_transfer_tx(&env, from, to, 10).0 + } + // 1: Conflicting write (all write to guinea_accounts[0]) + 1 => build_write_tx(&env, &guinea_accounts[0], i as u8).0, + // 2: Non-conflicting write (A writes A, B writes B, ...) + 2 => build_write_tx(&env, &guinea_accounts[idx], i as u8).0, + // 3: Readonly + _ => { + build_readonly_tx( + &env, + &[ + guinea_accounts[idx], + guinea_accounts[(idx + 1) % num_accounts], + ], + ) + .0 + } + }; + txs.push(tx); + // we force a different slot to generate a new signature + env.advance_slot(); + } + + let sigs = schedule_all(&env, txs).await; + env.run_scheduler(); + env.advance_slot(); + + // Use a longer timeout for the large queue + assert_statuses(&env, sigs, STRESS_TEST_TIMEOUT).await; +} diff --git a/magicblock-table-mania/src/manager.rs b/magicblock-table-mania/src/manager.rs index 4901ccd7a..792ad5f1e 100644 --- a/magicblock-table-mania/src/manager.rs +++ b/magicblock-table-mania/src/manager.rs @@ -377,8 +377,8 @@ impl TableMania { if self.randomize_lookup_table_slot { use rand::Rng; - let mut rng = rand::thread_rng(); - let random_slot = rng.gen_range(0..=u64::MAX); + let mut rng = rand::rng(); + let random_slot = rng.random_range(0..=u64::MAX); SUB_SLOT.store(random_slot, Ordering::Relaxed); } else { static LAST_SLOT: AtomicU64 = AtomicU64::new(0); diff --git a/magicblock-validator/Cargo.toml b/magicblock-validator/Cargo.toml index 4820ca4b4..9ac23d13a 100644 --- a/magicblock-validator/Cargo.toml +++ b/magicblock-validator/Cargo.toml @@ -16,6 +16,7 @@ log = { workspace = true } magicblock-api = { workspace = true } magicblock-config = { workspace = true } magicblock-version = { workspace = true } +num_cpus = { workspace = true } solana-sdk = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } diff --git a/magicblock-validator/src/main.rs b/magicblock-validator/src/main.rs index 3991d4c17..208fc4e7e 100644 --- a/magicblock-validator/src/main.rs +++ b/magicblock-validator/src/main.rs @@ -7,6 +7,7 @@ use magicblock_api::{ }; use magicblock_config::MagicBlockConfig; use solana_sdk::signature::Signer; +use tokio::runtime::Builder; use crate::shutdown::Shutdown; @@ -49,24 +50,21 @@ fn init_logger() { }); } -/// Print informational startup messages. -/// - If `RUST_LOG` is not set or is set to "quiet", prints to stdout using `println!()`. -/// - Otherwise, emits an `info!` log so operators can control visibility -/// (e.g., by setting `RUST_LOG=warn` to hide it). -fn print_info(msg: S) { - let rust_log = std::env::var("RUST_LOG").unwrap_or_default(); - let rust_log_trimmed = rust_log.trim().to_ascii_lowercase(); - let use_plain_print = - rust_log_trimmed.is_empty() || rust_log_trimmed == "quiet"; - if use_plain_print { - println!("{}", msg); - } else { - info!("{}", msg); - } +fn main() { + // We dedicate half of the threads to async runtime (where RPC and other + // io/timer bound services are running), and the other half is allocated + // for the execution runtime (transaction scheduler/executor threads) + let workers = (num_cpus::get() / 2).max(1); + let runtime = Builder::new_multi_thread() + .worker_threads(workers) + .enable_all() + .thread_name("async-runtime") + .build() + .expect("failed to build async runtime"); + runtime.block_on(run()); } -#[tokio::main] -async fn main() { +async fn run() { init_logger(); #[cfg(feature = "tokio-console")] console_subscriber::init(); @@ -137,3 +135,19 @@ async fn main() { } api.stop().await; } + +/// Print informational startup messages. +/// - If `RUST_LOG` is not set or is set to "quiet", prints to stdout using `println!()`. +/// - Otherwise, emits an `info!` log so operators can control visibility +/// (e.g., by setting `RUST_LOG=warn` to hide it). +fn print_info(msg: S) { + let rust_log = std::env::var("RUST_LOG").unwrap_or_default(); + let rust_log_trimmed = rust_log.trim().to_ascii_lowercase(); + let use_plain_print = + rust_log_trimmed.is_empty() || rust_log_trimmed == "quiet"; + if use_plain_print { + println!("{}", msg); + } else { + info!("{}", msg); + } +} diff --git a/test-kit/src/lib.rs b/test-kit/src/lib.rs index a69b204d8..2314d3f66 100644 --- a/test-kit/src/lib.rs +++ b/test-kit/src/lib.rs @@ -1,6 +1,9 @@ use std::{ ops::{Deref, DerefMut}, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, thread, }; @@ -44,8 +47,10 @@ use tempfile::TempDir; /// worker pool. It provides a high-level API for tests to manipulate the blockchain /// state and process transactions. pub struct ExecutionTestEnv { + /// Atomic counter to index the payers array + payer_index: AtomicUsize, /// The default keypair used for paying transaction fees and signing. - pub payer: Keypair, + pub payers: Vec, /// A handle to the accounts database, storing all account states. pub accountsdb: Arc, /// A handle to the ledger, storing all blocks and transactions. @@ -58,6 +63,8 @@ pub struct ExecutionTestEnv { pub dispatch: DispatchEndpoints, /// The "server-side" channel endpoint for broadcasting new block updates. pub blocks_tx: BlockUpdateTx, + /// Transaction execution scheduler/backend for deferred launch + pub scheduler: Option, } impl Default for ExecutionTestEnv { @@ -78,7 +85,7 @@ impl ExecutionTestEnv { /// 4. Pre-loads a test program (`guinea`) for use in tests. /// 5. Funds a default `payer` keypair with 1 SOL. pub fn new() -> Self { - Self::new_with_fee(Self::BASE_FEE) + Self::new_with_config(Self::BASE_FEE, 1, false) } /// Creates a new, fully initialized validator test environment with given base fee @@ -89,7 +96,11 @@ impl ExecutionTestEnv { /// 3. Spawns a `TransactionScheduler` with one worker thread. /// 4. Pre-loads a test program (`guinea`) for use in tests. /// 5. Funds a default `payer` keypair with 1 SOL. - pub fn new_with_fee(fee: u64) -> Self { + pub fn new_with_config( + fee: u64, + executors: u32, + defer_startup: bool, + ) -> Self { init_logger!(); let dir = tempfile::tempdir().expect("creating temp dir for validator state"); @@ -102,16 +113,18 @@ impl ExecutionTestEnv { let (dispatch, validator_channels) = link(); let blockhash = ledger.latest_block().load().blockhash; let environment = build_svm_env(&accountsdb, blockhash, fee); - let payer = Keypair::new(); + let payers = (0..executors).map(|_| Keypair::new()).collect(); - let this = Self { - payer, + let mut this = Self { + payer_index: AtomicUsize::new(0), + payers, accountsdb: accountsdb.clone(), ledger: ledger.clone(), transaction_scheduler: dispatch.transaction_scheduler.clone(), dir, dispatch, blocks_tx: validator_channels.block_update, + scheduler: None, }; this.advance_slot(); // Move to slot 1 to ensure a non-genesis state. @@ -132,13 +145,26 @@ impl ExecutionTestEnv { )]) .expect("failed to load test programs into test env"); - // Start the transaction processing backend. - TransactionScheduler::new(1, scheduler_state).spawn(); + // Start/Defer the transaction processing backend. + let scheduler = TransactionScheduler::new(executors, scheduler_state); + if defer_startup { + this.scheduler.replace(scheduler); + } else { + scheduler.spawn(); + } - this.fund_account(this.payer.pubkey(), LAMPORTS_PER_SOL); + for payer in this.payers.iter() { + this.fund_account(payer.pubkey(), LAMPORTS_PER_SOL); + } this } + pub fn run_scheduler(&mut self) { + if let Some(scheduler) = self.scheduler.take() { + scheduler.spawn(); + } + } + /// Creates a new account with the specified properties. /// Note: This helper automatically marks the account as `delegated`. pub fn create_account_with_config( @@ -222,10 +248,14 @@ impl ExecutionTestEnv { /// Builds a transaction with the given instructions, signed by the default payer. pub fn build_transaction(&self, ixs: &[Instruction]) -> Transaction { + let payer = { + let index = self.payer_index.fetch_add(1, Ordering::Relaxed); + &self.payers[index % self.payers.len()] + }; Transaction::new_signed_with_payer( ixs, - Some(&self.payer.pubkey()), - &[&self.payer], + Some(&payer.pubkey()), + &[payer], self.ledger.latest_blockhash(), ) } @@ -241,6 +271,14 @@ impl ExecutionTestEnv { .inspect_err(|err| error!("failed to execute transaction: {err}")) } + /// Submits a transaction for scheduling and returns + pub async fn schedule_transaction( + &self, + txn: impl SanitizeableTransaction, + ) { + self.transaction_scheduler.schedule(txn).await.unwrap(); + } + /// Submits a transaction for simulation and waits for the detailed result. pub async fn simulate_transaction( &self, @@ -281,7 +319,11 @@ impl ExecutionTestEnv { } pub fn get_payer(&self) -> CommitableAccount { - self.get_account(self.payer.pubkey()) + let payer = { + let index = self.payer_index.load(Ordering::Relaxed); + &self.payers[index % self.payers.len()] + }; + self.get_account(payer.pubkey()) } }