Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bigquery] Add storage write API #325

Merged
merged 25 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ num-bigint = "0.4"
backon = { version = "1.2", default-features = false, features = ["tokio-sleep"] }
reqwest-middleware = { version = "0.4", features = ["json", "multipart"] }
anyhow = "1.0"
async-stream = "0.3"
prost-types = "0.13"

google-cloud-auth = { optional = true, version = "0.17", path="../foundation/auth", default-features=false }

Expand All @@ -40,6 +42,8 @@ ctor = "0.1.26"
tokio-util = {version ="0.7", features = ["codec"] }
google-cloud-auth = { path = "../foundation/auth", default-features=false }
base64-serde = "0.7"
prost = "0.13"
futures-util = "0.3"

[features]
default = ["default-tls", "auth"]
Expand Down
236 changes: 220 additions & 16 deletions bigquery/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::{
};
use google_cloud_token::TokenSourceProvider;

use crate::grpc::apiv1::conn_pool::{ReadConnectionManager, DOMAIN};
use crate::grpc::apiv1::conn_pool::ConnectionManager;
use crate::http::bigquery_client::BigqueryClient;
use crate::http::bigquery_dataset_client::BigqueryDatasetClient;
use crate::http::bigquery_job_client::BigqueryJobClient;
Expand All @@ -38,15 +38,63 @@ pub struct ClientConfig {
token_source_provider: Box<dyn TokenSourceProvider>,
environment: Environment,
streaming_read_config: ChannelConfig,
streaming_write_config: StreamingWriteConfig,
debug: bool,
}

#[derive(Clone, Debug, Default)]
pub struct StreamingWriteConfig {
channel_config: ChannelConfig,
max_insert_count: usize,
}

impl StreamingWriteConfig {
pub fn with_channel_config(mut self, value: ChannelConfig) -> Self {
self.channel_config = value;
self
}
pub fn with_max_insert_count(mut self, value: usize) -> Self {
self.max_insert_count = value;
self
}
}

#[derive(Clone, Debug)]
pub struct ChannelConfig {
/// num_channels is the number of gRPC channels.
pub num_channels: usize,
pub connect_timeout: Option<Duration>,
pub timeout: Option<Duration>,
num_channels: usize,
connect_timeout: Option<Duration>,
timeout: Option<Duration>,
}

impl ChannelConfig {
pub fn with_num_channels(mut self, value: usize) -> Self {
self.num_channels = value;
self
}
pub fn with_connect_timeout(mut self, value: Duration) -> Self {
self.connect_timeout = Some(value);
self
}
pub fn with_timeout(mut self, value: Duration) -> Self {
self.timeout = Some(value);
self
}

async fn into_connection_manager(
self,
environment: &Environment,
) -> Result<ConnectionManager, google_cloud_gax::conn::Error> {
ConnectionManager::new(
self.num_channels,
environment,
&ConnectionOptions {
timeout: self.timeout,
connect_timeout: self.connect_timeout,
},
)
.await
}
}

impl Default for ChannelConfig {
Expand All @@ -70,6 +118,7 @@ impl ClientConfig {
token_source_provider: http_token_source_provider,
environment: Environment::GoogleCloud(grpc_token_source_provider),
streaming_read_config: ChannelConfig::default(),
streaming_write_config: StreamingWriteConfig::default(),
debug: false,
}
}
Expand All @@ -81,6 +130,10 @@ impl ClientConfig {
self.streaming_read_config = value;
self
}
pub fn with_streaming_write_config(mut self, value: StreamingWriteConfig) -> Self {
self.streaming_write_config = value;
self
}
pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
self.http = value;
self
Expand All @@ -94,8 +147,11 @@ impl ClientConfig {
use crate::http::job::get::GetJobRequest;
use crate::http::job::list::ListJobsRequest;

use crate::grpc::apiv1::bigquery_client::StreamingReadClient;
use crate::storage_write::stream::{buffered, committed, default, pending};
#[cfg(feature = "auth")]
pub use google_cloud_auth;
use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;

#[cfg(feature = "auth")]
impl ClientConfig {
Expand Down Expand Up @@ -163,7 +219,9 @@ pub struct Client {
routine_client: BigqueryRoutineClient,
row_access_policy_client: BigqueryRowAccessPolicyClient,
model_client: BigqueryModelClient,
streaming_read_client_conn_pool: Arc<ReadConnectionManager>,
streaming_read_conn_pool: Arc<ConnectionManager>,
streaming_write_conn_pool: Arc<ConnectionManager>,
streaming_write_max_insert_count: usize,
}

impl Client {
Expand All @@ -177,14 +235,6 @@ impl Client {
config.debug,
));

let read_config = config.streaming_read_config;
let conn_options = ConnectionOptions {
timeout: read_config.timeout,
connect_timeout: read_config.connect_timeout,
};

let streaming_read_client_conn_pool =
ReadConnectionManager::new(read_config.num_channels, &config.environment, DOMAIN, &conn_options).await?;
Ok(Self {
dataset_client: BigqueryDatasetClient::new(client.clone()),
table_client: BigqueryTableClient::new(client.clone()),
Expand All @@ -193,7 +243,20 @@ impl Client {
routine_client: BigqueryRoutineClient::new(client.clone()),
row_access_policy_client: BigqueryRowAccessPolicyClient::new(client.clone()),
model_client: BigqueryModelClient::new(client.clone()),
streaming_read_client_conn_pool: Arc::new(streaming_read_client_conn_pool),
streaming_read_conn_pool: Arc::new(
config
.streaming_read_config
.into_connection_manager(&config.environment)
.await?,
),
streaming_write_conn_pool: Arc::new(
config
.streaming_write_config
.channel_config
.into_connection_manager(&config.environment)
.await?,
),
streaming_write_max_insert_count: config.streaming_write_config.max_insert_count,
})
}

Expand Down Expand Up @@ -239,6 +302,145 @@ impl Client {
&self.model_client
}

/// Creates a new pending type storage writer for the specified table.
/// https://cloud.google.com/bigquery/docs/write-api#pending_type
/// ```
/// use prost_types::DescriptorProto;
/// use google_cloud_bigquery::client::Client;
/// use google_cloud_gax::grpc::Status;
/// use prost::Message;
/// use tokio::sync::futures;
/// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
/// use futures_util::stream::StreamExt;
///
/// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
/// -> Result<(), Status> {
/// let mut writer = client.pending_storage_writer(table);
/// let stream = writer.create_write_stream().await?;
///
/// let mut data= vec![];
/// for row in rows {
/// let mut buf = Vec::new();
/// row.encode(&mut buf).unwrap();
/// data.push(buf);
/// }
/// let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
/// while let Some(Ok(res)) = result.next().await {
/// tracing::info!("append row errors = {:?}", res.row_errors.len());
/// }
///
/// let _ = stream.finalize().await?;
/// let _ = writer.commit().await?;
/// Ok(())
/// }
/// ```
pub fn pending_storage_writer(&self, table: &str) -> pending::Writer {
pending::Writer::new(1, self.streaming_write_conn_pool.clone(), table.to_string())
}

/// Creates a new default type storage writer.
/// https://cloud.google.com/bigquery/docs/write-api#default_stream
/// ```
/// use prost_types::DescriptorProto;
/// use google_cloud_bigquery::client::Client;
/// use google_cloud_gax::grpc::Status;
/// use prost::Message;
/// use tokio::sync::futures;
/// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
/// use futures_util::stream::StreamExt;
///
/// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
/// -> Result<(), Status> {
/// let writer = client.default_storage_writer();
/// let stream = writer.create_write_stream(table).await?;
///
/// let mut data= vec![];
/// for row in rows {
/// let mut buf = Vec::new();
/// row.encode(&mut buf).unwrap();
/// data.push(buf);
/// }
/// let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
/// while let Some(Ok(res)) = result.next().await {
/// tracing::info!("append row errors = {:?}", res.row_errors.len());
/// }
/// Ok(())
/// }
/// ```
pub fn default_storage_writer(&self) -> default::Writer {
default::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
}

/// Creates a new committed type storage writer.
/// https://cloud.google.com/bigquery/docs/write-api#committed_type
/// ```
/// use prost_types::DescriptorProto;
/// use google_cloud_bigquery::client::Client;
/// use google_cloud_gax::grpc::Status;
/// use prost::Message;
/// use tokio::sync::futures;
/// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
/// use futures_util::stream::StreamExt;
///
/// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
/// -> Result<(), Status> {
/// let writer = client.committed_storage_writer();
/// let stream = writer.create_write_stream(table).await?;
///
/// let mut data= vec![];
/// for row in rows {
/// let mut buf = Vec::new();
/// row.encode(&mut buf).unwrap();
/// data.push(buf);
/// }
/// let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
/// while let Some(Ok(res)) = result.next().await {
/// tracing::info!("append row errors = {:?}", res.row_errors.len());
/// }
///
/// let _ = stream.finalize().await?;
/// Ok(())
/// }
/// ```
pub fn committed_storage_writer(&self) -> committed::Writer {
committed::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
}

/// Creates a new buffered type storage writer.
/// https://cloud.google.com/bigquery/docs/write-api#buffered_type
/// ```
/// use prost_types::DescriptorProto;
/// use google_cloud_bigquery::client::Client;
/// use prost::Message;
/// use tokio::sync::futures;
/// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
/// use futures_util::stream::StreamExt;
/// use google_cloud_gax::grpc::Status;
///
/// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
/// -> Result<(), Status> {
/// let writer = client.buffered_storage_writer();
/// let stream = writer.create_write_stream(table).await?;
///
/// let mut data= vec![];
/// for row in rows {
/// let mut buf = Vec::new();
/// row.encode(&mut buf).unwrap();
/// data.push(buf);
/// }
/// let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
/// while let Some(Ok(res)) = result.next().await {
/// tracing::info!("append row errors = {:?}", res.row_errors.len());
/// }
/// let _ = stream.flush_rows(Some(0)).await?;
/// let _ = stream.finalize().await?;
/// Ok(())
/// }
/// ```
pub fn buffered_storage_writer(&self) -> buffered::Writer {
buffered::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
}

/// Run query job and get result.
/// ```rust
/// use google_cloud_bigquery::http::job::query::QueryRequest;
Expand Down Expand Up @@ -473,7 +675,7 @@ impl Client {
{
let option = option.unwrap_or_default();

let mut client = self.streaming_read_client_conn_pool.conn();
let mut client = StreamingReadClient::new(BigQueryReadClient::new(self.streaming_read_conn_pool.conn()));
let read_session = client
.create_read_session(
CreateReadSessionRequest {
Expand Down Expand Up @@ -562,7 +764,9 @@ mod tests {

#[ctor::ctor]
fn init() {
let _ = tracing_subscriber::fmt::try_init();
let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
.add_directive("google_cloud_bigquery=trace".parse().unwrap());
let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
}

async fn create_client() -> (Client, String) {
Expand Down
16 changes: 16 additions & 0 deletions bigquery/src/grpc/apiv1/bigquery_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use google_cloud_gax::grpc::{Code, IntoStreamingRequest, Response, Status, Strea
use google_cloud_gax::retry::{invoke_fn, RetrySetting};
use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;
use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_write_client::BigQueryWriteClient;
use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type;
use google_cloud_googleapis::cloud::bigquery::storage::v1::{
AppendRowsRequest, AppendRowsResponse, BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse,
CreateReadSessionRequest, CreateWriteStreamRequest, FinalizeWriteStreamRequest, FinalizeWriteStreamResponse,
Expand Down Expand Up @@ -212,3 +213,18 @@ impl StreamingWriteClient {
.await
}
}

pub(crate) fn create_write_stream_request(table: &str, write_type: Type) -> CreateWriteStreamRequest {
CreateWriteStreamRequest {
parent: table.to_string(),
write_stream: Some(WriteStream {
name: "".to_string(),
r#type: write_type as i32,
create_time: None,
commit_time: None,
table_schema: None,
write_mode: 0,
location: "".to_string(),
}),
}
}
Loading
Loading