Skip to content
26 changes: 9 additions & 17 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2025, NVIDIA CORPORATION.
# Copyright (c) 2020-2026, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -158,8 +158,6 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl,
MapGen(StructGen([['child0', byte_gen], ['child1', long_gen]], nullable=False),
StructGen([['child0', byte_gen], ['child1', long_gen]]))]

non_utc_allow_orc_scan=['ColumnarToRowExec', 'FileSourceScanExec', 'BatchScanExec'] if is_not_utc() else []

orc_gens_list = [orc_basic_gens,
orc_array_gens_sample,
orc_struct_gens_sample,
Expand Down Expand Up @@ -195,7 +193,6 @@ def test_orc_fallback(spark_tmp_path, read_func, disable_conf):
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ['', 'orc'])
@tz_sensitive_test
@allow_non_gpu(*non_utc_allow_orc_scan)
def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
data_path = spark_tmp_path + '/ORC_DATA'
Expand Down Expand Up @@ -224,7 +221,6 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e
@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
@allow_non_gpu(*non_utc_allow_orc_scan)
def test_pred_push_round_trip(spark_tmp_path, orc_gen, read_func, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/ORC_DATA'
# Append two struct columns to verify nested predicate pushdown.
Expand Down Expand Up @@ -281,7 +277,6 @@ def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, rea

@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
@allow_non_gpu(*non_utc_allow_orc_scan)
def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/131 is fixed
# we should go with a more standard set of generators
Expand Down Expand Up @@ -351,7 +346,6 @@ def test_partitioned_read_just_partitions(spark_tmp_path, v1_enabled_list, reade

@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
@allow_non_gpu(*non_utc_allow_orc_scan)
def test_merge_schema_read(spark_tmp_path, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/131 is fixed
# we should go with a more standard set of generators
Expand Down Expand Up @@ -633,7 +627,6 @@ def test_read_struct_without_stream(spark_tmp_path):
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
@pytest.mark.parametrize('case_sensitive', ["false", "true"])
@allow_non_gpu(*non_utc_allow_orc_scan)
def test_read_with_more_columns(spark_tmp_path, orc_gen, reader_confs, v1_enabled_list, case_sensitive):
struct_gen = StructGen([('nested_col', orc_gen)])
# Map is not supported yet.
Expand Down Expand Up @@ -822,7 +815,6 @@ def test_orc_read_varchar_as_string(std_input_path):
@pytest.mark.parametrize('gens', orc_gens_list, ids=idfn)
@pytest.mark.parametrize('keep_order', [True, pytest.param(False, marks=pytest.mark.ignore_order(local=True))])
@tz_sensitive_test
@allow_non_gpu(*non_utc_allow_orc_scan)
def test_read_round_trip_for_multithreaded_combining(spark_tmp_path, gens, keep_order):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
data_path = spark_tmp_path + '/ORC_DATA'
Expand All @@ -837,7 +829,6 @@ def test_read_round_trip_for_multithreaded_combining(spark_tmp_path, gens, keep_


@pytest.mark.parametrize('keep_order', [True, pytest.param(False, marks=pytest.mark.ignore_order(local=True))])
@allow_non_gpu(*non_utc_allow_orc_scan)
def test_simple_partitioned_read_for_multithreaded_combining(spark_tmp_path, keep_order):
# Use every type except boolean, see https://github.com/NVIDIA/spark-rapids/issues/11762 and
# https://github.com/rapidsai/cudf/issues/6763 .
Expand Down Expand Up @@ -1071,17 +1062,19 @@ def test_orc_not_support_timestamp_ltz(std_input_path):
conf={},
error_message="ParseException")

# test ORC reader and writer with the same timezone
# the `tz_sensitive_test` mark guarantees the write and read are in the same timezone
# The `spark.sql.session.timeZone` here does not impact reader and writer timezone, but any way, we test it.
# For the tests that reader and writer timezones are different, refer to `OrcTimezoneSuite`
@pytest.mark.parametrize("reader_confs", reader_opt_confs, ids=idfn)
# Setting end timestamp as None almost always generate ts >= 2200 year.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete the comments that explain why we are setting the start and end timestamp to what they are? Have we tested outside of this range recently? Do we have a follow on issue to fix the range limitations?

# Setting end timestamp < 2200 to test running columnarly on GPU;
@pytest.mark.parametrize('end_timestamp', [None, datetime(2199, 1, 1, tzinfo=timezone.utc)], ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
@pytest.mark.parametrize("timezone_pair", [("UTC", "Asia/Shanghai"), ("Asia/Shanghai", "UTC"), ("Asia/Shanghai", "America/Los_Angeles")], ids=idfn)
@tz_sensitive_test
def test_orc_non_utc_timezone(reader_confs, end_timestamp, spark_tmp_path, v1_enabled_list, timezone_pair):
def test_orc_reader_writer_the_same_timezone(reader_confs, end_timestamp, spark_tmp_path, v1_enabled_list, timezone_pair):
d_gen = DateGen(start=date(1590, 1, 1))
# Update start year to 1590 when https://github.com/NVIDIA/spark-rapids/issues/13272 is fixed.
ts_gen = TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc), end=end_timestamp, nullable=True)
ts_gen = TimestampGen(start=datetime(1970, 1, 2, tzinfo=timezone.utc), end=end_timestamp, nullable=True)
date_timestamp_gens = [('c1', d_gen), ('c2', ts_gen)]

(write_timezone, read_timezone) = timezone_pair
Expand All @@ -1096,14 +1089,13 @@ def test_orc_non_utc_timezone(reader_confs, end_timestamp, spark_tmp_path, v1_en
'spark.rapids.sql.format.orc.enabled': True,
'spark.rapids.sql.format.orc.read.enabled': True,
'spark.sql.session.timeZone': read_timezone,
# ignore write timezone when reading, this is for test purpose only
# The `tz_sensitive_test` mark guarantees the write and read are in the same timezone
'spark.rapids.sql.orc.read.ignore.write.timezone': True
})

# write on CPU
cpu_write_path = spark_tmp_path + "/ORC_DATA_CPU"
with_cpu_session(lambda spark: gen_df(spark, date_timestamp_gens).write.orc(cpu_write_path), conf=write_confs)

# read on GPU and CPU
assert_gpu_and_cpu_are_equal_collect(read_orc_df(cpu_write_path), conf=read_confs)

@pytest.mark.skip(reason='https://github.com/NVIDIA/spark-rapids/issues/13272: CPU can not read ORC file generated by GPU when timestamp is less than 1970 year')
Expand Down
Loading
Loading