From 6769f0e463b335cf7db9e02854a85316946cc351 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 18 Dec 2024 17:51:22 +0800 Subject: [PATCH] wip --- crates/iceberg/src/spec/manifest.rs | 79 +++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 086c63080..b283303df 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -24,6 +24,8 @@ use std::sync::Arc; use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; use bytes::Bytes; use itertools::Itertools; +use serde::ser::SerializeStruct; +use serde::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize}; use serde_json::to_vec; use serde_with::{DeserializeFromStr, SerializeDisplay}; @@ -311,16 +313,14 @@ impl ManifestWriter { self.partitions.push(entry.data_file.partition.clone()); let value = match manifest.metadata.format_version { - FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from( - (*entry).clone(), - &partition_type, - )?)? - .resolve(&avro_schema)?, - FormatVersion::V2 => to_value(_serde::ManifestEntryV2::try_from( - (*entry).clone(), - &partition_type, - )?)? - .resolve(&avro_schema)?, + FormatVersion::V1 => { + to_value(_serde::ManifestEntryV1::try_from((*entry).clone())?)? + .resolve(&avro_schema)? + } + FormatVersion::V2 => { + to_value(_serde::ManifestEntryV2::try_from((*entry).clone())?)? + .resolve(&avro_schema)? + } }; avro_writer.append(value)?; @@ -1104,6 +1104,47 @@ pub struct DataFile { /// delete files. #[builder(default, setter(strip_option))] pub(crate) sort_order_id: Option, + + /// # TODO + /// fullfiil + #[builder(setter(strip_option))] + partition_type: StructType, + /// # TODO + /// fullfiil + #[builder(setter(strip_option))] + schema: Schema, +} + +impl Serialize for DataFile { + fn serialize(&self, serializer: S) -> std::result::Result + where S: serde::Serializer { + let mut state = serializer.serialize_struct("DataFile", 18)?; + let raw_data_file = _serde::DataFile::try_from(self.clone(), false).unwrap(); + state.serialize_field("schema", &self.schema)?; + state.serialize_field("partition_type", &self.partition_type)?; + state.serialize_field("raw", &raw_data_file)?; + state.end() + } +} + +impl<'de> Deserialize<'de> for DataFile { + fn deserialize(deserializer: D) -> std::result::Result + where D: serde::Deserializer<'de> { + #[derive(Deserialize)] + struct RawDataFile { + schema: Schema, + partition_type: StructType, + raw: crate::spec::manifest::_serde::DataFile, + } + let raw_data_file = RawDataFile::deserialize(deserializer)?; + let data_file = crate::spec::manifest::_serde::DataFile::try_into( + raw_data_file.raw, + &raw_data_file.partition_type, + &raw_data_file.schema, + ) + .unwrap(); + Ok(data_file) + } } impl DataFile { @@ -1274,13 +1315,13 @@ mod _serde { } impl ManifestEntryV2 { - pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result { + pub fn try_from(value: ManifestEntry) -> Result { Ok(Self { status: value.status as i32, snapshot_id: value.snapshot_id, sequence_number: value.sequence_number, file_sequence_number: value.file_sequence_number, - data_file: DataFile::try_from(value.data_file, partition_type, false)?, + data_file: DataFile::try_from(value.data_file, false)?, }) } @@ -1307,11 +1348,11 @@ mod _serde { } impl ManifestEntryV1 { - pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result { + pub fn try_from(value: ManifestEntry) -> Result { Ok(Self { status: value.status as i32, snapshot_id: value.snapshot_id.unwrap_or_default(), - data_file: DataFile::try_from(value.data_file, partition_type, true)?, + data_file: DataFile::try_from(value.data_file, true)?, }) } @@ -1356,11 +1397,7 @@ mod _serde { } impl DataFile { - pub fn try_from( - value: super::DataFile, - partition_type: &StructType, - is_version_1: bool, - ) -> Result { + pub fn try_from(value: super::DataFile, is_version_1: bool) -> Result { let block_size_in_bytes = if is_version_1 { Some(0) } else { None }; Ok(Self { content: value.content as i32, @@ -1368,7 +1405,7 @@ mod _serde { file_format: value.file_format.to_string().to_ascii_uppercase(), partition: RawLiteral::try_from( Literal::Struct(value.partition), - &Type::Struct(partition_type.clone()), + &Type::Struct(value.partition_type.clone()), )?, record_count: value.record_count.try_into()?, file_size_in_bytes: value.file_size_in_bytes.try_into()?, @@ -1447,6 +1484,8 @@ mod _serde { split_offsets: self.split_offsets.unwrap_or_default(), equality_ids: self.equality_ids.unwrap_or_default(), sort_order_id: self.sort_order_id, + partition_type: partition_type.clone(), + schema: schema.clone(), }) } }