Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
* Copyright (c) 2019-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -652,19 +652,24 @@ case class GpuOrcMultiFilePartitionReaderFactory(

metrics.getOrElse(FILTER_TIME, NoopMetric).ns {
metrics.getOrElse(SCAN_TIME, NoopMetric).ns {
files.map { file =>
val orcPartitionReaderContext = filterHandler.filterStripes(file, dataSchema,
readDataSchema, partitionSchema)
compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind,
new ArrayBuffer[OrcSingleStripeMeta]) ++=
orcPartitionReaderContext.blockIterator.map(block =>
OrcSingleStripeMeta(
orcPartitionReaderContext.filePath,
OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)),
file.partitionValues,
OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema),
readDataSchema,
OrcExtraInfo(orcPartitionReaderContext.requestedMapping)))
files.foreach { file =>
try {
val orcPartitionReaderContext = filterHandler.filterStripes(file, dataSchema,
readDataSchema, partitionSchema)
compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind,

@res-life res-life Jun 25, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you guard the filterStripes result before dereferencing it? filterStripes can return null for an empty ORC file, and this line currently reads orcPartitionReaderContext.compressionKind before any null check, so that case would throw NullPointerException.

Please wrap all uses of orcPartitionReaderContext in the non-null branch, matching the existing ORC reader paths that handle a null context by producing/skipping empty input.

For example:

if (orcPartitionReaderContext != null) {
  compressionAndStripes.getOrElseUpdate(...)
  ...
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch — guarded it. An empty ORC file makes filterStripes return null, so that file is now skipped instead of dereferencing the context, matching the single-file path that uses EmptyPartitionReader.

new ArrayBuffer[OrcSingleStripeMeta]) ++=
orcPartitionReaderContext.blockIterator.map(block =>
OrcSingleStripeMeta(
orcPartitionReaderContext.filePath,
OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)),
file.partitionValues,
OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema),
readDataSchema,
OrcExtraInfo(orcPartitionReaderContext.requestedMapping)))
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:
Do we have test case to test scenario:
FileNotFoundException and ignoreMissingFiles is false.
Make sure the extenal behavior is FileNotFoundException for this scenario.
I know the behavior is correct, it's better to have a test case.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — added a test that deletes a planned ORC file with ignoreMissingFiles=false and asserts a FileNotFoundException surfaces (in the failure cause chain) on both CPU and GPU.

logWarning(s"Skipped missing file: ${file.filePath}", e)
}
}
}
}
Expand Down
60 changes: 59 additions & 1 deletion tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
* Copyright (c) 2019-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,12 @@

package com.nvidia.spark.rapids

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.rapids.GpuFileSourceScanExec
import org.apache.spark.sql.rapids.shims.TrampolineConnectShims.SparkSession
import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType}

Expand Down Expand Up @@ -76,6 +80,60 @@ class OrcScanSuite extends SparkQueryCompareTestSuite {
StructField("_col2", StringType),
StructField("_col1", LongType))))) { frame => frame }

test("ORC coalescing reader honors ignoreMissingFiles") {
def collectAfterDeletingPlannedFiles(spark: SparkSession, checkGpu: Boolean): Seq[String] = {
import spark.implicits._

withTempPath { base =>
val basePath = base.getCanonicalPath

Seq("0").toDF("a").write.mode("overwrite").format("orc")
.save(new Path(basePath, "second").toString)
Seq("1").toDF("a").write.mode("overwrite").format("orc")
.save(new Path(basePath, "fourth").toString)

val firstPath = new Path(basePath, "first")
val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())

Seq("2").toDF("a").write.mode("overwrite").format("orc").save(firstPath.toString)
Seq("3").toDF("a").write.mode("overwrite").format("orc").save(thirdPath.toString)

val filesToDelete = Seq(firstPath, thirdPath).flatMap { path =>
fs.listStatus(path).filter(_.isFile).map(_.getPath)
}
val df = spark.read.format("orc").load(
firstPath.toString,
new Path(basePath, "second").toString,
thirdPath.toString,
new Path(basePath, "fourth").toString)
Comment on lines +108 to +112

if (checkGpu) {
val gpuScans = df.queryExecution.executedPlan.collect {
case _: GpuFileSourceScanExec => true
}
assert(gpuScans.nonEmpty, "ORC read is not running on GPU")
}

filesToDelete.foreach(file => fs.delete(file, false))
assert(fs.delete(thirdPath, true))

df.collect().map(_.getString(0)).sorted.toSeq
}
}

val conf = new SparkConf()
.set(SQLConf.USE_V1_SOURCE_LIST.key, "orc")
.set(SQLConf.IGNORE_MISSING_FILES.key, "true")
.set(RapidsConf.ORC_READER_TYPE.key, RapidsReaderType.COALESCING.toString)

val cpuResult = withCpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = false), conf)
val gpuResult = withGpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = true), conf)

assertResult(Seq("0", "1"))(cpuResult)
assertResult(cpuResult)(gpuResult)
}

/**
*
* The calendar of hybrid-Julian-calendar.orc file is hybrid Julian Gregorian
Expand Down
Loading