diff --git a/Cargo.lock b/Cargo.lock index 622a5d8..c9a3f1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1966,6 +1966,7 @@ dependencies = [ "log", "native-tls", "object_store", + "opendal", "parquet", "postgres", "postgres-native-tls", diff --git a/Cargo.toml b/Cargo.toml index abc5437..a6eea86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "e7008f399 log = "0.4" native-tls = "0.2.11" object_store = { version = "0.11", features = ["aws"] } +opendal = { version = "0.50" } parquet = { version = "53" } postgres = { version = "0.19.7", git = "https://github.com/splitgraph/rust-postgres", rev = "88c2c7714a4558aed6a63e2e2b140a8359568858" } postgres-native-tls = { version = "0.5.0", git = "https://github.com/splitgraph/rust-postgres", rev = "88c2c7714a4558aed6a63e2e2b140a8359568858" } diff --git a/src/error.rs b/src/error.rs index 6a1465a..3ec261c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,4 +18,6 @@ pub enum DataLoadingError { ObjectStoreError(#[from] object_store::Error), #[error("join error")] JoinError(#[from] tokio::task::JoinError), + #[error("optimistic concurrency error")] + OptimisticConcurrencyError(), } diff --git a/src/iceberg_destination.rs b/src/iceberg_destination.rs index acfcb9a..75a74df 100644 --- a/src/iceberg_destination.rs +++ b/src/iceberg_destination.rs @@ -1,5 +1,6 @@ use core::str; use std::collections::HashMap; +use std::error::Error; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -119,24 +120,24 @@ pub async fn record_batches_to_iceberg( "Table exists. Pass the overwrite flag to lakehouse-loader to overwrite data", ))); } - let x = version_hint_input.read().await?; - let y: String = String::from_utf8(x.to_vec()).map_err(|_| { - DataLoadingError::IcebergError(iceberg::Error::new( - iceberg::ErrorKind::DataInvalid, - "Could not parse UTF-8 in version-hint.text", - )) - })?; - let z = y.trim().parse::().map_err(|_| { + let version_hint_bytes = version_hint_input.read().await?; + let version_hint_string: String = + String::from_utf8(version_hint_bytes.to_vec()).map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse UTF-8 in version-hint.text", + )) + })?; + let version_hint_u64 = version_hint_string.trim().parse::().map_err(|_| { DataLoadingError::IcebergError(iceberg::Error::new( iceberg::ErrorKind::DataInvalid, "Could not parse integer version in version-hint.text", )) })?; - Some(z) + Some(version_hint_u64) } else { None }; - let (previous_metadata, previous_metadata_location) = match old_version_hint { Some(version_hint) => { let old_metadata_location = @@ -275,10 +276,20 @@ pub async fn record_batches_to_iceberg( target_url, new_version_hint ); - file_io + if let Err(iceberg_error) = file_io .new_output(&new_metadata_location)? .write_exclusive(serde_json::to_vec(&new_metadata).unwrap().into()) - .await?; + .await + { + if let Some(iceberg_error_source) = iceberg_error.source() { + if let Some(opendal_error) = iceberg_error_source.downcast_ref::() { + if opendal_error.kind() == opendal::ErrorKind::ConditionNotMatch { + return Err(DataLoadingError::OptimisticConcurrencyError()); + } + } + } + return Err(iceberg_error.into()); + }; info!("Wrote new metadata: {:?}", new_metadata_location); file_io diff --git a/src/lib.rs b/src/lib.rs index 6b64ff8..5b9cee0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,6 +75,8 @@ enum Commands { }, } +const OPTIMISTIC_CONCURRENCY_RETRIES: u32 = 3; + pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { match args.command { Commands::ParquetToDelta { @@ -117,20 +119,35 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { target_url, overwrite, } => { - let file = tokio::fs::File::open(source_file).await?; - let record_batch_reader = ParquetRecordBatchStreamBuilder::new(file) - .await? - .build() - .unwrap(); - let schema = record_batch_reader.schema().clone(); - info!("File schema: {}", schema); - record_batches_to_iceberg( - record_batch_reader.map_err(DataLoadingError::ParquetError), - schema, - target_url, - overwrite, - ) - .await + for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES { + let file = tokio::fs::File::open(&source_file).await?; + let record_batch_reader = ParquetRecordBatchStreamBuilder::new(file) + .await? + .build() + .unwrap(); + let arrow_schema = record_batch_reader.schema().clone(); + info!("File schema: {}", arrow_schema); + match record_batches_to_iceberg( + record_batch_reader.map_err(DataLoadingError::ParquetError), + arrow_schema, + target_url.clone(), + overwrite, + ) + .await + { + Err(DataLoadingError::OptimisticConcurrencyError()) => { + info!("Optimistic concurrency error. Retrying"); + continue; + } + Err(e) => { + return Err(e); + } + Ok(_) => { + break; + } + } + } + Ok(()) } Commands::PgToIceberg { connection_string, @@ -139,14 +156,34 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> { overwrite, batch_size, } => { - let mut source = PgArrowSource::new(connection_string.as_ref(), &query, batch_size) - .await - .map_err(DataLoadingError::PostgresError)?; - let arrow_schema = source.get_arrow_schema(); - let record_batch_stream = source.get_record_batch_stream(); - info!("Rowset schema: {}", arrow_schema); - record_batches_to_iceberg(record_batch_stream, arrow_schema, target_url, overwrite) + for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES { + let mut source = PgArrowSource::new(connection_string.as_ref(), &query, batch_size) + .await + .map_err(DataLoadingError::PostgresError)?; + let arrow_schema = source.get_arrow_schema(); + let record_batch_stream = source.get_record_batch_stream(); + info!("Rowset schema: {}", arrow_schema); + match record_batches_to_iceberg( + record_batch_stream, + arrow_schema, + target_url.clone(), + overwrite, + ) .await + { + Err(DataLoadingError::OptimisticConcurrencyError()) => { + info!("Optimistic concurrency error. Retrying"); + continue; + } + Err(e) => { + return Err(e); + } + Ok(_) => { + break; + } + } + } + Ok(()) } } // TODO