Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ pub enum DataLoadingError {
JoinError(#[from] tokio::task::JoinError),
#[error("optimistic concurrency error")]
OptimisticConcurrencyError(),
#[error("bad input error")]
BadInputError(String),
}
52 changes: 48 additions & 4 deletions src/iceberg_destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ use uuid::Uuid;

use crate::error::DataLoadingError;

// Defines how to behave with existing tables
#[derive(Debug, Clone, PartialEq)]
enum WriteMode {
CreateExclusive, // Error out if the table already exists
Overwrite, // Overwrite existing table data
Append, // Append to existing table data
}

fn create_file_io(target_url: String) -> Result<FileIO, DataLoadingError> {
let mut file_io_props: Vec<(String, String)> = vec![];
if let Ok(aws_endpoint) = std::env::var("AWS_ENDPOINT") {
Expand Down Expand Up @@ -96,14 +104,38 @@ fn update_metadata_snapshot(
Ok(new_metadata)
}

async fn get_manifest_files(
file_io: &FileIO,
table_metadata: &TableMetadata,
) -> Result<Option<Vec<ManifestFile>>, DataLoadingError> {
let snapshot = match table_metadata.current_snapshot() {
None => return Ok(None),
Some(s) => s,
};
let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?;
Ok(Some(manifest_list.consume_entries().into_iter().collect()))
}

const DEFAULT_SCHEMA_ID: i32 = 0;

pub async fn record_batches_to_iceberg(
record_batch_stream: impl TryStream<Item = Result<RecordBatch, DataLoadingError>>,
arrow_schema: SchemaRef,
target_url: Url,
overwrite: bool,
append: bool,
) -> Result<(), DataLoadingError> {
let write_mode = match (overwrite, append) {
(false, false) => WriteMode::CreateExclusive,
(true, false) => WriteMode::Overwrite,
(false, true) => WriteMode::Append,
(true, true) => {
return Err(DataLoadingError::BadInputError(
"Cannot use overwrite flag with append flag".to_string(),
));
}
};

pin_mut!(record_batch_stream);

let file_io = create_file_io(target_url.to_string())?;
Expand All @@ -115,7 +147,7 @@ pub async fn record_batches_to_iceberg(
let version_hint_location = format!("{}/metadata/version-hint.text", target_url);
let version_hint_input = file_io.new_input(&version_hint_location)?;
let old_version_hint: Option<u64> = if version_hint_input.exists().await? {
if !overwrite {
if write_mode == WriteMode::CreateExclusive {
return Err(DataLoadingError::IoError(std::io::Error::other(
"Table exists. Pass the overwrite flag to lakehouse-loader to overwrite data",
)));
Expand Down Expand Up @@ -233,8 +265,20 @@ pub async fn record_batches_to_iceberg(
})
.collect(),
);
let manifest_file: ManifestFile = manifest_writer.write(manifest).await?;
info!("Wrote manifest file: {:?}", manifest_file.manifest_path);
let new_manifest_file: ManifestFile = manifest_writer.write(manifest).await?;
info!("Wrote manifest file: {:?}", new_manifest_file.manifest_path);

let new_manifest_files_vec: Vec<ManifestFile> = match write_mode {
WriteMode::CreateExclusive | WriteMode::Overwrite => vec![new_manifest_file], // Only include new manifest
WriteMode::Append => match get_manifest_files(&file_io, &previous_metadata).await? {
Some(mut manifest_files) => {
// Include new manifest and all manifests from previous snapshot
manifest_files.push(new_manifest_file);
manifest_files
}
None => vec![new_manifest_file], // Only include new manifest
},
};

let manifest_list_path = format!(
"{}/metadata/manifest-list-{}.avro",
Expand All @@ -244,7 +288,7 @@ pub async fn record_batches_to_iceberg(
let manifest_file_output = file_io.new_output(manifest_list_path.clone())?;
let mut manifest_list_writer: ManifestListWriter =
ManifestListWriter::v2(manifest_file_output, snapshot_id, None, sequence_number);
manifest_list_writer.add_manifests(vec![manifest_file].into_iter())?;
manifest_list_writer.add_manifests(new_manifest_files_vec.into_iter())?;
manifest_list_writer.close().await?;
info!("Wrote manifest list: {:?}", manifest_list_path);

Expand Down
12 changes: 11 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ enum Commands {
target_url: Url,
#[clap(long, short, action)]
overwrite: bool,
#[clap(long, action)]
append: bool,
},
#[command(arg_required_else_help = true)]
PgToIceberg {
Expand All @@ -64,6 +66,8 @@ enum Commands {
query: String,
#[clap(long, short, action)]
overwrite: bool,
#[clap(long, action)]
append: bool,
#[clap(
long,
short,
Expand Down Expand Up @@ -118,6 +122,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
source_file,
target_url,
overwrite,
append,
} => {
for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES {
let file = tokio::fs::File::open(&source_file).await?;
Expand All @@ -126,12 +131,15 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
.build()
.unwrap();
let arrow_schema = record_batch_reader.schema().clone();
let record_batch_stream =
record_batch_reader.map_err(DataLoadingError::ParquetError);
info!("File schema: {}", arrow_schema);
match record_batches_to_iceberg(
record_batch_reader.map_err(DataLoadingError::ParquetError),
record_batch_stream,
arrow_schema,
target_url.clone(),
overwrite,
append,
)
.await
{
Expand All @@ -154,6 +162,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
target_url,
query,
overwrite,
append,
batch_size,
} => {
for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES {
Expand All @@ -168,6 +177,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
arrow_schema,
target_url.clone(),
overwrite,
append,
)
.await
{
Expand Down
Loading