Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::util::bit_util::FromBytes;

/// Rust representation for logical type INT96, value is backed by an array of `u32`.
/// The type only takes 12 bytes, without extra padding.
#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct Int96 {
value: [u32; 3],
}
Expand Down Expand Up @@ -124,6 +124,54 @@ impl Int96 {
.wrapping_add(nanos)
}

/// Sets the INT96 data from seconds since epoch
///
/// Will wrap around on overflow
#[inline]
pub fn set_data_from_seconds(&mut self, seconds: i64) {
self.set_data_from_nanos(seconds.wrapping_mul(NANOSECONDS));
}

/// Sets the INT96 data from milliseconds since epoch
///
/// Will wrap around on overflow
#[inline]
pub fn set_data_from_millis(&mut self, millis: i64) {
self.set_data_from_nanos(millis.wrapping_mul(MICROSECONDS));
}

/// Sets the INT96 data from microseconds since epoch
///
/// Will wrap around on overflow
#[inline]
pub fn set_data_from_micros(&mut self, micros: i64) {
self.set_data_from_nanos(micros.wrapping_mul(MILLISECONDS));
}

/// Sets the INT96 data from nanoseconds since epoch
///
/// Will wrap around on overflow
#[inline]
pub fn set_data_from_nanos(&mut self, nanos: i64) {
let days = nanos / NANOSECONDS_IN_DAY;
let remaining_nanos = nanos % NANOSECONDS_IN_DAY;
let julian_day = (days + JULIAN_DAY_OF_EPOCH) as i32;
self.set_data_from_days_and_nanos(julian_day, remaining_nanos);
}

/// Sets the INT96 data directly from days and nanoseconds
///
/// This is the most direct way to set the Int96 data structure which internally
/// stores days and nanoseconds. The days should be Julian days since epoch.

#[inline]
pub fn set_data_from_days_and_nanos(&mut self, days: i32, nanos: i64) {
let julian_day = (days as i32) as u32;
let nanos_low = (nanos & 0xFFFFFFFF) as u32;
let nanos_high = ((nanos >> 32) & 0xFFFFFFFF) as u32;
self.set_data(nanos_low, nanos_high, julian_day);
}

#[inline]
fn data_as_days_and_nanos(&self) -> (i32, i64) {
let day = self.data()[2] as i32;
Expand All @@ -132,6 +180,24 @@ impl Int96 {
}
}

impl PartialOrd for Int96 {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for Int96 {
fn cmp(&self, other: &Self) -> Ordering {
let (self_days, self_nanos) = self.data_as_days_and_nanos();
let (other_days, other_nanos) = other.data_as_days_and_nanos();

match self_days.cmp(&other_days) {
Ordering::Equal => self_nanos.cmp(&other_nanos),
ord => ord,
}
}
}

impl From<Vec<u32>> for Int96 {
fn from(buf: Vec<u32>) -> Self {
assert_eq!(buf.len(), 3);
Expand Down Expand Up @@ -1409,4 +1475,73 @@ mod tests {
assert_eq!(ba1, ba11);
assert!(ba5 > ba1);
}

#[test]
fn test_int96_time_conversions() {
let test_values = [
0, 1, 60, 3600, 86400, 1234567, 31536000,
];

for &value in &test_values {
let mut i96: Int96 = Int96::new();

i96.set_data_from_seconds(value);
assert_eq!(i96.to_seconds(), value, "seconds roundtrip failed for {}", value);

i96.set_data_from_millis(value);
assert_eq!(i96.to_millis(), value, "millis roundtrip failed for {}", value);

i96.set_data_from_micros(value);
assert_eq!(i96.to_micros(), value, "micros roundtrip failed for {}", value);

i96.set_data_from_nanos(value);
assert_eq!(i96.to_nanos(), value, "nanos roundtrip failed for {}", value);

let test_day_nanos = [
(0, 0), // 1970-01-01 00:00:00.000000000 (Unix epoch)
(0, 1), // 1970-01-01 00:00:00.000000001
(0, NANOSECONDS - 1), // 1970-01-01 00:00:00.999999999
(0, NANOSECONDS), // 1970-01-01 00:00:01.000000000
(1, 0), // 1970-01-02 00:00:00.000000000
(1, NANOSECONDS), // 1970-01-02 00:00:01.000000000
(365, 0), // 1971-01-01 00:00:00.000000000 (1 year after epoch)
(365, NANOSECONDS * 3600), // 1971-01-01 01:00:00.000000000
(10957, 0), // 2000-01-01 00:00:00.000000000 (Y2K)
(18262, 0), // 2020-01-01 00:00:00.000000000
(18262, NANOSECONDS * 3600 * 12), // 2020-01-01 12:00:00.000000000
];

for &(days, nanos) in &test_day_nanos {
let mut i96 = Int96::new();
i96.set_data_from_days_and_nanos(days, nanos);
let (roundtrip_days, roundtrip_nanos) = i96.data_as_days_and_nanos();
assert_eq!(roundtrip_days, days, "days roundtrip failed for days={}, nanos={}", days, nanos);
assert_eq!(roundtrip_nanos, nanos, "nanos roundtrip failed for days={}, nanos={}", days, nanos);
}
}
}

#[test]
fn test_int96_ord() {
let test_pairs = [

((99, 5), (100, 4)),
((100, 10), (100, 5523)),
((0, 0), (100, 0)),
((10000, 1_000_000_000), (10000, 2_000_000_000)),
((10000, 1_000_000_000), (20000, 1_000_000_000)),
];

for (smaller, larger) in test_pairs {
let mut small = Int96::new();
small.set_data_from_days_and_nanos(smaller.0, smaller.1);
let mut large = Int96::new();
large.set_data_from_days_and_nanos(larger.0, larger.1);

assert!(small < large, "Expected {:?} < {:?}", smaller, larger);
assert!(large > small, "Expected {:?} > {:?}", larger, smaller);
assert!(small == small, "Expected {:?} == {:?}", smaller, smaller);
assert!(large == large, "Expected {:?} == {:?}", larger, larger);
}
}
}
120 changes: 120 additions & 0 deletions parquet/tests/int96_stats_roundtrip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use parquet::basic::Type;
use parquet::data_type::{Int96, Int96Type};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::statistics::Statistics;
use parquet::file::writer::SerializedFileWriter;
use parquet::schema::parser::parse_message_type;
use rand::seq::SliceRandom;
use std::fs::File;
use std::sync::Arc;
use tempfile::Builder;
use chrono::{DateTime, NaiveDateTime, Utc};

fn datetime_to_int96(dt: &str) -> Int96 {
let naive = NaiveDateTime::parse_from_str(dt, "%Y-%m-%d %H:%M:%S%.f").unwrap();
let datetime: DateTime<Utc> = DateTime::from_naive_utc_and_offset(naive, Utc);
let nanos = datetime.timestamp_nanos_opt().unwrap();
let mut int96 = Int96::new();
int96.set_data_from_nanos(nanos);
int96
}

fn verify_ordering(data: Vec<Int96>) {
// Create a temporary file
let tmp = Builder::new()
.prefix("test_int96_stats")
.tempfile()
.unwrap();
let file_path = tmp.path().to_owned();

// Create schema with INT96 field
let message_type = "
message test {
REQUIRED INT96 timestamp;
}
";
let schema = parse_message_type(message_type).unwrap();

// Configure writer properties to enable statistics
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let expected_min = data[0];
let expected_max = data[data.len() - 1];

{
let file = File::create(&file_path).unwrap();
let mut writer = SerializedFileWriter::new(file, schema.into(), Arc::new(props)).unwrap();
let mut row_group = writer.next_row_group().unwrap();
let mut col_writer = row_group.next_column().unwrap().unwrap();

{
let writer = col_writer.typed::<Int96Type>();
let mut shuffled_data = data.clone();
shuffled_data.shuffle(&mut rand::rng());
writer.write_batch(&shuffled_data, None, None).unwrap();
}
col_writer.close().unwrap();
row_group.close().unwrap();
writer.close().unwrap();
}

let file = File::open(&file_path).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
let row_group = metadata.row_group(0);
let column = row_group.column(0);

let stats = column.statistics().unwrap();
assert_eq!(stats.physical_type(), Type::INT96);

if let Statistics::Int96(stats) = stats {
let min = stats.min_opt().unwrap();
let max = stats.max_opt().unwrap();

assert_eq!(*min, expected_min, "Min value should be {} but was {}", expected_min, min);
assert_eq!(*max, expected_max, "Max value should be {} but was {}", expected_max, max);
assert_eq!(stats.null_count_opt(), Some(0));
} else {
panic!("Expected Int96 statistics");
}
}

#[test]
fn test_multiple_dates() {
let data = vec![
datetime_to_int96("2020-01-01 00:00:00.000"),
datetime_to_int96("2020-02-29 23:59:59.000"),
datetime_to_int96("2020-12-31 23:59:59.000"),
datetime_to_int96("2021-01-01 00:00:00.000"),
datetime_to_int96("2023-06-15 12:30:45.000"),
datetime_to_int96("2024-02-29 15:45:30.000"),
datetime_to_int96("2024-12-25 07:00:00.000"),
datetime_to_int96("2025-01-01 00:00:00.000"),
datetime_to_int96("2025-07-04 20:00:00.000"),
datetime_to_int96("2025-12-31 23:59:59.000"),
];
verify_ordering(data);
}

#[test]
fn test_same_day_different_time() {
let data = vec![
datetime_to_int96("2020-01-01 00:01:00.000"),
datetime_to_int96("2020-01-01 00:02:00.000"),
datetime_to_int96("2020-01-01 00:03:00.000"),
];
verify_ordering(data);
}

#[test]
fn test_increasing_day_decreasing_time() {
let data = vec![
datetime_to_int96("2020-01-01 12:00:00.000"),
datetime_to_int96("2020-02-01 11:00:00.000"),
datetime_to_int96("2020-03-01 10:00:00.000"),
];
verify_ordering(data);
}