-
Notifications
You must be signed in to change notification settings - Fork 0
Iceberg support #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/iceberg_destination.rs
Outdated
|
|
||
| use crate::error::DataLoadingError; | ||
|
|
||
| pub async fn record_batches_to_iceberg( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is almost 200 lines long and handle multiple tasks. You can break it into something like:
let file_io = setup_file_io(...)
let iceberg_schema = assign_field_ids(...)
let metadata_v0 = prepare_metadata(...)
let data_files = write_record_batches(...)
let manifest_file = write_manifest(...)
write_snapshot(manifest_file, metadata_v0, target_url, file_io)
Besides, some values repeat along it, so you can store them in constants:
const DEFAULT_SCHEMA_ID: i32 = 0;
const METADATA_PATH: &str = "metadata";
const PARTITION_FILENAME: &str = "part";
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I've moved out the more self-contained logic out of record_batches_to_iceberg now. The remaining logic could be split up further, but right now I have low confidence that we know the right way to do it. It will become clearer in future as we learn more about Iceberg and which iceberg-related features we want to build
Co-authored-by: Luís Lizardo <[email protected]>
src/iceberg_destination.rs
Outdated
| let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&cloned_arrow_schema)?; | ||
|
|
||
| let table_creation = TableCreation::builder() | ||
| .name("dummy_name".to_string()) // Required by TableCreationBuilder. Doesn't affect output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably this name is stored in table metadata? If so how about using something that conveys at least some information (e.g. signifying that the table was created by lakehouse-loader, timestamp/source) or adding a new name param to the CLI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it doesn't get persisted in table metadata or anywhere else
|
|
||
| while let Some(maybe_batch) = record_batch_stream.next().await { | ||
| let batch = maybe_batch?; | ||
| file_writer.write(&batch).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this results in one or more row groups being buffered in memory prior to flushing, which may (or may not) pose memory usage issues. If it turns out it does we'd need to customize it, like we do with delta (temp files + multipart upload), but for start I think it's good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this results in one or more row groups being buffered in memory
You're right, it could be one or more row groups, but it should be bounded, if I'm interpreting this comment correctly: https://github.com/apache/arrow-rs/blob/ccd18f203116e87675ea4ddaf5ddc1b4f986ad14/parquet/src/arrow/async_writer/mod.rs#L211-L214
Therefore I don't expect memory usage issues
This PR adds Iceberg support with the
pg-to-icebergandparquet-to-icebergsubcommands.Testing