Skip to content

Commit

Permalink
spanner: parallelize session creation
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnorberg committed Dec 22, 2024
1 parent 2f9219b commit 04c86d6
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions spanner/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use thiserror;
use tokio::select;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::{sleep, timeout};
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -233,12 +233,22 @@ impl SessionPool {
) -> Result<VecDeque<SessionHandle>, Status> {
let channel_num = conn_pool.num();
let creation_count_per_channel = min_opened / channel_num;
let remainder = min_opened % channel_num;

let mut sessions = Vec::<SessionHandle>::new();
let mut tasks = JoinSet::new();
for _ in 0..channel_num {
let creation_count = if channel_num == 0 {
creation_count_per_channel + remainder
} else {
creation_count_per_channel
};
let next_client = conn_pool.conn();
let new_sessions =
batch_create_sessions(next_client, database.as_str(), creation_count_per_channel).await?;
let database = database.clone();
tasks.spawn(async move { batch_create_sessions(next_client, &database, creation_count).await });
}
while let Some(r) = tasks.join_next().await {
let new_sessions = r.map_err(|e| Status::from_error(e.into()))??;
sessions.extend(new_sessions);
}
tracing::debug!("initial session created count = {}", sessions.len());
Expand Down Expand Up @@ -492,16 +502,25 @@ impl SessionManager {
cancel: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut tasks = JoinSet::default();
loop {
let session_count: usize = select! {
select! {
biased;
_ = cancel.cancelled() => break,
session_result = tasks.join_next() => {
if let Some(Ok((session_count, result))) = session_result {
session_pool.inner.write().replenish(session_count, result)
}
}
session_count = rx.recv() => match session_count {
Some(session_count) => session_count,
Some(session_count) => {
let conn = conn_pool.conn();
let database = database.clone();
tasks.spawn(async move { (session_count, batch_create_sessions(conn, &database, session_count).await) });
},
None => continue
},
_ = cancel.cancelled() => break
};
let result = batch_create_sessions(conn_pool.conn(), database.as_str(), session_count).await;
session_pool.inner.write().replenish(session_count, result);
}
}
tracing::trace!("shutdown session creation task.");
})
Expand Down

0 comments on commit 04c86d6

Please sign in to comment.