diff --git a/src/error.rs b/src/error.rs index 3ec261c..ccad26c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -20,4 +20,6 @@ pub enum DataLoadingError { JoinError(#[from] tokio::task::JoinError), #[error("optimistic concurrency error")] OptimisticConcurrencyError(), + #[error("bad input error")] + BadInputError(String), } diff --git a/src/iceberg_destination.rs b/src/iceberg_destination.rs index 9d59b48..84c0998 100644 --- a/src/iceberg_destination.rs +++ b/src/iceberg_destination.rs @@ -27,6 +27,14 @@ use uuid::Uuid; use crate::error::DataLoadingError; +// Defines how to behave with existing tables +#[derive(Debug, Clone, PartialEq)] +enum WriteMode { + CreateExclusive, // Error out if the table already exists + Overwrite, // Overwrite existing table data + Append, // Append to existing table data +} + fn create_file_io(target_url: String) -> Result { let mut file_io_props: Vec<(String, String)> = vec![]; if let Ok(aws_endpoint) = std::env::var("AWS_ENDPOINT") { @@ -96,6 +104,18 @@ fn update_metadata_snapshot( Ok(new_metadata) } +async fn get_manifest_files( + file_io: &FileIO, + table_metadata: &TableMetadata, +) -> Result>, DataLoadingError> { + let snapshot = match table_metadata.current_snapshot() { + None => return Ok(None), + Some(s) => s, + }; + let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?; + Ok(Some(manifest_list.consume_entries().into_iter().collect())) +} + const DEFAULT_SCHEMA_ID: i32 = 0; pub async fn record_batches_to_iceberg( @@ -103,7 +123,19 @@ pub async fn record_batches_to_iceberg( arrow_schema: SchemaRef, target_url: Url, overwrite: bool, + append: bool, ) -> Result<(), DataLoadingError> { + let write_mode = match (overwrite, append) { + (false, false) => WriteMode::CreateExclusive, + (true, false) => WriteMode::Overwrite, + (false, true) => WriteMode::Append, + (true, true) => { + return Err(DataLoadingError::BadInputError( + "Cannot use overwrite flag with append flag".to_string(), + )); + } + }; + pin_mut!(record_batch_stream); let file_io = create_file_io(target_url.to_string())?; @@ -115,7 +147,7 @@ pub async fn record_batches_to_iceberg( let version_hint_location = format!("{}/metadata/version-hint.text", target_url); let version_hint_input = file_io.new_input(&version_hint_location)?; let old_version_hint: Option = if version_hint_input.exists().await? { - if !overwrite { + if write_mode == WriteMode::CreateExclusive { return Err(DataLoadingError::IoError(std::io::Error::other( "Table exists. Pass the overwrite flag to lakehouse-loader to overwrite data", ))); @@ -233,8 +265,20 @@ pub async fn record_batches_to_iceberg( }) .collect(), ); - let manifest_file: ManifestFile = manifest_writer.write(manifest).await?; - info!("Wrote manifest file: {:?}", manifest_file.manifest_path); + let new_manifest_file: ManifestFile = manifest_writer.write(manifest).await?; + info!("Wrote manifest file: {:?}", new_manifest_file.manifest_path); + + let new_manifest_files_vec: Vec = match write_mode { + WriteMode::CreateExclusive | WriteMode::Overwrite => vec![new_manifest_file], // Only include new manifest + WriteMode::Append => match get_manifest_files(&file_io, &previous_metadata).await? { + Some(mut manifest_files) => { + // Include new manifest and all manifests from previous snapshot + manifest_files.push(new_manifest_file); + manifest_files + } + None => vec![new_manifest_file], // Only include new manifest + }, + }; let manifest_list_path = format!( "{}/metadata/manifest-list-{}.avro", @@ -244,7 +288,7 @@ pub async fn record_batches_to_iceberg( let manifest_file_output = file_io.new_output(manifest_list_path.clone())?; let mut manifest_list_writer: ManifestListWriter = ManifestListWriter::v2(manifest_file_output, snapshot_id, None, sequence_number); - manifest_list_writer.add_manifests(vec![manifest_file].into_iter())?; + manifest_list_writer.add_manifests(new_manifest_files_vec.into_iter())?; manifest_list_writer.close().await?; info!("Wrote manifest list: {:?}", manifest_list_path); diff --git a/src/lib.rs b/src/lib.rs index 5b9cee0..c204d11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,8 @@ enum Commands { target_url: Url, #[clap(long, short, action)] overwrite: bool, + #[clap(long, action)] + append: bool, }, #[command(arg_required_else_help = true)] PgToIceberg { @@ -64,6 +66,8 @@ enum Commands { query: String, #[clap(long, short, action)] overwrite: bool, + #[clap(long, action)] + append: bool, #[clap( long, short, @@ -118,6 +122,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { source_file, target_url, overwrite, + append, } => { for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES { let file = tokio::fs::File::open(&source_file).await?; @@ -126,12 +131,15 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { .build() .unwrap(); let arrow_schema = record_batch_reader.schema().clone(); + let record_batch_stream = + record_batch_reader.map_err(DataLoadingError::ParquetError); info!("File schema: {}", arrow_schema); match record_batches_to_iceberg( - record_batch_reader.map_err(DataLoadingError::ParquetError), + record_batch_stream, arrow_schema, target_url.clone(), overwrite, + append, ) .await { @@ -154,6 +162,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { target_url, query, overwrite, + append, batch_size, } => { for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES { @@ -168,6 +177,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { arrow_schema, target_url.clone(), overwrite, + append, ) .await {