Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanner: parallelize session creation #333

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions spanner/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use thiserror;
use tokio::select;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::{sleep, timeout};
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -234,12 +234,23 @@ impl SessionPool {
) -> Result<VecDeque<SessionHandle>, Status> {
let channel_num = conn_pool.num();
let creation_count_per_channel = min_opened / channel_num;
let remainder = min_opened % channel_num;

let mut sessions = Vec::<SessionHandle>::new();
let mut tasks = JoinSet::new();
for _ in 0..channel_num {
// Ensure that we create the exact number of requested sessions by adding the remainder to the first channel.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also took the opportunity to implement the same exact session creation as the golang client: https://github.com/googleapis/google-cloud-go/blob/7cbffad749a0f8917658406dae73c7fb7151b55d/spanner/sessionclient.go#L221

Ran into this during testing when creating a client where channel_num > min_opened.

let creation_count = if channel_num == 0 {
creation_count_per_channel + remainder
} else {
creation_count_per_channel
};
let next_client = conn_pool.conn().with_metadata(client_metadata(&database));
let new_sessions =
batch_create_sessions(next_client, database.as_str(), creation_count_per_channel).await?;
let database = database.clone();
tasks.spawn(async move { batch_create_sessions(next_client, &database, creation_count).await });
}
while let Some(r) = tasks.join_next().await {
let new_sessions = r.map_err(|e| Status::from_error(e.into()))??;
sessions.extend(new_sessions);
}
tracing::debug!("initial session created count = {}", sessions.len());
Expand Down Expand Up @@ -493,17 +504,23 @@ impl SessionManager {
cancel: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut tasks = JoinSet::default();
loop {
let session_count: usize = select! {
select! {
biased;
_ = cancel.cancelled() => break,
Some(Ok((session_count, result))) = tasks.join_next(), if !tasks.is_empty() => {
session_pool.inner.write().replenish(session_count, result);
}
session_count = rx.recv() => match session_count {
Some(session_count) => session_count,
Some(session_count) => {
let client = conn_pool.conn().with_metadata(client_metadata(&database));
let database = database.clone();
tasks.spawn(async move { (session_count, batch_create_sessions(client, &database, session_count).await) });
},
None => continue
},
_ = cancel.cancelled() => break
};
let client = conn_pool.conn().with_metadata(client_metadata(&database));
let result = batch_create_sessions(client, database.as_str(), session_count).await;
session_pool.inner.write().replenish(session_count, result);
}
}
tracing::trace!("shutdown session creation task.");
})
Expand Down
Loading