Skip to content

Commit 1f0109d

Browse files
authored
feat: account hydration on background task (#407)
- **deps: add flume properly** - **feat: put account hydration on a background task** <!-- greptile_comment --> ## Greptile Summary Refactors account hydration to run as a background task, improving validator startup performance by making the process asynchronous. - Switches from `tokio::mpsc` to `flume` channels for better multi-producer/consumer performance in `magicblock-account-cloner` - Reduces concurrent account hydration from 30 to 10 tasks in `remote_account_cloner_worker.rs` for better resource management - Improves async task handling in test suites with proper await patterns and timing adjustments - Wraps `RemoteAccountClonerWorker` in `Arc` for safe concurrent access in `magic_validator.rs` - Adds 5-second sleep in integration tests to accommodate background hydration timing <!-- /greptile_comment -->
1 parent c5769f7 commit 1f0109d

File tree

18 files changed

+132
-84
lines changed

18 files changed

+132
-84
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ isocountry = "0.3.2"
6565
crossbeam-channel = "0.5.11"
6666
enum-iterator = "1.5.0"
6767
env_logger = "0.11.2"
68-
magic-domain-program = { git = "https://github.com/magicblock-labs/magic-domain-program.git", rev = "ea04d46", default-features = false}
68+
magic-domain-program = { git = "https://github.com/magicblock-labs/magic-domain-program.git", rev = "ea04d46", default-features = false }
6969
magicblock-delegation-program = { git = "https://github.com/magicblock-labs/delegation-program.git", rev = "4af7f1c" }
7070
fd-lock = "4.0.2"
71+
flume = "0.11"
7172
fs_extra = "1.3.0"
7273
futures-util = "0.3.30"
7374
geyser-grpc-proto = { path = "./geyser-grpc-proto" }
@@ -150,7 +151,7 @@ solana-rpc = "2.2"
150151
solana-rpc-client = { version = "2.2" }
151152
solana-rpc-client-api = { version = "2.2" }
152153
solana-sdk = { version = "2.2" }
153-
solana-svm = { version = "2.2", features = [ "dev-context-only-utils" ] }
154+
solana-svm = { version = "2.2", features = ["dev-context-only-utils"] }
154155
solana-svm-transaction = { version = "2.2" }
155156
solana-storage-proto = { path = "storage-proto" }
156157
solana-system-program = { version = "2.2" }

magicblock-account-cloner/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ edition.workspace = true
99

1010
[dependencies]
1111
conjunto-transwise = { workspace = true }
12+
flume = { workspace = true }
1213
futures-util = { workspace = true }
1314
log = { workspace = true }
1415
magicblock-account-fetcher = { workspace = true }

magicblock-account-cloner/src/account_cloner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tokio::sync::oneshot::Sender;
1616
#[derive(Debug, Clone, Error)]
1717
pub enum AccountClonerError {
1818
#[error(transparent)]
19-
SendError(#[from] tokio::sync::mpsc::error::SendError<Pubkey>),
19+
SendError(#[from] flume::SendError<Pubkey>),
2020

2121
#[error(transparent)]
2222
RecvError(#[from] tokio::sync::oneshot::error::RecvError),

magicblock-account-cloner/src/remote_account_cloner_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ use magicblock_account_fetcher::AccountFetcher;
1212
use magicblock_account_updates::AccountUpdates;
1313
use magicblock_accounts_api::InternalAccountProvider;
1414
use solana_sdk::pubkey::Pubkey;
15-
use tokio::sync::{mpsc::UnboundedSender, oneshot::channel};
15+
use tokio::sync::oneshot::channel;
1616

1717
use crate::{
1818
AccountCloner, AccountClonerError, AccountClonerListeners,
1919
AccountClonerOutput, AccountClonerResult, RemoteAccountClonerWorker,
2020
};
2121

2222
pub struct RemoteAccountClonerClient {
23-
clone_request_sender: UnboundedSender<Pubkey>,
23+
clone_request_sender: flume::Sender<Pubkey>,
2424
clone_listeners: Arc<RwLock<HashMap<Pubkey, AccountClonerListeners>>>,
2525
}
2626

magicblock-account-cloner/src/remote_account_cloner_worker.rs

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,13 @@ use std::{
33
collections::{hash_map::Entry, HashMap, HashSet},
44
sync::{Arc, RwLock},
55
time::Duration,
6-
vec,
76
};
87

98
use conjunto_transwise::{
109
AccountChainSnapshot, AccountChainSnapshotShared, AccountChainState,
1110
DelegationRecord,
1211
};
13-
use futures_util::{
14-
future::join_all,
15-
stream::{self, StreamExt, TryStreamExt},
16-
};
12+
use futures_util::stream::{self, StreamExt, TryStreamExt};
1713
use log::*;
1814
use lru::LruCache;
1915
use magicblock_account_dumper::AccountDumper;
@@ -29,10 +25,7 @@ use solana_sdk::{
2925
pubkey::Pubkey,
3026
signature::Signature,
3127
};
32-
use tokio::{
33-
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
34-
time::sleep,
35-
};
28+
use tokio::time::sleep;
3629
use tokio_util::sync::CancellationToken;
3730

3831
use crate::{
@@ -107,8 +100,8 @@ pub struct RemoteAccountClonerWorker<IAP, AFE, AUP, ADU> {
107100
validator_charges_fees: ValidatorCollectionMode,
108101
permissions: AccountClonerPermissions,
109102
fetch_retries: u64,
110-
clone_request_receiver: UnboundedReceiver<Pubkey>,
111-
clone_request_sender: UnboundedSender<Pubkey>,
103+
clone_request_sender: flume::Sender<Pubkey>,
104+
clone_request_receiver: flume::Receiver<Pubkey>,
112105
clone_listeners: Arc<RwLock<HashMap<Pubkey, AccountClonerListeners>>>,
113106
last_clone_output: CloneOutputMap,
114107
validator_identity: Pubkey,
@@ -151,8 +144,7 @@ where
151144
validator_authority: Pubkey,
152145
max_monitored_accounts: usize,
153146
) -> Self {
154-
let (clone_request_sender, clone_request_receiver) =
155-
unbounded_channel();
147+
let (clone_request_sender, clone_request_receiver) = flume::unbounded();
156148
let fetch_retries = 50;
157149
let max_monitored_accounts = max_monitored_accounts
158150
.try_into()
@@ -177,7 +169,7 @@ where
177169
}
178170
}
179171

180-
pub fn get_clone_request_sender(&self) -> UnboundedSender<Pubkey> {
172+
pub fn get_clone_request_sender(&self) -> flume::Sender<Pubkey> {
181173
self.clone_request_sender.clone()
182174
}
183175

@@ -192,18 +184,18 @@ where
192184
}
193185

194186
pub async fn start_clone_request_processing(
195-
mut self,
187+
&self,
196188
cancellation_token: CancellationToken,
197189
) {
198-
let mut requests = vec![];
199190
loop {
200191
tokio::select! {
201-
_ = self.clone_request_receiver.recv_many(&mut requests, 100) => {
202-
join_all(
203-
requests
204-
.drain(..)
205-
.map(|request| self.process_clone_request(request))
206-
).await;
192+
res = self.clone_request_receiver.recv_async() => {
193+
match res {
194+
Ok(req) => self.process_clone_request(req).await,
195+
Err(err) => {
196+
error!("Failed to receive clone request: {:?}", err);
197+
}
198+
}
207199
}
208200
_ = cancellation_token.cancelled() => {
209201
return;
@@ -299,7 +291,7 @@ where
299291
// TODO(GabrielePicco): Make the concurrency configurable
300292
let result = stream
301293
.map(Ok::<_, AccountClonerError>)
302-
.try_for_each_concurrent(30, |(pubkey, owner)| async move {
294+
.try_for_each_concurrent(10, |(pubkey, owner)| async move {
303295
trace!("Hydrating '{}'", pubkey);
304296
let res = self
305297
.do_clone_and_update_cache(

magicblock-account-cloner/tests/remote_account_cloner.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,11 @@ fn setup_custom(
5555
let cancellation_token = CancellationToken::new();
5656
let cloner_worker_handle = {
5757
let cloner_cancellation_token = cancellation_token.clone();
58-
tokio::spawn(
58+
tokio::spawn(async move {
5959
cloner_worker
60-
.start_clone_request_processing(cloner_cancellation_token),
61-
)
60+
.start_clone_request_processing(cloner_cancellation_token)
61+
.await
62+
})
6263
};
6364
// Ready to run
6465
(cloner_client, cancellation_token, cloner_worker_handle)

magicblock-accounts/tests/ensure_accounts.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,11 @@ fn setup_with_lifecycle(
6262
RemoteAccountClonerClient::new(&remote_account_cloner_worker);
6363
let remote_account_cloner_worker_handle = {
6464
let cloner_cancellation_token = cancellation_token.clone();
65-
tokio::spawn(
65+
tokio::spawn(async move {
6666
remote_account_cloner_worker
67-
.start_clone_request_processing(cloner_cancellation_token),
68-
)
67+
.start_clone_request_processing(cloner_cancellation_token)
68+
.await
69+
})
6970
};
7071

7172
let external_account_manager = ExternalAccountsManager {

magicblock-api/src/magic_validator.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,13 @@ pub struct MagicValidator {
138138
remote_account_updates_worker: Option<RemoteAccountUpdatesWorker>,
139139
remote_account_updates_handle: Option<tokio::task::JoinHandle<()>>,
140140
remote_account_cloner_worker: Option<
141-
RemoteAccountClonerWorker<
142-
BankAccountProvider,
143-
RemoteAccountFetcherClient,
144-
RemoteAccountUpdatesClient,
145-
AccountDumperBank,
141+
Arc<
142+
RemoteAccountClonerWorker<
143+
BankAccountProvider,
144+
RemoteAccountFetcherClient,
145+
RemoteAccountUpdatesClient,
146+
AccountDumperBank,
147+
>,
146148
>,
147149
>,
148150
remote_account_cloner_handle: Option<tokio::task::JoinHandle<()>>,
@@ -354,7 +356,9 @@ impl MagicValidator {
354356
remote_account_fetcher_handle: None,
355357
remote_account_updates_worker: Some(remote_account_updates_worker),
356358
remote_account_updates_handle: None,
357-
remote_account_cloner_worker: Some(remote_account_cloner_worker),
359+
remote_account_cloner_worker: Some(Arc::new(
360+
remote_account_cloner_worker,
361+
)),
358362
remote_account_cloner_handle: None,
359363
pubsub_handle: Default::default(),
360364
pubsub_close_handle: Default::default(),
@@ -720,8 +724,20 @@ impl MagicValidator {
720724
self.remote_account_cloner_worker.take()
721725
{
722726
if !self.config.ledger.reset {
723-
remote_account_cloner_worker.hydrate().await?;
724-
info!("Validator hydration complete (bank hydrate, replay, account clone)");
727+
let remote_account_cloner_worker =
728+
remote_account_cloner_worker.clone();
729+
tokio::spawn(async move {
730+
let _ = remote_account_cloner_worker
731+
.hydrate()
732+
.await
733+
.inspect_err(|err| {
734+
error!(
735+
"Failed to hydrate validator accounts: {:?}",
736+
err
737+
);
738+
});
739+
info!("Validator hydration complete (bank hydrate, replay, account clone)");
740+
});
725741
}
726742

727743
let cancellation_token = self.token.clone();

magicblock-geyser-plugin/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ bs58 = { workspace = true }
1414
expiring-hashmap = { workspace = true }
1515
geyser-grpc-proto = { workspace = true }
1616
hostname = { workspace = true }
17-
flume = "0.11"
17+
flume = { workspace = true }
1818
log = { workspace = true }
1919
serde = { workspace = true }
2020
serde_json = { workspace = true }
2121
magicblock-transaction-status = { workspace = true }
22-
scc = "2.3"
22+
scc = "2.3"
2323
solana-geyser-plugin-interface = { workspace = true }
2424
solana-sdk = { workspace = true }
2525
spl-token-2022 = { workspace = true, features = ["no-entrypoint"] }

0 commit comments

Comments
 (0)