Skip to content

Commit

Permalink
refactor(iceberg): remove unnecessary JSON serde (#20212)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Jan 20, 2025
1 parent d00c6e3 commit a19acf5
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 114 deletions.
69 changes: 12 additions & 57 deletions src/batch/executors/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use std::sync::Arc;

use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use iceberg::spec::TableMetadataRef;
use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
Expand All @@ -26,9 +25,7 @@ use risingwave_common::catalog::{
};
use risingwave_common::types::DataType;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_connector::source::iceberg::{
IcebergFileScanTaskJsonStrEnum, IcebergProperties, IcebergSplit,
};
use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergProperties, IcebergSplit};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand All @@ -38,54 +35,12 @@ use crate::error::BatchError;
use crate::executor::Executor;
use crate::monitor::BatchMetrics;

pub enum IcebergFileScanTaskEnum {
// The scan task of the data file and the position delete file
Data(Vec<FileScanTask>),
// The scan task of the equality delete file
EqualityDelete(Vec<FileScanTask>),
// The scan task of the position delete file
PositionDelete(Vec<FileScanTask>),
}

impl IcebergFileScanTaskEnum {
fn from_iceberg_file_scan_task_json_str_enum(
iceberg_file_scan_task_json_str_enum: IcebergFileScanTaskJsonStrEnum,
) -> Self {
match iceberg_file_scan_task_json_str_enum {
IcebergFileScanTaskJsonStrEnum::Data(data_file_scan_tasks) => {
IcebergFileScanTaskEnum::Data(
data_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
)
}
IcebergFileScanTaskJsonStrEnum::EqualityDelete(equality_delete_file_scan_tasks) => {
IcebergFileScanTaskEnum::EqualityDelete(
equality_delete_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
)
}
IcebergFileScanTaskJsonStrEnum::PositionDelete(position_delete_file_scan_tasks) => {
IcebergFileScanTaskEnum::PositionDelete(
position_delete_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
)
}
}
}
}

pub struct IcebergScanExecutor {
iceberg_config: IcebergProperties,
#[allow(dead_code)]
snapshot_id: Option<i64>,
table_meta: TableMetadata,
file_scan_tasks: Option<IcebergFileScanTaskEnum>,
table_meta: TableMetadataRef,
file_scan_tasks: Option<IcebergFileScanTask>,
batch_size: usize,
schema: Schema,
identity: String,
Expand All @@ -112,8 +67,8 @@ impl IcebergScanExecutor {
pub fn new(
iceberg_config: IcebergProperties,
snapshot_id: Option<i64>,
table_meta: TableMetadata,
file_scan_tasks: IcebergFileScanTaskEnum,
table_meta: TableMetadataRef,
file_scan_tasks: IcebergFileScanTask,
batch_size: usize,
schema: Schema,
identity: String,
Expand Down Expand Up @@ -145,11 +100,11 @@ impl IcebergScanExecutor {
let table_name = table.identifier().name().to_owned();

let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
Some(IcebergFileScanTaskEnum::Data(data_file_scan_tasks)) => data_file_scan_tasks,
Some(IcebergFileScanTaskEnum::EqualityDelete(equality_delete_file_scan_tasks)) => {
Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
equality_delete_file_scan_tasks
}
Some(IcebergFileScanTaskEnum::PositionDelete(position_delete_file_scan_tasks)) => {
Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
position_delete_file_scan_tasks
}
None => {
Expand Down Expand Up @@ -271,13 +226,13 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
.fields()
.iter()
.any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
&& matches!(split.files, IcebergFileScanTaskJsonStrEnum::Data(_));
&& matches!(split.task, IcebergFileScanTask::Data(_));

Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties,
Some(split.snapshot_id),
split.table_meta.deserialize(),
IcebergFileScanTaskEnum::from_iceberg_file_scan_task_json_str_enum(split.files),
split.table_meta.clone(),
split.task,
source.context().get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use std::collections::HashMap;
use std::sync::Arc;

use ::iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use ::iceberg::spec::TableMetadata;
use ::iceberg::table::Table;
use ::iceberg::{Catalog, TableIdent};
use anyhow::{anyhow, Context};
use iceberg::io::{GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD};
use iceberg::spec::TableMetadataRef;
use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
use risingwave_common::bail;
use serde_derive::Deserialize;
Expand Down Expand Up @@ -479,7 +479,7 @@ impl IcebergCommon {

pub async fn load_table_with_metadata(
&self,
metadata: TableMetadata,
metadata: TableMetadataRef,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<Table> {
match self.catalog_type() {
Expand Down
91 changes: 36 additions & 55 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use async_trait::async_trait;
use futures_async_stream::for_await;
use iceberg::expr::Predicate as IcebergPredicate;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use iceberg::spec::TableMetadataRef;
use iceberg::table::Table;
use iceberg::Catalog;
use itertools::Itertools;
Expand Down Expand Up @@ -88,7 +88,7 @@ impl IcebergProperties {

pub async fn load_table_with_metadata(
&self,
table_meta: TableMetadata,
table_meta: TableMetadataRef,
) -> ConnectorResult<Table> {
let mut java_catalog_props = HashMap::new();
if let Some(jdbc_user) = self.jdbc_user.clone() {
Expand Down Expand Up @@ -131,70 +131,57 @@ impl IcebergFileScanTaskJsonStr {
}
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct TableMetadataJsonStr(String);

impl TableMetadataJsonStr {
pub fn deserialize(&self) -> TableMetadata {
serde_json::from_str(&self.0).unwrap()
}

pub fn serialize(metadata: &TableMetadata) -> Self {
Self(serde_json::to_string(metadata).unwrap())
}
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub enum IcebergFileScanTaskJsonStrEnum {
Data(Vec<IcebergFileScanTaskJsonStr>),
EqualityDelete(Vec<IcebergFileScanTaskJsonStr>),
PositionDelete(Vec<IcebergFileScanTaskJsonStr>),
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum IcebergFileScanTask {
Data(Vec<FileScanTask>),
EqualityDelete(Vec<FileScanTask>),
PositionDelete(Vec<FileScanTask>),
}
impl IcebergFileScanTaskJsonStrEnum {
impl IcebergFileScanTask {
pub fn new_with_scan_type(
iceberg_scan_type: IcebergScanType,
data_files: Vec<IcebergFileScanTaskJsonStr>,
equality_delete_files: Vec<IcebergFileScanTaskJsonStr>,
position_delete_files: Vec<IcebergFileScanTaskJsonStr>,
data_files: Vec<FileScanTask>,
equality_delete_files: Vec<FileScanTask>,
position_delete_files: Vec<FileScanTask>,
) -> Self {
match iceberg_scan_type {
IcebergScanType::EqualityDeleteScan => {
IcebergFileScanTaskJsonStrEnum::EqualityDelete(equality_delete_files)
IcebergFileScanTask::EqualityDelete(equality_delete_files)
}
IcebergScanType::DataScan => IcebergFileScanTaskJsonStrEnum::Data(data_files),
IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
IcebergScanType::PositionDeleteScan => {
IcebergFileScanTaskJsonStrEnum::PositionDelete(position_delete_files)
IcebergFileScanTask::PositionDelete(position_delete_files)
}
IcebergScanType::Unspecified => unreachable!("Unspecified iceberg scan type"),
}
}

pub fn add_files(
&mut self,
data_file: IcebergFileScanTaskJsonStr,
equality_delete_file: IcebergFileScanTaskJsonStr,
position_delete_file: IcebergFileScanTaskJsonStr,
data_file: FileScanTask,
equality_delete_file: FileScanTask,
position_delete_file: FileScanTask,
) {
match self {
IcebergFileScanTaskJsonStrEnum::Data(data_files) => {
IcebergFileScanTask::Data(data_files) => {
data_files.push(data_file);
}
IcebergFileScanTaskJsonStrEnum::EqualityDelete(equality_delete_files) => {
IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
equality_delete_files.push(equality_delete_file);
}
IcebergFileScanTaskJsonStrEnum::PositionDelete(position_delete_files) => {
IcebergFileScanTask::PositionDelete(position_delete_files) => {
position_delete_files.push(position_delete_file);
}
}
}

pub fn is_empty(&self) -> bool {
match self {
IcebergFileScanTaskJsonStrEnum::Data(data_files) => data_files.is_empty(),
IcebergFileScanTaskJsonStrEnum::EqualityDelete(equality_delete_files) => {
IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
equality_delete_files.is_empty()
}
IcebergFileScanTaskJsonStrEnum::PositionDelete(position_delete_files) => {
IcebergFileScanTask::PositionDelete(position_delete_files) => {
position_delete_files.is_empty()
}
}
Expand All @@ -205,17 +192,17 @@ impl IcebergFileScanTaskJsonStrEnum {
pub struct IcebergSplit {
pub split_id: i64,
pub snapshot_id: i64,
pub table_meta: TableMetadataJsonStr,
pub files: IcebergFileScanTaskJsonStrEnum,
pub table_meta: TableMetadataRef,
pub task: IcebergFileScanTask,
}

impl IcebergSplit {
pub fn empty(table_meta: TableMetadataJsonStr, iceberg_scan_type: IcebergScanType) -> Self {
pub fn empty(table_meta: TableMetadataRef, iceberg_scan_type: IcebergScanType) -> Self {
Self {
split_id: 0,
snapshot_id: 0,
table_meta,
files: IcebergFileScanTaskJsonStrEnum::new_with_scan_type(
task: IcebergFileScanTask::new_with_scan_type(
iceberg_scan_type,
vec![],
vec![],
Expand Down Expand Up @@ -333,13 +320,10 @@ impl IcebergSplitEnumerator {
}
let table = self.config.load_table().await?;
let snapshot_id = Self::get_snapshot_id(&table, time_traval_info)?;
let table_meta = TableMetadataJsonStr::serialize(table.metadata());
let table_meta = table.metadata_ref();
if snapshot_id.is_none() {
// If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
return Ok(vec![IcebergSplit::empty(
TableMetadataJsonStr::serialize(table.metadata()),
iceberg_scan_type,
)]);
return Ok(vec![IcebergSplit::empty(table_meta, iceberg_scan_type)]);
}
let snapshot_id = snapshot_id.unwrap();

Expand Down Expand Up @@ -375,14 +359,14 @@ impl IcebergSplitEnumerator {
let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?;
match task.data_file_content {
iceberg::spec::DataContentType::Data => {
data_files.push(IcebergFileScanTaskJsonStr::serialize(&task));
data_files.push(task);
}
iceberg::spec::DataContentType::EqualityDeletes => {
equality_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task));
equality_delete_files.push(task);
}
iceberg::spec::DataContentType::PositionDeletes => {
task.project_field_ids = Vec::default();
position_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task));
position_delete_files.push(task);
}
}
}
Expand All @@ -401,20 +385,20 @@ impl IcebergSplitEnumerator {
split_id: index as i64,
snapshot_id,
table_meta: table_meta.clone(),
files: IcebergFileScanTaskJsonStrEnum::new_with_scan_type(
task: IcebergFileScanTask::new_with_scan_type(
iceberg_scan_type,
data_file,
equality_delete_file,
position_delete_file,
),
},
)
.filter(|split| !split.files.is_empty())
.filter(|split| !split.task.is_empty())
.collect_vec();

if splits.is_empty() {
return Ok(vec![IcebergSplit::empty(
TableMetadataJsonStr::serialize(table.metadata()),
table.metadata_ref(),
iceberg_scan_type,
)]);
}
Expand Down Expand Up @@ -472,10 +456,7 @@ impl IcebergSplitEnumerator {
Self::all_delete_parameters(&table, snapshot_id).await
}

fn split_n_vecs(
vecs: Vec<IcebergFileScanTaskJsonStr>,
split_num: usize,
) -> Vec<Vec<IcebergFileScanTaskJsonStr>> {
fn split_n_vecs(vecs: Vec<FileScanTask>, split_num: usize) -> Vec<Vec<FileScanTask>> {
let split_size = vecs.len() / split_num;
let remaining = vecs.len() % split_num;
let mut result_vecs = (0..split_num)
Expand Down

0 comments on commit a19acf5

Please sign in to comment.