Skip to content

Commit

Permalink
v2.1: GreedyScheduler (backport of #4688) (#4831)
Browse files Browse the repository at this point in the history
* Scheduler trait

* impl Scheduler for PrioGraphScheduler

* SchedulerController generic over scheduler implementation

* GreedyScheduler implementation

* GreedyScheduler: tests

* BankingStage spawn greedy-scheduler

* CLI hookup for central-scheduler-greedy

* Add entry to CHANGELOG

---------

Co-authored-by: Andrew Fitzgerald <[email protected]>
  • Loading branch information
mergify[bot] and apfitzge authored Feb 12, 2025
1 parent f798546 commit ff32b19
Show file tree
Hide file tree
Showing 8 changed files with 920 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ solana_pubkey::declare_id!("MyProgram1111111111111111111111111111111111");
* `solana-genesis`: the `--cluster-type` parameter now clones the feature set from the target cluster (#2587)
* `unified-scheduler` as default option for `--block-verification-method` (#2653)
* warn that `thread-local-multi-iterator` option for `--block-production-method` is deprecated (#3113)
* Add new variant to `--block-production-method` for `central-scheduler-greedy`. This is a simplified scheduler that has much better performance than the more strict `central-scheduler` variant.

## [2.0.0]
* Breaking
Expand Down
117 changes: 81 additions & 36 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use {
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
transaction_scheduler::greedy_scheduler::{GreedyScheduler, GreedySchedulerConfig},
};

// Below modules are pub to allow use by banking_stage bench
Expand Down Expand Up @@ -413,21 +414,29 @@ impl BankingStage {
prioritization_fee_cache,
)
}
BlockProductionMethod::CentralScheduler => Self::new_central_scheduler(
cluster_info,
poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
num_threads,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
enable_forwarding,
),
BlockProductionMethod::CentralScheduler
| BlockProductionMethod::CentralSchedulerGreedy => {
let use_greedy_scheduler = matches!(
block_production_method,
BlockProductionMethod::CentralSchedulerGreedy
);
Self::new_central_scheduler(
use_greedy_scheduler,
cluster_info,
poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
num_threads,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
enable_forwarding,
)
}
}
}

Expand Down Expand Up @@ -519,6 +528,7 @@ impl BankingStage {

#[allow(clippy::too_many_arguments)]
pub fn new_central_scheduler(
use_greedy_scheduler: bool,
cluster_info: &impl LikeClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
non_vote_receiver: BankingPacketReceiver,
Expand Down Expand Up @@ -626,28 +636,63 @@ impl BankingStage {
});

// Spawn the central scheduler thread
bank_thread_hdls.push({
let packet_deserializer = PacketDeserializer::new(non_vote_receiver);
let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
packet_deserializer,
bank_forks,
scheduler,
worker_metrics,
forwarder,
let packet_deserializer = PacketDeserializer::new(non_vote_receiver);
if use_greedy_scheduler {
bank_thread_hdls.push(
Builder::new()
.name("solBnkTxSched".to_string())
.spawn(move || {
let scheduler = GreedyScheduler::new(
work_senders,
finished_work_receiver,
GreedySchedulerConfig::default(),
);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
packet_deserializer,
bank_forks,
scheduler,
worker_metrics,
forwarder,
);

match scheduler_controller.run() {
Ok(_) => {}
Err(SchedulerError::DisconnectedRecvChannel(_)) => {}
Err(SchedulerError::DisconnectedSendChannel(_)) => {
warn!("Unexpected worker disconnect from scheduler")
}
}
})
.unwrap(),
);
Builder::new()
.name("solBnkTxSched".to_string())
.spawn(move || match scheduler_controller.run() {
Ok(_) => {}
Err(SchedulerError::DisconnectedRecvChannel(_)) => {}
Err(SchedulerError::DisconnectedSendChannel(_)) => {
warn!("Unexpected worker disconnect from scheduler")
}
})
.unwrap()
});
} else {
bank_thread_hdls.push(
Builder::new()
.name("solBnkTxSched".to_string())
.spawn(move || {
let scheduler =
PrioGraphScheduler::new(work_senders, finished_work_receiver);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
packet_deserializer,
bank_forks,
scheduler,
worker_metrics,
forwarder,
);

match scheduler_controller.run() {
Ok(_) => {}
Err(SchedulerError::DisconnectedRecvChannel(_)) => {}
Err(SchedulerError::DisconnectedSendChannel(_)) => {
warn!("Unexpected worker disconnect from scheduler")
}
}
})
.unwrap(),
);
}

Self { bank_thread_hdls }
}
Expand Down
Loading

0 comments on commit ff32b19

Please sign in to comment.