Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 276 additions & 34 deletions src/context/iceberg.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::str;
use itertools::izip;
use std::collections::HashMap;
use std::error::Error;
use std::pin::Pin;
Expand Down Expand Up @@ -68,7 +69,7 @@ fn create_empty_metadata(
}

// Clone an arrow schema, assigning sequential field IDs starting from 1
fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
fn assign_field_ids(arrow_schema: &Arc<Schema>) -> Schema {
let mut field_id_counter = 1;
let new_fields: Vec<Field> = arrow_schema
.fields
Expand All @@ -88,6 +89,36 @@ fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
Schema::new_with_metadata(new_fields, arrow_schema.metadata.clone())
}

fn is_schema_aligned(
new_arrow_schema: &Arc<Schema>,
existing_iceberg_schema: &Arc<iceberg::spec::Schema>,
) -> Result<(), DataLoadingError> {
let old_iceberg_struct = existing_iceberg_schema.as_struct();
let old_iceberg_fields = old_iceberg_struct.fields();

let new_arrow_schema_with_field_ids = assign_field_ids(new_arrow_schema);
let new_iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&new_arrow_schema_with_field_ids,
)?);
let new_iceberg_struct = new_iceberg_schema.as_struct();
let new_iceberg_fields = new_iceberg_struct.fields();

if old_iceberg_fields.len() != new_iceberg_fields.len() {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Old schema has {} fields but new schema has {} fields", old_iceberg_fields.len(), new_iceberg_fields.len())));
}
for (i, old_iceberg_field, new_iceberg_field) in
izip!(0.., old_iceberg_fields.iter(), new_iceberg_fields.iter())
{
if old_iceberg_field.required && !new_iceberg_field.required {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) is required in old schema but not required in new schema", i, old_iceberg_field.name)));
}
if old_iceberg_field.field_type != new_iceberg_field.field_type {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) has data type {:?} in old schema but {:?} in new schema", i, old_iceberg_field.name, old_iceberg_field.field_type, new_iceberg_field.field_type)));
}
}
Ok(())
}

// Create a new TableMetadata object by updating the current snapshot of an existing TableMetadata
fn update_metadata_snapshot(
previous_metadata: &TableMetadata,
Expand Down Expand Up @@ -134,10 +165,6 @@ pub async fn record_batches_to_iceberg(
let table_base_url = Url::parse(table_location).unwrap();

let file_io = table.file_io();
let arrow_schema_with_ids = assign_field_ids(arrow_schema.clone());
let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&arrow_schema_with_ids,
)?);

let version_hint_location = format!("{}/metadata/version-hint.text", table_base_url);
let version_hint_input = file_io.new_input(&version_hint_location)?;
Expand All @@ -161,42 +188,49 @@ pub async fn record_batches_to_iceberg(
} else {
None
};
let (previous_metadata, previous_metadata_location) = match old_version_hint {
Some(version_hint) => {
let old_metadata_location = format!(
"{}/metadata/v{}.metadata.json",
table_base_url, version_hint
);
let old_metadata_bytes =
file_io.new_input(&old_metadata_location)?.read().await?;
let old_metadata_string =
str::from_utf8(&old_metadata_bytes).map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse UTF-8 in old metadata file",
))
})?;
let old_metadata = serde_json::from_str::<TableMetadata>(old_metadata_string)
let (previous_metadata, previous_metadata_location, iceberg_schema) =
match old_version_hint {
Some(version_hint) => {
let old_metadata_location = format!(
"{}/metadata/v{}.metadata.json",
table_base_url, version_hint
);
let old_metadata_bytes =
file_io.new_input(&old_metadata_location)?.read().await?;
let old_metadata_string =
str::from_utf8(&old_metadata_bytes).map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse UTF-8 in old metadata file",
))
})?;
let old_metadata = serde_json::from_str::<TableMetadata>(
old_metadata_string,
)
.map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse old metadata file",
))
})?;
if old_metadata.current_schema() != &iceberg_schema {
return Err(DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::FeatureUnsupported,
"Schema changes not supported",
)));
let old_iceberg_schema = old_metadata.current_schema();
is_schema_aligned(&arrow_schema, old_iceberg_schema)?;
(
old_metadata.clone(),
Some(old_metadata_location),
old_iceberg_schema.clone(),
)
}
(old_metadata, Some(old_metadata_location))
}
None => {
let empty_metadata =
create_empty_metadata(&iceberg_schema, table_base_url.to_string())?;
(empty_metadata, None)
}
};
None => {
let arrow_schema_with_ids = assign_field_ids(&arrow_schema);
let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&arrow_schema_with_ids,
)?);
let empty_metadata =
create_empty_metadata(&iceberg_schema, table_base_url.to_string())?;
(empty_metadata, None, iceberg_schema)
}
};

let file_writer_builder = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Expand Down Expand Up @@ -383,3 +417,211 @@ impl SeafowlContext {
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};

use arrow_schema::{DataType, Field, Schema};
use iceberg::spec::{NestedField, PrimitiveType, Type};

use super::is_schema_aligned;

#[test]
fn test_is_schema_aligned_positive() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok()
);
}

#[test]
fn test_is_schema_aligned_positive_renamed() {
let arrow_schema = Schema::new_with_metadata(
vec![
// Fields renamed
Field::new("x", DataType::Utf8, false),
Field::new("y", DataType::Int32, false),
Field::new("z", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok()
);
}

// OK to insert a non-nullable value into a nullable field
#[test]
fn test_is_schema_aligned_positive_nonnullable() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok()
);
}

#[test]
fn test_is_schema_aligned_negative_added_field() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::Boolean, false), // Added field
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema))
.is_err()
);
}

#[test]
fn test_is_schema_aligned_negative_different_type() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false), // Mismatched type
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema))
.is_err()
);
}

#[test]
fn test_is_schema_aligned_negative_reordered() {
let arrow_schema = Schema::new_with_metadata(
vec![
// Same fields but in wrong order
Field::new("b", DataType::Int32, false),
Field::new("a", DataType::Utf8, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema))
.is_err()
);
}

// Not allowed to insert a nullable value into a non-nullable field
#[test]
fn test_is_schema_aligned_negative_nullable() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, true), // Nullable
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema))
.is_err()
);
}
}
Loading