Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/iceberg/src/spec/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ impl Transform {
match self {
Self::Identity => datum.to_string(),
Self::Void => "null".to_string(),
// TODO: does this appear anywhere?
//
// It looks like this is used for metrics/statistics, but we hit the todo
// panic otherwise without filling something here.
Self::Day | Self::Month | Self::Year | Self::Hour => datum.to_string(),
Self::Bucket(_) => datum.to_string(),
_ => {
todo!()
}
Expand Down
27 changes: 22 additions & 5 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ impl<'a> FastAppendAction<'a> {
pub fn add_data_files(
&mut self,
data_files: impl IntoIterator<Item = DataFile>,
deleted_data_files: impl IntoIterator<Item = DataFile>,
) -> Result<&mut Self> {
self.snapshot_produce_action.add_data_files(data_files)?;
self.snapshot_produce_action
.add_data_files(data_files, deleted_data_files)?;
Ok(self)
}

Expand Down Expand Up @@ -103,13 +105,20 @@ impl<'a> FastAppendAction<'a> {
)
.await?;

self.add_data_files(data_files)?;
self.add_data_files(data_files, Vec::new())?;

self.apply().await
}

/// Finished building the action and apply it to the transaction.
pub async fn apply(self) -> Result<Transaction<'a>> {
if self.snapshot_produce_action.added_data_files.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"cannot apply no data files to transaction",
));
}

// Checks duplicate files
if self.check_duplicate {
let new_files: HashSet<&str> = self
Expand Down Expand Up @@ -224,7 +233,7 @@ mod tests {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);
let mut action = tx.fast_append(None, vec![]).unwrap();
action.add_data_files(vec![]).unwrap();
action.add_data_files(vec![], vec![]).unwrap();
assert!(action.apply().await.is_err());
}

Expand All @@ -245,8 +254,14 @@ mod tests {
.partition(Struct::from_iter([Some(Literal::string("test"))]))
.build()
.unwrap();
assert!(action.add_data_files(vec![data_file.clone()]).is_err());
assert!(
action
.add_data_files(vec![data_file.clone()], vec![])
.is_err()
);

let tx = Transaction::new(&table);
let mut action = tx.fast_append(None, vec![]).unwrap();
let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/3.parquet".to_string())
Expand All @@ -257,7 +272,9 @@ mod tests {
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap();
action.add_data_files(vec![data_file.clone()]).unwrap();
action
.add_data_files(vec![data_file.clone()], vec![])
.unwrap();
let tx = action.apply().await.unwrap();

// check updates and requirements
Expand Down
106 changes: 97 additions & 9 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::ops::RangeFrom;

Expand Down Expand Up @@ -66,6 +66,7 @@ pub(crate) struct SnapshotProduceAction<'a> {
commit_uuid: Uuid,
snapshot_properties: HashMap<String, String>,
pub added_data_files: Vec<DataFile>,
pub added_delete_files: Vec<DataFile>,
// A counter used to generate unique manifest file names.
// It starts from 0 and increments for each new manifest file.
// Note: This counter is limited to the range of (0..u64::MAX).
Expand All @@ -86,6 +87,7 @@ impl<'a> SnapshotProduceAction<'a> {
commit_uuid,
snapshot_properties,
added_data_files: vec![],
added_delete_files: vec![],
manifest_counter: (0..),
key_metadata,
})
Expand Down Expand Up @@ -126,6 +128,7 @@ impl<'a> SnapshotProduceAction<'a> {
pub fn add_data_files(
&mut self,
data_files: impl IntoIterator<Item = DataFile>,
deleted_data_files: impl IntoIterator<Item = DataFile>,
) -> Result<&mut Self> {
let data_files: Vec<DataFile> = data_files.into_iter().collect();
for data_file in &data_files {
Expand All @@ -135,6 +138,24 @@ impl<'a> SnapshotProduceAction<'a> {
"Only data content type is allowed for fast append",
));
}

// Check if the data file partition spec id matches the table default partition spec id.
if self.tx.current_table.metadata().default_partition_spec_id()
!= data_file.partition_spec_id
{
return Err(Error::new(
ErrorKind::DataInvalid,
"Data file partition spec id does not match table default partition spec id",
));
}
Self::validate_partition_value(
data_file.partition(),
self.tx.current_table.metadata().default_partition_type(),
)?;
}

let deleted_data_files: Vec<DataFile> = deleted_data_files.into_iter().collect();
for data_file in &deleted_data_files {
// Check if the data file partition spec id matches the table default partition spec id.
if self.tx.current_table.metadata().default_partition_spec_id()
!= data_file.partition_spec_id
Expand All @@ -150,6 +171,7 @@ impl<'a> SnapshotProduceAction<'a> {
)?;
}
self.added_data_files.extend(data_files);
self.added_delete_files.extend(deleted_data_files);
Ok(self)
}

Expand Down Expand Up @@ -211,6 +233,7 @@ impl<'a> SnapshotProduceAction<'a> {
builder.build_v2_data()
}
};

for entry in manifest_entries {
writer.add_entry(entry)?;
}
Expand All @@ -222,14 +245,79 @@ impl<'a> SnapshotProduceAction<'a> {
snapshot_produce_operation: &OP,
manifest_process: &MP,
) -> Result<Vec<ManifestFile>> {
let added_manifest = self.write_added_manifest().await?;
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
// # TODO
// Support process delete entries.

let mut manifest_files = vec![added_manifest];
manifest_files.extend(existing_manifests);
let manifest_files = manifest_process.process_manifests(manifest_files);
let mut existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;

if !self.added_data_files.is_empty() {
let added_manifest = self.write_added_manifest().await?;
existing_manifests.push(added_manifest);
}

if !self.added_delete_files.is_empty() {
let deleted_file_paths: HashSet<String> = self
.added_delete_files
.iter()
.map(|df| df.file_path.clone())
.collect();

// Filter existing manifests to remove entries for deleted files
let mut filtered_manifests = Vec::new();

for manifest in existing_manifests {
let manifest_entries = manifest
.load_manifest(self.tx.current_table.file_io())
.await?;

// Check if any entries in this manifest are for deleted files
let has_deleted_entries = manifest_entries
.entries()
.iter()
.any(|entry| deleted_file_paths.contains(entry.file_path()));

if has_deleted_entries {
// When there are deleted files, we should write the new manifests
// of this snapshot without including those entries.
let mut writer = {
let builder = ManifestWriterBuilder::new(
self.new_manifest_output()?,
Some(self.snapshot_id),
self.key_metadata.clone(),
self.tx.current_table.metadata().current_schema().clone(),
self.tx
.current_table
.metadata()
.default_partition_spec()
.as_ref()
.clone(),
);
if self.tx.current_table.metadata().format_version() == FormatVersion::V1 {
builder.build_v1()
} else {
builder.build_v2_data()
}
};

for entry in manifest_entries.entries() {
// Skip the deleted entry so it does not show up in
// the latest snapshot that is generated.
if deleted_file_paths.contains(entry.file_path()) {
continue;
}
writer.add_entry(entry.as_ref().clone())?;
}

let new_manifest = writer.write_manifest_file().await?;
filtered_manifests.push(new_manifest);
} else {
// No deleted entries in this manifest, keep it as-is
filtered_manifests.push(manifest);
}
}

// Replace the existing manifests with our modified list
// which omits the known deleted data (parquet) files.
existing_manifests = filtered_manifests;
}
let manifest_files = manifest_process.process_manifests(existing_manifests);
Ok(manifest_files)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ async fn test_append_data_file() {
// commit result
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
append_action
.add_data_files(data_file.clone(), vec![])
.unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&rest_catalog).await.unwrap();

Expand All @@ -133,7 +135,9 @@ async fn test_append_data_file() {
// commit result again
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
append_action
.add_data_files(data_file.clone(), vec![])
.unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&rest_catalog).await.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async fn test_append_partition_data_file() {
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
append_action
.add_data_files(data_file_valid.clone())
.add_data_files(data_file_valid.clone(), vec![])
.unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&rest_catalog).await.unwrap();
Expand Down Expand Up @@ -182,7 +182,7 @@ async fn test_schema_incompatible_partition_type(
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
if append_action
.add_data_files(data_file_invalid.clone())
.add_data_files(data_file_invalid.clone(), vec![])
.is_ok()
{
panic!("diverging partition info should have returned error");
Expand Down Expand Up @@ -222,7 +222,7 @@ async fn test_schema_incompatible_partition_fields(
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
if append_action
.add_data_files(data_file_invalid.clone())
.add_data_files(data_file_invalid.clone(), vec![])
.is_ok()
{
panic!("passing different number of partition fields should have returned error");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,16 @@ async fn test_append_data_file_conflict() {
// start two transaction and commit one of them
let tx1 = Transaction::new(&table);
let mut append_action = tx1.fast_append(None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
append_action
.add_data_files(data_file.clone(), vec![])
.unwrap();
let tx1 = append_action.apply().await.unwrap();

let tx2 = Transaction::new(&table);
let mut append_action = tx2.fast_append(None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
append_action
.add_data_files(data_file.clone(), vec![])
.unwrap();
let tx2 = append_action.apply().await.unwrap();
let table = tx2
.commit(&rest_catalog)
Expand Down
4 changes: 3 additions & 1 deletion crates/integration_tests/tests/shared_tests/scan_all_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ async fn test_scan_all_type() {
// commit result
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
append_action
.add_data_files(data_file.clone(), vec![])
.unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&rest_catalog).await.unwrap();

Expand Down