diff --git a/Cargo.lock b/Cargo.lock index c9a3f1d..1ad038e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1963,6 +1963,7 @@ dependencies = [ "fastrand", "futures", "iceberg", + "itertools", "log", "native-tls", "object_store", diff --git a/Cargo.toml b/Cargo.toml index a6eea86..52a407d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ env_logger = "0.11.1" fastrand = "2.2.0" futures = "0.3" iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "e7008f39975ee2f09bc81a74d4ec5c9a3089580d" } +itertools = "0.13.0" log = "0.4" native-tls = "0.2.11" object_store = { version = "0.11", features = ["aws"] } diff --git a/src/iceberg_destination.rs b/src/iceberg_destination.rs index 84c0998..ca6233c 100644 --- a/src/iceberg_destination.rs +++ b/src/iceberg_destination.rs @@ -1,4 +1,5 @@ use core::str; +use itertools::izip; use std::collections::HashMap; use std::error::Error; use std::sync::Arc; @@ -63,7 +64,7 @@ fn create_empty_metadata( } // Clone an arrow schema, assigning sequential field IDs starting from 1 -fn assign_field_ids(arrow_schema: Arc) -> Schema { +fn assign_field_ids(arrow_schema: &Arc) -> Schema { let mut field_id_counter = 1; let new_fields: Vec = arrow_schema .fields @@ -83,6 +84,36 @@ fn assign_field_ids(arrow_schema: Arc) -> Schema { Schema::new_with_metadata(new_fields, arrow_schema.metadata.clone()) } +fn is_schema_aligned( + new_arrow_schema: &Arc, + existing_iceberg_schema: &Arc, +) -> Result<(), DataLoadingError> { + let old_iceberg_struct = existing_iceberg_schema.as_struct(); + let old_iceberg_fields = old_iceberg_struct.fields(); + + let new_arrow_schema_with_field_ids = assign_field_ids(new_arrow_schema); + let new_iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema( + &new_arrow_schema_with_field_ids, + )?); + let new_iceberg_struct = new_iceberg_schema.as_struct(); + let new_iceberg_fields = new_iceberg_struct.fields(); + + if old_iceberg_fields.len() != new_iceberg_fields.len() { + return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Old schema has {} fields but new schema has {} fields", old_iceberg_fields.len(), new_iceberg_fields.len()))); + } + for (i, old_iceberg_field, new_iceberg_field) in + izip!(0.., old_iceberg_fields.iter(), new_iceberg_fields.iter()) + { + if old_iceberg_field.required && !new_iceberg_field.required { + return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) is required in old schema but not required in new schema", i, old_iceberg_field.name))); + } + if old_iceberg_field.field_type != new_iceberg_field.field_type { + return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) has data type {:?} in old schema but {:?} in new schema", i, old_iceberg_field.name, old_iceberg_field.field_type, new_iceberg_field.field_type))); + } + } + Ok(()) +} + // Create a new TableMetadata object by updating the current snapshot of an existing TableMetadata fn update_metadata_snapshot( previous_metadata: &TableMetadata, @@ -139,10 +170,6 @@ pub async fn record_batches_to_iceberg( pin_mut!(record_batch_stream); let file_io = create_file_io(target_url.to_string())?; - let arrow_schema_with_ids = assign_field_ids(arrow_schema.clone()); - let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema( - &arrow_schema_with_ids, - )?); let version_hint_location = format!("{}/metadata/version-hint.text", target_url); let version_hint_input = file_io.new_input(&version_hint_location)?; @@ -170,7 +197,7 @@ pub async fn record_batches_to_iceberg( } else { None }; - let (previous_metadata, previous_metadata_location) = match old_version_hint { + let (previous_metadata, previous_metadata_location, iceberg_schema) = match old_version_hint { Some(version_hint) => { let old_metadata_location = format!("{}/metadata/v{}.metadata.json", target_url, version_hint); @@ -188,17 +215,21 @@ pub async fn record_batches_to_iceberg( "Could not parse old metadata file", )) })?; - if old_metadata.current_schema() != &iceberg_schema { - return Err(DataLoadingError::IcebergError(iceberg::Error::new( - iceberg::ErrorKind::FeatureUnsupported, - "Schema changes not supported", - ))); - } - (old_metadata, Some(old_metadata_location)) + let old_iceberg_schema = old_metadata.current_schema(); + is_schema_aligned(&arrow_schema, old_iceberg_schema)?; + ( + old_metadata.clone(), + Some(old_metadata_location), + old_iceberg_schema.clone(), + ) } None => { + let arrow_schema_with_ids = assign_field_ids(&arrow_schema); + let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema( + &arrow_schema_with_ids, + )?); let empty_metadata = create_empty_metadata(&iceberg_schema, target_url.to_string())?; - (empty_metadata, None) + (empty_metadata, None, iceberg_schema) } }; @@ -344,3 +375,179 @@ pub async fn record_batches_to_iceberg( Ok(()) } + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc}; + + use arrow_schema::{DataType, Field, Schema}; + use iceberg::spec::{NestedField, PrimitiveType, Type}; + + use crate::iceberg_destination::is_schema_aligned; + + #[test] + fn test_is_schema_aligned_positive() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok()); + } + + #[test] + fn test_is_schema_aligned_positive_renamed() { + let arrow_schema = Schema::new_with_metadata( + vec![ + // Fields renamed + Field::new("x", DataType::Utf8, false), + Field::new("y", DataType::Int32, false), + Field::new("z", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok()); + } + + // OK to insert a non-nullable value into a nullable field + #[test] + fn test_is_schema_aligned_positive_nonnullable() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok()); + } + + #[test] + fn test_is_schema_aligned_negative_added_field() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Boolean, false), + Field::new("d", DataType::Boolean, false), // Added field + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err()); + } + + #[test] + fn test_is_schema_aligned_negative_different_type() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), // Mismatched type + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err()); + } + + #[test] + fn test_is_schema_aligned_negative_reordered() { + let arrow_schema = Schema::new_with_metadata( + vec![ + // Same fields but in wrong order + Field::new("b", DataType::Int32, false), + Field::new("a", DataType::Utf8, false), + Field::new("c", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err()); + } + + // Not allowed to insert a nullable value into a non-nullable field + #[test] + fn test_is_schema_aligned_negative_nullable() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, true), // Nullable + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + assert!(is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_err()); + } +} diff --git a/tests/basic_integration.rs b/tests/basic_integration.rs index 8f8ac34..50e13af 100644 --- a/tests/basic_integration.rs +++ b/tests/basic_integration.rs @@ -140,35 +140,33 @@ async fn test_pg_to_iceberg() { Ok(_) => panic!("Expected command to fail but it succeeded"), }; - // WHEN we try to write to an existing table with a different schema + // WHEN we try to write to an existing table with an incompatible schema // THEN the command errors out let args = vec![ "lakehouse-loader", "pg-to-iceberg", "postgres://test-user:test-password@localhost:5432/test-db", "-q", - "select cint4, cint8 cint8_newname, ctext, cbool from t1 order by id", + "select cint4, cint8::text cint8_casted, ctext, cbool from t1 order by id", target_url, "--overwrite", ]; match do_main(Cli::parse_from(args.clone())).await { - Err(DataLoadingError::IcebergError(e)) => { - assert!(e.kind() == iceberg::ErrorKind::FeatureUnsupported); - } + Err(DataLoadingError::BadInputError(_)) => {} Err(e) => { panic!("Unexpected error type: {:?}", e); } Ok(_) => panic!("Expected command to fail but it succeeded"), }; - // WHEN we try to write to an existing table with the same schema + // WHEN we try to write to an existing table with a compatible schema // THEN the command succeeds let args = vec![ "lakehouse-loader", "pg-to-iceberg", "postgres://test-user:test-password@localhost:5432/test-db", "-q", - "select cint4, cint8 + 1 cint8, ctext, cbool from t1 order by id", + "select cint4, cint8 + 1 cint8_renamed, ctext, cbool from t1 order by id", target_url, "--overwrite", ];