Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ env_logger = "0.11.1"
fastrand = "2.2.0"
futures = "0.3"
iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "e7008f39975ee2f09bc81a74d4ec5c9a3089580d" }
itertools = "0.13.0"
log = "0.4"
native-tls = "0.2.11"
object_store = { version = "0.11", features = ["aws"] }
Expand Down
235 changes: 221 additions & 14 deletions src/iceberg_destination.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::str;
use itertools::izip;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
Expand Down Expand Up @@ -63,7 +64,7 @@ fn create_empty_metadata(
}

// Clone an arrow schema, assigning sequential field IDs starting from 1
fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
fn assign_field_ids(arrow_schema: &Arc<Schema>) -> Schema {
let mut field_id_counter = 1;
let new_fields: Vec<Field> = arrow_schema
.fields
Expand All @@ -83,6 +84,36 @@ fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
Schema::new_with_metadata(new_fields, arrow_schema.metadata.clone())
}

fn is_schema_aligned(
new_arrow_schema: &Arc<Schema>,
existing_iceberg_schema: &Arc<iceberg::spec::Schema>,
) -> Result<(), DataLoadingError> {
let old_iceberg_struct = existing_iceberg_schema.as_struct();
let old_iceberg_fields = old_iceberg_struct.fields();

let new_arrow_schema_with_field_ids = assign_field_ids(new_arrow_schema);
let new_iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&new_arrow_schema_with_field_ids,
)?);
let new_iceberg_struct = new_iceberg_schema.as_struct();
let new_iceberg_fields = new_iceberg_struct.fields();

if old_iceberg_fields.len() != new_iceberg_fields.len() {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Old schema has {} fields but new schema has {} fields", old_iceberg_fields.len(), new_iceberg_fields.len())));
}
for (i, old_iceberg_field, new_iceberg_field) in
izip!(0.., old_iceberg_fields.iter(), new_iceberg_fields.iter())
{
if old_iceberg_field.required && !new_iceberg_field.required {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) is required in old schema but not required in new schema", i, old_iceberg_field.name)));
}
if old_iceberg_field.field_type != new_iceberg_field.field_type {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) has data type {:?} in old schema but {:?} in new schema", i, old_iceberg_field.name, old_iceberg_field.field_type, new_iceberg_field.field_type)));
}
}
Ok(())
}

// Create a new TableMetadata object by updating the current snapshot of an existing TableMetadata
fn update_metadata_snapshot(
previous_metadata: &TableMetadata,
Expand Down Expand Up @@ -139,10 +170,6 @@ pub async fn record_batches_to_iceberg(
pin_mut!(record_batch_stream);

let file_io = create_file_io(target_url.to_string())?;
let arrow_schema_with_ids = assign_field_ids(arrow_schema.clone());
let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&arrow_schema_with_ids,
)?);

let version_hint_location = format!("{}/metadata/version-hint.text", target_url);
let version_hint_input = file_io.new_input(&version_hint_location)?;
Expand Down Expand Up @@ -170,7 +197,7 @@ pub async fn record_batches_to_iceberg(
} else {
None
};
let (previous_metadata, previous_metadata_location) = match old_version_hint {
let (previous_metadata, previous_metadata_location, iceberg_schema) = match old_version_hint {
Some(version_hint) => {
let old_metadata_location =
format!("{}/metadata/v{}.metadata.json", target_url, version_hint);
Expand All @@ -188,17 +215,21 @@ pub async fn record_batches_to_iceberg(
"Could not parse old metadata file",
))
})?;
if old_metadata.current_schema() != &iceberg_schema {
return Err(DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::FeatureUnsupported,
"Schema changes not supported",
)));
}
(old_metadata, Some(old_metadata_location))
let old_iceberg_schema = old_metadata.current_schema();
is_schema_aligned(&arrow_schema, old_iceberg_schema)?;
(
old_metadata.clone(),
Some(old_metadata_location),
old_iceberg_schema.clone(),
)
}
None => {
let arrow_schema_with_ids = assign_field_ids(&arrow_schema);
let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&arrow_schema_with_ids,
)?);
let empty_metadata = create_empty_metadata(&iceberg_schema, target_url.to_string())?;
(empty_metadata, None)
(empty_metadata, None, iceberg_schema)
}
};

Expand Down Expand Up @@ -344,3 +375,179 @@ pub async fn record_batches_to_iceberg(

Ok(())
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};

use arrow_schema::{DataType, Field, Schema};
use iceberg::spec::{NestedField, PrimitiveType, Type};

use crate::iceberg_destination::is_schema_aligned;

#[test]
fn test_is_schema_aligned_positive() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok());
}

#[test]
fn test_is_schema_aligned_positive_renamed() {
let arrow_schema = Schema::new_with_metadata(
vec![
// Fields renamed
Field::new("x", DataType::Utf8, false),
Field::new("y", DataType::Int32, false),
Field::new("z", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok());
}

// OK to insert a non-nullable value into a nullable field
#[test]
fn test_is_schema_aligned_positive_nonnullable() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok());
}

#[test]
fn test_is_schema_aligned_negative_added_field() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::Boolean, false), // Added field
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err());
}

#[test]
fn test_is_schema_aligned_negative_different_type() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false), // Mismatched type
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err());
}

#[test]
fn test_is_schema_aligned_negative_reordered() {
let arrow_schema = Schema::new_with_metadata(
vec![
// Same fields but in wrong order
Field::new("b", DataType::Int32, false),
Field::new("a", DataType::Utf8, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err());
}

// Not allowed to insert a nullable value into a non-nullable field
#[test]
fn test_is_schema_aligned_negative_nullable() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, true), // Nullable
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err());
}
}
12 changes: 5 additions & 7 deletions tests/basic_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,35 +140,33 @@ async fn test_pg_to_iceberg() {
Ok(_) => panic!("Expected command to fail but it succeeded"),
};

// WHEN we try to write to an existing table with a different schema
// WHEN we try to write to an existing table with an incompatible schema
// THEN the command errors out
let args = vec![
"lakehouse-loader",
"pg-to-iceberg",
"postgres://test-user:test-password@localhost:5432/test-db",
"-q",
"select cint4, cint8 cint8_newname, ctext, cbool from t1 order by id",
"select cint4, cint8::text cint8_casted, ctext, cbool from t1 order by id",
target_url,
"--overwrite",
];
match do_main(Cli::parse_from(args.clone())).await {
Err(DataLoadingError::IcebergError(e)) => {
assert!(e.kind() == iceberg::ErrorKind::FeatureUnsupported);
}
Err(DataLoadingError::BadInputError(_)) => {}
Err(e) => {
panic!("Unexpected error type: {:?}", e);
}
Ok(_) => panic!("Expected command to fail but it succeeded"),
};

// WHEN we try to write to an existing table with the same schema
// WHEN we try to write to an existing table with a compatible schema
// THEN the command succeeds
let args = vec![
"lakehouse-loader",
"pg-to-iceberg",
"postgres://test-user:test-password@localhost:5432/test-db",
"-q",
"select cint4, cint8 + 1 cint8, ctext, cbool from t1 order by id",
"select cint4, cint8 + 1 cint8_renamed, ctext, cbool from t1 order by id",
target_url,
"--overwrite",
];
Expand Down
Loading