Skip to content

Commit

Permalink
feat(connector): introduce azblob file scan (#20046)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Jan 14, 2025
1 parent 8c0cfb4 commit 48a9e4b
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 12 deletions.
17 changes: 17 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ message FileScanNode {
STORAGE_TYPE_UNSPECIFIED = 0;
S3 = 1;
GCS = 2;
AZBLOB = 3;
}

repeated plan_common.ColumnDesc columns = 1;
Expand All @@ -114,6 +115,21 @@ message GcsFileScanNode {
repeated string file_location = 4;
}

message AzblobFileScanNode {
enum FileFormat {
FILE_FORMAT_UNSPECIFIED = 0;
PARQUET = 1;
}

repeated plan_common.ColumnDesc columns = 1;
FileFormat file_format = 2;
string account_name = 3;
string account_key = 4;
string endpoint = 5;

repeated string file_location = 6;
}

// NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed.
message PostgresQueryNode {
repeated plan_common.ColumnDesc columns = 1;
Expand Down Expand Up @@ -421,6 +437,7 @@ message PlanNode {
PostgresQueryNode postgres_query = 40;
MySqlQueryNode mysql_query = 41;
GcsFileScanNode gcs_file_scan = 42;
AzblobFileScanNode azblob_file_scan = 43;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
3 changes: 3 additions & 0 deletions src/batch/executors/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub use risingwave_batch::executor::*;

pub mod aggregation;
mod azblob_file_scan;
mod delete;
mod expand;
mod filter;
Expand Down Expand Up @@ -50,6 +51,7 @@ mod update;
mod utils;
mod values;

use azblob_file_scan::AzblobFileScanExecutorBuilder;
pub use delete::*;
pub use expand::*;
pub use filter::*;
Expand Down Expand Up @@ -115,6 +117,7 @@ register_executor!(SortOverWindow, SortOverWindowExecutor);
register_executor!(MaxOneRow, MaxOneRowExecutor);
register_executor!(FileScan, FileScanExecutorBuilder);
register_executor!(GcsFileScan, GcsFileScanExecutorBuilder);
register_executor!(AzblobFileScan, AzblobFileScanExecutorBuilder);
register_executor!(IcebergScan, IcebergScanExecutorBuilder);
register_executor!(PostgresQuery, PostgresQueryExecutorBuilder);
register_executor!(MysqlQuery, MySqlQueryExecutorBuilder);
Expand Down
132 changes: 132 additions & 0 deletions src/batch/executors/src/executor/azblob_file_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_connector::source::iceberg::{
extract_bucket_and_file_name, new_azblob_operator, read_parquet_file, FileScanBackend,
};
use risingwave_pb::batch_plan::file_scan_node;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::BatchError;
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};

#[derive(PartialEq, Debug)]
pub enum FileFormat {
Parquet,
}

/// Azblob file scan executor. Currently only support parquet file format.
pub struct AzblobFileScanExecutor {
file_format: FileFormat,
file_location: Vec<String>,
account_name: String,
account_key: String,
endpoint: String,
batch_size: usize,
schema: Schema,
identity: String,
}

impl Executor for AzblobFileScanExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
self.do_execute().boxed()
}
}

impl AzblobFileScanExecutor {
pub fn new(
file_format: FileFormat,
file_location: Vec<String>,
account_name: String,
account_key: String,
endpoint: String,
batch_size: usize,
schema: Schema,
identity: String,
) -> Self {
Self {
file_format,
file_location,
account_name,
account_key,
endpoint,
batch_size,
schema,
identity,
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
assert_eq!(self.file_format, FileFormat::Parquet);
for file in self.file_location {
let (bucket, file_name) =
extract_bucket_and_file_name(&file, &FileScanBackend::Azblob)?;
let op = new_azblob_operator(
self.endpoint.clone(),
self.account_name.clone(),
self.account_key.clone(),
bucket.clone(),
)?;
let chunk_stream =
read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?;
#[for_await]
for stream_chunk in chunk_stream {
let stream_chunk = stream_chunk?;
let (data_chunk, _) = stream_chunk.into_parts();
yield data_chunk;
}
}
}
}

pub struct AzblobFileScanExecutorBuilder {}

impl BoxedExecutorBuilder for AzblobFileScanExecutorBuilder {
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
_inputs: Vec<BoxedExecutor>,
) -> crate::error::Result<BoxedExecutor> {
let file_scan_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::AzblobFileScan
)?;

Ok(Box::new(AzblobFileScanExecutor::new(
match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
file_scan_node::FileFormat::Unspecified => unreachable!(),
},
file_scan_node.file_location.clone(),
file_scan_node.account_name.clone(),
file_scan_node.account_key.clone(),
file_scan_node.endpoint.clone(),
source.context().get_config().developer.chunk_size,
Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
source.plan_node().get_identity().clone(),
)))
}
}
26 changes: 25 additions & 1 deletion src/connector/src/source/iceberg/parquet_file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use iceberg::io::{
use iceberg::{Error, ErrorKind};
use itertools::Itertools;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::{Gcs, S3};
use opendal::services::{Azblob, Gcs, S3};
use opendal::Operator;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask};
Expand Down Expand Up @@ -138,10 +138,32 @@ pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<O
Ok(operator)
}

pub fn new_azblob_operator(
endpoint: String,
account_name: String,
account_key: String,
container_name: String,
) -> ConnectorResult<Operator> {
// Create azblob builder.
let mut builder = Azblob::default();
builder = builder
.container(&container_name)
.endpoint(&endpoint)
.account_name(&account_name)
.account_key(&account_key);

let operator: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
Ok(operator)
}

#[derive(Debug, Clone)]
pub enum FileScanBackend {
S3,
Gcs,
Azblob,
}

pub fn extract_bucket_and_file_name(
Expand All @@ -161,6 +183,7 @@ pub fn extract_bucket_and_file_name(
let prefix = match file_scan_backend {
FileScanBackend::S3 => format!("s3://{}/", bucket),
FileScanBackend::Gcs => format!("gcs://{}/", bucket),
FileScanBackend::Azblob => format!("azblob://{}/", bucket),
};
let file_name = location[prefix.len()..].to_string();
Ok((bucket, file_name))
Expand All @@ -175,6 +198,7 @@ pub async fn list_data_directory(
let prefix = match file_scan_backend {
FileScanBackend::S3 => format!("s3://{}/", bucket),
FileScanBackend::Gcs => format!("gcs://{}/", bucket),
FileScanBackend::Azblob => format!("azblob://{}/", bucket),
};
if dir.starts_with(&prefix) {
op.list(&file_name)
Expand Down
48 changes: 39 additions & 9 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use mysql_async::prelude::*;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::types::{DataType, ScalarImpl, StructType};
use risingwave_connector::source::iceberg::{
extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_gcs_operator,
new_s3_operator, FileScanBackend,
extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_azblob_operator,
new_gcs_operator, new_s3_operator, FileScanBackend,
};
pub use risingwave_pb::expr::table_function::PbType as TableFunctionType;
use risingwave_pb::expr::PbTableFunction;
Expand Down Expand Up @@ -80,12 +80,10 @@ impl TableFunction {
let return_type = {
// arguments:
// file format e.g. parquet
// storage type e.g. s3, gcs
// For s3: file_scan(file_format, s3, s3_region, s3_access_key, s3_secret_key, file_location_or_directory)
// For gcs: file_scan(file_format, gcs, credential, file_location_or_directory)
if args.len() != 6 && args.len() != 4 {
return Err(BindError("file_scan function only accepts: file_scan('parquet', 's3', s3 region, s3 access key, s3 secret key, file location) or file_scan('parquet', 'gcs', credential, service_account, file location)".to_owned()).into());
}
// storage type e.g. s3, gcs, azblob
// For s3: file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location_or_directory)
// For gcs: file_scan('parquet', 'gcs', credential, file_location_or_directory)
// For azblob: file_scan('parquet', 'azblob', endpoint, account_name, account_key, file_location)
let mut eval_args: Vec<String> = vec![];
for arg in &args {
if arg.return_type() != DataType::Varchar {
Expand Down Expand Up @@ -125,6 +123,22 @@ impl TableFunction {
}
}
}

if (eval_args.len() != 4 && eval_args.len() != 6)
|| (eval_args.len() == 4 && !"gcs".eq_ignore_ascii_case(&eval_args[1]))
|| (eval_args.len() == 6
&& !"s3".eq_ignore_ascii_case(&eval_args[1])
&& !"azblob".eq_ignore_ascii_case(&eval_args[1]))
{
return Err(BindError(
"file_scan function supports three backends: s3, gcs, and azblob. Their formats are as follows: \n
file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location) \n
file_scan('parquet', 'gcs', credential, service_account, file_location) \n
file_scan('parquet', 'azblob', endpoint, account_name, account_key, file_location)"
.to_owned(),
)
.into());
}
if !"parquet".eq_ignore_ascii_case(&eval_args[0]) {
return Err(BindError(
"file_scan function only accepts 'parquet' as file format".to_owned(),
Expand All @@ -134,9 +148,11 @@ impl TableFunction {

if !"s3".eq_ignore_ascii_case(&eval_args[1])
&& !"gcs".eq_ignore_ascii_case(&eval_args[1])
&& !"azblob".eq_ignore_ascii_case(&eval_args[1])
{
return Err(BindError(
"file_scan function only accepts 's3' or 'gcs' as storage type".to_owned(),
"file_scan function only accepts 's3', 'gcs' or 'azblob' as storage type"
.to_owned(),
)
.into());
}
Expand All @@ -154,6 +170,8 @@ impl TableFunction {
(FileScanBackend::S3, eval_args[5].clone())
} else if "gcs".eq_ignore_ascii_case(&eval_args[1]) {
(FileScanBackend::Gcs, eval_args[3].clone())
} else if "azblob".eq_ignore_ascii_case(&eval_args[1]) {
(FileScanBackend::Azblob, eval_args[5].clone())
} else {
unreachable!();
};
Expand Down Expand Up @@ -185,6 +203,17 @@ impl TableFunction {

new_gcs_operator(eval_args[2].clone(), bucket.clone())?
}
FileScanBackend::Azblob => {
let (bucket, _) =
extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?;

new_azblob_operator(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
bucket.clone(),
)?
}
};
let files = if input_file_location.ends_with('/') {
let files = tokio::task::block_in_place(|| {
Expand Down Expand Up @@ -240,6 +269,7 @@ impl TableFunction {
match file_scan_backend {
FileScanBackend::S3 => args.remove(5),
FileScanBackend::Gcs => args.remove(3),
FileScanBackend::Azblob => args.remove(5),
};
for file in files {
args.push(ExprImpl::Literal(Box::new(Literal::new(
Expand Down
19 changes: 18 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use pretty_xmlish::XmlNode;
use risingwave_pb::batch_plan::file_scan_node::{FileFormat, StorageType};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{FileScanNode, GcsFileScanNode};
use risingwave_pb::batch_plan::{AzblobFileScanNode, FileScanNode, GcsFileScanNode};

use super::batch::prelude::*;
use super::utils::{childless_record, column_names_pretty, Distill};
Expand Down Expand Up @@ -109,6 +109,23 @@ impl ToBatchPb for BatchFileScan {
file_location: gcs_file_scan.file_location.clone(),
})
}

generic::FileScanBackend::AzblobFileScan(azblob_file_scan) => {
NodeBody::AzblobFileScan(AzblobFileScanNode {
columns: azblob_file_scan
.columns()
.into_iter()
.map(|col| col.to_protobuf())
.collect(),
file_format: match azblob_file_scan.file_format {
generic::FileFormat::Parquet => FileFormat::Parquet as i32,
},
account_name: azblob_file_scan.account_name.clone(),
account_key: azblob_file_scan.account_key.clone(),
endpoint: azblob_file_scan.endpoint.clone(),
file_location: azblob_file_scan.file_location.clone(),
})
}
}
}
}
Expand Down
Loading

0 comments on commit 48a9e4b

Please sign in to comment.