From 5a7852ac66730b516079f90e88f32c8f27a56afd Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 6 Feb 2025 12:59:50 +0900 Subject: [PATCH] Plumb unified scheduler for BP minimally (#4533) * Plumb unified scheduler for BP minimally * Clean up `use`s a bit * Simplify closure interaction * Rework banking stage plubming by extending handler * Add docs * Use RootBankCache * Fix ci * Add mut to decision_maker --- Cargo.lock | 62 +++++- Cargo.toml | 3 + core/src/banking_stage.rs | 7 + core/src/banking_stage/packet_deserializer.rs | 13 ++ core/src/banking_stage/unified_scheduler.rs | 94 ++++++++ core/src/banking_trace.rs | 22 ++ core/tests/unified_scheduler.rs | 122 ++++++++++- programs/sbf/Cargo.lock | 62 +++++- runtime/src/bank_forks.rs | 14 +- runtime/src/installed_scheduler_pool.rs | 4 + runtime/src/root_bank_cache.rs | 1 + svm/examples/Cargo.lock | 62 +++++- unified-scheduler-pool/Cargo.toml | 4 + unified-scheduler-pool/src/lib.rs | 201 ++++++++++++++++-- 14 files changed, 644 insertions(+), 27 deletions(-) create mode 100644 core/src/banking_stage/unified_scheduler.rs diff --git a/Cargo.lock b/Cargo.lock index e75d3225706aef..4ac7d5964019e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1685,6 +1685,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -2033,13 +2042,35 @@ version = "0.99.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40eebddd2156ce1bb37b20bbe5151340a31828b1f2d22ba4141f3531710e38df" dependencies = [ - "convert_case", + "convert_case 0.4.0", "proc-macro2", "quote", "rustc_version 0.3.3", "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "convert_case 0.6.0", + "proc-macro2", + "quote", + "syn 2.0.98", + "unicode-xid", +] + [[package]] name = "dialoguer" version = "0.10.4" @@ -2163,6 +2194,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "eager" version = "0.1.0" @@ -3514,7 +3551,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2b99d4207e2a04fb4581746903c2bb7eb376f88de9c699d0f3e10feeac0cd3a" dependencies = [ - "derive_more", + "derive_more 0.99.16", "futures 0.3.31", "jsonrpc-core", "jsonrpc-pubsub", @@ -10376,11 +10413,14 @@ dependencies = [ name = "solana-unified-scheduler-pool" version = "2.2.0" dependencies = [ + "agave-banking-stage-ingress-types", "aquamarine", "assert_matches", "crossbeam-channel", "dashmap", "derive-where", + "derive_more 1.0.0", + "dyn-clone", "lazy_static", "log", "qualifier_attr", @@ -10402,6 +10442,7 @@ dependencies = [ "solana-unified-scheduler-logic", "static_assertions", "test-case", + "trait-set", "vec_extract_if_polyfill", ] @@ -11957,6 +11998,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "trait-set" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b79e2e9c9ab44c6d7c20d5976961b47e8f49ac199154daa514b77cd1ab536625" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "trees" version = "0.4.2" @@ -12038,6 +12090,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-width" version = "0.1.9" diff --git a/Cargo.toml b/Cargo.toml index 24d3e39925c406..440a183b4b8828 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -228,11 +228,13 @@ curve25519-dalek = { version = "4.1.3", features = ["digest", "rand_core"] } dashmap = "5.5.3" derivation-path = { version = "0.2.0", default-features = false } derive-where = "1.2.7" +derive_more = { version = "1.0.0", features = ["full"] } dialoguer = "0.10.4" digest = "0.10.7" dir-diff = "0.3.3" dirs-next = "2.0.0" dlopen2 = "0.5.0" +dyn-clone = "1.0.17" eager = "0.1.0" ed25519-dalek = "=1.0.1" ed25519-dalek-bip32 = "0.2.0" @@ -591,6 +593,7 @@ toml = "0.8.12" tonic = "0.9.2" tonic-build = "0.9.2" tower = "0.5.2" +trait-set = "0.3.0" trees = "0.4.2" tungstenite = "0.20.1" uriparse = "0.6.4" diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 48dbb205a2b801..54d912286282e7 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -83,6 +83,13 @@ mod read_write_account_set; mod scheduler_messages; mod transaction_scheduler; +// proc_macro_hygiene needs to be stabilzied to use qualifier_attr... +// error[E0658]: non-inline modules in proc macro input are unstable +#[cfg(not(feature = "dev-context-only-utils"))] +pub(crate) mod unified_scheduler; +#[cfg(feature = "dev-context-only-utils")] +pub mod unified_scheduler; + // Fixed thread size seems to be fastest on GCP setup pub const NUM_THREADS: u32 = 6; diff --git a/core/src/banking_stage/packet_deserializer.rs b/core/src/banking_stage/packet_deserializer.rs index 3c1a56b43b01de..32e2dd591ec589 100644 --- a/core/src/banking_stage/packet_deserializer.rs +++ b/core/src/banking_stage/packet_deserializer.rs @@ -198,6 +198,19 @@ impl PacketDeserializer { } }) } + + #[allow(dead_code)] + pub(crate) fn deserialize_packets_with_indexes( + packet_batch: &PacketBatch, + ) -> impl Iterator + '_ { + let packet_indexes = PacketDeserializer::generate_packet_indexes(packet_batch); + packet_indexes.into_iter().filter_map(move |packet_index| { + let packet = packet_batch[packet_index].clone(); + ImmutableDeserializedPacket::new(packet) + .ok() + .map(|packet| (packet, packet_index)) + }) + } } #[cfg(test)] diff --git a/core/src/banking_stage/unified_scheduler.rs b/core/src/banking_stage/unified_scheduler.rs new file mode 100644 index 00000000000000..c05b2b7ce47a6c --- /dev/null +++ b/core/src/banking_stage/unified_scheduler.rs @@ -0,0 +1,94 @@ +//! This module contains any integration code to put the unified scheduler subsystem into the +//! banking stage, starting from `BankingPacketBatch` ingestion from the sig verify stage and to +//! `Task` submission to the unified scheduler. +//! +//! These preprocessing for task creation can be multi-threaded trivially. At the same time, the +//! maximum cpu core utilization needs to be constrained among this processing and the actual task +//! handling of unified scheduler. Thus, it's desired to share a single thread pool for the two +//! kinds of work. Towards that end, the integration was implemented as a callback-style, which is +//! invoked (via `select!` on `banking_packet_receiver`) at each of unified scheduler handler +//! threads. +//! +//! Aside from some limited abstraction leakage to make `select!` work at the +//! solana-unified-scheduler-pool crate, almost all of these preprocessing are intentionally +//! encapsulated into this module, at the cost of dynamic dispatch per each BankingPacketBatch to +//! retain the unified scheduler agnostic scheduler over scheduling mode (block verification vs +//! block production) as much as possible. +//! +//! Lastly, what the callback closure in this module does is roughly as follows: +//! 1. Translate the raw packet bytes into some structs +//! 2. Do various sanitization on them +//! 3. Calculate priorities +//! 4. Convert them to tasks with the help of provided BankingStageHelper (remember that pubkey +//! lookup for UsageQueue is also performed here; thus multi-threaded and off-loaded from the +//! scheduler thread) +//! 5. Submit the tasks. + +#[cfg(feature = "dev-context-only-utils")] +use qualifier_attr::qualifiers; +use { + super::{ + decision_maker::{BufferedPacketsDecision, DecisionMaker}, + packet_deserializer::PacketDeserializer, + LikeClusterInfo, + }, + crate::banking_trace::Channels, + agave_banking_stage_ingress_types::BankingPacketBatch, + solana_poh::poh_recorder::PohRecorder, + solana_runtime::{bank_forks::BankForks, root_bank_cache::RootBankCache}, + solana_unified_scheduler_pool::{BankingStageHelper, DefaultSchedulerPool}, + std::sync::{Arc, RwLock}, +}; + +#[allow(dead_code)] +#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] +pub(crate) fn ensure_banking_stage_setup( + pool: &DefaultSchedulerPool, + bank_forks: &Arc>, + channels: &Channels, + cluster_info: &impl LikeClusterInfo, + poh_recorder: &Arc>, +) { + let mut root_bank_cache = RootBankCache::new(bank_forks.clone()); + let unified_receiver = channels.unified_receiver().clone(); + let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); + let transaction_recorder = poh_recorder.read().unwrap().new_recorder(); + + let banking_packet_handler = Box::new( + move |helper: &BankingStageHelper, batches: BankingPacketBatch| { + let decision = decision_maker.make_consume_or_forward_decision(); + if matches!(decision, BufferedPacketsDecision::Forward) { + return; + } + let bank = root_bank_cache.root_bank(); + for batch in batches.iter() { + // over-provision nevertheless some of packets could be invalid. + let task_id_base = helper.generate_task_ids(batch.len()); + let packets = PacketDeserializer::deserialize_packets_with_indexes(batch); + + for (packet, packet_index) in packets { + let Some((transaction, _deactivation_slot)) = packet + .build_sanitized_transaction( + bank.vote_only_bank(), + &bank, + bank.get_reserved_account_keys(), + ) + else { + continue; + }; + + let index = task_id_base + packet_index; + + let task = helper.create_new_task(transaction, index); + helper.send_new_task(task); + } + } + }, + ); + + pool.register_banking_stage( + unified_receiver, + banking_packet_handler, + transaction_recorder, + ); +} diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index b48ddeb7387e58..7fc8b85cd35b8f 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -184,6 +184,28 @@ pub struct Channels { pub gossip_vote_receiver: BankingPacketReceiver, } +#[allow(dead_code)] +impl Channels { + #[cfg(feature = "dev-context-only-utils")] + pub fn unified_sender(&self) -> &BankingPacketSender { + let unified_sender = &self.non_vote_sender; + assert!(unified_sender + .sender + .same_channel(&self.tpu_vote_sender.sender)); + assert!(unified_sender + .sender + .same_channel(&self.gossip_vote_sender.sender)); + unified_sender + } + + pub(crate) fn unified_receiver(&self) -> &BankingPacketReceiver { + let unified_receiver = &self.non_vote_receiver; + assert!(unified_receiver.same_channel(&self.tpu_vote_receiver)); + assert!(unified_receiver.same_channel(&self.gossip_vote_receiver)); + unified_receiver + } +} + impl BankingTracer { pub fn new( maybe_config: Option<(&PathBuf, Arc, DirByteLimit)>, diff --git a/core/tests/unified_scheduler.rs b/core/tests/unified_scheduler.rs index c2f24923aba23e..5565110f9a3b7c 100644 --- a/core/tests/unified_scheduler.rs +++ b/core/tests/unified_scheduler.rs @@ -1,8 +1,12 @@ use { + agave_banking_stage_ingress_types::BankingPacketBatch, + assert_matches::assert_matches, crossbeam_channel::unbounded, itertools::Itertools, log::*, solana_core::{ + banking_stage::unified_scheduler::ensure_banking_stage_setup, + banking_trace::BankingTracer, consensus::{ heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, progress_map::{ForkProgress, ProgressMap}, @@ -14,22 +18,36 @@ use { replay_stage::ReplayStage, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, }, - solana_ledger::genesis_utils::create_genesis_config, + solana_entry::entry::Entry, + solana_gossip::cluster_info::{ClusterInfo, Node}, + solana_ledger::{ + blockstore::Blockstore, create_new_tmp_ledger_auto_delete, + genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache, + }, + solana_perf::packet::to_packet_batches, + solana_poh::poh_recorder::create_test_recorder, solana_runtime::{ accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks, genesis_utils::GenesisConfigInfo, installed_scheduler_pool::SchedulingContext, prioritization_fee_cache::PrioritizationFeeCache, }, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, - solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction::Result}, + solana_sdk::{ + hash::Hash, pubkey::Pubkey, signature::Signer, signer::keypair::Keypair, + system_transaction, transaction::Result, + }, + solana_streamer::socket::SocketAddrSpace, solana_timings::ExecuteTimings, - solana_unified_scheduler_logic::Task, + solana_unified_scheduler_logic::{SchedulingMode, Task}, solana_unified_scheduler_pool::{ - DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool, TaskHandler, + DefaultSchedulerPool, DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool, + TaskHandler, }, std::{ collections::HashMap, - sync::{Arc, Mutex}, + sync::{atomic::Ordering, Arc, Mutex}, + thread::sleep, + time::Duration, }, }; @@ -185,3 +203,97 @@ fn test_scheduler_waited_by_drop_bank_service() { // the scheduler used by the pruned_bank have been returned now. assert_eq!(pool_raw.pooled_scheduler_count(), 1); } + +#[test] +fn test_scheduler_producing_blocks() { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + + // Setup bank_forks with block-producing unified scheduler enabled + let genesis_bank = Bank::new_for_tests(&genesis_config); + let bank_forks = BankForks::new_rw_arc(genesis_bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let genesis_bank = bank_forks.read().unwrap().working_bank_with_scheduler(); + genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks)); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&genesis_bank)); + let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder( + genesis_bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let channels = { + let banking_tracer = BankingTracer::new_disabled(); + banking_tracer.create_channels(true) + }; + let cluster_info = { + let keypair = Arc::new(Keypair::new()); + let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); + Arc::new(ClusterInfo::new( + node.info, + keypair, + SocketAddrSpace::Unspecified, + )) + }; + ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder); + bank_forks.write().unwrap().install_scheduler_pool(pool); + + // Wait until genesis_bank reaches its tick height... + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + + // Create test tx + let tx = system_transaction::transfer( + &mint_keypair, + &solana_pubkey::new_rand(), + 1, + genesis_config.hash(), + ); + let banking_packet_batch = BankingPacketBatch::new(to_packet_batches(&vec![tx.clone(); 1], 1)); + let tx = RuntimeTransaction::from_transaction_for_tests(tx); + + // Crate tpu_bank + let tpu_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), 2); + let tpu_bank = bank_forks + .write() + .unwrap() + .insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank); + poh_recorder + .write() + .unwrap() + .set_bank(tpu_bank.clone_with_scheduler(), false); + let tpu_bank = bank_forks.read().unwrap().working_bank_with_scheduler(); + assert_eq!(tpu_bank.transaction_count(), 0); + + // Now, send transaction + channels + .unified_sender() + .send(banking_packet_batch) + .unwrap(); + + // Wait until tpu_bank reaches its tick height... + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + assert_matches!(tpu_bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + + // Verify transactions are committed and poh-recorded + assert_eq!(tpu_bank.transaction_count(), 1); + assert_matches!( + signal_receiver.into_iter().find(|(_, (entry, _))| !entry.is_tick()), + Some((_, (Entry {transactions, ..}, _))) if transactions == [tx.to_versioned_transaction()] + ); + + // Stop things. + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); +} diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 380db64ac14fa6..909cd381c40095 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -1168,6 +1168,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -1446,13 +1455,35 @@ version = "0.99.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" dependencies = [ - "convert_case", + "convert_case 0.4.0", "proc-macro2", "quote", "rustc_version", "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "convert_case 0.6.0", + "proc-macro2", + "quote", + "syn 2.0.87", + "unicode-xid", +] + [[package]] name = "dialoguer" version = "0.10.4" @@ -1561,6 +1592,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "eager" version = "0.1.0" @@ -2736,7 +2773,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2b99d4207e2a04fb4581746903c2bb7eb376f88de9c699d0f3e10feeac0cd3a" dependencies = [ - "derive_more", + "derive_more 0.99.17", "futures 0.3.31", "jsonrpc-core", "jsonrpc-pubsub", @@ -8663,11 +8700,14 @@ dependencies = [ name = "solana-unified-scheduler-pool" version = "2.2.0" dependencies = [ + "agave-banking-stage-ingress-types", "aquamarine", "assert_matches", "crossbeam-channel", "dashmap", "derive-where", + "derive_more 1.0.0", + "dyn-clone", "log", "qualifier_attr", "scopeguard", @@ -8681,6 +8721,7 @@ dependencies = [ "solana-transaction-error", "solana-unified-scheduler-logic", "static_assertions", + "trait-set", "vec_extract_if_polyfill", ] @@ -10013,6 +10054,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "trait-set" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b79e2e9c9ab44c6d7c20d5976961b47e8f49ac199154daa514b77cd1ab536625" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "trees" version = "0.4.2" @@ -10094,6 +10146,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-width" version = "0.1.8" diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 1627f8113021db..c46c0216871f5b 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -18,6 +18,7 @@ use { clock::{BankId, Slot}, hash::Hash, }, + solana_unified_scheduler_logic::SchedulingMode, std::{ collections::{hash_map::Entry, HashMap, HashSet}, ops::Index, @@ -32,6 +33,7 @@ use { pub const MAX_ROOT_DISTANCE_FOR_VOTE_ONLY: Slot = 400; pub type AtomicSlot = AtomicU64; +#[derive(Clone)] pub struct ReadOnlyAtomicSlot { slot: Arc, } @@ -226,14 +228,22 @@ impl BankForks { ); } - pub fn insert(&mut self, mut bank: Bank) -> BankWithScheduler { + pub fn insert(&mut self, bank: Bank) -> BankWithScheduler { + self.insert_with_scheduling_mode(SchedulingMode::BlockVerification, bank) + } + + pub fn insert_with_scheduling_mode( + &mut self, + mode: SchedulingMode, + mut bank: Bank, + ) -> BankWithScheduler { if self.root.load(Ordering::Relaxed) < self.highest_slot_at_startup { bank.set_check_program_modification_slot(true); } let bank = Arc::new(bank); let bank = if let Some(scheduler_pool) = &self.scheduler_pool { - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::new_with_mode(mode, bank.clone()); let scheduler = scheduler_pool.take_scheduler(context); let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler)); scheduler_pool.register_timeout_listener(bank_with_scheduler.create_timeout_listener()); diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 6be6b147d2a495..8012f77af8d0a4 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -250,6 +250,10 @@ impl SchedulingContext { } } + pub fn new_with_mode(mode: SchedulingMode, bank: Arc) -> Self { + Self { mode, bank } + } + #[cfg(feature = "dev-context-only-utils")] pub fn for_production(bank: Arc) -> Self { Self { diff --git a/runtime/src/root_bank_cache.rs b/runtime/src/root_bank_cache.rs index 73e72ad085b6c3..59dc0a029086b4 100644 --- a/runtime/src/root_bank_cache.rs +++ b/runtime/src/root_bank_cache.rs @@ -11,6 +11,7 @@ use { }; /// Cached root bank that only loads from bank forks if the root has been updated. +#[derive(Clone)] pub struct RootBankCache { bank_forks: Arc>, cached_root_bank: Weak, diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index c71b3241bf8ed5..3d63c415224ebd 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -1082,6 +1082,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -1352,13 +1361,35 @@ version = "0.99.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" dependencies = [ - "convert_case", + "convert_case 0.4.0", "proc-macro2", "quote", "rustc_version", "syn 2.0.96", ] +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "convert_case 0.6.0", + "proc-macro2", + "quote", + "syn 2.0.96", + "unicode-xid", +] + [[package]] name = "dialoguer" version = "0.10.4" @@ -1467,6 +1498,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "eager" version = "0.1.0" @@ -2695,7 +2732,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2b99d4207e2a04fb4581746903c2bb7eb376f88de9c699d0f3e10feeac0cd3a" dependencies = [ - "derive_more", + "derive_more 0.99.18", "futures 0.3.31", "jsonrpc-core", "jsonrpc-pubsub", @@ -8002,11 +8039,14 @@ dependencies = [ name = "solana-unified-scheduler-pool" version = "2.2.0" dependencies = [ + "agave-banking-stage-ingress-types", "aquamarine", "assert_matches", "crossbeam-channel", "dashmap", "derive-where", + "derive_more 1.0.0", + "dyn-clone", "log", "qualifier_attr", "scopeguard", @@ -8020,6 +8060,7 @@ dependencies = [ "solana-transaction-error", "solana-unified-scheduler-logic", "static_assertions", + "trait-set", "vec_extract_if_polyfill", ] @@ -9313,6 +9354,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "trait-set" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b79e2e9c9ab44c6d7c20d5976961b47e8f49ac199154daa514b77cd1ab536625" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "trees" version = "0.4.2" @@ -9391,6 +9443,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-width" version = "0.1.14" diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 44c9a93598ea1f..091191229e2333 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -10,11 +10,14 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +agave-banking-stage-ingress-types = { workspace = true } aquamarine = { workspace = true } assert_matches = { workspace = true } crossbeam-channel = { workspace = true } dashmap = { workspace = true } derive-where = { workspace = true } +derive_more = { workspace = true } +dyn-clone = { workspace = true } log = { workspace = true } qualifier_attr = { workspace = true } scopeguard = { workspace = true } @@ -28,6 +31,7 @@ solana-transaction = { workspace = true } solana-transaction-error = { workspace = true } solana-unified-scheduler-logic = { workspace = true } static_assertions = { workspace = true } +trait-set = { workspace = true } vec_extract_if_polyfill = { workspace = true } [dev-dependencies] diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 44056110521c26..61ce28afbfd724 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -15,10 +15,12 @@ #[cfg(feature = "dev-context-only-utils")] use qualifier_attr::qualifiers; use { + agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver}, assert_matches::assert_matches, crossbeam_channel::{self, never, select_biased, Receiver, RecvError, SendError, Sender}, dashmap::DashMap, derive_where::derive_where, + dyn_clone::{clone_trait_object, DynClone}, log::*, scopeguard::defer, solana_ledger::blockstore_processor::{ @@ -41,7 +43,7 @@ use { solana_transaction::sanitized::SanitizedTransaction, solana_transaction_error::{TransactionError, TransactionResult as Result}, solana_unified_scheduler_logic::{ - SchedulingMode::{BlockProduction, BlockVerification}, + SchedulingMode::{self, BlockProduction, BlockVerification}, SchedulingStateMachine, Task, UsageQueue, }, static_assertions::const_assert_eq, @@ -50,12 +52,13 @@ use { marker::PhantomData, mem, sync::{ - atomic::{AtomicU64, Ordering::Relaxed}, + atomic::{AtomicU64, AtomicUsize, Ordering::Relaxed}, Arc, Mutex, OnceLock, Weak, }, thread::{self, sleep, JoinHandle}, time::{Duration, Instant}, }, + trait_set::trait_set, vec_extract_if_polyfill::MakeExtractIf, }; @@ -96,7 +99,8 @@ pub struct SchedulerPool, TH: TaskHandler> { trashed_scheduler_inners: Mutex>, timeout_listeners: Mutex>, handler_count: usize, - handler_context: HandlerContext, + common_handler_context: CommonHandlerContext, + banking_stage_handler_context: Mutex>, // weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to // Arc from &Self, because SchedulerPool is used as in the form of Arc // almost always. But, this would cause wasted and noisy Arc::clone()'s at every call sites. @@ -113,15 +117,107 @@ pub struct SchedulerPool, TH: TaskHandler> { _phantom: PhantomData, } -#[derive(Debug)] +#[derive(derive_more::Debug, Clone)] pub struct HandlerContext { log_messages_bytes_limit: Option, transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, + banking_packet_receiver: BankingPacketReceiver, + #[debug("{banking_packet_handler:p}")] + banking_packet_handler: Box, + banking_stage_helper: Option>, transaction_recorder: Option, } +#[derive(Debug, Clone)] +struct CommonHandlerContext { + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, +} + +impl CommonHandlerContext { + fn into_handler_context( + self, + banking_packet_receiver: BankingPacketReceiver, + banking_packet_handler: Box, + banking_stage_helper: Option>, + transaction_recorder: Option, + ) -> HandlerContext { + let Self { + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + } = self; + + HandlerContext { + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + banking_packet_receiver, + banking_packet_handler, + banking_stage_helper, + transaction_recorder, + } + } +} + +#[derive(derive_more::Debug)] +struct BankingStageHandlerContext { + banking_packet_receiver: BankingPacketReceiver, + #[debug("{banking_packet_handler:p}")] + banking_packet_handler: Box, + transaction_recorder: TransactionRecorder, +} + +trait_set! { + pub trait BankingPacketHandler = + DynClone + FnMut(&BankingStageHelper, BankingPacketBatch) + Send + 'static; +} +// Make this `Clone`-able so that it can easily propagated to all the handler threads. +clone_trait_object!(BankingPacketHandler); + +#[derive(Debug)] +pub struct BankingStageHelper { + usage_queue_loader: UsageQueueLoader, + next_task_id: AtomicUsize, + new_task_sender: Sender, +} + +impl BankingStageHelper { + fn new(new_task_sender: Sender) -> Self { + Self { + usage_queue_loader: UsageQueueLoader::default(), + next_task_id: AtomicUsize::default(), + new_task_sender, + } + } + + pub fn generate_task_ids(&self, count: usize) -> usize { + self.next_task_id.fetch_add(count, Relaxed) + } + + pub fn create_new_task( + &self, + transaction: RuntimeTransaction, + index: usize, + ) -> Task { + SchedulingStateMachine::create_task(transaction, index, &mut |pubkey| { + self.usage_queue_loader.load(pubkey) + }) + } + + pub fn send_new_task(&self, task: Task) { + self.new_task_sender + .send(NewTaskPayload::Payload(task)) + .unwrap(); + } +} + pub type DefaultSchedulerPool = SchedulerPool, DefaultTaskHandler>; @@ -191,14 +287,13 @@ where trashed_scheduler_inners: Mutex::default(), timeout_listeners: Mutex::default(), handler_count, - handler_context: HandlerContext { + common_handler_context: CommonHandlerContext { log_messages_bytes_limit, transaction_status_sender, replay_vote_sender, prioritization_fee_cache, - // will be configurable later - transaction_recorder: None, }, + banking_stage_handler_context: Mutex::default(), weak_self: weak_self.clone(), next_scheduler_id: AtomicSchedulerId::default(), max_usage_queue_count, @@ -380,6 +475,59 @@ where self.scheduler_inners.lock().expect("not poisoned").len() } + pub fn register_banking_stage( + &self, + banking_packet_receiver: BankingPacketReceiver, + banking_packet_handler: Box, + transaction_recorder: TransactionRecorder, + ) { + *self.banking_stage_handler_context.lock().unwrap() = Some(BankingStageHandlerContext { + banking_packet_receiver, + banking_packet_handler, + transaction_recorder, + }); + } + + fn create_handler_context( + &self, + mode: SchedulingMode, + new_task_sender: &Sender, + ) -> HandlerContext { + let ( + banking_packet_receiver, + banking_packet_handler, + banking_stage_helper, + transaction_recorder, + ): ( + _, + Box, /* to aid type inference */ + _, + _, + ) = match mode { + BlockVerification => { + // Return various type-specific no-op values. + (never(), Box::new(|_, _| {}), None, None) + } + BlockProduction => { + let handler_context = self.banking_stage_handler_context.lock().unwrap(); + let handler_context = handler_context.as_ref().unwrap(); + + ( + handler_context.banking_packet_receiver.clone(), + handler_context.banking_packet_handler.clone(), + Some(Arc::new(BankingStageHelper::new(new_task_sender.clone()))), + Some(handler_context.transaction_recorder.clone()), + ) + } + }; + self.common_handler_context.clone().into_handler_context( + banking_packet_receiver, + banking_packet_handler, + banking_stage_helper, + transaction_recorder, + ) + } + pub fn default_handler_count() -> usize { Self::calculate_default_handler_count( thread::available_parallelism() @@ -965,6 +1113,7 @@ impl, TH: TaskHandler> ThreadManager { &mut self, context: SchedulingContext, mut result_with_timings: ResultWithTimings, + handler_context: HandlerContext, ) { // Firstly, setup bi-directional messaging between the scheduler and handlers to pass // around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to @@ -1266,7 +1415,7 @@ impl, TH: TaskHandler> ThreadManager { }; let handler_main_loop = || { - let pool = self.pool.clone(); + let mut handler_context = handler_context.clone(); let mut runnable_task_receiver = runnable_task_receiver.clone(); let finished_blocked_task_sender = finished_blocked_task_sender.clone(); let finished_idle_task_sender = finished_idle_task_sender.clone(); @@ -1300,6 +1449,19 @@ impl, TH: TaskHandler> ThreadManager { continue; } }, + recv(handler_context.banking_packet_receiver) -> banking_packet => { + // See solana_core::banking_stage::unified_scheduler module doc as to + // justification of this additional work in the handler thread. + let Ok(banking_packet) = banking_packet else { + info!("disconnected banking_packet_receiver"); + break; + }; + (handler_context.banking_packet_handler)( + handler_context.banking_stage_helper.as_ref().unwrap(), + banking_packet + ); + continue; + }, }; defer! { if !thread::panicking() { @@ -1322,7 +1484,7 @@ impl, TH: TaskHandler> ThreadManager { Self::execute_task_with_handler( runnable_task_receiver.context(), &mut task, - &pool.handler_context, + &handler_context, ); if sender.send(Ok(task)).is_err() { warn!("handler_thread: scheduler thread aborted..."); @@ -1524,12 +1686,14 @@ impl SpawnableScheduler for PooledScheduler { result_with_timings: ResultWithTimings, ) -> Self { let mut inner = Self::Inner { - thread_manager: ThreadManager::new(pool), + thread_manager: ThreadManager::new(pool.clone()), usage_queue_loader: UsageQueueLoader::default(), }; - inner - .thread_manager - .start_threads(context.clone(), result_with_timings); + inner.thread_manager.start_threads( + context.clone(), + result_with_timings, + pool.create_handler_context(context.mode(), &inner.thread_manager.new_task_sender), + ); Self { inner, context } } } @@ -2872,7 +3036,10 @@ mod tests { &mut timings, &context, &task, - &pool.handler_context, + &pool.create_handler_context( + BlockVerification, + &crossbeam_channel::unbounded().0, + ), ); (result, timings) })); @@ -3077,6 +3244,9 @@ mod tests { transaction_status_sender: None, replay_vote_sender: None, prioritization_fee_cache, + banking_packet_receiver: never(), + banking_packet_handler: Box::new(|_, _| {}), + banking_stage_helper: None, transaction_recorder: None, }; @@ -3158,6 +3328,9 @@ mod tests { transaction_status_sender: Some(TransactionStatusSender { sender }), replay_vote_sender: None, prioritization_fee_cache, + banking_packet_receiver: never(), + banking_packet_handler: Box::new(|_, _| {}), + banking_stage_helper: None, transaction_recorder: Some(poh_recorder.read().unwrap().new_recorder()), };