diff --git a/arrow-avro/src/errors.rs b/arrow-avro/src/errors.rs new file mode 100644 index 000000000000..3883c421d9fe --- /dev/null +++ b/arrow-avro/src/errors.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Common Avro errors and macros. + +use arrow_schema::ArrowError; +use core::num::TryFromIntError; +use std::error::Error; +use std::string::FromUtf8Error; +use std::{io, result, str}; + +/// Avro error enumeration + +#[derive(Debug)] +#[non_exhaustive] +pub enum AvroError { + /// General Avro error. + /// Returned when code violates normal workflow of working with Avro data. + General(String), + /// "Not yet implemented" Avro error. + /// Returned when functionality is not yet available. + NYI(String), + /// "End of file" Avro error. + /// Returned when IO related failures occur, e.g. when there are not enough bytes to + /// decode. + EOF(String), + /// Arrow error. + /// Returned when reading into arrow or writing from arrow. + ArrowError(Box), + /// Error when the requested index is more than the + /// number of items expected + IndexOutOfBound(usize, usize), + /// Error indicating that an unexpected or bad argument was passed to a function. + InvalidArgument(String), + /// Error indicating that a value could not be parsed. + ParseError(String), + /// Error indicating that a schema is invalid. + SchemaError(String), + /// An external error variant + External(Box), + /// Error during IO operations + IoError(String, io::Error), + /// Returned when a function needs more data to complete properly. The `usize` field indicates + /// the total number of bytes required, not the number of additional bytes. + NeedMoreData(usize), + /// Returned when a function needs more data to complete properly. + /// The `Range` indicates the range of bytes that are needed. + NeedMoreDataRange(std::ops::Range), +} + +impl std::fmt::Display for AvroError { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + match &self { + AvroError::General(message) => { + write!(fmt, "Avro error: {message}") + } + AvroError::NYI(message) => write!(fmt, "NYI: {message}"), + AvroError::EOF(message) => write!(fmt, "EOF: {message}"), + AvroError::ArrowError(message) => write!(fmt, "Arrow: {message}"), + AvroError::IndexOutOfBound(index, bound) => { + write!(fmt, "Index {index} out of bound: {bound}") + } + AvroError::InvalidArgument(message) => { + write!(fmt, "Invalid argument: {message}") + } + AvroError::ParseError(message) => write!(fmt, "Parse error: {message}"), + AvroError::SchemaError(message) => write!(fmt, "Schema error: {message}"), + AvroError::External(e) => write!(fmt, "External: {e}"), + AvroError::IoError(message, e) => write!(fmt, "I/O Error: {message}: {e}"), + AvroError::NeedMoreData(needed) => write!(fmt, "NeedMoreData: {needed}"), + AvroError::NeedMoreDataRange(range) => { + write!(fmt, "NeedMoreDataRange: {}..{}", range.start, range.end) + } + } + } +} + +impl Error for AvroError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + AvroError::External(e) => Some(e.as_ref()), + AvroError::ArrowError(e) => Some(e.as_ref()), + AvroError::IoError(_, e) => Some(e), + _ => None, + } + } +} + +impl From for AvroError { + fn from(e: TryFromIntError) -> AvroError { + AvroError::General(format!("Integer overflow: {e}")) + } +} + +impl From for AvroError { + fn from(e: io::Error) -> AvroError { + AvroError::External(Box::new(e)) + } +} + +impl From for AvroError { + fn from(e: str::Utf8Error) -> AvroError { + AvroError::External(Box::new(e)) + } +} + +impl From for AvroError { + fn from(e: FromUtf8Error) -> AvroError { + AvroError::External(Box::new(e)) + } +} + +impl From for AvroError { + fn from(e: ArrowError) -> Self { + AvroError::ArrowError(Box::new(e)) + } +} + +/// A specialized `Result` for Avro errors. +pub type Result = result::Result; + +impl From for io::Error { + fn from(e: AvroError) -> Self { + io::Error::other(e) + } +} + +impl From for ArrowError { + fn from(e: AvroError) -> Self { + match e { + AvroError::External(inner) => ArrowError::from_external_error(inner), + AvroError::IoError(msg, err) => ArrowError::IoError(msg, err), + AvroError::ArrowError(inner) => *inner, + other => ArrowError::AvroError(other.to_string()), + } + } +} diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs index 032ad683ff77..eb04ee8fa6fc 100644 --- a/arrow-avro/src/lib.rs +++ b/arrow-avro/src/lib.rs @@ -195,6 +195,9 @@ pub mod compression; /// Avro data types and Arrow data types. pub mod codec; +/// AvroError variants +pub mod errors; + /// Extension trait for AvroField to add Utf8View support /// /// This trait adds methods for working with Utf8View support to the AvroField struct. diff --git a/arrow-avro/src/reader/block.rs b/arrow-avro/src/reader/block.rs index 479f0ef90909..c4f4992d05e5 100644 --- a/arrow-avro/src/reader/block.rs +++ b/arrow-avro/src/reader/block.rs @@ -17,8 +17,8 @@ //! Decoder for [`Block`] +use crate::errors::{AvroError, Result}; use crate::reader::vlq::VLQDecoder; -use arrow_schema::ArrowError; /// A file data block /// @@ -75,14 +75,14 @@ impl BlockDecoder { /// can then be used again to read the next block, if any /// /// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf - pub fn decode(&mut self, mut buf: &[u8]) -> Result { + pub fn decode(&mut self, mut buf: &[u8]) -> Result { let max_read = buf.len(); while !buf.is_empty() { match self.state { BlockDecoderState::Count => { if let Some(c) = self.vlq_decoder.long(&mut buf) { self.in_progress.count = c.try_into().map_err(|_| { - ArrowError::ParseError(format!( + AvroError::ParseError(format!( "Block count cannot be negative, got {c}" )) })?; @@ -93,9 +93,7 @@ impl BlockDecoder { BlockDecoderState::Size => { if let Some(c) = self.vlq_decoder.long(&mut buf) { self.bytes_remaining = c.try_into().map_err(|_| { - ArrowError::ParseError(format!( - "Block size cannot be negative, got {c}" - )) + AvroError::ParseError(format!("Block size cannot be negative, got {c}")) })?; self.in_progress.data.reserve(self.bytes_remaining); diff --git a/arrow-avro/src/reader/cursor.rs b/arrow-avro/src/reader/cursor.rs index 23d9e503339d..25d196c47892 100644 --- a/arrow-avro/src/reader/cursor.rs +++ b/arrow-avro/src/reader/cursor.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +use crate::errors::{AvroError, Result}; use crate::reader::vlq::read_varint; -use arrow_schema::ArrowError; /// A wrapper around a byte slice, providing low-level decoding for Avro /// @@ -43,52 +43,51 @@ impl<'a> AvroCursor<'a> { /// Read a single `u8` #[inline] - pub(crate) fn get_u8(&mut self) -> Result { + pub(crate) fn get_u8(&mut self) -> Result { match self.buf.first().copied() { Some(x) => { self.buf = &self.buf[1..]; Ok(x) } - None => Err(ArrowError::ParseError("Unexpected EOF".to_string())), + None => Err(AvroError::EOF("Unexpected EOF".to_string())), } } #[inline] - pub(crate) fn get_bool(&mut self) -> Result { + pub(crate) fn get_bool(&mut self) -> Result { Ok(self.get_u8()? != 0) } - pub(crate) fn read_vlq(&mut self) -> Result { - let (val, offset) = read_varint(self.buf) - .ok_or_else(|| ArrowError::ParseError("bad varint".to_string()))?; + pub(crate) fn read_vlq(&mut self) -> Result { + let (val, offset) = + read_varint(self.buf).ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?; self.buf = &self.buf[offset..]; Ok(val) } #[inline] - pub(crate) fn get_int(&mut self) -> Result { + pub(crate) fn get_int(&mut self) -> Result { let varint = self.read_vlq()?; let val: u32 = varint .try_into() - .map_err(|_| ArrowError::ParseError("varint overflow".to_string()))?; + .map_err(|_| AvroError::ParseError("varint overflow".to_string()))?; Ok((val >> 1) as i32 ^ -((val & 1) as i32)) } #[inline] - pub(crate) fn get_long(&mut self) -> Result { + pub(crate) fn get_long(&mut self) -> Result { let val = self.read_vlq()?; Ok((val >> 1) as i64 ^ -((val & 1) as i64)) } - pub(crate) fn get_bytes(&mut self) -> Result<&'a [u8], ArrowError> { - let len: usize = self.get_long()?.try_into().map_err(|_| { - ArrowError::ParseError("offset overflow reading avro bytes".to_string()) - })?; + pub(crate) fn get_bytes(&mut self) -> Result<&'a [u8]> { + let len: usize = self + .get_long()? + .try_into() + .map_err(|_| AvroError::ParseError("offset overflow reading avro bytes".to_string()))?; if self.buf.len() < len { - return Err(ArrowError::ParseError( - "Unexpected EOF reading bytes".to_string(), - )); + return Err(AvroError::EOF("Unexpected EOF reading bytes".to_string())); } let ret = &self.buf[..len]; self.buf = &self.buf[len..]; @@ -96,11 +95,9 @@ impl<'a> AvroCursor<'a> { } #[inline] - pub(crate) fn get_float(&mut self) -> Result { + pub(crate) fn get_float(&mut self) -> Result { if self.buf.len() < 4 { - return Err(ArrowError::ParseError( - "Unexpected EOF reading float".to_string(), - )); + return Err(AvroError::EOF("Unexpected EOF reading float".to_string())); } let ret = f32::from_le_bytes(self.buf[..4].try_into().unwrap()); self.buf = &self.buf[4..]; @@ -108,11 +105,9 @@ impl<'a> AvroCursor<'a> { } #[inline] - pub(crate) fn get_double(&mut self) -> Result { + pub(crate) fn get_double(&mut self) -> Result { if self.buf.len() < 8 { - return Err(ArrowError::ParseError( - "Unexpected EOF reading float".to_string(), - )); + return Err(AvroError::EOF("Unexpected EOF reading float".to_string())); } let ret = f64::from_le_bytes(self.buf[..8].try_into().unwrap()); self.buf = &self.buf[8..]; @@ -120,11 +115,9 @@ impl<'a> AvroCursor<'a> { } /// Read exactly `n` bytes from the buffer (e.g. for Avro `fixed`). - pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8], ArrowError> { + pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8]> { if self.buf.len() < n { - return Err(ArrowError::ParseError( - "Unexpected EOF reading fixed".to_string(), - )); + return Err(AvroError::EOF("Unexpected EOF reading fixed".to_string())); } let ret = &self.buf[..n]; self.buf = &self.buf[n..]; diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs index aac267f50e9e..c9ab0c2ea185 100644 --- a/arrow-avro/src/reader/header.rs +++ b/arrow-avro/src/reader/header.rs @@ -18,13 +18,13 @@ //! Decoder for [`Header`] use crate::compression::{CODEC_METADATA_KEY, CompressionCodec}; +use crate::errors::{AvroError, Result}; use crate::reader::vlq::VLQDecoder; use crate::schema::{SCHEMA_METADATA_KEY, Schema}; -use arrow_schema::ArrowError; use std::io::BufRead; /// Read the Avro file header (magic, metadata, sync marker) from `reader`. -pub(crate) fn read_header(mut reader: R) -> Result { +pub(crate) fn read_header(mut reader: R) -> Result
{ let mut decoder = HeaderDecoder::default(); loop { let buf = reader.fill_buf()?; @@ -38,9 +38,9 @@ pub(crate) fn read_header(mut reader: R) -> Result Result, ArrowError> { + pub fn compression(&self) -> Result> { let v = self.get(CODEC_METADATA_KEY); match v { None | Some(b"null") => Ok(None), @@ -105,7 +105,7 @@ impl Header { Some(b"zstandard") => Ok(Some(CompressionCodec::ZStandard)), Some(b"bzip2") => Ok(Some(CompressionCodec::Bzip2)), Some(b"xz") => Ok(Some(CompressionCodec::Xz)), - Some(v) => Err(ArrowError::ParseError(format!( + Some(v) => Err(AvroError::ParseError(format!( "Unrecognized compression codec \'{}\'", String::from_utf8_lossy(v) ))), @@ -113,11 +113,11 @@ impl Header { } /// Returns the `Schema` if any - pub(crate) fn schema(&self) -> Result>, ArrowError> { + pub(crate) fn schema(&self) -> Result>> { self.get(SCHEMA_METADATA_KEY) .map(|x| { serde_json::from_slice(x).map_err(|e| { - ArrowError::ParseError(format!("Failed to parse Avro schema JSON: {e}")) + AvroError::ParseError(format!("Failed to parse Avro schema JSON: {e}")) }) }) .transpose() @@ -175,7 +175,7 @@ impl HeaderDecoder { /// input bytes, and the header can be obtained with [`Self::flush`] /// /// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf - pub fn decode(&mut self, mut buf: &[u8]) -> Result { + pub fn decode(&mut self, mut buf: &[u8]) -> Result { let max_read = buf.len(); while !buf.is_empty() { match self.state { @@ -183,7 +183,7 @@ impl HeaderDecoder { let remaining = &MAGIC[MAGIC.len() - self.bytes_remaining..]; let to_decode = buf.len().min(remaining.len()); if !buf.starts_with(&remaining[..to_decode]) { - return Err(ArrowError::ParseError("Incorrect avro magic".to_string())); + return Err(AvroError::ParseError("Incorrect avro magic".to_string())); } self.bytes_remaining -= to_decode; buf = &buf[to_decode..]; @@ -310,7 +310,7 @@ mod test { let mut decoder = HeaderDecoder::default(); decoder.decode(b"Ob").unwrap(); let err = decoder.decode(b"s").unwrap_err().to_string(); - assert_eq!(err, "Parser error: Incorrect avro magic"); + assert_eq!(err, "Parse error: Incorrect avro magic"); } fn decode_file(file: &str) -> Header { diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 426845676a88..3f1b26bc7fcb 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -480,6 +480,7 @@ //! //! --- use crate::codec::AvroFieldBuilder; +use crate::errors::{AvroError, Result}; use crate::reader::header::read_header; use crate::schema::{ AvroSchema, CONFLUENT_MAGIC, Fingerprint, FingerprintAlgorithm, SINGLE_OBJECT_MAGIC, Schema, @@ -499,11 +500,10 @@ mod header; mod record; mod vlq; -fn is_incomplete_data(err: &ArrowError) -> bool { +fn is_incomplete_data(err: &AvroError) -> bool { matches!( err, - ArrowError::ParseError(msg) - if msg.contains("Unexpected EOF") + AvroError::EOF(_) | AvroError::NeedMoreData(_) | AvroError::NeedMoreDataRange(_) ) } @@ -687,7 +687,7 @@ impl Decoder { continue; } Err(ref e) if is_incomplete_data(e) => break, - err => return err, + Err(e) => return Err(e.into()), }; } match self.handle_prefix(&data[total_consumed..])? { @@ -711,7 +711,7 @@ impl Decoder { // * Ok(None) – buffer does not start with the prefix. // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller should await more bytes. // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and fingerprint). - fn handle_prefix(&mut self, buf: &[u8]) -> Result, ArrowError> { + fn handle_prefix(&mut self, buf: &[u8]) -> Result> { match self.fingerprint_algorithm { FingerprintAlgorithm::Rabin => { self.handle_prefix_common(buf, &SINGLE_OBJECT_MAGIC, |bytes| { @@ -749,7 +749,7 @@ impl Decoder { buf: &[u8], magic: &[u8; MAGIC_LEN], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, - ) -> Result, ArrowError> { + ) -> Result> { // Need at least the magic bytes to decide // 2 bytes for Avro Spec and 1 byte for Confluent Wire Protocol. if buf.len() < MAGIC_LEN { @@ -774,7 +774,7 @@ impl Decoder { &mut self, buf: &[u8], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, - ) -> Result, ArrowError> { + ) -> Result> { // Need enough bytes to get fingerprint (next N bytes) let Some(fingerprint_bytes) = buf.get(..N) else { return Ok(None); // insufficient bytes @@ -784,7 +784,7 @@ impl Decoder { // If the fingerprint indicates a schema change, prepare to switch decoders. if self.active_fingerprint != Some(new_fingerprint) { let Some(new_decoder) = self.cache.shift_remove(&new_fingerprint) else { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Unknown fingerprint: {new_fingerprint:?}" ))); }; @@ -816,7 +816,7 @@ impl Decoder { } } - fn flush_and_reset(&mut self) -> Result, ArrowError> { + fn flush_and_reset(&mut self) -> Result> { if self.batch_is_empty() { return Ok(None); } @@ -835,7 +835,7 @@ impl Decoder { // We must flush the active decoder before switching to the pending one. let batch = self.flush_and_reset(); self.apply_pending_schema(); - batch + batch.map_err(ArrowError::from) } /// Returns the number of rows that can be added to this decoder before it is full. @@ -856,7 +856,7 @@ impl Decoder { // Decode either the block count or remaining capacity from `data` (an OCF block payload). // // Returns the number of bytes consumed from `data` along with the number of records decoded. - fn decode_block(&mut self, data: &[u8], count: usize) -> Result<(usize, usize), ArrowError> { + fn decode_block(&mut self, data: &[u8], count: usize) -> Result<(usize, usize)> { // OCF decoding never interleaves records across blocks, so no chunking. let to_decode = std::cmp::min(count, self.remaining_capacity); if to_decode == 0 { @@ -869,7 +869,7 @@ impl Decoder { // Produce a `RecordBatch` if at least one row is fully decoded, returning // `Ok(None)` if no new rows are available. - fn flush_block(&mut self) -> Result, ArrowError> { + fn flush_block(&mut self) -> Result> { self.flush_and_reset() } } @@ -965,7 +965,7 @@ impl ReaderBuilder { &self, writer_schema: &Schema, reader_schema: Option<&Schema>, - ) -> Result { + ) -> Result { let mut builder = AvroFieldBuilder::new(writer_schema); if let Some(reader_schema) = reader_schema { builder = builder.with_reader_schema(reader_schema); @@ -981,7 +981,7 @@ impl ReaderBuilder { &self, writer_schema: &Schema, reader_schema: Option<&AvroSchema>, - ) -> Result { + ) -> Result { let reader_schema_raw = reader_schema.map(|s| s.schema()).transpose()?; self.make_record_decoder(writer_schema, reader_schema_raw.as_ref()) } @@ -1009,14 +1009,11 @@ impl ReaderBuilder { &self, header: Option<&Header>, reader_schema: Option<&AvroSchema>, - ) -> Result { + ) -> Result { if let Some(hdr) = header { - let writer_schema = hdr - .schema() - .map_err(|e| ArrowError::ExternalError(Box::new(e)))? - .ok_or_else(|| { - ArrowError::ParseError("No Avro schema present in file header".into()) - })?; + let writer_schema = hdr.schema()?.ok_or_else(|| { + AvroError::ParseError("No Avro schema present in file header".into()) + })?; let record_decoder = self.make_record_decoder_from_schemas(&writer_schema, reader_schema)?; return Ok(self.make_decoder_with_parts( @@ -1027,11 +1024,11 @@ impl ReaderBuilder { )); } let store = self.writer_schema_store.as_ref().ok_or_else(|| { - ArrowError::ParseError("Writer schema store required for raw Avro".into()) + AvroError::ParseError("Writer schema store required for raw Avro".into()) })?; let fingerprints = store.fingerprints(); if fingerprints.is_empty() { - return Err(ArrowError::ParseError( + return Err(AvroError::ParseError( "Writer schema store must contain at least one schema".into(), )); } @@ -1039,7 +1036,7 @@ impl ReaderBuilder { .active_fingerprint .or_else(|| fingerprints.first().copied()) .ok_or_else(|| { - ArrowError::ParseError("Could not determine initial schema fingerprint".into()) + AvroError::ParseError("Could not determine initial schema fingerprint".into()) })?; let mut cache = IndexMap::with_capacity(fingerprints.len().saturating_sub(1)); let mut active_decoder: Option = None; @@ -1047,7 +1044,7 @@ impl ReaderBuilder { let avro_schema = match store.lookup(&fingerprint) { Some(schema) => schema, None => { - return Err(ArrowError::ComputeError(format!( + return Err(AvroError::General(format!( "Fingerprint {fingerprint:?} not found in schema store", ))); } @@ -1062,7 +1059,7 @@ impl ReaderBuilder { } } let active_decoder = active_decoder.ok_or_else(|| { - ArrowError::ComputeError(format!( + AvroError::General(format!( "Initial fingerprint {start_fingerprint:?} not found in schema store" )) })?; @@ -1175,6 +1172,7 @@ impl ReaderBuilder { )); } self.make_decoder(None, self.reader_schema.as_ref()) + .map_err(ArrowError::from) } } @@ -1251,7 +1249,7 @@ impl Reader { self.block_count -= records_decoded; } } - self.decoder.flush_block() + self.decoder.flush_block().map_err(ArrowError::from) } } diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 0412a3e754aa..bbedbda6a7f9 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -21,6 +21,7 @@ use crate::codec::{ AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord, ResolvedUnion, }; +use crate::errors::{AvroError, Result}; use crate::reader::cursor::AvroCursor; use crate::schema::Nullability; #[cfg(feature = "small_decimals")] @@ -29,12 +30,12 @@ use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDa use arrow_array::types::*; use arrow_array::*; use arrow_buffer::*; -use arrow_schema::{ - ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, - FieldRef, Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode, -}; #[cfg(feature = "small_decimals")] use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION}; +use arrow_schema::{ + DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef, + Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode, +}; #[cfg(feature = "avro_custom_types")] use arrow_select::take::{TakeOptions, take}; use std::cmp::Ordering; @@ -67,8 +68,7 @@ macro_rules! flush_decimal { ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{ let (_, vals, _) = $builder.finish().into_parts(); let dec = <$ArrayTy>::try_new(vals, $nulls)? - .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8) - .map_err(|e| ArrowError::ParseError(e.to_string()))?; + .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?; Arc::new(dec) as ArrayRef }}; } @@ -84,7 +84,7 @@ macro_rules! append_decimal_default { $builder.append_value(val); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( concat!( "Default for ", $name, @@ -115,7 +115,7 @@ impl RecordDecoder { /// /// # Errors /// This function will return an error if the provided `data_type` is not a `Record`. - pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result { + pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result { match data_type.codec() { Codec::Struct(reader_fields) => { // Build Arrow schema fields and per-child decoders @@ -137,7 +137,7 @@ impl RecordDecoder { projector, }) } - other => Err(ArrowError::ParseError(format!( + other => Err(AvroError::ParseError(format!( "Expected record got {other:?}" ))), } @@ -149,7 +149,7 @@ impl RecordDecoder { } /// Decode `count` records from `buf` - pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result { + pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result { let mut cursor = AvroCursor::new(buf); match self.projector.as_mut() { Some(proj) => { @@ -169,13 +169,13 @@ impl RecordDecoder { } /// Flush the decoded records into a [`RecordBatch`] - pub(crate) fn flush(&mut self) -> Result { + pub(crate) fn flush(&mut self) -> Result { let arrays = self .fields .iter_mut() .map(|x| x.flush(None)) .collect::, _>>()?; - RecordBatch::try_new(self.schema.clone(), arrays) + RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into) } } @@ -246,7 +246,7 @@ enum Decoder { } impl Decoder { - fn try_new(data_type: &AvroDataType) -> Result { + fn try_new(data_type: &AvroDataType) -> Result { if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() { if info.writer_is_union && !info.reader_is_union { let mut clone = data_type.clone(); @@ -264,7 +264,7 @@ impl Decoder { Self::try_new_internal(data_type) } - fn try_new_internal(data_type: &AvroDataType) -> Result { + fn try_new_internal(data_type: &AvroDataType) -> Result { // Extract just the Promotion (if any) to simplify pattern matching let promotion = match data_type.resolution.as_ref() { Some(ResolutionInfo::Promotion(p)) => Some(p), @@ -369,7 +369,7 @@ impl Decoder { .with_precision_and_scale(prec, scl)?; Self::Decimal256(p, s, *size, builder) } else { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Decimal precision {p} exceeds maximum supported" ))); } @@ -385,7 +385,7 @@ impl Decoder { .with_precision_and_scale(prec, scl)?; Self::Decimal256(p, s, *size, builder) } else { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Decimal precision {p} exceeds maximum supported" ))); } @@ -452,7 +452,7 @@ impl Decoder { .map(Self::try_new_internal) .collect::, _>>()?; if fields.len() != decoders.len() { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Union has {} fields but {} decoders", fields.len(), decoders.len() @@ -463,7 +463,7 @@ impl Decoder { let branch_count = decoders.len(); let max_addr = (i32::MAX as usize) + 1; if branch_count > max_addr { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Union has {branch_count} branches, which exceeds the maximum addressable \ branches by an Avro int tag ({} + 1).", i32::MAX @@ -480,7 +480,7 @@ impl Decoder { Self::Union(builder.build()?) } (Codec::Union(_, _, _), _) => { - return Err(ArrowError::NotYetImplemented( + return Err(AvroError::NYI( "Sparse Arrow unions are not yet supported".to_string(), )); } @@ -493,7 +493,7 @@ impl Decoder { 32 => 4, 64 => 8, other => { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Unsupported run-end width {other} for RunEndEncoded; \ expected 16/32/64 bits or 2/4/8 bytes" ))); @@ -525,7 +525,7 @@ impl Decoder { } /// Append a null record - fn append_null(&mut self) -> Result<(), ArrowError> { + fn append_null(&mut self) -> Result<()> { match self { Self::Null(count) => *count += 1, Self::Boolean(b) => b.append(false), @@ -593,7 +593,7 @@ impl Decoder { } /// Append a single default literal into the decoder's buffers - fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> { + fn append_default(&mut self, lit: &AvroLiteral) -> Result<()> { match self { Self::Nullable(_, nb, inner, _) => { if matches!(lit, AvroLiteral::Null) { @@ -609,7 +609,7 @@ impl Decoder { *count += 1; Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Non-null default for null type".to_string(), )), }, @@ -618,7 +618,7 @@ impl Decoder { b.append(*v); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for boolean must be boolean".to_string(), )), }, @@ -627,7 +627,7 @@ impl Decoder { v.push(*i); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for int32/date32/time-millis must be int".to_string(), )), }, @@ -640,7 +640,7 @@ impl Decoder { v.push(*i); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for duration long must be long".to_string(), )), }, @@ -658,7 +658,7 @@ impl Decoder { v.push(*i as i64); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for long/time-micros/timestamp must be long or int".to_string(), )), }, @@ -667,7 +667,7 @@ impl Decoder { v.push(*f); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for float must be float".to_string(), )), }, @@ -679,7 +679,7 @@ impl Decoder { v.push(*f); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for double must be double".to_string(), )), }, @@ -689,7 +689,7 @@ impl Decoder { values.extend_from_slice(b); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for bytes must be bytes".to_string(), )), }, @@ -702,26 +702,26 @@ impl Decoder { values.extend_from_slice(b); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for string must be string".to_string(), )), }, Self::Uuid(values) => match lit { AvroLiteral::String(s) => { let uuid = Uuid::try_parse(s).map_err(|e| { - ArrowError::InvalidArgumentError(format!("Invalid UUID default: {s} ({e})")) + AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})")) })?; values.extend_from_slice(uuid.as_bytes()); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for uuid must be string".to_string(), )), }, Self::Fixed(sz, accum) => match lit { AvroLiteral::Bytes(b) => { if b.len() != *sz as usize { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Fixed default length {} does not match size {sz}", b.len(), ))); @@ -729,7 +729,7 @@ impl Decoder { accum.extend_from_slice(b); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for fixed must be bytes".to_string(), )), }, @@ -750,7 +750,7 @@ impl Decoder { Self::Duration(builder) => match lit { AvroLiteral::Bytes(b) => { if b.len() != 12 { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Duration default must be exactly 12 bytes, got {}", b.len() ))); @@ -766,7 +766,7 @@ impl Decoder { )); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for duration must be 12-byte little-endian months/days/millis" .to_string(), )), @@ -779,7 +779,7 @@ impl Decoder { } Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for array must be an array literal".to_string(), )), }, @@ -794,21 +794,21 @@ impl Decoder { } Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for map must be a map/object literal".to_string(), )), }, Self::Enum(indices, symbols, _) => match lit { AvroLiteral::Enum(sym) => { let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| { - ArrowError::InvalidArgumentError(format!( + AvroError::InvalidArgument(format!( "Enum default symbol {sym:?} not in reader symbols" )) })?; indices.push(pos as i32); Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for enum must be a symbol".to_string(), )), }, @@ -842,7 +842,7 @@ impl Decoder { } Ok(()) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Default for record must be a map/object or null".to_string(), )), }, @@ -850,7 +850,7 @@ impl Decoder { } /// Decode a single record from `buf` - fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> { + fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<()> { match self { Self::Null(x) => *x += 1, Self::Boolean(values) => values.append(buf.get_bool()?), @@ -887,10 +887,10 @@ impl Decoder { Self::Uuid(values) => { let s_bytes = buf.get_bytes()?; let s = std::str::from_utf8(s_bytes).map_err(|e| { - ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}")) + AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}")) })?; let uuid = Uuid::try_parse(s) - .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?; + .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?; values.extend_from_slice(uuid.as_bytes()); } Self::Array(_, off, encoding) => { @@ -945,7 +945,7 @@ impl Decoder { if resolved >= 0 { indices.push(resolved); } else { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Enum symbol index {raw} not resolvable and no default provided", ))); } @@ -994,7 +994,7 @@ impl Decoder { &mut self, buf: &mut AvroCursor<'_>, promotion: Promotion, - ) -> Result<(), ArrowError> { + ) -> Result<()> { #[cfg(feature = "avro_custom_types")] if let Self::RunEndEncoded(_, len, inner) = self { *len += 1; @@ -1009,7 +1009,7 @@ impl Decoder { v.push(x as $to); Ok(()) } - other => Err(ArrowError::ParseError(format!( + other => Err(AvroError::ParseError(format!( "Promotion {promotion} target mismatch: expected {}, got {}", stringify!($variant), >::as_ref(other) @@ -1032,7 +1032,7 @@ impl Decoder { values.extend_from_slice(data); Ok(()) } - other => Err(ArrowError::ParseError(format!( + other => Err(AvroError::ParseError(format!( "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}", >::as_ref(other) ))), @@ -1046,7 +1046,7 @@ impl Decoder { values.extend_from_slice(data); Ok(()) } - other => Err(ArrowError::ParseError(format!( + other => Err(AvroError::ParseError(format!( "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}", >::as_ref(other) ))), @@ -1055,7 +1055,7 @@ impl Decoder { } /// Flush decoded records to an [`ArrayRef`] - fn flush(&mut self, nulls: Option) -> Result { + fn flush(&mut self, nulls: Option) -> Result { Ok(match self { Self::Nullable(_, n, e, _) => e.flush(n.finish())?, Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))), @@ -1152,7 +1152,7 @@ impl Decoder { let val_arr = valdec.flush(None)?; let key_arr = StringArray::try_new(koff, kd, None)?; if key_arr.len() != val_arr.len() { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Map keys length ({}) != map values length ({})", key_arr.len(), val_arr.len() @@ -1161,7 +1161,7 @@ impl Decoder { let final_len = moff.len() - 1; if let Some(n) = &nulls { if n.len() != final_len { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Map array null buffer length {} != final map length {final_len}", n.len() ))); @@ -1170,7 +1170,7 @@ impl Decoder { let entries_fields = match map_field.data_type() { DataType::Struct(fields) => fields.clone(), other => { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Map entries field must be a Struct, got {other:?}" ))); } @@ -1184,12 +1184,12 @@ impl Decoder { Self::Fixed(sz, accum) => { let b: Buffer = flush_values(accum).into(); let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls) - .map_err(|e| ArrowError::ParseError(e.to_string()))?; + .map_err(|e| AvroError::ParseError(e.to_string()))?; Arc::new(arr) } Self::Uuid(values) => { let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls) - .map_err(|e| ArrowError::ParseError(e.to_string()))?; + .map_err(|e| AvroError::ParseError(e.to_string()))?; Arc::new(arr) } #[cfg(feature = "small_decimals")] @@ -1210,7 +1210,7 @@ impl Decoder { Self::Duration(builder) => { let (_, vals, _) = builder.finish().into_parts(); let vals = IntervalMonthDayNanoArray::try_new(vals, nulls) - .map_err(|e| ArrowError::ParseError(e.to_string()))?; + .map_err(|e| AvroError::ParseError(e.to_string()))?; Arc::new(vals) } #[cfg(feature = "avro_custom_types")] @@ -1228,7 +1228,7 @@ impl Decoder { } } if n > (u32::MAX as usize) { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take", ))); } @@ -1239,7 +1239,7 @@ impl Decoder { values.slice(0, 0) } else { take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| { - ArrowError::ParseError(format!("take() for REE values failed: {e}")) + AvroError::ParseError(format!("take() for REE values failed: {e}")) })? }; @@ -1256,14 +1256,14 @@ impl Decoder { } let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect(); let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref()) - .map_err(|e| ArrowError::ParseError(e.to_string()))?; + .map_err(|e| AvroError::ParseError(e.to_string()))?; Arc::new(run_arr) as ArrayRef }}; } match *width { 2 => { if n > i16::MAX as usize { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "RunEndEncoded length {n} exceeds i16::MAX for run end width 2" ))); } @@ -1272,7 +1272,7 @@ impl Decoder { 4 => build_run_array!(i32, Int32Type), 8 => build_run_array!(i64, Int64Type), other => { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Unsupported run-end width {other} for RunEndEncoded" ))); } @@ -1314,16 +1314,14 @@ struct DispatchLookupTable { const NO_SOURCE: i8 = -1; impl DispatchLookupTable { - fn from_writer_to_reader( - promotion_map: &[Option<(usize, Promotion)>], - ) -> Result { + fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> Result { let mut to_reader = Vec::with_capacity(promotion_map.len()); let mut promotion = Vec::with_capacity(promotion_map.len()); for map in promotion_map { match *map { Some((idx, promo)) => { let idx_i8 = i8::try_from(idx).map_err(|_| { - ArrowError::SchemaError(format!( + AvroError::SchemaError(format!( "Reader branch index {idx} exceeds i8 range (max {})", i8::MAX )) @@ -1401,7 +1399,7 @@ impl UnionDecoder { fields: UnionFields, branches: Vec, resolved: Option, - ) -> Result { + ) -> Result { let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::>(); let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_))); let default_emit_idx = 0; @@ -1410,7 +1408,7 @@ impl UnionDecoder { // Guard against impractically large unions that cannot be indexed by an Avro int let max_addr = (i32::MAX as usize) + 1; if branches.len() > max_addr { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Reader union has {} branches, which exceeds the maximum addressable \ branches by an Avro int tag ({} + 1).", branches.len(), @@ -1430,10 +1428,7 @@ impl UnionDecoder { }) } - fn try_new_from_writer_union( - info: ResolvedUnion, - target: Box, - ) -> Result { + fn try_new_from_writer_union(info: ResolvedUnion, target: Box) -> Result { // This constructor is only for writer-union to single-type resolution debug_assert!(info.writer_is_union && !info.reader_is_union); let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?; @@ -1446,7 +1441,7 @@ impl UnionDecoder { }) } - fn plan_from_resolved(resolved: Option) -> Result { + fn plan_from_resolved(resolved: Option) -> Result { let Some(info) = resolved else { return Ok(UnionReadPlan::Passthrough); }; @@ -1460,7 +1455,7 @@ impl UnionDecoder { let Some(&(reader_idx, promotion)) = info.writer_to_reader.first().and_then(Option::as_ref) else { - return Err(ArrowError::SchemaError( + return Err(AvroError::SchemaError( "Writer type does not match any reader union branch".to_string(), )); }; @@ -1469,12 +1464,12 @@ impl UnionDecoder { promotion, }) } - (true, false) => Err(ArrowError::InvalidArgumentError( + (true, false) => Err(AvroError::InvalidArgument( "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target" .to_string(), )), // (false, false) is invalid and should never be constructed by the resolver. - _ => Err(ArrowError::SchemaError( + _ => Err(AvroError::SchemaError( "ResolvedUnion constructed for non-union sides; resolver should return None" .to_string(), )), @@ -1482,19 +1477,19 @@ impl UnionDecoder { } #[inline] - fn read_tag(buf: &mut AvroCursor<'_>) -> Result { + fn read_tag(buf: &mut AvroCursor<'_>) -> Result { // Avro unions are encoded by first writing the zero-based branch index. // In Avro 1.11.1 this is specified as an *int*; older specs said *long*, // but both use zig-zag varint encoding, so decoding as long is compatible // with either form and widely used in practice. let raw = buf.get_long()?; if raw < 0 { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Negative union branch index {raw}" ))); } usize::try_from(raw).map_err(|_| { - ArrowError::ParseError(format!( + AvroError::ParseError(format!( "Union branch index {raw} does not fit into usize on this platform ({}-bit)", (usize::BITS as usize) )) @@ -1502,10 +1497,10 @@ impl UnionDecoder { } #[inline] - fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, ArrowError> { + fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder> { let branches_len = self.branches.len(); let Some(reader_branch) = self.branches.get_mut(reader_idx) else { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Union branch index {reader_idx} out of range ({branches_len} branches)" ))); }; @@ -1516,9 +1511,9 @@ impl UnionDecoder { } #[inline] - fn on_decoder(&mut self, fallback_idx: usize, action: F) -> Result<(), ArrowError> + fn on_decoder(&mut self, fallback_idx: usize, action: F) -> Result<()> where - F: FnOnce(&mut Decoder) -> Result<(), ArrowError>, + F: FnOnce(&mut Decoder) -> Result<()>, { if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan { return action(target); @@ -1530,21 +1525,21 @@ impl UnionDecoder { self.emit_to(reader_idx).and_then(action) } - fn append_null(&mut self) -> Result<(), ArrowError> { + fn append_null(&mut self) -> Result<()> { self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null()) } - fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> { + fn append_default(&mut self, lit: &AvroLiteral) -> Result<()> { self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit)) } - fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> { + fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<()> { let (reader_idx, promotion) = match &mut self.plan { UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct), UnionReadPlan::ReaderUnion { lookup_table } => { let idx = Self::read_tag(buf)?; lookup_table.resolve(idx).ok_or_else(|| { - ArrowError::ParseError(format!( + AvroError::ParseError(format!( "Union branch index {idx} not resolvable by reader schema" )) })? @@ -1560,7 +1555,7 @@ impl UnionDecoder { let idx = Self::read_tag(buf)?; return match lookup_table.resolve(idx) { Some((_, promotion)) => target.decode_with_promotion(buf, promotion), - None => Err(ArrowError::ParseError(format!( + None => Err(AvroError::ParseError(format!( "Writer union branch {idx} does not resolve to reader type" ))), }; @@ -1570,7 +1565,7 @@ impl UnionDecoder { decoder.decode_with_promotion(buf, promotion) } - fn flush(&mut self, nulls: Option) -> Result { + fn flush(&mut self, nulls: Option) -> Result { if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan { return target.flush(nulls); } @@ -1590,7 +1585,7 @@ impl UnionDecoder { Some(flush_values(&mut self.offsets).into_iter().collect()), children, ) - .map_err(|e| ArrowError::ParseError(e.to_string()))?; + .map_err(|e| AvroError::ParseError(e.to_string()))?; Ok(Arc::new(arr)) } } @@ -1628,7 +1623,7 @@ impl UnionDecoderBuilder { self } - fn build(self) -> Result { + fn build(self) -> Result { match (self.resolved, self.fields, self.branches, self.target) { (resolved, Some(fields), Some(branches), None) => { UnionDecoder::try_new(fields, branches, resolved) @@ -1638,7 +1633,7 @@ impl UnionDecoderBuilder { { UnionDecoder::try_new_from_writer_union(info, target) } - _ => Err(ArrowError::InvalidArgumentError( + _ => Err(AvroError::InvalidArgument( "Invalid UnionDecoderBuilder configuration: expected either \ (fields + branches + resolved) with no target for reader-unions, or \ (resolved + target) with no fields/branches for writer-union to single." @@ -1657,8 +1652,8 @@ enum NegativeBlockBehavior { #[inline] fn skip_blocks( buf: &mut AvroCursor, - mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, -) -> Result { + mut skip_item: impl FnMut(&mut AvroCursor) -> Result<()>, +) -> Result { process_blockwise( buf, move |c| skip_item(c), @@ -1671,30 +1666,30 @@ fn flush_dict( indices: &mut Vec, symbols: &[String], nulls: Option, -) -> Result { +) -> Result { let keys = flush_primitive::(indices, nulls); let values = Arc::new(StringArray::from_iter_values( symbols.iter().map(|s| s.as_str()), )); DictionaryArray::try_new(keys, values) - .map_err(|e| ArrowError::ParseError(e.to_string())) + .map_err(Into::into) .map(|arr| Arc::new(arr) as ArrayRef) } #[inline] fn read_blocks( buf: &mut AvroCursor, - decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, -) -> Result { + decode_entry: impl FnMut(&mut AvroCursor) -> Result<()>, +) -> Result { process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems) } #[inline] fn process_blockwise( buf: &mut AvroCursor, - mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, + mut on_item: impl FnMut(&mut AvroCursor) -> Result<()>, negative_behavior: NegativeBlockBehavior, -) -> Result { +) -> Result { let mut total = 0usize; loop { // Read the block count @@ -1756,7 +1751,7 @@ fn flush_primitive( fn read_decimal_bytes_be( buf: &mut AvroCursor<'_>, size: &Option, -) -> Result<[u8; N], ArrowError> { +) -> Result<[u8; N]> { match size { Some(n) if *n == N => { let raw = buf.get_fixed(N)?; @@ -1784,7 +1779,7 @@ fn read_decimal_bytes_be( /// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte /// and the MSB of the first kept byte must match the sign (to avoid silent overflow). #[inline] -fn sign_cast_to(raw: &[u8]) -> Result<[u8; N], ArrowError> { +fn sign_cast_to(raw: &[u8]) -> Result<[u8; N]> { let len = raw.len(); // Fast path: exact width, just copy if len == N { @@ -1803,7 +1798,7 @@ fn sign_cast_to(raw: &[u8]) -> Result<[u8; N], ArrowError> { let extra = len - N; // Any non-sign byte in the truncated prefix indicates overflow if raw[..extra].iter().any(|&b| b != sign_byte) { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Decimal value with {} bytes cannot be represented in {} bytes without overflow", len, N ))); @@ -1812,7 +1807,7 @@ fn sign_cast_to(raw: &[u8]) -> Result<[u8; N], ArrowError> { let first_kept = raw[extra]; let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0; if sign_bit_mismatch { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Decimal value with {} bytes cannot be represented in {} bytes without overflow", len, N ))); @@ -1863,7 +1858,7 @@ impl<'a> ProjectorBuilder<'a> { } #[inline] - fn build(self) -> Result { + fn build(self) -> Result { let reader_fields = self.reader_fields; let mut field_defaults: Vec> = Vec::with_capacity(reader_fields.len()); for avro_field in reader_fields.as_ref() { @@ -1904,7 +1899,7 @@ impl<'a> ProjectorBuilder<'a> { impl Projector { #[inline] - fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), ArrowError> { + fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<()> { // SAFETY: `index` is obtained by listing the reader's record fields (i.e., from // `decoders.iter_mut().enumerate()`), and `field_defaults` was built in // `ProjectorBuilder::build` to have exactly one element per reader field. @@ -1923,7 +1918,7 @@ impl Projector { &mut self, buf: &mut AvroCursor<'_>, encodings: &mut [Decoder], - ) -> Result<(), ArrowError> { + ) -> Result<()> { debug_assert_eq!( self.writer_to_reader.len(), self.skip_decoders.len(), @@ -1939,7 +1934,7 @@ impl Projector { (Some(reader_index), _) => encodings[*reader_index].decode(buf)?, (None, Some(skipper)) => skipper.skip(buf)?, (None, None) => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "No skipper available for writer-only field at index {i}", ))); } @@ -1986,7 +1981,7 @@ enum Skipper { } impl Skipper { - fn from_avro(dt: &AvroDataType) -> Result { + fn from_avro(dt: &AvroDataType) -> Result { let mut base = match dt.codec() { Codec::Null => Self::Null, Codec::Boolean => Self::Boolean, @@ -2021,7 +2016,7 @@ impl Skipper { Codec::Union(encodings, _, _) => { let max_addr = (i32::MAX as usize) + 1; if encodings.len() > max_addr { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Writer union has {} branches, which exceeds the maximum addressable \ branches by an Avro int tag ({} + 1).", encodings.len(), @@ -2046,7 +2041,7 @@ impl Skipper { Ok(base) } - fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> { + fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<()> { match self { Self::Null => Ok(()), Self::Boolean => { @@ -2118,18 +2113,18 @@ impl Skipper { // Union tag must be ZigZag-decoded let raw = buf.get_long()?; if raw < 0 { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Negative union branch index {raw}" ))); } let idx: usize = usize::try_from(raw).map_err(|_| { - ArrowError::ParseError(format!( + AvroError::ParseError(format!( "Union branch index {raw} does not fit into usize on this platform ({}-bit)", (usize::BITS as usize) )) })?; let Some(encoding) = encodings.get_mut(idx) else { - return Err(ArrowError::ParseError(format!( + return Err(AvroError::ParseError(format!( "Union branch index {idx} out of range for skipper ({} branches)", encodings.len() ))); @@ -4325,7 +4320,7 @@ mod tests { #[cfg(feature = "avro_custom_types")] #[test] - fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), ArrowError> { + fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<()> { for codec in [ Codec::DurationNanos, Codec::DurationMicros, @@ -4344,7 +4339,7 @@ mod tests { #[cfg(feature = "avro_custom_types")] #[test] - fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), ArrowError> { + fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<()> { let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3]; for codec in [ Codec::DurationNanos, @@ -4372,7 +4367,7 @@ mod tests { #[cfg(feature = "avro_custom_types")] #[test] - fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), ArrowError> { + fn skipper_nullable_custom_duration_respects_null_first() -> Result<()> { let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst)); let mut s = Skipper::from_avro(&dt)?; match &s { @@ -4401,7 +4396,7 @@ mod tests { #[cfg(feature = "avro_custom_types")] #[test] - fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), ArrowError> { + fn skipper_nullable_custom_duration_respects_null_second() -> Result<()> { let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond)); let mut s = Skipper::from_avro(&dt)?; match &s { @@ -4432,7 +4427,7 @@ mod tests { } #[test] - fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), ArrowError> { + fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<()> { let dt = make_avro_dt(Codec::Interval, None); let mut s = Skipper::from_avro(&dt)?; match s { diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index 79aee4fae0c7..b09555cf585d 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -18,6 +18,7 @@ //! Avro Encoder for Arrow types. use crate::codec::{AvroDataType, AvroField, Codec}; +use crate::errors::{AvroError, Result}; use crate::schema::{Fingerprint, Nullability, Prefix}; use arrow_array::cast::AsArray; use arrow_array::types::{ @@ -40,9 +41,7 @@ use arrow_array::{ #[cfg(feature = "small_decimals")] use arrow_array::{Decimal32Array, Decimal64Array}; use arrow_buffer::{ArrowNativeType, NullBuffer}; -use arrow_schema::{ - ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode, -}; +use arrow_schema::{DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode}; use std::io::Write; use std::sync::Arc; use uuid::Uuid; @@ -51,7 +50,7 @@ use uuid::Uuid; /// /// Spec: #[inline] -pub(crate) fn write_long(out: &mut W, value: i64) -> Result<(), ArrowError> { +pub(crate) fn write_long(out: &mut W, value: i64) -> Result<()> { let mut zz = ((value << 1) ^ (value >> 63)) as u64; // At most 10 bytes for 64-bit varint let mut buf = [0u8; 10]; @@ -63,26 +62,26 @@ pub(crate) fn write_long(out: &mut W, value: i64) -> Result<( } buf[i] = (zz & 0x7F) as u8; i += 1; - out.write_all(&buf[..i]) - .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e)) + out.write_all(&buf[..i])?; + Ok(()) } #[inline] -fn write_int(out: &mut W, value: i32) -> Result<(), ArrowError> { +fn write_int(out: &mut W, value: i32) -> Result<()> { write_long(out, value as i64) } #[inline] -fn write_len_prefixed(out: &mut W, bytes: &[u8]) -> Result<(), ArrowError> { +fn write_len_prefixed(out: &mut W, bytes: &[u8]) -> Result<()> { write_long(out, bytes.len() as i64)?; - out.write_all(bytes) - .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e)) + out.write_all(bytes)?; + Ok(()) } #[inline] -fn write_bool(out: &mut W, v: bool) -> Result<(), ArrowError> { - out.write_all(&[if v { 1 } else { 0 }]) - .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e)) +fn write_bool(out: &mut W, v: bool) -> Result<()> { + out.write_all(&[if v { 1 } else { 0 }])?; + Ok(()) } /// Minimal two's-complement big-endian representation helper for Avro decimal (bytes). @@ -129,16 +128,11 @@ fn minimal_twos_complement(be: &[u8]) -> &[u8] { /// /// Used for encoding Avro decimal values into `fixed(N)` fields. #[inline] -fn write_sign_extended( - out: &mut W, - src_be: &[u8], - n: usize, -) -> Result<(), ArrowError> { +fn write_sign_extended(out: &mut W, src_be: &[u8], n: usize) -> Result<()> { let len = src_be.len(); if len == n { - return out - .write_all(src_be) - .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e)); + out.write_all(src_be)?; + return Ok(()); } let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 { 0xFF @@ -155,13 +149,13 @@ fn write_sign_extended( if src_be[..extra].iter().any(|&b| b != sign_byte) || ((src_be[extra] ^ sign_byte) & 0x80) != 0 { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow", ))); } return out .write_all(&src_be[extra..]) - .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e)); + .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e)); } // len < n: prepend sign bytes (sign extension) then the payload let pad_len = n - len; @@ -178,15 +172,15 @@ fn write_sign_extended( let mut rem = pad_len; while rem >= pad.len() { out.write_all(pad) - .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?; + .map_err(|e| AvroError::General(format!("write decimal fixed: {e}")))?; rem -= pad.len(); } if rem > 0 { out.write_all(&pad[..rem]) - .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?; + .map_err(|e| AvroError::General(format!("write decimal fixed: {e}")))?; } out.write_all(src_be) - .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e)) + .map_err(|e| AvroError::General(format!("write decimal fixed: {e}"))) } /// Write the union branch index for an optional field. @@ -198,10 +192,10 @@ fn write_optional_index( out: &mut W, is_null: bool, null_order: Nullability, -) -> Result<(), ArrowError> { +) -> Result<(), AvroError> { let byte = union_value_branch_byte(null_order, is_null); out.write_all(&[byte]) - .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), e)) + .map_err(|e| AvroError::General(format!("write union branch: {e}"))) } #[derive(Debug, Clone)] @@ -230,7 +224,7 @@ impl<'a> FieldEncoder<'a> { field: &Field, plan: &FieldPlan, nullability: Option, - ) -> Result { + ) -> Result { let encoder = match plan { FieldPlan::Scalar => match array.data_type() { DataType::Null => Encoder::Null, @@ -245,25 +239,21 @@ impl<'a> FieldEncoder<'a> { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| { - ArrowError::SchemaError("Expected StringViewArray".into()) - })?; + .ok_or_else(|| AvroError::SchemaError("Expected StringViewArray".into()))?; Encoder::Utf8View(Utf8ViewEncoder(arr)) } DataType::BinaryView => { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| { - ArrowError::SchemaError("Expected BinaryViewArray".into()) - })?; + .ok_or_else(|| AvroError::SchemaError("Expected BinaryViewArray".into()))?; Encoder::BinaryView(BinaryViewEncoder(arr)) } DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::())), DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::())), DataType::Date32 => Encoder::Date32(IntEncoder(array.as_primitive::())), DataType::Date64 => { - return Err(ArrowError::NotYetImplemented( + return Err(AvroError::NYI( "Avro logical type 'date' is days since epoch (int). Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a Timestamp." .into(), )); @@ -275,13 +265,13 @@ impl<'a> FieldEncoder<'a> { Encoder::Time32Millis(IntEncoder(array.as_primitive::())) } DataType::Time32(TimeUnit::Microsecond) => { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Arrow Time32 only supports Second or Millisecond. Use Time64 for microseconds." .into(), )); } DataType::Time32(TimeUnit::Nanosecond) => { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Arrow Time32 only supports Second or Millisecond. Use Time64 for nanoseconds." .into(), )); @@ -290,19 +280,19 @@ impl<'a> FieldEncoder<'a> { array.as_primitive::(), )), DataType::Time64(TimeUnit::Nanosecond) => { - return Err(ArrowError::NotYetImplemented( + return Err(AvroError::NYI( "Avro writer does not support time-nanos; cast to Time64(Microsecond)." .into(), )); } DataType::Time64(TimeUnit::Millisecond) => { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Arrow Time64 with millisecond unit is not a valid Arrow type (use Time32 for millis)." .into(), )); } DataType::Time64(TimeUnit::Second) => { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Arrow Time64 with second unit is not a valid Arrow type (use Time32 for seconds)." .into(), )); @@ -322,7 +312,7 @@ impl<'a> FieldEncoder<'a> { .as_any() .downcast_ref::() .ok_or_else(|| { - ArrowError::SchemaError("Expected FixedSizeBinaryArray".into()) + AvroError::SchemaError("Expected FixedSizeBinaryArray".into()) })?; Encoder::Fixed(FixedEncoder(arr)) } @@ -368,7 +358,7 @@ impl<'a> FieldEncoder<'a> { )), }, other => { - return Err(ArrowError::NotYetImplemented(format!( + return Err(AvroError::NYI(format!( "Avro scalar type not yet supported: {other:?}" ))); } @@ -377,7 +367,7 @@ impl<'a> FieldEncoder<'a> { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected StructArray".into()))?; + .ok_or_else(|| AvroError::SchemaError("Expected StructArray".into()))?; Encoder::Struct(Box::new(StructEncoder::try_new(arr, bindings)?)) } FieldPlan::List { @@ -388,7 +378,7 @@ impl<'a> FieldEncoder<'a> { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected ListArray".into()))?; + .ok_or_else(|| AvroError::SchemaError("Expected ListArray".into()))?; Encoder::List(Box::new(ListEncoder32::try_new( arr, *items_nullability, @@ -399,7 +389,7 @@ impl<'a> FieldEncoder<'a> { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected LargeListArray".into()))?; + .ok_or_else(|| AvroError::SchemaError("Expected LargeListArray".into()))?; Encoder::LargeList(Box::new(ListEncoder64::try_new( arr, *items_nullability, @@ -410,7 +400,7 @@ impl<'a> FieldEncoder<'a> { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected ListViewArray".into()))?; + .ok_or_else(|| AvroError::SchemaError("Expected ListViewArray".into()))?; Encoder::ListView(Box::new(ListViewEncoder32::try_new( arr, *items_nullability, @@ -422,7 +412,7 @@ impl<'a> FieldEncoder<'a> { .as_any() .downcast_ref::() .ok_or_else(|| { - ArrowError::SchemaError("Expected LargeListViewArray".into()) + AvroError::SchemaError("Expected LargeListViewArray".into()) })?; Encoder::LargeListView(Box::new(ListViewEncoder64::try_new( arr, @@ -435,7 +425,7 @@ impl<'a> FieldEncoder<'a> { .as_any() .downcast_ref::() .ok_or_else(|| { - ArrowError::SchemaError("Expected FixedSizeListArray".into()) + AvroError::SchemaError("Expected FixedSizeListArray".into()) })?; Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new( arr, @@ -444,7 +434,7 @@ impl<'a> FieldEncoder<'a> { )?)) } other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro array site requires Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}" ))); } @@ -455,7 +445,7 @@ impl<'a> FieldEncoder<'a> { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; + .ok_or_else(|| AvroError::SchemaError("Expected Decimal32Array".into()))?; Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size)) } #[cfg(feature = "small_decimals")] @@ -463,29 +453,25 @@ impl<'a> FieldEncoder<'a> { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected Decimal64Array".into()))?; + .ok_or_else(|| AvroError::SchemaError("Expected Decimal64Array".into()))?; Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size)) } DataType::Decimal128(_, _) => { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| { - ArrowError::SchemaError("Expected Decimal128Array".into()) - })?; + .ok_or_else(|| AvroError::SchemaError("Expected Decimal128Array".into()))?; Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size)) } DataType::Decimal256(_, _) => { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| { - ArrowError::SchemaError("Expected Decimal256Array".into()) - })?; + .ok_or_else(|| AvroError::SchemaError("Expected Decimal256Array".into()))?; Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size)) } other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}" ))); } @@ -495,7 +481,7 @@ impl<'a> FieldEncoder<'a> { .as_any() .downcast_ref::() .ok_or_else(|| { - ArrowError::SchemaError("Expected FixedSizeBinaryArray".into()) + AvroError::SchemaError("Expected FixedSizeBinaryArray".into()) })?; Encoder::Uuid(UuidEncoder(arr)) } @@ -506,7 +492,7 @@ impl<'a> FieldEncoder<'a> { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected MapArray".into()))?; + .ok_or_else(|| AvroError::SchemaError("Expected MapArray".into()))?; Encoder::Map(Box::new(MapEncoder::try_new( arr, *values_nullability, @@ -516,7 +502,7 @@ impl<'a> FieldEncoder<'a> { FieldPlan::Enum { symbols } => match array.data_type() { DataType::Dictionary(key_dt, value_dt) => { if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 { - return Err(ArrowError::SchemaError( + return Err(AvroError::SchemaError( "Avro enum requires Dictionary".into(), )); } @@ -524,17 +510,17 @@ impl<'a> FieldEncoder<'a> { .as_any() .downcast_ref::>() .ok_or_else(|| { - ArrowError::SchemaError("Expected DictionaryArray".into()) + AvroError::SchemaError("Expected DictionaryArray".into()) })?; let values = dict .values() .as_any() .downcast_ref::() .ok_or_else(|| { - ArrowError::SchemaError("Dictionary values must be Utf8".into()) + AvroError::SchemaError("Dictionary values must be Utf8".into()) })?; if values.len() != symbols.len() { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Enum symbol length {} != dictionary size {}", symbols.len(), values.len() @@ -542,7 +528,7 @@ impl<'a> FieldEncoder<'a> { } for i in 0..values.len() { if values.value(i) != symbols[i].as_str() { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Enum symbol mismatch at {i}: schema='{}' dict='{}'", symbols[i], values.value(i) @@ -553,7 +539,7 @@ impl<'a> FieldEncoder<'a> { Encoder::Enum(EnumEncoder { keys }) } other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro enum site requires DataType::Dictionary, found: {other:?}" ))); } @@ -562,7 +548,7 @@ impl<'a> FieldEncoder<'a> { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected UnionArray".into()))?; + .ok_or_else(|| AvroError::SchemaError("Expected UnionArray".into()))?; Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?)) } @@ -574,13 +560,13 @@ impl<'a> FieldEncoder<'a> { let values_field = match dt { DataType::RunEndEncoded(_re_field, v_field) => v_field.as_ref(), other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro RunEndEncoded site requires Arrow DataType::RunEndEncoded, found: {other:?}" ))); } }; // Helper closure to build a typed RunEncodedEncoder - let build = |run_arr_any: &'a dyn Array| -> Result, ArrowError> { + let build = |run_arr_any: &'a dyn Array| -> Result> { if let Some(arr) = run_arr_any.as_any().downcast_ref::>() { let values_enc = prepare_value_site_encoder( arr.values().as_ref(), @@ -620,7 +606,7 @@ impl<'a> FieldEncoder<'a> { arr, values_enc )))); } - Err(ArrowError::SchemaError( + Err(AvroError::SchemaError( "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64" .into(), )) @@ -632,7 +618,7 @@ impl<'a> FieldEncoder<'a> { let null_state = match (nullability, array.null_count() > 0) { (None, false) => NullState::NonNullable, (None, true) => { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Avro site '{}' is non-nullable, but array contains nulls", field.name() ))); @@ -645,7 +631,7 @@ impl<'a> FieldEncoder<'a> { } (Some(null_order), true) => { let Some(nulls) = array.nulls().cloned() else { - return Err(ArrowError::InvalidArgumentError(format!( + return Err(AvroError::InvalidArgument(format!( "Array for Avro site '{}' reports nulls but has no null buffer", field.name() ))); @@ -659,12 +645,12 @@ impl<'a> FieldEncoder<'a> { }) } - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { match &self.null_state { NullState::NonNullable => {} - NullState::NullableNoNulls { union_value_byte } => out - .write_all(&[*union_value_byte]) - .map_err(|e| ArrowError::IoError(format!("write union value branch: {e}"), e))?, + NullState::NullableNoNulls { union_value_byte } => { + out.write_all(&[*union_value_byte])? + } NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => { return write_optional_index(out, true, *null_order); // no value to write } @@ -751,10 +737,10 @@ impl<'a> RecordEncoderBuilder<'a> { /// Build the `RecordEncoder` by walking the Avro **record** root in Avro order, /// resolving each field to an Arrow index by name. - pub(crate) fn build(self) -> Result { + pub(crate) fn build(self) -> Result { let avro_root_dt = self.avro_root.data_type(); let Codec::Struct(root_fields) = avro_root_dt.codec() else { - return Err(ArrowError::SchemaError( + return Err(AvroError::SchemaError( "Top-level Avro schema must be a record/struct".into(), )); }; @@ -762,7 +748,7 @@ impl<'a> RecordEncoderBuilder<'a> { for root_field in root_fields.as_ref() { let name = root_field.name(); let arrow_index = self.arrow_schema.index_of(name).map_err(|e| { - ArrowError::SchemaError(format!("Schema mismatch for field '{name}': {e}")) + AvroError::SchemaError(format!("Schema mismatch for field '{name}': {e}")) })?; columns.push(FieldBinding { arrow_index, @@ -793,10 +779,7 @@ pub(crate) struct RecordEncoder { } impl RecordEncoder { - fn prepare_for_batch<'a>( - &'a self, - batch: &'a RecordBatch, - ) -> Result>, ArrowError> { + fn prepare_for_batch<'a>(&'a self, batch: &'a RecordBatch) -> Result>> { let schema_binding = batch.schema(); let fields = schema_binding.fields(); let arrays = batch.columns(); @@ -804,7 +787,7 @@ impl RecordEncoder { for col_plan in self.columns.iter() { let arrow_index = col_plan.arrow_index; let array = arrays.get(arrow_index).ok_or_else(|| { - ArrowError::SchemaError(format!("Column index {arrow_index} out of range")) + AvroError::SchemaError(format!("Column index {arrow_index} out of range")) })?; let field = fields[arrow_index].as_ref(); #[cfg(not(feature = "avro_custom_types"))] @@ -828,18 +811,13 @@ impl RecordEncoder { /// Encode a `RecordBatch` using this encoder plan. /// /// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of many small writes. - pub(crate) fn encode( - &self, - out: &mut W, - batch: &RecordBatch, - ) -> Result<(), ArrowError> { + pub(crate) fn encode(&self, out: &mut W, batch: &RecordBatch) -> Result<()> { let mut column_encoders = self.prepare_for_batch(batch)?; let n = batch.num_rows(); match self.prefix { Some(prefix) => { for row in 0..n { - out.write_all(prefix.as_slice()) - .map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?; + out.write_all(prefix.as_slice())?; for enc in column_encoders.iter_mut() { enc.encode(out, row)?; } @@ -869,7 +847,7 @@ fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option { } impl FieldPlan { - fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result { + fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result { #[cfg(not(feature = "avro_custom_types"))] if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() { let values_nullability = avro_dt.nullability(); @@ -878,7 +856,7 @@ impl FieldPlan { .iter() .find(|b| !matches!(b.codec(), Codec::Null)) .ok_or_else(|| { - ArrowError::SchemaError( + AvroError::SchemaError( "Avro union at RunEndEncoded site has no non-null branch".into(), ) })?, @@ -911,7 +889,7 @@ impl FieldPlan { == Some("uuid"); if ext_is_uuid || md_is_uuid { if *len != 16 { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "logicalType=uuid requires FixedSizeBinary(16)".into(), )); } @@ -923,7 +901,7 @@ impl FieldPlan { let fields = match arrow_field.data_type() { DataType::Struct(struct_fields) => struct_fields, other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro struct maps to Arrow Struct, found: {other:?}" ))); } @@ -932,7 +910,7 @@ impl FieldPlan { for avro_field in avro_fields.iter() { let name = avro_field.name().to_string(); let idx = find_struct_child_index(fields, &name).ok_or_else(|| { - ArrowError::SchemaError(format!( + AvroError::SchemaError(format!( "Struct field '{name}' not present in Arrow field '{}'", arrow_field.name() )) @@ -957,7 +935,7 @@ impl FieldPlan { items_nullability: items_dt.nullability(), item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?), }), - other => Err(ArrowError::SchemaError(format!( + other => Err(AvroError::SchemaError(format!( "Avro array maps to Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}" ))), }, @@ -965,7 +943,7 @@ impl FieldPlan { let entries_field = match arrow_field.data_type() { DataType::Map(entries, _sorted) => entries.as_ref(), other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro map maps to Arrow DataType::Map, found: {other:?}" ))); } @@ -973,14 +951,14 @@ impl FieldPlan { let entries_struct_fields = match entries_field.data_type() { DataType::Struct(fs) => fs, other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Arrow Map entries must be Struct, found: {other:?}" ))); } }; let value_idx = find_map_value_field_index(entries_struct_fields).ok_or_else(|| { - ArrowError::SchemaError("Map entries struct missing value field".into()) + AvroError::SchemaError("Map entries struct missing value field".into()) })?; let value_field = entries_struct_fields[value_idx].as_ref(); let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?; @@ -992,12 +970,12 @@ impl FieldPlan { Codec::Enum(symbols) => match arrow_field.data_type() { DataType::Dictionary(key_dt, value_dt) => { if **key_dt != DataType::Int32 { - return Err(ArrowError::SchemaError( + return Err(AvroError::SchemaError( "Avro enum requires Dictionary".into(), )); } if **value_dt != DataType::Utf8 { - return Err(ArrowError::SchemaError( + return Err(AvroError::SchemaError( "Avro enum requires Dictionary".into(), )); } @@ -1005,7 +983,7 @@ impl FieldPlan { symbols: symbols.clone(), }) } - other => Err(ArrowError::SchemaError(format!( + other => Err(AvroError::SchemaError(format!( "Avro enum maps to Arrow Dictionary, found: {other:?}" ))), }, @@ -1019,7 +997,7 @@ impl FieldPlan { DataType::Decimal128(p, s) => (*p as usize, *s as i32), DataType::Decimal256(p, s) => (*p as usize, *s as i32), other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro decimal requires Arrow decimal, got {other:?} for field '{}'", arrow_field.name() ))); @@ -1027,7 +1005,7 @@ impl FieldPlan { }; let sc = scale_opt.unwrap_or(0) as i32; // Avro scale defaults to 0 if absent if ap != *precision || as_ != sc { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})", arrow_field.name() ))); @@ -1040,7 +1018,7 @@ impl FieldPlan { DataType::Interval( IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime, ) => Ok(FieldPlan::Scalar), - other => Err(ArrowError::SchemaError(format!( + other => Err(AvroError::SchemaError(format!( "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}" ))), }, @@ -1048,18 +1026,18 @@ impl FieldPlan { let arrow_union_fields = match arrow_field.data_type() { DataType::Union(fields, UnionMode::Dense) => fields, DataType::Union(_, UnionMode::Sparse) => { - return Err(ArrowError::NotYetImplemented( + return Err(AvroError::NYI( "Sparse Arrow unions are not yet supported".to_string(), )); } other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro union maps to Arrow Union, found: {other:?}" ))); } }; if avro_branches.len() != arrow_union_fields.len() { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'", avro_branches.len(), arrow_union_fields.len(), @@ -1077,10 +1055,10 @@ impl FieldPlan { plan: FieldPlan::build(avro_branch, arrow_child_field)?, }) }) - .collect::, ArrowError>>()?; + .collect::>>()?; Ok(FieldPlan::Union { bindings }) } - Codec::Union(_, _, UnionMode::Sparse) => Err(ArrowError::NotYetImplemented( + Codec::Union(_, _, UnionMode::Sparse) => Err(AvroError::NYI( "Sparse Arrow unions are not yet supported".to_string(), )), #[cfg(feature = "avro_custom_types")] @@ -1088,7 +1066,7 @@ impl FieldPlan { let values_field = match arrow_field.data_type() { DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(), other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}" ))); } @@ -1162,7 +1140,7 @@ enum Encoder<'a> { impl<'a> Encoder<'a> { /// Encode the value at `idx`. - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { match self { Encoder::Boolean(e) => e.encode(out, idx), Encoder::Int(e) => e.encode(out, idx), @@ -1217,7 +1195,7 @@ impl<'a> Encoder<'a> { struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray); impl BooleanEncoder<'_> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { write_bool(out, self.0.value(idx)) } } @@ -1225,7 +1203,7 @@ impl BooleanEncoder<'_> { /// Generic Avro `int` encoder for primitive arrays with `i32` native values. struct IntEncoder<'a, P: ArrowPrimitiveType>(&'a PrimitiveArray

); impl<'a, P: ArrowPrimitiveType> IntEncoder<'a, P> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { write_int(out, self.0.value(idx)) } } @@ -1233,7 +1211,7 @@ impl<'a, P: ArrowPrimitiveType> IntEncoder<'a, P> { /// Generic Avro `long` encoder for primitive arrays with `i64` native values. struct LongEncoder<'a, P: ArrowPrimitiveType>(&'a PrimitiveArray

); impl<'a, P: ArrowPrimitiveType> LongEncoder<'a, P> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { write_long(out, self.0.value(idx)) } } @@ -1242,11 +1220,11 @@ impl<'a, P: ArrowPrimitiveType> LongEncoder<'a, P> { struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray); impl<'a> Time32SecondsToMillisEncoder<'a> { #[inline] - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let secs = self.0.value(idx); - let millis = secs.checked_mul(1000).ok_or_else(|| { - ArrowError::InvalidArgumentError("time32(secs) * 1000 overflowed".into()) - })?; + let millis = secs + .checked_mul(1000) + .ok_or_else(|| AvroError::InvalidArgument("time32(secs) * 1000 overflowed".into()))?; write_int(out, millis) } } @@ -1255,10 +1233,10 @@ impl<'a> Time32SecondsToMillisEncoder<'a> { struct TimestampSecondsToMillisEncoder<'a>(&'a PrimitiveArray); impl<'a> TimestampSecondsToMillisEncoder<'a> { #[inline] - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let secs = self.0.value(idx); let millis = secs.checked_mul(1000).ok_or_else(|| { - ArrowError::InvalidArgumentError("timestamp(secs) * 1000 overflowed".into()) + AvroError::InvalidArgument("timestamp(secs) * 1000 overflowed".into()) })?; write_long(out, millis) } @@ -1267,7 +1245,7 @@ impl<'a> TimestampSecondsToMillisEncoder<'a> { /// Unified binary encoder generic over offset size (i32/i64). struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray); impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { write_len_prefixed(out, self.0.value(idx)) } } @@ -1275,7 +1253,7 @@ impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> { /// BinaryView (byte view) encoder. struct BinaryViewEncoder<'a>(&'a BinaryViewArray); impl BinaryViewEncoder<'_> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { write_len_prefixed(out, self.0.value(idx)) } } @@ -1283,35 +1261,34 @@ impl BinaryViewEncoder<'_> { /// StringView encoder. struct Utf8ViewEncoder<'a>(&'a StringViewArray); impl Utf8ViewEncoder<'_> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { write_len_prefixed(out, self.0.value(idx).as_bytes()) } } struct F32Encoder<'a>(&'a arrow_array::Float32Array); impl F32Encoder<'_> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { // Avro float: 4 bytes, IEEE-754 little-endian let bits = self.0.value(idx).to_bits(); - out.write_all(&bits.to_le_bytes()) - .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e)) + out.write_all(&bits.to_le_bytes())?; + Ok(()) } } struct F64Encoder<'a>(&'a arrow_array::Float64Array); impl F64Encoder<'_> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { // Avro double: 8 bytes, IEEE-754 little-endian let bits = self.0.value(idx).to_bits(); - out.write_all(&bits.to_le_bytes()) - .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e)) + out.write_all(&bits.to_le_bytes()).map_err(Into::into) } } struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray); impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { write_len_prefixed(out, self.0.value(idx).as_bytes()) } } @@ -1337,13 +1314,13 @@ impl<'a> MapEncoder<'a> { map: &'a MapArray, values_nullability: Option, value_plan: &FieldPlan, - ) -> Result { + ) -> Result { let keys_arr = map.keys(); let keys_kind = match keys_arr.data_type() { DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::()), DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::()), other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}" ))); } @@ -1353,20 +1330,20 @@ impl<'a> MapEncoder<'a> { DataType::Map(entries, _) => match entries.data_type() { DataType::Struct(fs) => fs, other => { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Arrow Map entries must be Struct, found: {other:?}" ))); } }, _ => { - return Err(ArrowError::SchemaError( + return Err(AvroError::SchemaError( "Expected MapArray with DataType::Map".into(), )); } }; let v_idx = find_map_value_field_index(entries_struct_fields).ok_or_else(|| { - ArrowError::SchemaError("Map entries struct missing value field".into()) + AvroError::SchemaError("Map entries struct missing value field".into()) })?; let value_field = entries_struct_fields[v_idx].as_ref(); @@ -1392,8 +1369,8 @@ impl<'a> MapEncoder<'a> { keys_offset: usize, start: usize, end: usize, - mut write_item: impl FnMut(&mut W, usize) -> Result<(), ArrowError>, - ) -> Result<(), ArrowError> + mut write_item: impl FnMut(&mut W, usize) -> Result<()>, + ) -> Result<()> where W: Write + ?Sized, O: OffsetSizeTrait, @@ -1405,7 +1382,7 @@ impl<'a> MapEncoder<'a> { }) } - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let offsets = self.map.offsets(); let start = offsets[idx] as usize; let end = offsets[idx + 1] as usize; @@ -1444,7 +1421,7 @@ struct EnumEncoder<'a> { keys: &'a PrimitiveArray, } impl EnumEncoder<'_> { - fn encode(&mut self, out: &mut W, row: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, row: usize) -> Result<()> { write_int(out, self.keys.value(row)) } } @@ -1455,13 +1432,13 @@ struct UnionEncoder<'a> { } impl<'a> UnionEncoder<'a> { - fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result { + fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result { let DataType::Union(fields, UnionMode::Dense) = array.data_type() else { - return Err(ArrowError::SchemaError("Expected Dense UnionArray".into())); + return Err(AvroError::SchemaError("Expected Dense UnionArray".into())); }; if fields.len() != field_bindings.len() { - return Err(ArrowError::SchemaError(format!( + return Err(AvroError::SchemaError(format!( "Mismatched number of union branches between Arrow array ({}) and encoding plan ({})", fields.len(), field_bindings.len() @@ -1471,7 +1448,7 @@ impl<'a> UnionEncoder<'a> { for (type_id, field_ref) in fields.iter() { let binding = field_bindings .get(type_id as usize) - .ok_or_else(|| ArrowError::SchemaError("Binding and field mismatch".to_string()))?; + .ok_or_else(|| AvroError::SchemaError("Binding and field mismatch".to_string()))?; let child = array.child(type_id).as_ref(); @@ -1486,7 +1463,7 @@ impl<'a> UnionEncoder<'a> { Ok(Self { encoders, array }) } - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let type_id = self.array.type_ids()[idx]; let branch_index = type_id as usize; write_int(out, type_id as i32)?; @@ -1495,7 +1472,7 @@ impl<'a> UnionEncoder<'a> { let encoder = self .encoders .get_mut(branch_index) - .ok_or_else(|| ArrowError::SchemaError(format!("Invalid type_id {type_id}")))?; + .ok_or_else(|| AvroError::SchemaError(format!("Invalid type_id {type_id}")))?; encoder.encode(out, child_row) } @@ -1506,21 +1483,18 @@ struct StructEncoder<'a> { } impl<'a> StructEncoder<'a> { - fn try_new( - array: &'a StructArray, - field_bindings: &[FieldBinding], - ) -> Result { + fn try_new(array: &'a StructArray, field_bindings: &[FieldBinding]) -> Result { let DataType::Struct(fields) = array.data_type() else { - return Err(ArrowError::SchemaError("Expected Struct".into())); + return Err(AvroError::SchemaError("Expected Struct".into())); }; let mut encoders = Vec::with_capacity(field_bindings.len()); for field_binding in field_bindings { let idx = field_binding.arrow_index; let column = array.columns().get(idx).ok_or_else(|| { - ArrowError::SchemaError(format!("Struct child index {idx} out of range")) + AvroError::SchemaError(format!("Struct child index {idx} out of range")) })?; let field = fields.get(idx).ok_or_else(|| { - ArrowError::SchemaError(format!("Struct child index {idx} out of range")) + AvroError::SchemaError(format!("Struct child index {idx} out of range")) })?; let encoder = prepare_value_site_encoder( column.as_ref(), @@ -1533,7 +1507,7 @@ impl<'a> StructEncoder<'a> { Ok(Self { encoders }) } - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { for encoder in self.encoders.iter_mut() { encoder.encode(out, idx)?; } @@ -1549,9 +1523,9 @@ fn encode_blocked_range( start: usize, end: usize, mut write_item: F, -) -> Result<(), ArrowError> +) -> Result<()> where - F: FnMut(&mut W, usize) -> Result<(), ArrowError>, + F: FnMut(&mut W, usize) -> Result<()>, { let len = end.saturating_sub(start); if len == 0 { @@ -1582,12 +1556,12 @@ impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> { list: &'a GenericListArray, items_nullability: Option, item_plan: &FieldPlan, - ) -> Result { + ) -> Result { let child_field = match list.data_type() { DataType::List(field) => field.as_ref(), DataType::LargeList(field) => field.as_ref(), _ => { - return Err(ArrowError::SchemaError( + return Err(AvroError::SchemaError( "Expected List or LargeList for ListEncoder".into(), )); } @@ -1610,23 +1584,20 @@ impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> { out: &mut W, start: usize, end: usize, - ) -> Result<(), ArrowError> { + ) -> Result<()> { encode_blocked_range(out, start, end, |out, row| { self.values .encode(out, row.saturating_sub(self.values_offset)) }) } - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let offsets = self.list.offsets(); let start = offsets[idx].to_usize().ok_or_else(|| { - ArrowError::InvalidArgumentError(format!("Error converting offset[{idx}] to usize")) + AvroError::InvalidArgument(format!("Error converting offset[{idx}] to usize")) })?; let end = offsets[idx + 1].to_usize().ok_or_else(|| { - ArrowError::InvalidArgumentError(format!( - "Error converting offset[{}] to usize", - idx + 1 - )) + AvroError::InvalidArgument(format!("Error converting offset[{}] to usize", idx + 1)) })?; self.encode_list_range(out, start, end) } @@ -1646,12 +1617,12 @@ impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> { list: &'a GenericListViewArray, items_nullability: Option, item_plan: &FieldPlan, - ) -> Result { + ) -> Result { let child_field = match list.data_type() { DataType::ListView(field) => field.as_ref(), DataType::LargeListView(field) => field.as_ref(), _ => { - return Err(ArrowError::SchemaError( + return Err(AvroError::SchemaError( "Expected ListView or LargeListView for ListViewEncoder".into(), )); } @@ -1669,14 +1640,12 @@ impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> { }) } - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let start = self.list.value_offset(idx).to_usize().ok_or_else(|| { - ArrowError::InvalidArgumentError(format!( - "Error converting value_offset[{idx}] to usize" - )) + AvroError::InvalidArgument(format!("Error converting value_offset[{idx}] to usize")) })?; let len = self.list.value_size(idx).to_usize().ok_or_else(|| { - ArrowError::InvalidArgumentError(format!("Error converting value_size[{idx}] to usize")) + AvroError::InvalidArgument(format!("Error converting value_size[{idx}] to usize")) })?; let start = start + self.values_offset; let end = start + len; @@ -1700,11 +1669,11 @@ impl<'a> FixedSizeListEncoder<'a> { list: &'a FixedSizeListArray, items_nullability: Option, item_plan: &FieldPlan, - ) -> Result { + ) -> Result { let child_field = match list.data_type() { DataType::FixedSizeList(field, _len) => field.as_ref(), _ => { - return Err(ArrowError::SchemaError( + return Err(AvroError::SchemaError( "Expected FixedSizeList for FixedSizeListEncoder".into(), )); } @@ -1723,7 +1692,7 @@ impl<'a> FixedSizeListEncoder<'a> { }) } - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { // Starting index is relative to values() start let rel = self.list.value_offset(idx) as usize; let start = self.values_offset + rel; @@ -1740,7 +1709,7 @@ fn prepare_value_site_encoder<'a>( value_field: &Field, nullability: Option, plan: &FieldPlan, -) -> Result, ArrowError> { +) -> Result> { // Effective nullability is computed here from the writer-declared site nullability and data. FieldEncoder::make_encoder(values_array, value_field, plan, nullability) } @@ -1749,10 +1718,10 @@ fn prepare_value_site_encoder<'a>( /// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix. struct FixedEncoder<'a>(&'a FixedSizeBinaryArray); impl FixedEncoder<'_> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let v = self.0.value(idx); // &[u8] of fixed width - out.write_all(v) - .map_err(|e| ArrowError::IoError(format!("write fixed bytes: {e}"), e)) + out.write_all(v)?; + Ok(()) } } @@ -1760,15 +1729,15 @@ impl FixedEncoder<'_> { /// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated form. struct UuidEncoder<'a>(&'a FixedSizeBinaryArray); impl UuidEncoder<'_> { - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH]; buf[0] = 0x48; let v = self.0.value(idx); let u = Uuid::from_slice(v) - .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid UUID bytes: {e}")))?; + .map_err(|e| AvroError::InvalidArgument(format!("Invalid UUID bytes: {e}")))?; let _ = u.hyphenated().encode_lower(&mut buf[1..]); - out.write_all(&buf) - .map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e)) + out.write_all(&buf)?; + Ok(()) } } @@ -1780,25 +1749,25 @@ struct DurationParts { } /// Trait mapping an Arrow interval native value to Avro duration `(months, days, millis)`. trait IntervalToDurationParts: ArrowPrimitiveType { - fn duration_parts(native: Self::Native) -> Result; + fn duration_parts(native: Self::Native) -> Result; } impl IntervalToDurationParts for IntervalMonthDayNanoType { - fn duration_parts(native: Self::Native) -> Result { + fn duration_parts(native: Self::Native) -> Result { let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native); if months < 0 || days < 0 || nanos < 0 { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Avro 'duration' cannot encode negative months/days/nanoseconds".into(), )); } if nanos % 1_000_000 != 0 { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000" .into(), )); } let millis = nanos / 1_000_000; if millis > u32::MAX as i64 { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Avro 'duration' milliseconds exceed u32::MAX".into(), )); } @@ -1810,9 +1779,9 @@ impl IntervalToDurationParts for IntervalMonthDayNanoType { } } impl IntervalToDurationParts for IntervalYearMonthType { - fn duration_parts(native: Self::Native) -> Result { + fn duration_parts(native: Self::Native) -> Result { if native < 0 { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Avro 'duration' cannot encode negative months".into(), )); } @@ -1824,10 +1793,10 @@ impl IntervalToDurationParts for IntervalYearMonthType { } } impl IntervalToDurationParts for IntervalDayTimeType { - fn duration_parts(native: Self::Native) -> Result { + fn duration_parts(native: Self::Native) -> Result { let (days, millis) = IntervalDayTimeType::to_parts(native); if days < 0 || millis < 0 { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Avro 'duration' cannot encode negative days or milliseconds".into(), )); } @@ -1844,7 +1813,7 @@ impl IntervalToDurationParts for IntervalDayTimeType { struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray

); impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> { #[inline(always)] - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let parts = P::duration_parts(self.0.value(idx))?; let months = parts.months.to_le_bytes(); let days = parts.days.to_le_bytes(); @@ -1860,16 +1829,16 @@ impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> // indices. [std docs; Rust Performance Book on bounds-check elimination] // - Memory safety: The `[u8; 12]` array is built on the stack by value, with no // aliasing and no uninitialized memory. There is no `unsafe`. - // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated and mapped - // into `ArrowError`, so I/O errors are reported, not panicked. + // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated as an AvroError, + // so I/O errors are reported, not panicked. // Consequently, constructing `buf` with the constant indices below is safe and // panic-free under these validated preconditions. let buf = [ months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0], ms[1], ms[2], ms[3], ]; - out.write_all(&buf) - .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e)) + out.write_all(&buf)?; + Ok(()) } } @@ -1917,7 +1886,7 @@ impl<'a, const N: usize, A: DecimalBeBytes> DecimalEncoder<'a, N, A> { Self { arr, fixed_size } } - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { let be = self.arr.value_be_bytes(idx); match self.fixed_size { Some(n) => write_sign_extended(out, &be, n), @@ -1971,7 +1940,7 @@ impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> { /// Advance `cur_run` so that `idx` is within the run ending at `cur_end`. /// Uses the REE invariant: run ends are strictly increasing, positive, and 1-based. #[inline(always)] - fn advance_to_row(&mut self, idx: usize) -> Result<(), ArrowError> { + fn advance_to_row(&mut self, idx: usize) -> Result<()> { if idx < self.cur_end { return Ok(()); } @@ -1983,7 +1952,7 @@ impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> { if idx < self.cur_end { Ok(()) } else { - Err(ArrowError::InvalidArgumentError(format!( + Err(AvroError::InvalidArgument(format!( "row index {idx} out of bounds for run-ends ({} runs)", self.len ))) @@ -1991,7 +1960,7 @@ impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> { } #[inline(always)] - fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<()> { self.advance_to_row(idx)?; // For REE values, the value for any logical row within a run is at // the physical index of that run. @@ -2387,10 +2356,10 @@ mod tests { let mut out = Vec::new(); let err = enc.encode(&mut out, 0).unwrap_err(); match err { - ArrowError::InvalidArgumentError(msg) => { + AvroError::InvalidArgument(msg) => { assert!(msg.contains("Invalid UUID bytes")) } - other => panic!("expected InvalidArgumentError, got {other:?}"), + other => panic!("expected InvalidArgument, got {other:?}"), } } @@ -2754,10 +2723,10 @@ mod tests { let mut out = Vec::new(); let err = enc.encode(&mut out, 0).unwrap_err(); match err { - ArrowError::InvalidArgumentError(msg) => { + AvroError::InvalidArgument(msg) => { assert!(msg.contains("cannot encode negative months")) } - other => panic!("expected InvalidArgumentError, got {other:?}"), + other => panic!("expected InvalidArgument, got {other:?}"), } } @@ -2782,10 +2751,10 @@ mod tests { let mut out = Vec::new(); let err = enc.encode(&mut out, 0).unwrap_err(); match err { - ArrowError::InvalidArgumentError(msg) => { + AvroError::InvalidArgument(msg) => { assert!(msg.contains("cannot encode negative days")) } - other => panic!("expected InvalidArgumentError, got {other:?}"), + other => panic!("expected InvalidArgument, got {other:?}"), } } @@ -2810,10 +2779,10 @@ mod tests { let mut out = Vec::new(); let err = enc.encode(&mut out, 0).unwrap_err(); match err { - ArrowError::InvalidArgumentError(msg) => { + AvroError::InvalidArgument(msg) => { assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible")) } - other => panic!("expected InvalidArgumentError, got {other:?}"), + other => panic!("expected InvalidArgument, got {other:?}"), } } @@ -2843,8 +2812,8 @@ mod tests { // truncation overflow let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err(); match err { - ArrowError::InvalidArgumentError(_) => {} - _ => panic!("expected InvalidArgumentError"), + AvroError::InvalidArgument(_) => {} + _ => panic!("expected InvalidArgument"), } } @@ -2859,8 +2828,8 @@ mod tests { let mut out = Vec::new(); let err = enc.encode(&mut out, 0).unwrap_err(); match err { - ArrowError::InvalidArgumentError(msg) => assert!(msg.contains("exceed u32::MAX")), - _ => panic!("expected InvalidArgumentError"), + AvroError::InvalidArgument(msg) => assert!(msg.contains("exceed u32::MAX")), + _ => panic!("expected InvalidArgument"), } } @@ -2873,7 +2842,7 @@ mod tests { let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None); let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err(); match err { - ArrowError::SchemaError(msg) => { + AvroError::SchemaError(msg) => { assert!(msg.contains("Decimal precision/scale mismatch")) } _ => panic!("expected SchemaError"), @@ -3035,13 +3004,13 @@ mod tests { let mut out = Vec::new(); let err = enc.encode(&mut out, 0).unwrap_err(); match err { - arrow_schema::ArrowError::InvalidArgumentError(msg) => { + AvroError::InvalidArgument(msg) => { assert!( msg.contains("overflowed") || msg.contains("overflow"), "unexpected message: {msg}" ) } - other => panic!("expected InvalidArgumentError, got {other:?}"), + other => panic!("expected InvalidArgument, got {other:?}"), } } @@ -3078,13 +3047,13 @@ mod tests { let mut out = Vec::new(); let err = enc.encode(&mut out, 0).unwrap_err(); match err { - arrow_schema::ArrowError::InvalidArgumentError(msg) => { + AvroError::InvalidArgument(msg) => { assert!( msg.contains("overflowed") || msg.contains("overflow"), "unexpected message: {msg}" ) } - other => panic!("expected InvalidArgumentError, got {other:?}"), + other => panic!("expected InvalidArgument, got {other:?}"), } } diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs index ba2a0b8564b2..43901ad384ee 100644 --- a/arrow-avro/src/writer/format.rs +++ b/arrow-avro/src/writer/format.rs @@ -18,9 +18,10 @@ //! Avro Writer Formats for Arrow. use crate::compression::{CODEC_METADATA_KEY, CompressionCodec}; +use crate::errors::{AvroError, Result}; use crate::schema::{AvroSchema, AvroSchemaOptions, SCHEMA_METADATA_KEY}; use crate::writer::encoder::write_long; -use arrow_schema::{ArrowError, Schema}; +use arrow_schema::Schema; use rand::RngCore; use std::fmt::Debug; use std::io::Write; @@ -40,7 +41,7 @@ pub trait AvroFormat: Debug + Default { writer: &mut W, schema: &Schema, compression: Option, - ) -> Result<(), ArrowError>; + ) -> Result<()>; /// Return the 16‑byte sync marker (OCF) or `None` (binary stream). fn sync_marker(&self) -> Option<&[u8; 16]>; @@ -59,7 +60,7 @@ impl AvroFormat for AvroOcfFormat { writer: &mut W, schema: &Schema, compression: Option, - ) -> Result<(), ArrowError> { + ) -> Result<()> { let mut rng = rand::rng(); rng.fill_bytes(&mut self.sync_marker); // Choose the Avro schema JSON that the file will advertise. @@ -73,9 +74,7 @@ impl AvroFormat for AvroOcfFormat { }), )?; // Magic - writer - .write_all(b"Obj\x01") - .map_err(|e| ArrowError::IoError(format!("write OCF magic: {e}"), e))?; + writer.write_all(b"Obj\x01")?; // File metadata map: { "avro.schema": , "avro.codec": } let codec_str = match compression { Some(CompressionCodec::Deflate) => "deflate", @@ -93,9 +92,7 @@ impl AvroFormat for AvroOcfFormat { write_bytes(writer, codec_str.as_bytes())?; write_long(writer, 0)?; // Sync marker (16 bytes) - writer - .write_all(&self.sync_marker) - .map_err(|e| ArrowError::IoError(format!("write OCF sync marker: {e}"), e))?; + writer.write_all(&self.sync_marker)?; Ok(()) } @@ -121,9 +118,9 @@ impl AvroFormat for AvroSoeFormat { _writer: &mut W, _schema: &Schema, compression: Option, - ) -> Result<(), ArrowError> { + ) -> Result<()> { if compression.is_some() { - return Err(ArrowError::InvalidArgumentError( + return Err(AvroError::InvalidArgument( "Compression not supported for Avro SOE streaming".to_string(), )); } @@ -136,14 +133,13 @@ impl AvroFormat for AvroSoeFormat { } #[inline] -fn write_string(writer: &mut W, s: &str) -> Result<(), ArrowError> { +fn write_string(writer: &mut W, s: &str) -> Result<()> { write_bytes(writer, s.as_bytes()) } #[inline] -fn write_bytes(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> { +fn write_bytes(writer: &mut W, bytes: &[u8]) -> Result<()> { write_long(writer, bytes.len() as i64)?; - writer - .write_all(bytes) - .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e)) + writer.write_all(bytes)?; + Ok(()) }