diff --git a/bigquery/Cargo.toml b/bigquery/Cargo.toml index 5b6b2eb5..c15ea0f6 100644 --- a/bigquery/Cargo.toml +++ b/bigquery/Cargo.toml @@ -29,6 +29,8 @@ num-bigint = "0.4" backon = { version = "1.2", default-features = false, features = ["tokio-sleep"] } reqwest-middleware = { version = "0.4", features = ["json", "multipart"] } anyhow = "1.0" +async-stream = "0.3" +prost-types = "0.13" google-cloud-auth = { optional = true, version = "0.17", path="../foundation/auth", default-features=false } @@ -40,6 +42,8 @@ ctor = "0.1.26" tokio-util = {version ="0.7", features = ["codec"] } google-cloud-auth = { path = "../foundation/auth", default-features=false } base64-serde = "0.7" +prost = "0.13" +futures-util = "0.3" [features] default = ["default-tls", "auth"] diff --git a/bigquery/src/client.rs b/bigquery/src/client.rs index a1cec4cf..2f5d7959 100644 --- a/bigquery/src/client.rs +++ b/bigquery/src/client.rs @@ -12,7 +12,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::{ }; use google_cloud_token::TokenSourceProvider; -use crate::grpc::apiv1::conn_pool::{ReadConnectionManager, DOMAIN}; +use crate::grpc::apiv1::conn_pool::ConnectionManager; use crate::http::bigquery_client::BigqueryClient; use crate::http::bigquery_dataset_client::BigqueryDatasetClient; use crate::http::bigquery_job_client::BigqueryJobClient; @@ -38,15 +38,63 @@ pub struct ClientConfig { token_source_provider: Box, environment: Environment, streaming_read_config: ChannelConfig, + streaming_write_config: StreamingWriteConfig, debug: bool, } +#[derive(Clone, Debug, Default)] +pub struct StreamingWriteConfig { + channel_config: ChannelConfig, + max_insert_count: usize, +} + +impl StreamingWriteConfig { + pub fn with_channel_config(mut self, value: ChannelConfig) -> Self { + self.channel_config = value; + self + } + pub fn with_max_insert_count(mut self, value: usize) -> Self { + self.max_insert_count = value; + self + } +} + #[derive(Clone, Debug)] pub struct ChannelConfig { /// num_channels is the number of gRPC channels. - pub num_channels: usize, - pub connect_timeout: Option, - pub timeout: Option, + num_channels: usize, + connect_timeout: Option, + timeout: Option, +} + +impl ChannelConfig { + pub fn with_num_channels(mut self, value: usize) -> Self { + self.num_channels = value; + self + } + pub fn with_connect_timeout(mut self, value: Duration) -> Self { + self.connect_timeout = Some(value); + self + } + pub fn with_timeout(mut self, value: Duration) -> Self { + self.timeout = Some(value); + self + } + + async fn into_connection_manager( + self, + environment: &Environment, + ) -> Result { + ConnectionManager::new( + self.num_channels, + environment, + &ConnectionOptions { + timeout: self.timeout, + connect_timeout: self.connect_timeout, + }, + ) + .await + } } impl Default for ChannelConfig { @@ -70,6 +118,7 @@ impl ClientConfig { token_source_provider: http_token_source_provider, environment: Environment::GoogleCloud(grpc_token_source_provider), streaming_read_config: ChannelConfig::default(), + streaming_write_config: StreamingWriteConfig::default(), debug: false, } } @@ -81,6 +130,10 @@ impl ClientConfig { self.streaming_read_config = value; self } + pub fn with_streaming_write_config(mut self, value: StreamingWriteConfig) -> Self { + self.streaming_write_config = value; + self + } pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self { self.http = value; self @@ -94,8 +147,11 @@ impl ClientConfig { use crate::http::job::get::GetJobRequest; use crate::http::job::list::ListJobsRequest; +use crate::grpc::apiv1::bigquery_client::StreamingReadClient; +use crate::storage_write::stream::{buffered, committed, default, pending}; #[cfg(feature = "auth")] pub use google_cloud_auth; +use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient; #[cfg(feature = "auth")] impl ClientConfig { @@ -163,7 +219,9 @@ pub struct Client { routine_client: BigqueryRoutineClient, row_access_policy_client: BigqueryRowAccessPolicyClient, model_client: BigqueryModelClient, - streaming_read_client_conn_pool: Arc, + streaming_read_conn_pool: Arc, + streaming_write_conn_pool: Arc, + streaming_write_max_insert_count: usize, } impl Client { @@ -177,14 +235,6 @@ impl Client { config.debug, )); - let read_config = config.streaming_read_config; - let conn_options = ConnectionOptions { - timeout: read_config.timeout, - connect_timeout: read_config.connect_timeout, - }; - - let streaming_read_client_conn_pool = - ReadConnectionManager::new(read_config.num_channels, &config.environment, DOMAIN, &conn_options).await?; Ok(Self { dataset_client: BigqueryDatasetClient::new(client.clone()), table_client: BigqueryTableClient::new(client.clone()), @@ -193,7 +243,20 @@ impl Client { routine_client: BigqueryRoutineClient::new(client.clone()), row_access_policy_client: BigqueryRowAccessPolicyClient::new(client.clone()), model_client: BigqueryModelClient::new(client.clone()), - streaming_read_client_conn_pool: Arc::new(streaming_read_client_conn_pool), + streaming_read_conn_pool: Arc::new( + config + .streaming_read_config + .into_connection_manager(&config.environment) + .await?, + ), + streaming_write_conn_pool: Arc::new( + config + .streaming_write_config + .channel_config + .into_connection_manager(&config.environment) + .await?, + ), + streaming_write_max_insert_count: config.streaming_write_config.max_insert_count, }) } @@ -239,6 +302,145 @@ impl Client { &self.model_client } + /// Creates a new pending type storage writer for the specified table. + /// https://cloud.google.com/bigquery/docs/write-api#pending_type + /// ``` + /// use prost_types::DescriptorProto; + /// use google_cloud_bigquery::client::Client; + /// use google_cloud_gax::grpc::Status; + /// use prost::Message; + /// use tokio::sync::futures; + /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder; + /// use futures_util::stream::StreamExt; + /// + /// pub async fn run(client: &Client, table: &str, rows: Vec, schema: DescriptorProto) + /// -> Result<(), Status> { + /// let mut writer = client.pending_storage_writer(table); + /// let stream = writer.create_write_stream().await?; + /// + /// let mut data= vec![]; + /// for row in rows { + /// let mut buf = Vec::new(); + /// row.encode(&mut buf).unwrap(); + /// data.push(buf); + /// } + /// let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap(); + /// while let Some(Ok(res)) = result.next().await { + /// tracing::info!("append row errors = {:?}", res.row_errors.len()); + /// } + /// + /// let _ = stream.finalize().await?; + /// let _ = writer.commit().await?; + /// Ok(()) + /// } + /// ``` + pub fn pending_storage_writer(&self, table: &str) -> pending::Writer { + pending::Writer::new(1, self.streaming_write_conn_pool.clone(), table.to_string()) + } + + /// Creates a new default type storage writer. + /// https://cloud.google.com/bigquery/docs/write-api#default_stream + /// ``` + /// use prost_types::DescriptorProto; + /// use google_cloud_bigquery::client::Client; + /// use google_cloud_gax::grpc::Status; + /// use prost::Message; + /// use tokio::sync::futures; + /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder; + /// use futures_util::stream::StreamExt; + /// + /// pub async fn run(client: &Client, table: &str, rows: Vec, schema: DescriptorProto) + /// -> Result<(), Status> { + /// let writer = client.default_storage_writer(); + /// let stream = writer.create_write_stream(table).await?; + /// + /// let mut data= vec![]; + /// for row in rows { + /// let mut buf = Vec::new(); + /// row.encode(&mut buf).unwrap(); + /// data.push(buf); + /// } + /// let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap(); + /// while let Some(Ok(res)) = result.next().await { + /// tracing::info!("append row errors = {:?}", res.row_errors.len()); + /// } + /// Ok(()) + /// } + /// ``` + pub fn default_storage_writer(&self) -> default::Writer { + default::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone()) + } + + /// Creates a new committed type storage writer. + /// https://cloud.google.com/bigquery/docs/write-api#committed_type + /// ``` + /// use prost_types::DescriptorProto; + /// use google_cloud_bigquery::client::Client; + /// use google_cloud_gax::grpc::Status; + /// use prost::Message; + /// use tokio::sync::futures; + /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder; + /// use futures_util::stream::StreamExt; + /// + /// pub async fn run(client: &Client, table: &str, rows: Vec, schema: DescriptorProto) + /// -> Result<(), Status> { + /// let writer = client.committed_storage_writer(); + /// let stream = writer.create_write_stream(table).await?; + /// + /// let mut data= vec![]; + /// for row in rows { + /// let mut buf = Vec::new(); + /// row.encode(&mut buf).unwrap(); + /// data.push(buf); + /// } + /// let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap(); + /// while let Some(Ok(res)) = result.next().await { + /// tracing::info!("append row errors = {:?}", res.row_errors.len()); + /// } + /// + /// let _ = stream.finalize().await?; + /// Ok(()) + /// } + /// ``` + pub fn committed_storage_writer(&self) -> committed::Writer { + committed::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone()) + } + + /// Creates a new buffered type storage writer. + /// https://cloud.google.com/bigquery/docs/write-api#buffered_type + /// ``` + /// use prost_types::DescriptorProto; + /// use google_cloud_bigquery::client::Client; + /// use prost::Message; + /// use tokio::sync::futures; + /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder; + /// use futures_util::stream::StreamExt; + /// use google_cloud_gax::grpc::Status; + /// + /// pub async fn run(client: &Client, table: &str, rows: Vec, schema: DescriptorProto) + /// -> Result<(), Status> { + /// let writer = client.buffered_storage_writer(); + /// let stream = writer.create_write_stream(table).await?; + /// + /// let mut data= vec![]; + /// for row in rows { + /// let mut buf = Vec::new(); + /// row.encode(&mut buf).unwrap(); + /// data.push(buf); + /// } + /// let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap(); + /// while let Some(Ok(res)) = result.next().await { + /// tracing::info!("append row errors = {:?}", res.row_errors.len()); + /// } + /// let _ = stream.flush_rows(Some(0)).await?; + /// let _ = stream.finalize().await?; + /// Ok(()) + /// } + /// ``` + pub fn buffered_storage_writer(&self) -> buffered::Writer { + buffered::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone()) + } + /// Run query job and get result. /// ```rust /// use google_cloud_bigquery::http::job::query::QueryRequest; @@ -473,7 +675,7 @@ impl Client { { let option = option.unwrap_or_default(); - let mut client = self.streaming_read_client_conn_pool.conn(); + let mut client = StreamingReadClient::new(BigQueryReadClient::new(self.streaming_read_conn_pool.conn())); let read_session = client .create_read_session( CreateReadSessionRequest { @@ -562,7 +764,9 @@ mod tests { #[ctor::ctor] fn init() { - let _ = tracing_subscriber::fmt::try_init(); + let filter = tracing_subscriber::filter::EnvFilter::from_default_env() + .add_directive("google_cloud_bigquery=trace".parse().unwrap()); + let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init(); } async fn create_client() -> (Client, String) { diff --git a/bigquery/src/grpc/apiv1/bigquery_client.rs b/bigquery/src/grpc/apiv1/bigquery_client.rs index f99eeb00..0fce003a 100644 --- a/bigquery/src/grpc/apiv1/bigquery_client.rs +++ b/bigquery/src/grpc/apiv1/bigquery_client.rs @@ -6,6 +6,7 @@ use google_cloud_gax::grpc::{Code, IntoStreamingRequest, Response, Status, Strea use google_cloud_gax::retry::{invoke_fn, RetrySetting}; use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient; use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_write_client::BigQueryWriteClient; +use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type; use google_cloud_googleapis::cloud::bigquery::storage::v1::{ AppendRowsRequest, AppendRowsResponse, BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse, CreateReadSessionRequest, CreateWriteStreamRequest, FinalizeWriteStreamRequest, FinalizeWriteStreamResponse, @@ -212,3 +213,18 @@ impl StreamingWriteClient { .await } } + +pub(crate) fn create_write_stream_request(table: &str, write_type: Type) -> CreateWriteStreamRequest { + CreateWriteStreamRequest { + parent: table.to_string(), + write_stream: Some(WriteStream { + name: "".to_string(), + r#type: write_type as i32, + create_time: None, + commit_time: None, + table_schema: None, + write_mode: 0, + location: "".to_string(), + }), + } +} diff --git a/bigquery/src/grpc/apiv1/conn_pool.rs b/bigquery/src/grpc/apiv1/conn_pool.rs index fb45ecaf..6faeb8e4 100644 --- a/bigquery/src/grpc/apiv1/conn_pool.rs +++ b/bigquery/src/grpc/apiv1/conn_pool.rs @@ -1,9 +1,9 @@ -use google_cloud_gax::conn::{ConnectionManager as GRPCConnectionManager, ConnectionOptions, Environment, Error}; -use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient; +use crate::grpc::apiv1::bigquery_client::StreamingWriteClient; +use google_cloud_gax::conn::{ + Channel, ConnectionManager as GRPCConnectionManager, ConnectionOptions, Environment, Error, +}; use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_write_client::BigQueryWriteClient; -use crate::grpc::apiv1::bigquery_client::{StreamingReadClient, StreamingWriteClient}; - pub const AUDIENCE: &str = "https://bigquerystorage.googleapis.com/"; pub const DOMAIN: &str = "bigquerystorage.googleapis.com"; pub const SCOPES: [&str; 3] = [ @@ -12,19 +12,19 @@ pub const SCOPES: [&str; 3] = [ "https://www.googleapis.com/auth/cloud-platform", ]; -pub struct ReadConnectionManager { +#[derive(Debug)] +pub struct ConnectionManager { inner: GRPCConnectionManager, } -impl ReadConnectionManager { +impl ConnectionManager { pub async fn new( pool_size: usize, environment: &Environment, - domain: &str, conn_options: &ConnectionOptions, ) -> Result { - Ok(ReadConnectionManager { - inner: GRPCConnectionManager::new(pool_size, domain, AUDIENCE, environment, conn_options).await?, + Ok(ConnectionManager { + inner: GRPCConnectionManager::new(pool_size, DOMAIN, AUDIENCE, environment, conn_options).await?, }) } @@ -32,34 +32,11 @@ impl ReadConnectionManager { self.inner.num() } - pub fn conn(&self) -> StreamingReadClient { - let conn = self.inner.conn(); - StreamingReadClient::new(BigQueryReadClient::new(conn)) - } -} - -pub struct WriteConnectionManager { - inner: GRPCConnectionManager, -} - -impl WriteConnectionManager { - pub async fn new( - pool_size: usize, - environment: &Environment, - domain: &str, - conn_options: &ConnectionOptions, - ) -> Result { - Ok(WriteConnectionManager { - inner: GRPCConnectionManager::new(pool_size, domain, AUDIENCE, environment, conn_options).await?, - }) - } - - pub fn num(&self) -> usize { - self.inner.num() + pub fn conn(&self) -> Channel { + self.inner.conn() } - pub fn conn(&self) -> StreamingWriteClient { - let conn = self.inner.conn(); - StreamingWriteClient::new(BigQueryWriteClient::new(conn)) + pub fn writer(&self) -> StreamingWriteClient { + StreamingWriteClient::new(BigQueryWriteClient::new(self.conn())) } } diff --git a/bigquery/src/http/bigquery_dataset_client.rs b/bigquery/src/http/bigquery_dataset_client.rs index 3c26db91..0dd0950f 100644 --- a/bigquery/src/http/bigquery_dataset_client.rs +++ b/bigquery/src/http/bigquery_dataset_client.rs @@ -121,7 +121,7 @@ mod test { // minimum dataset let mut ds1 = Dataset::default(); ds1.dataset_reference.dataset_id = dataset_name("crud_empty"); - ds1.dataset_reference.project_id = project.clone(); + ds1.dataset_reference.project_id.clone_from(&project); ds1 = client.create(&ds1).await.unwrap(); // test get diff --git a/bigquery/src/http/bigquery_job_client.rs b/bigquery/src/http/bigquery_job_client.rs index 961dde8c..359c015d 100644 --- a/bigquery/src/http/bigquery_job_client.rs +++ b/bigquery/src/http/bigquery_job_client.rs @@ -335,7 +335,7 @@ mod test { // insert test data let mut table1 = Table::default(); - table1.table_reference.dataset_id = dataset.clone(); + table1.table_reference.dataset_id.clone_from(&dataset); table1.table_reference.project_id = project.to_string(); table1.table_reference.table_id = format!("table_data_{}", OffsetDateTime::now_utc().unix_timestamp()); table1.schema = Some(create_table_schema()); diff --git a/bigquery/src/lib.rs b/bigquery/src/lib.rs index fc72b03e..9b9c1a2a 100644 --- a/bigquery/src/lib.rs +++ b/bigquery/src/lib.rs @@ -190,3 +190,4 @@ pub mod grpc; pub mod http; pub mod query; pub mod storage; +pub mod storage_write; diff --git a/bigquery/src/storage_write/flow.rs b/bigquery/src/storage_write/flow.rs new file mode 100644 index 00000000..a9b39bd5 --- /dev/null +++ b/bigquery/src/storage_write/flow.rs @@ -0,0 +1,17 @@ +use tokio::sync::{Semaphore, SemaphorePermit}; + +#[derive(Debug)] +pub struct FlowController { + sem_insert_count: Semaphore, //TODO support sem_insert_bytes +} + +impl FlowController { + pub fn new(max_insert_count: usize) -> Self { + FlowController { + sem_insert_count: Semaphore::new(max_insert_count), + } + } + pub async fn acquire(&self) -> SemaphorePermit { + self.sem_insert_count.acquire().await.unwrap() + } +} diff --git a/bigquery/src/storage_write/mod.rs b/bigquery/src/storage_write/mod.rs new file mode 100644 index 00000000..1f218f32 --- /dev/null +++ b/bigquery/src/storage_write/mod.rs @@ -0,0 +1,76 @@ +use google_cloud_gax::grpc::codegen::tokio_stream::Stream; +use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{ProtoData, Rows}; +use google_cloud_googleapis::cloud::bigquery::storage::v1::{AppendRowsRequest, ProtoRows, ProtoSchema}; +use prost_types::DescriptorProto; +use std::collections::HashMap; + +mod flow; +pub mod stream; + +pub struct AppendRowsRequestBuilder { + offset: Option, + trace_id: Option, + missing_value_interpretations: Option>, + default_missing_value_interpretation: Option, + data: Vec>, + schema: DescriptorProto, +} + +impl AppendRowsRequestBuilder { + pub fn new(schema: DescriptorProto, data: Vec>) -> Self { + Self { + offset: None, + trace_id: None, + missing_value_interpretations: None, + default_missing_value_interpretation: None, + data, + schema, + } + } + + pub fn with_offset(mut self, offset: i64) -> Self { + self.offset = Some(offset); + self + } + + pub fn with_trace_id(mut self, trace_id: String) -> Self { + self.trace_id = Some(trace_id); + self + } + + pub fn with_missing_value_interpretations(mut self, missing_value_interpretations: HashMap) -> Self { + self.missing_value_interpretations = Some(missing_value_interpretations); + self + } + + pub fn with_default_missing_value_interpretation(mut self, default_missing_value_interpretation: i32) -> Self { + self.default_missing_value_interpretation = Some(default_missing_value_interpretation); + self + } + + pub(crate) fn build(self, stream: &str) -> AppendRowsRequest { + AppendRowsRequest { + write_stream: stream.to_string(), + offset: self.offset, + trace_id: self.trace_id.unwrap_or_default(), + missing_value_interpretations: self.missing_value_interpretations.unwrap_or_default(), + default_missing_value_interpretation: self.default_missing_value_interpretation.unwrap_or(0), + rows: Some(Rows::ProtoRows(ProtoData { + writer_schema: Some(ProtoSchema { + proto_descriptor: Some(self.schema), + }), + rows: Some(ProtoRows { + serialized_rows: self.data, + }), + })), + } + } +} + +pub fn into_streaming_request(rows: Vec) -> impl Stream { + async_stream::stream! { + for row in rows { + yield row; + } + } +} diff --git a/bigquery/src/storage_write/stream/buffered.rs b/bigquery/src/storage_write/stream/buffered.rs new file mode 100644 index 00000000..fe9d34a1 --- /dev/null +++ b/bigquery/src/storage_write/stream/buffered.rs @@ -0,0 +1,189 @@ +use crate::grpc::apiv1::bigquery_client::create_write_stream_request; +use crate::grpc::apiv1::conn_pool::ConnectionManager; +use crate::storage_write::stream::{AsStream, DisposableStreamDelegate, ManagedStreamDelegate, Stream}; +use crate::storage_write::AppendRowsRequestBuilder; +use google_cloud_gax::grpc::{Status, Streaming}; +use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type::Buffered; +use google_cloud_googleapis::cloud::bigquery::storage::v1::{AppendRowsResponse, FlushRowsRequest}; +use std::sync::Arc; + +pub struct Writer { + max_insert_count: usize, + cm: Arc, +} + +impl Writer { + pub(crate) fn new(max_insert_count: usize, cm: Arc) -> Self { + Self { max_insert_count, cm } + } + + pub async fn create_write_stream(&self, table: &str) -> Result { + let req = create_write_stream_request(table, Buffered); + let stream = self.cm.writer().create_write_stream(req, None).await?.into_inner(); + Ok(BufferedStream::new(Stream::new(stream, self.cm.clone(), self.max_insert_count))) + } +} +pub struct BufferedStream { + inner: Stream, +} + +impl BufferedStream { + pub(crate) fn new(inner: Stream) -> Self { + Self { inner } + } + + pub async fn append_rows( + &self, + rows: Vec, + ) -> Result, Status> { + ManagedStreamDelegate::append_rows(&self.inner, rows).await + } + + pub async fn finalize(&self) -> Result { + DisposableStreamDelegate::finalize(&self.inner).await + } + + pub async fn flush_rows(&self, offset: Option) -> Result { + let stream = &self.inner; + let res = stream + .cons + .writer() + .flush_rows( + FlushRowsRequest { + write_stream: stream.inner.name.to_string(), + offset, + }, + None, + ) + .await? + .into_inner(); + Ok(res.offset) + } +} + +impl AsStream for BufferedStream { + fn as_ref(&self) -> &Stream { + &self.inner + } +} + +#[cfg(test)] +mod tests { + use futures_util::StreamExt; + use std::sync::Arc; + use tokio::task::JoinHandle; + + use crate::client::{Client, ClientConfig}; + use crate::storage_write::stream::tests::{create_append_rows_request, TestData}; + use google_cloud_gax::grpc::Status; + use prost::Message; + + #[ctor::ctor] + fn init() { + crate::storage_write::stream::tests::init(); + } + + #[tokio::test] + async fn test_storage_write() { + let (config, project_id) = ClientConfig::new_with_auth().await.unwrap(); + let project_id = project_id.unwrap(); + let client = Client::new(config).await.unwrap(); + let tables = ["write_test", "write_test_1"]; + let writer = client.buffered_storage_writer(); + + // Create Streams + let mut streams = vec![]; + for i in 0..2 { + let table = format!( + "projects/{}/datasets/gcrbq_storage/tables/{}", + &project_id, + tables[i % tables.len()] + ) + .to_string(); + let stream = writer.create_write_stream(&table).await.unwrap(); + streams.push(stream); + } + + // Append Rows + let mut tasks: Vec>> = vec![]; + for (i, stream) in streams.into_iter().enumerate() { + tasks.push(tokio::spawn(async move { + let mut rows = vec![]; + for j in 0..5 { + let data = TestData { + col_string: format!("buffered_{i}_{j}"), + }; + let mut buf = Vec::new(); + data.encode(&mut buf).unwrap(); + rows.push(create_append_rows_request(vec![buf.clone(), buf])); + } + let mut result = stream.append_rows(rows).await.unwrap(); + while let Some(res) = result.next().await { + let res = res?; + tracing::info!("append row errors = {:?}", res.row_errors.len()); + } + let result = stream.flush_rows(Some(0)).await.unwrap(); + tracing::info!("flush rows count = {:?}", result); + + let result = stream.finalize().await.unwrap(); + tracing::info!("finalized row count = {:?}", result); + Ok(()) + })); + } + + // Wait for append rows + for task in tasks { + task.await.unwrap().unwrap(); + } + } + + #[serial_test::serial] + #[tokio::test] + async fn test_storage_write_single_stream() { + let (config, project_id) = ClientConfig::new_with_auth().await.unwrap(); + let project_id = project_id.unwrap(); + let client = Client::new(config).await.unwrap(); + let writer = client.buffered_storage_writer(); + + // Create Streams + let mut streams = vec![]; + let table = format!("projects/{}/datasets/gcrbq_storage/tables/write_test", &project_id).to_string(); + let stream = Arc::new(writer.create_write_stream(&table).await.unwrap()); + for _i in 0..2 { + streams.push(stream.clone()); + } + + // Append Rows + let mut tasks: Vec>> = vec![]; + for (i, stream) in streams.into_iter().enumerate() { + tasks.push(tokio::spawn(async move { + let mut rows = vec![]; + for j in 0..5 { + let data = TestData { + col_string: format!("buffered_{i}_{j}"), + }; + let mut buf = Vec::new(); + data.encode(&mut buf).unwrap(); + rows.push(create_append_rows_request(vec![buf.clone(), buf])); + } + let mut result = stream.append_rows(rows).await.unwrap(); + while let Some(res) = result.next().await { + let res = res?; + tracing::info!("append row errors = {:?}", res.row_errors.len()); + } + Ok(()) + })); + } + + // Wait for append rows + for task in tasks { + task.await.unwrap().unwrap(); + } + + let result = stream.flush_rows(Some(0)).await.unwrap(); + tracing::info!("flush rows count = {:?}", result); + + let result = stream.finalize().await.unwrap(); + tracing::info!("finalized row count = {:?}", result); + } +} diff --git a/bigquery/src/storage_write/stream/committed.rs b/bigquery/src/storage_write/stream/committed.rs new file mode 100644 index 00000000..40c88667 --- /dev/null +++ b/bigquery/src/storage_write/stream/committed.rs @@ -0,0 +1,170 @@ +use crate::grpc::apiv1::bigquery_client::create_write_stream_request; +use crate::grpc::apiv1::conn_pool::ConnectionManager; +use crate::storage_write::stream::{AsStream, DisposableStreamDelegate, ManagedStreamDelegate, Stream}; +use crate::storage_write::AppendRowsRequestBuilder; +use google_cloud_gax::grpc::{Status, Streaming}; +use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type::Committed; +use google_cloud_googleapis::cloud::bigquery::storage::v1::AppendRowsResponse; +use std::sync::Arc; + +pub struct Writer { + max_insert_count: usize, + cm: Arc, +} + +impl Writer { + pub(crate) fn new(max_insert_count: usize, cm: Arc) -> Self { + Self { max_insert_count, cm } + } + + pub async fn create_write_stream(&self, table: &str) -> Result { + let req = create_write_stream_request(table, Committed); + let stream = self.cm.writer().create_write_stream(req, None).await?.into_inner(); + Ok(CommittedStream::new(Stream::new( + stream, + self.cm.clone(), + self.max_insert_count, + ))) + } +} + +pub struct CommittedStream { + inner: Stream, +} + +impl CommittedStream { + pub(crate) fn new(inner: Stream) -> Self { + Self { inner } + } + + pub async fn append_rows( + &self, + rows: Vec, + ) -> Result, Status> { + ManagedStreamDelegate::append_rows(&self.inner, rows).await + } + + pub async fn finalize(&self) -> Result { + DisposableStreamDelegate::finalize(&self.inner).await + } +} + +impl AsStream for CommittedStream { + fn as_ref(&self) -> &Stream { + &self.inner + } +} +#[cfg(test)] +mod tests { + use crate::client::{Client, ClientConfig}; + use crate::storage_write::stream::tests::{create_append_rows_request, TestData}; + use futures_util::StreamExt; + use google_cloud_gax::grpc::Status; + use prost::Message; + use std::sync::Arc; + use tokio::task::JoinHandle; + + #[ctor::ctor] + fn init() { + crate::storage_write::stream::tests::init(); + } + + #[serial_test::serial] + #[tokio::test] + async fn test_storage_write() { + let (config, project_id) = ClientConfig::new_with_auth().await.unwrap(); + let project_id = project_id.unwrap(); + let client = Client::new(config).await.unwrap(); + let tables = ["write_test", "write_test_1"]; + let writer = client.committed_storage_writer(); + + // Create Streams + let mut streams = vec![]; + for i in 0..2 { + let table = format!( + "projects/{}/datasets/gcrbq_storage/tables/{}", + &project_id, + tables[i % tables.len()] + ) + .to_string(); + let stream = writer.create_write_stream(&table).await.unwrap(); + streams.push(stream); + } + + // Append Rows + let mut tasks: Vec>> = vec![]; + for (i, stream) in streams.into_iter().enumerate() { + tasks.push(tokio::spawn(async move { + let mut rows = vec![]; + for j in 0..5 { + let data = TestData { + col_string: format!("committed_{i}_{j}"), + }; + let mut buf = Vec::new(); + data.encode(&mut buf).unwrap(); + rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf])); + } + let mut result = stream.append_rows(rows).await.unwrap(); + while let Some(res) = result.next().await { + let res = res?; + tracing::info!("append row errors = {:?}", res.row_errors.len()); + } + let result = stream.finalize().await.unwrap(); + tracing::info!("finalized row count = {:?}", result); + Ok(()) + })); + } + + // Wait for append rows + for task in tasks { + task.await.unwrap().unwrap(); + } + } + + #[serial_test::serial] + #[tokio::test] + async fn test_storage_write_single_stream() { + let (config, project_id) = ClientConfig::new_with_auth().await.unwrap(); + let project_id = project_id.unwrap(); + let client = Client::new(config).await.unwrap(); + let writer = client.committed_storage_writer(); + + // Create Streams + let mut streams = vec![]; + let table = format!("projects/{}/datasets/gcrbq_storage/tables/write_test", &project_id).to_string(); + let stream = Arc::new(writer.create_write_stream(&table).await.unwrap()); + for _i in 0..2 { + streams.push(stream.clone()); + } + + // Append Rows + let mut tasks: Vec>> = vec![]; + for (i, stream) in streams.into_iter().enumerate() { + tasks.push(tokio::spawn(async move { + let mut rows = vec![]; + for j in 0..5 { + let data = TestData { + col_string: format!("committed_{i}_{j}"), + }; + let mut buf = Vec::new(); + data.encode(&mut buf).unwrap(); + rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf])); + } + let mut result = stream.append_rows(rows).await.unwrap(); + while let Some(res) = result.next().await { + let res = res?; + tracing::info!("append row errors = {:?}", res.row_errors.len()); + } + Ok(()) + })); + } + + // Wait for append rows + for task in tasks { + task.await.unwrap().unwrap(); + } + + let result = stream.finalize().await.unwrap(); + tracing::info!("finalized row count = {:?}", result); + } +} diff --git a/bigquery/src/storage_write/stream/default.rs b/bigquery/src/storage_write/stream/default.rs new file mode 100644 index 00000000..5e45b41a --- /dev/null +++ b/bigquery/src/storage_write/stream/default.rs @@ -0,0 +1,120 @@ +use crate::grpc::apiv1::conn_pool::ConnectionManager; +use crate::storage_write::stream::{AsStream, ManagedStreamDelegate, Stream}; +use crate::storage_write::AppendRowsRequestBuilder; +use google_cloud_gax::grpc::{Status, Streaming}; +use google_cloud_googleapis::cloud::bigquery::storage::v1::{AppendRowsResponse, GetWriteStreamRequest}; +use std::sync::Arc; + +pub struct Writer { + max_insert_count: usize, + cm: Arc, +} + +impl Writer { + pub(crate) fn new(max_insert_count: usize, cm: Arc) -> Self { + Self { max_insert_count, cm } + } + + pub async fn create_write_stream(&self, table: &str) -> Result { + let stream = self + .cm + .writer() + .get_write_stream( + GetWriteStreamRequest { + name: format!("{table}/streams/_default"), + ..Default::default() + }, + None, + ) + .await? + .into_inner(); + Ok(DefaultStream::new(Stream::new(stream, self.cm.clone(), self.max_insert_count))) + } +} + +pub struct DefaultStream { + inner: Stream, +} + +impl DefaultStream { + pub(crate) fn new(inner: Stream) -> Self { + Self { inner } + } + pub async fn append_rows( + &self, + rows: Vec, + ) -> Result, Status> { + ManagedStreamDelegate::append_rows(&self.inner, rows).await + } +} + +impl AsStream for DefaultStream { + fn as_ref(&self) -> &Stream { + &self.inner + } +} + +#[cfg(test)] +mod tests { + use crate::client::{Client, ClientConfig}; + use crate::storage_write::stream::tests::{create_append_rows_request, TestData}; + use futures_util::StreamExt; + use google_cloud_gax::grpc::Status; + use prost::Message; + use tokio::task::JoinHandle; + + #[ctor::ctor] + fn init() { + crate::storage_write::stream::tests::init(); + } + + #[serial_test::serial] + #[tokio::test] + async fn test_storage_write() { + let (config, project_id) = ClientConfig::new_with_auth().await.unwrap(); + let project_id = project_id.unwrap(); + let client = Client::new(config).await.unwrap(); + let tables = ["write_test", "write_test_1"]; + let writer = client.default_storage_writer(); + + // Create Streams + let mut streams = vec![]; + for i in 0..2 { + let table = format!( + "projects/{}/datasets/gcrbq_storage/tables/{}", + &project_id, + tables[i % tables.len()] + ) + .to_string(); + let stream = writer.create_write_stream(&table).await.unwrap(); + streams.push(stream); + } + + // Append Rows + let mut tasks: Vec>> = vec![]; + for (i, stream) in streams.into_iter().enumerate() { + tasks.push(tokio::spawn(async move { + let mut rows = vec![]; + for j in 0..5 { + let data = TestData { + col_string: format!("default_{i}_{j}"), + }; + let mut buf = Vec::new(); + data.encode(&mut buf).unwrap(); + rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf])); + } + let mut result = stream.append_rows(rows).await.unwrap(); + while let Some(res) = result.next().await { + let res = res?; + tracing::info!("append row errors = {:?}", res.row_errors.len()); + } + Ok(()) + })); + } + + // Wait for append rows + for task in tasks { + task.await.unwrap().unwrap(); + } + } +} diff --git a/bigquery/src/storage_write/stream/mod.rs b/bigquery/src/storage_write/stream/mod.rs new file mode 100644 index 00000000..d7e10367 --- /dev/null +++ b/bigquery/src/storage_write/stream/mod.rs @@ -0,0 +1,153 @@ +use crate::grpc::apiv1::conn_pool::ConnectionManager; +use crate::storage_write::flow::FlowController; +use crate::storage_write::AppendRowsRequestBuilder; +use google_cloud_gax::grpc::{IntoStreamingRequest, Status, Streaming}; +use google_cloud_googleapis::cloud::bigquery::storage::v1::{ + AppendRowsRequest, AppendRowsResponse, FinalizeWriteStreamRequest, WriteStream, +}; +use std::sync::Arc; + +pub mod buffered; +pub mod committed; +pub mod default; +pub mod pending; + +pub struct Stream { + inner: WriteStream, + cons: Arc, + fc: Option, +} + +impl Stream { + pub(crate) fn new(inner: WriteStream, cons: Arc, max_insert_count: usize) -> Self { + Self { + inner, + cons, + fc: if max_insert_count > 0 { + Some(FlowController::new(max_insert_count)) + } else { + None + }, + } + } +} + +pub trait AsStream: Sized { + fn as_ref(&self) -> &Stream; + + fn name(&self) -> &str { + &self.as_ref().inner.name + } + + fn create_streaming_request( + &self, + rows: Vec, + ) -> impl google_cloud_gax::grpc::codegen::tokio_stream::Stream { + let name = self.name().to_string(); + async_stream::stream! { + for row in rows { + yield row.build(&name); + } + } + } +} + +pub(crate) struct ManagedStreamDelegate {} + +impl ManagedStreamDelegate { + async fn append_rows( + stream: &Stream, + rows: Vec, + ) -> Result, Status> { + let name = stream.inner.name.to_string(); + let req = async_stream::stream! { + for row in rows { + yield row.build(&name); + } + }; + Self::append_streaming_request(stream, req).await + } + + async fn append_streaming_request( + stream: &Stream, + req: impl IntoStreamingRequest, + ) -> Result, Status> { + match &stream.fc { + None => { + let mut client = stream.cons.writer(); + Ok(client.append_rows(req).await?.into_inner()) + } + Some(fc) => { + let permit = fc.acquire().await; + let mut client = stream.cons.writer(); + let result = client.append_rows(req).await?.into_inner(); + drop(permit); + Ok(result) + } + } + } +} + +pub(crate) struct DisposableStreamDelegate {} +impl DisposableStreamDelegate { + async fn finalize(stream: &Stream) -> Result { + let res = stream + .cons + .writer() + .finalize_write_stream( + FinalizeWriteStreamRequest { + name: stream.inner.name.to_string(), + }, + None, + ) + .await? + .into_inner(); + Ok(res.row_count) + } +} + +#[cfg(test)] +mod tests { + use crate::storage_write::AppendRowsRequestBuilder; + use prost_types::{field_descriptor_proto, DescriptorProto, FieldDescriptorProto}; + + #[derive(Clone, PartialEq, ::prost::Message)] + pub(crate) struct TestData { + #[prost(string, tag = "1")] + pub col_string: String, + } + + pub(crate) fn init() { + let filter = tracing_subscriber::filter::EnvFilter::from_default_env() + .add_directive("google_cloud_bigquery=trace".parse().unwrap()); + let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init(); + } + + pub(crate) fn create_append_rows_request(buf: Vec>) -> AppendRowsRequestBuilder { + let proto = DescriptorProto { + name: Some("TestData".to_string()), + field: vec![FieldDescriptorProto { + name: Some("col_string".to_string()), + number: Some(1), + label: None, + r#type: Some(field_descriptor_proto::Type::String.into()), + type_name: None, + extendee: None, + default_value: None, + oneof_index: None, + json_name: None, + options: None, + proto3_optional: None, + }], + extension: vec![], + nested_type: vec![], + enum_type: vec![], + extension_range: vec![], + oneof_decl: vec![], + options: None, + reserved_range: vec![], + reserved_name: vec![], + }; + AppendRowsRequestBuilder::new(proto, buf) + } +} diff --git a/bigquery/src/storage_write/stream/pending.rs b/bigquery/src/storage_write/stream/pending.rs new file mode 100644 index 00000000..31a6efb8 --- /dev/null +++ b/bigquery/src/storage_write/stream/pending.rs @@ -0,0 +1,204 @@ +use crate::grpc::apiv1::bigquery_client::create_write_stream_request; +use crate::grpc::apiv1::conn_pool::ConnectionManager; +use crate::storage_write::stream::{AsStream, DisposableStreamDelegate, ManagedStreamDelegate, Stream}; +use crate::storage_write::AppendRowsRequestBuilder; +use google_cloud_gax::grpc::{Status, Streaming}; +use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type::Pending; +use google_cloud_googleapis::cloud::bigquery::storage::v1::{ + AppendRowsResponse, BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse, +}; +use std::sync::Arc; + +pub struct Writer { + max_insert_count: usize, + cm: Arc, + table: String, + streams: Vec, +} + +impl Writer { + pub(crate) fn new(max_insert_count: usize, cm: Arc, table: String) -> Self { + Self { + max_insert_count, + cm, + table, + streams: Vec::new(), + } + } + + pub async fn create_write_stream(&mut self) -> Result { + let req = create_write_stream_request(&self.table, Pending); + let stream = self.cm.writer().create_write_stream(req, None).await?.into_inner(); + self.streams.push(stream.name.to_string()); + Ok(PendingStream::new(Stream::new(stream, self.cm.clone(), self.max_insert_count))) + } + + pub async fn commit(&self) -> Result { + let result = self + .cm + .writer() + .batch_commit_write_streams( + BatchCommitWriteStreamsRequest { + parent: self.table.to_string(), + write_streams: self.streams.clone(), + }, + None, + ) + .await? + .into_inner(); + Ok(result) + } +} +pub struct PendingStream { + inner: Stream, +} + +impl PendingStream { + pub(crate) fn new(inner: Stream) -> Self { + Self { inner } + } + + pub async fn append_rows( + &self, + rows: Vec, + ) -> Result, Status> { + ManagedStreamDelegate::append_rows(&self.inner, rows).await + } + + pub async fn finalize(&self) -> Result { + DisposableStreamDelegate::finalize(&self.inner).await + } +} + +impl AsStream for PendingStream { + fn as_ref(&self) -> &Stream { + &self.inner + } +} +#[cfg(test)] +mod tests { + use crate::client::{Client, ClientConfig}; + use crate::storage_write::stream::tests::{create_append_rows_request, TestData}; + use futures_util::StreamExt; + use google_cloud_gax::grpc::Status; + use prost::Message; + use std::sync::Arc; + use tokio::task::JoinHandle; + + #[ctor::ctor] + fn init() { + crate::storage_write::stream::tests::init(); + } + + #[serial_test::serial] + #[tokio::test] + async fn test_storage_write() { + let (config, project_id) = ClientConfig::new_with_auth().await.unwrap(); + let project_id = project_id.unwrap(); + let client = Client::new(config).await.unwrap(); + let tables = ["write_test", "write_test_1"]; + + // Create Writers + let mut writers = vec![]; + for i in 0..2 { + let table = format!( + "projects/{}/datasets/gcrbq_storage/tables/{}", + &project_id, + tables[i % tables.len()] + ); + let writer = client.pending_storage_writer(&table); + writers.push(writer); + } + + // Create Streams + let mut streams = vec![]; + for writer in writers.iter_mut() { + let stream = writer.create_write_stream().await.unwrap(); + streams.push(stream); + } + + // Append Rows + let mut tasks: Vec>> = vec![]; + for (i, stream) in streams.into_iter().enumerate() { + tasks.push(tokio::spawn(async move { + let mut rows = vec![]; + for j in 0..5 { + let data = TestData { + col_string: format!("pending_{i}_{j}"), + }; + let mut buf = Vec::new(); + data.encode(&mut buf).unwrap(); + rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf])); + } + let mut result = stream.append_rows(rows).await.unwrap(); + while let Some(res) = result.next().await { + let res = res?; + tracing::info!("append row errors = {:?}", res.row_errors.len()); + } + let result = stream.finalize().await.unwrap(); + tracing::info!("finalized row count = {:?}", result); + Ok(()) + })); + } + + // Wait for append rows + for task in tasks { + task.await.unwrap().unwrap(); + } + + for writer in writers.iter_mut() { + let result = writer.commit().await.unwrap(); + tracing::info!("committed error count = {:?}", result.stream_errors.len()); + } + } + + #[serial_test::serial] + #[tokio::test] + async fn test_storage_write_single_stream() { + let (config, project_id) = ClientConfig::new_with_auth().await.unwrap(); + let project_id = project_id.unwrap(); + let client = Client::new(config).await.unwrap(); + + // Create Streams + let mut streams = vec![]; + let table = format!("projects/{}/datasets/gcrbq_storage/tables/write_test", &project_id); + let mut writer = client.pending_storage_writer(&table); + let stream = Arc::new(writer.create_write_stream().await.unwrap()); + for _i in 0..2 { + streams.push(stream.clone()); + } + + // Append Rows + let mut tasks: Vec>> = vec![]; + for (i, stream) in streams.into_iter().enumerate() { + tasks.push(tokio::spawn(async move { + let mut rows = vec![]; + for j in 0..5 { + let data = TestData { + col_string: format!("pending_{i}_{j}"), + }; + let mut buf = Vec::new(); + data.encode(&mut buf).unwrap(); + rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf])); + } + let mut result = stream.append_rows(rows).await.unwrap(); + while let Some(res) = result.next().await { + let res = res?; + tracing::info!("append row errors = {:?}", res.row_errors.len()); + } + Ok(()) + })); + } + + // Wait for append rows + for task in tasks { + task.await.unwrap().unwrap(); + } + + let result = stream.finalize().await.unwrap(); + tracing::info!("finalized row count = {:?}", result); + + let result = writer.commit().await.unwrap(); + tracing::info!("commit error count = {:?}", result.stream_errors.len()); + } +}