diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 2a290ea77c..125323bcd9 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -144,6 +144,12 @@ impl Transform { match self { Self::Identity => datum.to_string(), Self::Void => "null".to_string(), + // TODO: does this appear anywhere? + // + // It looks like this is used for metrics/statistics, but we hit the todo + // panic otherwise without filling something here. + Self::Day | Self::Month | Self::Year | Self::Hour => datum.to_string(), + Self::Bucket(_) => datum.to_string(), _ => { todo!() } diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 9a3c970aa4..4ce1a72559 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -67,8 +67,10 @@ impl<'a> FastAppendAction<'a> { pub fn add_data_files( &mut self, data_files: impl IntoIterator, + deleted_data_files: impl IntoIterator, ) -> Result<&mut Self> { - self.snapshot_produce_action.add_data_files(data_files)?; + self.snapshot_produce_action + .add_data_files(data_files, deleted_data_files)?; Ok(self) } @@ -103,13 +105,20 @@ impl<'a> FastAppendAction<'a> { ) .await?; - self.add_data_files(data_files)?; + self.add_data_files(data_files, Vec::new())?; self.apply().await } /// Finished building the action and apply it to the transaction. pub async fn apply(self) -> Result> { + if self.snapshot_produce_action.added_data_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "cannot apply no data files to transaction", + )); + } + // Checks duplicate files if self.check_duplicate { let new_files: HashSet<&str> = self @@ -224,7 +233,7 @@ mod tests { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); let mut action = tx.fast_append(None, vec![]).unwrap(); - action.add_data_files(vec![]).unwrap(); + action.add_data_files(vec![], vec![]).unwrap(); assert!(action.apply().await.is_err()); } @@ -245,8 +254,14 @@ mod tests { .partition(Struct::from_iter([Some(Literal::string("test"))])) .build() .unwrap(); - assert!(action.add_data_files(vec![data_file.clone()]).is_err()); + assert!( + action + .add_data_files(vec![data_file.clone()], vec![]) + .is_err() + ); + let tx = Transaction::new(&table); + let mut action = tx.fast_append(None, vec![]).unwrap(); let data_file = DataFileBuilder::default() .content(DataContentType::Data) .file_path("test/3.parquet".to_string()) @@ -257,7 +272,9 @@ mod tests { .partition(Struct::from_iter([Some(Literal::long(300))])) .build() .unwrap(); - action.add_data_files(vec![data_file.clone()]).unwrap(); + action + .add_data_files(vec![data_file.clone()], vec![]) + .unwrap(); let tx = action.apply().await.unwrap(); // check updates and requirements diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 4211b4f138..2db5260047 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; @@ -66,6 +66,7 @@ pub(crate) struct SnapshotProduceAction<'a> { commit_uuid: Uuid, snapshot_properties: HashMap, pub added_data_files: Vec, + pub added_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -86,6 +87,7 @@ impl<'a> SnapshotProduceAction<'a> { commit_uuid, snapshot_properties, added_data_files: vec![], + added_delete_files: vec![], manifest_counter: (0..), key_metadata, }) @@ -126,6 +128,7 @@ impl<'a> SnapshotProduceAction<'a> { pub fn add_data_files( &mut self, data_files: impl IntoIterator, + deleted_data_files: impl IntoIterator, ) -> Result<&mut Self> { let data_files: Vec = data_files.into_iter().collect(); for data_file in &data_files { @@ -135,6 +138,24 @@ impl<'a> SnapshotProduceAction<'a> { "Only data content type is allowed for fast append", )); } + + // Check if the data file partition spec id matches the table default partition spec id. + if self.tx.current_table.metadata().default_partition_spec_id() + != data_file.partition_spec_id + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Data file partition spec id does not match table default partition spec id", + )); + } + Self::validate_partition_value( + data_file.partition(), + self.tx.current_table.metadata().default_partition_type(), + )?; + } + + let deleted_data_files: Vec = deleted_data_files.into_iter().collect(); + for data_file in &deleted_data_files { // Check if the data file partition spec id matches the table default partition spec id. if self.tx.current_table.metadata().default_partition_spec_id() != data_file.partition_spec_id @@ -150,6 +171,7 @@ impl<'a> SnapshotProduceAction<'a> { )?; } self.added_data_files.extend(data_files); + self.added_delete_files.extend(deleted_data_files); Ok(self) } @@ -211,6 +233,7 @@ impl<'a> SnapshotProduceAction<'a> { builder.build_v2_data() } }; + for entry in manifest_entries { writer.add_entry(entry)?; } @@ -222,14 +245,79 @@ impl<'a> SnapshotProduceAction<'a> { snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - let added_manifest = self.write_added_manifest().await?; - let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; - // # TODO - // Support process delete entries. - - let mut manifest_files = vec![added_manifest]; - manifest_files.extend(existing_manifests); - let manifest_files = manifest_process.process_manifests(manifest_files); + let mut existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; + + if !self.added_data_files.is_empty() { + let added_manifest = self.write_added_manifest().await?; + existing_manifests.push(added_manifest); + } + + if !self.added_delete_files.is_empty() { + let deleted_file_paths: HashSet = self + .added_delete_files + .iter() + .map(|df| df.file_path.clone()) + .collect(); + + // Filter existing manifests to remove entries for deleted files + let mut filtered_manifests = Vec::new(); + + for manifest in existing_manifests { + let manifest_entries = manifest + .load_manifest(self.tx.current_table.file_io()) + .await?; + + // Check if any entries in this manifest are for deleted files + let has_deleted_entries = manifest_entries + .entries() + .iter() + .any(|entry| deleted_file_paths.contains(entry.file_path())); + + if has_deleted_entries { + // When there are deleted files, we should write the new manifests + // of this snapshot without including those entries. + let mut writer = { + let builder = ManifestWriterBuilder::new( + self.new_manifest_output()?, + Some(self.snapshot_id), + self.key_metadata.clone(), + self.tx.current_table.metadata().current_schema().clone(), + self.tx + .current_table + .metadata() + .default_partition_spec() + .as_ref() + .clone(), + ); + if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { + builder.build_v1() + } else { + builder.build_v2_data() + } + }; + + for entry in manifest_entries.entries() { + // Skip the deleted entry so it does not show up in + // the latest snapshot that is generated. + if deleted_file_paths.contains(entry.file_path()) { + continue; + } + writer.add_entry(entry.as_ref().clone())?; + } + + let new_manifest = writer.write_manifest_file().await?; + filtered_manifests.push(new_manifest); + } else { + // No deleted entries in this manifest, keep it as-is + filtered_manifests.push(manifest); + } + } + + // Replace the existing manifests with our modified list + // which omits the known deleted data (parquet) files. + existing_manifests = filtered_manifests; + } + let manifest_files = manifest_process.process_manifests(existing_manifests); Ok(manifest_files) } diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index f3ee17c75c..434be79713 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -113,7 +113,9 @@ async fn test_append_data_file() { // commit result let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); + append_action + .add_data_files(data_file.clone(), vec![]) + .unwrap(); let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); @@ -133,7 +135,9 @@ async fn test_append_data_file() { // commit result again let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); + append_action + .add_data_files(data_file.clone(), vec![]) + .unwrap(); let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index c5c029a45a..ae1ecbbcac 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -122,7 +122,7 @@ async fn test_append_partition_data_file() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action - .add_data_files(data_file_valid.clone()) + .add_data_files(data_file_valid.clone(), vec![]) .unwrap(); let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); @@ -182,7 +182,7 @@ async fn test_schema_incompatible_partition_type( let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); if append_action - .add_data_files(data_file_invalid.clone()) + .add_data_files(data_file_invalid.clone(), vec![]) .is_ok() { panic!("diverging partition info should have returned error"); @@ -222,7 +222,7 @@ async fn test_schema_incompatible_partition_fields( let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); if append_action - .add_data_files(data_file_invalid.clone()) + .add_data_files(data_file_invalid.clone(), vec![]) .is_ok() { panic!("passing different number of partition fields should have returned error"); diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index d277e12e5a..a5275993ac 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -91,12 +91,16 @@ async fn test_append_data_file_conflict() { // start two transaction and commit one of them let tx1 = Transaction::new(&table); let mut append_action = tx1.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); + append_action + .add_data_files(data_file.clone(), vec![]) + .unwrap(); let tx1 = append_action.apply().await.unwrap(); let tx2 = Transaction::new(&table); let mut append_action = tx2.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); + append_action + .add_data_files(data_file.clone(), vec![]) + .unwrap(); let tx2 = append_action.apply().await.unwrap(); let table = tx2 .commit(&rest_catalog) diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 5ff982720b..80ff0e57af 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -310,7 +310,9 @@ async fn test_scan_all_type() { // commit result let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); + append_action + .add_data_files(data_file.clone(), vec![]) + .unwrap(); let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap();