Skip to content

Commit

Permalink
Plumb unified scheduler for BP minimally (#4533)
Browse files Browse the repository at this point in the history
* Plumb unified scheduler for BP minimally

* Clean up `use`s a bit

* Simplify closure interaction

* Rework banking stage plubming by extending handler

* Add docs

* Use RootBankCache

* Fix ci

* Add mut to decision_maker
  • Loading branch information
ryoqun authored Feb 6, 2025
1 parent b61839b commit 5a7852a
Show file tree
Hide file tree
Showing 14 changed files with 644 additions and 27 deletions.
62 changes: 60 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,13 @@ curve25519-dalek = { version = "4.1.3", features = ["digest", "rand_core"] }
dashmap = "5.5.3"
derivation-path = { version = "0.2.0", default-features = false }
derive-where = "1.2.7"
derive_more = { version = "1.0.0", features = ["full"] }
dialoguer = "0.10.4"
digest = "0.10.7"
dir-diff = "0.3.3"
dirs-next = "2.0.0"
dlopen2 = "0.5.0"
dyn-clone = "1.0.17"
eager = "0.1.0"
ed25519-dalek = "=1.0.1"
ed25519-dalek-bip32 = "0.2.0"
Expand Down Expand Up @@ -591,6 +593,7 @@ toml = "0.8.12"
tonic = "0.9.2"
tonic-build = "0.9.2"
tower = "0.5.2"
trait-set = "0.3.0"
trees = "0.4.2"
tungstenite = "0.20.1"
uriparse = "0.6.4"
Expand Down
7 changes: 7 additions & 0 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ mod read_write_account_set;
mod scheduler_messages;
mod transaction_scheduler;

// proc_macro_hygiene needs to be stabilzied to use qualifier_attr...
// error[E0658]: non-inline modules in proc macro input are unstable
#[cfg(not(feature = "dev-context-only-utils"))]
pub(crate) mod unified_scheduler;
#[cfg(feature = "dev-context-only-utils")]
pub mod unified_scheduler;

// Fixed thread size seems to be fastest on GCP setup
pub const NUM_THREADS: u32 = 6;

Expand Down
13 changes: 13 additions & 0 deletions core/src/banking_stage/packet_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,19 @@ impl PacketDeserializer {
}
})
}

#[allow(dead_code)]
pub(crate) fn deserialize_packets_with_indexes(
packet_batch: &PacketBatch,
) -> impl Iterator<Item = (ImmutableDeserializedPacket, usize)> + '_ {
let packet_indexes = PacketDeserializer::generate_packet_indexes(packet_batch);
packet_indexes.into_iter().filter_map(move |packet_index| {
let packet = packet_batch[packet_index].clone();
ImmutableDeserializedPacket::new(packet)
.ok()
.map(|packet| (packet, packet_index))
})
}
}

#[cfg(test)]
Expand Down
94 changes: 94 additions & 0 deletions core/src/banking_stage/unified_scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! This module contains any integration code to put the unified scheduler subsystem into the
//! banking stage, starting from `BankingPacketBatch` ingestion from the sig verify stage and to
//! `Task` submission to the unified scheduler.
//!
//! These preprocessing for task creation can be multi-threaded trivially. At the same time, the
//! maximum cpu core utilization needs to be constrained among this processing and the actual task
//! handling of unified scheduler. Thus, it's desired to share a single thread pool for the two
//! kinds of work. Towards that end, the integration was implemented as a callback-style, which is
//! invoked (via `select!` on `banking_packet_receiver`) at each of unified scheduler handler
//! threads.
//!
//! Aside from some limited abstraction leakage to make `select!` work at the
//! solana-unified-scheduler-pool crate, almost all of these preprocessing are intentionally
//! encapsulated into this module, at the cost of dynamic dispatch per each BankingPacketBatch to
//! retain the unified scheduler agnostic scheduler over scheduling mode (block verification vs
//! block production) as much as possible.
//!
//! Lastly, what the callback closure in this module does is roughly as follows:
//! 1. Translate the raw packet bytes into some structs
//! 2. Do various sanitization on them
//! 3. Calculate priorities
//! 4. Convert them to tasks with the help of provided BankingStageHelper (remember that pubkey
//! lookup for UsageQueue is also performed here; thus multi-threaded and off-loaded from the
//! scheduler thread)
//! 5. Submit the tasks.
#[cfg(feature = "dev-context-only-utils")]
use qualifier_attr::qualifiers;
use {
super::{
decision_maker::{BufferedPacketsDecision, DecisionMaker},
packet_deserializer::PacketDeserializer,
LikeClusterInfo,
},
crate::banking_trace::Channels,
agave_banking_stage_ingress_types::BankingPacketBatch,
solana_poh::poh_recorder::PohRecorder,
solana_runtime::{bank_forks::BankForks, root_bank_cache::RootBankCache},
solana_unified_scheduler_pool::{BankingStageHelper, DefaultSchedulerPool},
std::sync::{Arc, RwLock},
};

#[allow(dead_code)]
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn ensure_banking_stage_setup(
pool: &DefaultSchedulerPool,
bank_forks: &Arc<RwLock<BankForks>>,
channels: &Channels,
cluster_info: &impl LikeClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
) {
let mut root_bank_cache = RootBankCache::new(bank_forks.clone());
let unified_receiver = channels.unified_receiver().clone();
let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();

let banking_packet_handler = Box::new(
move |helper: &BankingStageHelper, batches: BankingPacketBatch| {
let decision = decision_maker.make_consume_or_forward_decision();
if matches!(decision, BufferedPacketsDecision::Forward) {
return;
}
let bank = root_bank_cache.root_bank();
for batch in batches.iter() {
// over-provision nevertheless some of packets could be invalid.
let task_id_base = helper.generate_task_ids(batch.len());
let packets = PacketDeserializer::deserialize_packets_with_indexes(batch);

for (packet, packet_index) in packets {
let Some((transaction, _deactivation_slot)) = packet
.build_sanitized_transaction(
bank.vote_only_bank(),
&bank,
bank.get_reserved_account_keys(),
)
else {
continue;
};

let index = task_id_base + packet_index;

let task = helper.create_new_task(transaction, index);
helper.send_new_task(task);
}
}
},
);

pool.register_banking_stage(
unified_receiver,
banking_packet_handler,
transaction_recorder,
);
}
22 changes: 22 additions & 0 deletions core/src/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,28 @@ pub struct Channels {
pub gossip_vote_receiver: BankingPacketReceiver,
}

#[allow(dead_code)]
impl Channels {
#[cfg(feature = "dev-context-only-utils")]
pub fn unified_sender(&self) -> &BankingPacketSender {
let unified_sender = &self.non_vote_sender;
assert!(unified_sender
.sender
.same_channel(&self.tpu_vote_sender.sender));
assert!(unified_sender
.sender
.same_channel(&self.gossip_vote_sender.sender));
unified_sender
}

pub(crate) fn unified_receiver(&self) -> &BankingPacketReceiver {
let unified_receiver = &self.non_vote_receiver;
assert!(unified_receiver.same_channel(&self.tpu_vote_receiver));
assert!(unified_receiver.same_channel(&self.gossip_vote_receiver));
unified_receiver
}
}

impl BankingTracer {
pub fn new(
maybe_config: Option<(&PathBuf, Arc<AtomicBool>, DirByteLimit)>,
Expand Down
Loading

0 comments on commit 5a7852a

Please sign in to comment.