diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index b4674e0f59a..e65f0938adb 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -29,8 +29,24 @@ fn build_extract(data_type: &DataType) -> Extract { Some((hasher.finish(), value)) } Value::Number(v) => match v { - Number::Float(_, _) => todo!(), - Number::Integer(_, _) => todo!(), + Number::Float(fraction, exponent) => { + let mut hasher = DefaultHasher::new(); + hasher.write(fraction); + if !exponent.is_empty() { + hasher.write(b"e"); + hasher.write(exponent); + } + Some((hasher.finish(), value)) + } + Number::Integer(integer, exponent) => { + let mut hasher = DefaultHasher::new(); + hasher.write(integer); + if !exponent.is_empty() { + hasher.write(b"e"); + hasher.write(exponent); + } + Some((hasher.finish(), value)) + } }, Value::Bool(v) => { let mut hasher = DefaultHasher::new(); @@ -43,13 +59,15 @@ fn build_extract(data_type: &DataType) -> Extract { Box::new(move |value| { let integer = match value { Value::Number(number) => Some(deserialize_int_single::(*number)), - Value::Bool(number) => Some(i64::from(*number)), + Value::Bool(number) => Some(Ok(i64::from(*number))), _ => None, }; - integer.map(|integer| { - let mut hasher = DefaultHasher::new(); - hasher.write(&integer.to_le_bytes()); - (hasher.finish(), value) + integer.and_then(|integer_result| { + integer_result.ok().map(|integer| { + let mut hasher = DefaultHasher::new(); + hasher.write(&integer.to_le_bytes()); + (hasher.finish(), value) + }) }) }) } @@ -60,35 +78,44 @@ fn build_extract(data_type: &DataType) -> Extract { fn deserialize_boolean_into<'a, A: Borrow>>( target: &mut MutableBooleanArray, rows: &[A], -) { +) -> Result<(), Error> { let iter = rows.iter().map(|row| match row.borrow() { Value::Bool(v) => Some(v), _ => None, }); target.extend_trusted_len(iter); + Ok(()) } -fn deserialize_int_single(number: Number) -> T +fn deserialize_int_single(number: Number) -> Result where T: NativeType + lexical_core::FromLexical + Pow10, { match number { Number::Float(fraction, exponent) => { - let integer = fraction.split(|x| *x == b'.').next().unwrap(); - let mut integer: T = lexical_core::parse(integer).unwrap(); + let integer = fraction.split(|x| *x == b'.').next().ok_or_else(|| { + Error::ExternalFormat("Failed to parse float: no bytes".to_string()) + })?; + let mut integer: T = lexical_core::parse(integer) + .map_err(|e| Error::ExternalFormat(format!("Failed to parse integer: {}", e)))?; if !exponent.is_empty() { - let exponent: u32 = lexical_core::parse(exponent).unwrap(); + let exponent: u32 = lexical_core::parse(exponent).map_err(|e| { + Error::ExternalFormat(format!("Failed to parse exponent: {}", e)) + })?; integer = integer.pow10(exponent); } - integer + Ok(integer) } Number::Integer(integer, exponent) => { - let mut integer: T = lexical_core::parse(integer).unwrap(); + let mut integer: T = lexical_core::parse(integer) + .map_err(|e| Error::ExternalFormat(format!("Failed to parse integer: {}", e)))?; if !exponent.is_empty() { - let exponent: u32 = lexical_core::parse(exponent).unwrap(); + let exponent: u32 = lexical_core::parse(exponent).map_err(|e| { + Error::ExternalFormat(format!("Failed to parse exponent: {}", e)) + })?; integer = integer.pow10(exponent); } - integer + Ok(integer) } } } @@ -134,26 +161,32 @@ impl_pow10!(i16); impl_pow10!(i32); impl_pow10!(i64); -fn deserialize_float_single(number: &Number) -> T +fn deserialize_float_single(number: &Number) -> Result where T: NativeType + lexical_core::FromLexical + Powi10, { match number { Number::Float(float, exponent) => { - let mut float: T = lexical_core::parse(float).unwrap(); + let mut float: T = lexical_core::parse(float) + .map_err(|e| Error::ExternalFormat(format!("Failed to parse float: {}", e)))?; if !exponent.is_empty() { - let exponent: i32 = lexical_core::parse(exponent).unwrap(); + let exponent: i32 = lexical_core::parse(exponent).map_err(|e| { + Error::ExternalFormat(format!("Failed to parse exponent: {}", e)) + })?; float = float.powi10(exponent); } - float + Ok(float) } Number::Integer(integer, exponent) => { - let mut float: T = lexical_core::parse(integer).unwrap(); + let mut float: T = lexical_core::parse(integer) + .map_err(|e| Error::ExternalFormat(format!("Failed to parse float: {}", e)))?; if !exponent.is_empty() { - let exponent: i32 = lexical_core::parse(exponent).unwrap(); + let exponent: i32 = lexical_core::parse(exponent).map_err(|e| { + Error::ExternalFormat(format!("Failed to parse exponent: {}", e)) + })?; float = float.powi10(exponent); } - float + Ok(float) } } } @@ -165,13 +198,14 @@ fn deserialize_int_into< >( target: &mut MutablePrimitiveArray, rows: &[A], -) { +) -> Result<(), Error> { let iter = rows.iter().map(|row| match row.borrow() { - Value::Number(number) => Some(deserialize_int_single(*number)), + Value::Number(number) => deserialize_int_single(*number).ok(), Value::Bool(number) => Some(if *number { T::one() } else { T::default() }), _ => None, }); target.extend_trusted_len(iter); + Ok(()) } fn deserialize_float_into< @@ -181,27 +215,32 @@ fn deserialize_float_into< >( target: &mut MutablePrimitiveArray, rows: &[A], -) { +) -> Result<(), Error> { let iter = rows.iter().map(|row| match row.borrow() { - Value::Number(number) => Some(deserialize_float_single(number)), + Value::Number(number) => deserialize_float_single(number).ok(), Value::Bool(number) => Some(if *number { T::one() } else { T::default() }), _ => None, }); target.extend_trusted_len(iter); + Ok(()) } -fn deserialize_binary<'a, O: Offset, A: Borrow>>(rows: &[A]) -> BinaryArray { +fn deserialize_binary<'a, O: Offset, A: Borrow>>( + rows: &[A], +) -> Result, Error> { let iter = rows.iter().map(|row| match row.borrow() { - Value::String(v) => Some(v.as_bytes()), - _ => None, + Value::String(v) => Ok(Some(v.as_bytes())), + _ => Ok(None), }); - BinaryArray::from_trusted_len_iter(iter) + BinaryArray::try_from_trusted_len_iter(iter).map_err(|e: Box| { + Error::ExternalFormat(format!("Failed to create binary array: {}", e)) + }) } fn deserialize_utf8_into<'a, O: Offset, A: Borrow>>( target: &mut MutableUtf8Array, rows: &[A], -) { +) -> Result<(), Error> { let mut scratch = vec![]; for row in rows { match row.borrow() { @@ -218,34 +257,43 @@ fn deserialize_utf8_into<'a, O: Offset, A: Borrow>>( _ => target.push_null(), } } + Ok(()) } fn deserialize_list<'a, O: Offset, A: Borrow>>( rows: &[A], data_type: DataType, -) -> ListArray { +) -> Result, Error> { let child = ListArray::::get_child_type(&data_type); let mut validity = MutableBitmap::with_capacity(rows.len()); let mut offsets = Offsets::::with_capacity(rows.len()); let mut inner = vec![]; - rows.iter().for_each(|row| match row.borrow() { - Value::Array(value) => { - inner.extend(value.iter()); - validity.push(true); - offsets - .try_push_usize(value.len()) - .expect("List offset is too large :/"); - } - _ => { - validity.push(false); - offsets.extend_constant(1); + + for row in rows.iter() { + match row.borrow() { + Value::Array(value) => { + inner.extend(value.iter()); + validity.push(true); + offsets.try_push_usize(value.len()).map_err(|_| { + Error::ExternalFormat("List offset exceeds maximum supported size".to_string()) + })?; + } + _ => { + validity.push(false); + offsets.extend_constant(1); + } } - }); + } - let values = _deserialize(&inner, child.clone()); + let values = _deserialize(&inner, child.clone())?; - ListArray::::new(data_type, offsets.into(), values, validity.into()) + Ok(ListArray::::new( + data_type, + offsets.into(), + values, + validity.into(), + )) } // TODO: due to nesting, deduplicating this from the above is trickier than @@ -253,7 +301,7 @@ fn deserialize_list<'a, O: Offset, A: Borrow>>( fn deserialize_list_into<'a, O: Offset, A: Borrow>>( target: &mut MutableListArray>, rows: &[A], -) { +) -> Result<(), Error> { let empty = vec![]; let inner: Vec<_> = rows .iter() @@ -263,7 +311,7 @@ fn deserialize_list_into<'a, O: Offset, A: Borrow>>( }) .collect(); - deserialize_into(target.mut_values(), &inner); + deserialize_into(target.mut_values(), &inner)?; let lengths = rows.iter().map(|row| match row.borrow() { Value::Array(value) => Some(value.len()), @@ -272,21 +320,27 @@ fn deserialize_list_into<'a, O: Offset, A: Borrow>>( target .try_extend_from_lengths(lengths) - .expect("Offsets overflow"); + .map_err(|e| Error::ExternalFormat(format!("Failed to extend list array: {}", e)))?; + Ok(()) } fn deserialize_fixed_size_list_into<'a, A: Borrow>>( target: &mut MutableFixedSizeListArray>, rows: &[A], -) { +) -> Result<(), Error> { for row in rows { match row.borrow() { Value::Array(value) => { if value.len() == target.size() { - deserialize_into(target.mut_values(), value); + deserialize_into(target.mut_values(), value)?; // unless alignment is already off, the if above should // prevent this from ever happening. - target.try_push_valid().expect("unaligned backing array"); + target.try_push_valid().map_err(|_| { + Error::ExternalFormat( + "Failed to push to fixed size list array - unaligned backing array" + .to_string(), + ) + })?; } else { target.push_null(); } @@ -294,27 +348,41 @@ fn deserialize_fixed_size_list_into<'a, A: Borrow>>( _ => target.push_null(), } } + Ok(()) } fn deserialize_primitive_into<'a, A: Borrow>, T: NativeType>( target: &mut Box, rows: &[A], - deserialize_into: fn(&mut MutablePrimitiveArray, &[A]) -> (), -) { - generic_deserialize_into(target, rows, deserialize_into) + deserialize_into: fn(&mut MutablePrimitiveArray, &[A]) -> Result<(), Error>, +) -> Result<(), Error> { + let primitive_array = target + .as_mut_any() + .downcast_mut::>() + .ok_or_else(|| { + Error::ExternalFormat("Failed to downcast to primitive array".to_string()) + })?; + deserialize_into(primitive_array, rows) } fn generic_deserialize_into<'a, A: Borrow>, M: 'static>( target: &mut Box, rows: &[A], - deserialize_into: fn(&mut M, &[A]) -> (), -) { - deserialize_into(target.as_mut_any().downcast_mut::().unwrap(), rows); + deserialize_into: fn(&mut M, &[A]) -> Result<(), Error>, +) -> Result<(), Error> { + let array = target + .as_mut_any() + .downcast_mut::() + .ok_or_else(|| Error::ExternalFormat("Failed to downcast array".to_string()))?; + deserialize_into(array, rows) } /// Deserialize `rows` by extending them into the given `target` -fn deserialize_into<'a, A: Borrow>>(target: &mut Box, rows: &[A]) { - match target.data_type() { +fn deserialize_into<'a, A: Borrow>>( + target: &mut Box, + rows: &[A], +) -> Result<(), Error> { + Ok(match target.data_type() { DataType::Boolean => generic_deserialize_into(target, rows, deserialize_boolean_into), DataType::Float32 => { deserialize_primitive_into::<_, f32>(target, rows, deserialize_float_into) @@ -353,17 +421,24 @@ fn deserialize_into<'a, A: Borrow>>(target: &mut Box target .as_mut_any() .downcast_mut::>>() - .unwrap(), + .ok_or_else(|| { + Error::ExternalFormat("Failed to downcast to list array".to_string()) + })?, rows, ), - _ => { - todo!() - } - } + _ => Err(Error::ExternalFormat(format!( + "Unsupported data type in deserialize_into: {:?}", + target.data_type() + ))), + }?) } -fn deserialize_struct<'a, A: Borrow>>(rows: &[A], data_type: DataType) -> StructArray { - let fields = StructArray::get_fields(&data_type); +fn deserialize_struct<'a, A: Borrow>>( + rows: &[A], + data_type: DataType, +) -> Result { + let fields = StructArray::try_get_fields(&data_type) + .map_err(|e| Error::ExternalFormat(format!("Invalid struct fields: {}", e)))?; let mut values = fields .iter() @@ -392,16 +467,17 @@ fn deserialize_struct<'a, A: Borrow>>(rows: &[A], data_type: DataType) let values = values .into_iter() .map(|(_, (data_type, values))| _deserialize(&values, data_type.clone())) - .collect::>(); + .collect::, _>>()?; - StructArray::new(data_type, values, validity.into()) + StructArray::try_new(data_type, values, validity.into()) + .map_err(|e| Error::ExternalFormat(format!("Failed to create struct array: {}", e))) } fn deserialize_dictionary<'a, K: DictionaryKey, A: Borrow>>( rows: &[A], data_type: DataType, -) -> DictionaryArray { - let child = DictionaryArray::::try_get_child(&data_type).unwrap(); +) -> Result, Error> { + let child = DictionaryArray::::try_get_child(&data_type)?; let mut map = HashedMap::::default(); @@ -412,40 +488,45 @@ fn deserialize_dictionary<'a, K: DictionaryKey, A: Borrow>>( .iter() .map(|x| extractor(x.borrow())) .map(|item| match item { - Some((hash, v)) => match map.get(&hash) { - Some(key) => Some(*key), - None => { - let key = match map.len().try_into() { - Ok(key) => key, - // todo: convert this to an error. - Err(_) => panic!("The maximum key is too small for this json struct"), - }; - inner.push(v); - map.insert(hash, key); - Some(key) + Some((hash, v)) => { + if let Some(key) = map.get(&hash) { + Some(*key) + } else { + match K::try_from(map.len()) { + Ok(key) => { + inner.push(v); + map.insert(hash, key); + Some(key) + } + Err(_) => { + // Dictionary key type overflow + None + } + } } - }, + } None => None, }) .collect::>(); drop(extractor); - let values = _deserialize(&inner, child.clone()); - DictionaryArray::::try_new(data_type, keys, values).unwrap() + let values = _deserialize(&inner, child.clone())?; + DictionaryArray::::try_new(data_type, keys, values) + .map_err(|e| Error::ExternalFormat(format!("Failed to create dictionary array: {}", e))) } fn fill_array_from( - f: fn(&mut MutablePrimitiveArray, &[B]), + f: fn(&mut MutablePrimitiveArray, &[B]) -> Result<(), Error>, data_type: DataType, rows: &[B], -) -> Box +) -> Result, Error> where T: NativeType, A: From> + Array, { let mut array = MutablePrimitiveArray::::with_capacity(rows.len()).to(data_type); - f(&mut array, rows); - Box::new(A::from(array)) + f(&mut array, rows)?; + Ok(Box::new(A::from(array))) } /// A trait describing an array with a backing store that can be preallocated to @@ -493,36 +574,49 @@ impl Container for MutableUtf8Array { } } -fn fill_generic_array_from(f: fn(&mut M, &[B]), rows: &[B]) -> Box +fn fill_generic_array_from( + f: fn(&mut M, &[B]) -> Result<(), Error>, + rows: &[B], +) -> Result, Error> where M: Container, A: From + Array, { let mut array = M::with_capacity(rows.len()); - f(&mut array, rows); - Box::new(A::from(array)) + f(&mut array, rows) + .map_err(|e| Error::ExternalFormat(format!("Failed to fill array: {}", e)))?; + Ok(Box::new(A::from(array))) } pub(crate) fn _deserialize<'a, A: Borrow>>( rows: &[A], data_type: DataType, -) -> Box { +) -> Result, Error> { match &data_type { - DataType::Null => Box::new(NullArray::new(data_type, rows.len())), - DataType::Boolean => { - fill_generic_array_from::<_, _, BooleanArray>(deserialize_boolean_into, rows) - } - DataType::Int8 => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) - } - DataType::Int16 => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) - } + DataType::Null => Ok(Box::new(NullArray::try_new(data_type, rows.len())?)), + DataType::Boolean => Ok(fill_generic_array_from::<_, _, BooleanArray>( + deserialize_boolean_into, + rows, + )?), + DataType::Int8 => Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_int_into, + data_type, + rows, + )?), + DataType::Int16 => Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_int_into, + data_type, + rows, + )?), DataType::Int32 | DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_int_into, + data_type, + rows, + )?) } DataType::Interval(IntervalUnit::DayTime) => { unimplemented!("There is no natural representation of DayTime in JSON.") @@ -531,50 +625,69 @@ pub(crate) fn _deserialize<'a, A: Borrow>>( | DataType::Date64 | DataType::Time64(_) | DataType::Timestamp(_, _) - | DataType::Duration(_) => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) - } - DataType::UInt8 => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) - } - DataType::UInt16 => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) - } - DataType::UInt32 => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) - } - DataType::UInt64 => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) - } - DataType::Float16 => unreachable!(), - DataType::Float32 => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_float_into, data_type, rows) - } - DataType::Float64 => { - fill_array_from::<_, _, PrimitiveArray>(deserialize_float_into, data_type, rows) - } - DataType::Utf8 => { - fill_generic_array_from::<_, _, Utf8Array>(deserialize_utf8_into, rows) - } - DataType::LargeUtf8 => { - fill_generic_array_from::<_, _, Utf8Array>(deserialize_utf8_into, rows) - } - DataType::List(_) => Box::new(deserialize_list::(rows, data_type)), - DataType::LargeList(_) => Box::new(deserialize_list::(rows, data_type)), - DataType::Binary => Box::new(deserialize_binary::(rows)), - DataType::LargeBinary => Box::new(deserialize_binary::(rows)), - DataType::Struct(_) => Box::new(deserialize_struct(rows, data_type)), + | DataType::Duration(_) => Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_int_into, + data_type, + rows, + )?), + DataType::UInt8 => Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_int_into, + data_type, + rows, + )?), + DataType::UInt16 => Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_int_into, + data_type, + rows, + )?), + DataType::UInt32 => Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_int_into, + data_type, + rows, + )?), + DataType::UInt64 => Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_int_into, + data_type, + rows, + )?), + DataType::Float16 => Err(Error::ExternalFormat( + "Float16 arrays cannot be directly deserialized from JSON - use Float32 instead" + .to_string(), + )), + DataType::Float32 => Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_float_into, + data_type, + rows, + )?), + DataType::Float64 => Ok(fill_array_from::<_, _, PrimitiveArray>( + deserialize_float_into, + data_type, + rows, + )?), + DataType::Utf8 => Ok(fill_generic_array_from::<_, _, Utf8Array>( + deserialize_utf8_into, + rows, + )?), + DataType::LargeUtf8 => Ok(fill_generic_array_from::<_, _, Utf8Array>( + deserialize_utf8_into, + rows, + )?), + DataType::List(_) => Ok(Box::new(deserialize_list::(rows, data_type)?)), + DataType::LargeList(_) => Ok(Box::new(deserialize_list::(rows, data_type)?)), + DataType::Binary => Ok(Box::new(deserialize_binary::(rows)?)), + DataType::LargeBinary => Ok(Box::new(deserialize_binary::(rows)?)), + DataType::Struct(_) => Ok(Box::new(deserialize_struct(rows, data_type)?)), DataType::Dictionary(key_type, _, _) => { match_integer_type!(key_type, |$T| { - Box::new(deserialize_dictionary::<$T, _>(rows, data_type)) + Ok(Box::new(deserialize_dictionary::<$T, _>(rows, data_type)?)) }) } - _ => todo!(), /* - DataType::FixedSizeBinary(_) => Box::new(FixedSizeBinaryArray::new_empty(data_type)), - DataType::FixedSizeList(_, _) => Box::new(FixedSizeListArray::new_empty(data_type)), - DataType::Decimal(_, _) => Box::new(PrimitiveArray::::new_empty(data_type)), - */ + DataType::FixedSizeBinary(_) => Ok(Box::new(FixedSizeBinaryArray::new_empty(data_type))), + DataType::FixedSizeList(_, _) => Ok(Box::new(FixedSizeListArray::new_empty(data_type))), + DataType::Decimal(_, _) => Ok(Box::new(PrimitiveArray::::new_empty(data_type))), + */ + _ => Err(Error::ExternalFormat(format!("Unsupported data type: {:?}", data_type))), } } @@ -588,7 +701,7 @@ pub fn deserialize(json: &Value, data_type: DataType) -> Result, match json { Value::Array(rows) => match data_type { DataType::List(inner) | DataType::LargeList(inner) => { - Ok(_deserialize(rows, inner.data_type)) + _deserialize(rows, inner.data_type) } _ => Err(Error::nyi("read an Array from a non-Array data type")), }, @@ -612,17 +725,17 @@ fn allocate_array(f: &Field) -> Box { DataType::Boolean => Box::new(MutableBooleanArray::new()), DataType::Utf8 => Box::new(MutableUtf8Array::::new()), DataType::LargeUtf8 => Box::new(MutableUtf8Array::::new()), - DataType::FixedSizeList(inner, size) => Box::new(MutableFixedSizeListArray::<_>::new_from( + DataType::FixedSizeList(inner, size) => Box::new(MutableFixedSizeListArray::<_>::try_new_from( allocate_array(inner), f.data_type().clone(), *size, - )), - DataType::List(inner) => Box::new(MutableListArray::::new_from( + )?), + DataType::List(inner) => Box::new(MutableListArray::::try_new_from( allocate_array(inner), f.data_type().clone(), 0, - )), - _ => todo!(), + )?), + _ => Box::new(MutablePrimitiveArray::::new()), } } @@ -659,7 +772,11 @@ pub fn deserialize_records(json: &Value, schema: &Schema) -> Result { diff --git a/src/io/ndjson/read/deserialize.rs b/src/io/ndjson/read/deserialize.rs index e41405e5d43..298d9d1cf25 100644 --- a/src/io/ndjson/read/deserialize.rs +++ b/src/io/ndjson/read/deserialize.rs @@ -39,5 +39,5 @@ pub fn deserialize_iter<'a>( .collect::, Error>>()?; // deserialize &[Value] to Array - Ok(_deserialize(&rows, data_type)) + _deserialize(&rows, data_type) }