Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
* Copyright (c) 2019-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -652,19 +652,28 @@ case class GpuOrcMultiFilePartitionReaderFactory(

metrics.getOrElse(FILTER_TIME, NoopMetric).ns {
metrics.getOrElse(SCAN_TIME, NoopMetric).ns {
files.map { file =>
val orcPartitionReaderContext = filterHandler.filterStripes(file, dataSchema,
readDataSchema, partitionSchema)
compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind,
new ArrayBuffer[OrcSingleStripeMeta]) ++=
orcPartitionReaderContext.blockIterator.map(block =>
OrcSingleStripeMeta(
orcPartitionReaderContext.filePath,
OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)),
file.partitionValues,
OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema),
readDataSchema,
OrcExtraInfo(orcPartitionReaderContext.requestedMapping)))
files.foreach { file =>
try {
val orcPartitionReaderContext = filterHandler.filterStripes(file, dataSchema,
readDataSchema, partitionSchema)
// filterStripes returns null for an empty ORC file; it has no stripes to
// contribute, so skip it (the single-file path uses an EmptyPartitionReader).
if (orcPartitionReaderContext != null) {
compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind,
new ArrayBuffer[OrcSingleStripeMeta]) ++=
orcPartitionReaderContext.blockIterator.map(block =>
OrcSingleStripeMeta(
orcPartitionReaderContext.filePath,
OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)),
file.partitionValues,
OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema),
readDataSchema,
OrcExtraInfo(orcPartitionReaderContext.requestedMapping)))
}
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:
Do we have test case to test scenario:
FileNotFoundException and ignoreMissingFiles is false.
Make sure the extenal behavior is FileNotFoundException for this scenario.
I know the behavior is correct, it's better to have a test case.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — added a test that deletes a planned ORC file with ignoreMissingFiles=false and asserts a FileNotFoundException surfaces (in the failure cause chain) on both CPU and GPU.

logWarning(s"Skipped missing file: ${file.filePath}", e)
}
}
}
}
Expand Down
130 changes: 129 additions & 1 deletion tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
* Copyright (c) 2019-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,15 @@

package com.nvidia.spark.rapids

import java.io.FileNotFoundException

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuFileSourceScanExec
import org.apache.spark.sql.rapids.shims.TrampolineConnectShims.SparkSession
import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType}

Expand Down Expand Up @@ -76,6 +83,127 @@ class OrcScanSuite extends SparkQueryCompareTestSuite {
StructField("_col2", StringType),
StructField("_col1", LongType))))) { frame => frame }

test("ORC coalescing reader honors ignoreMissingFiles") {
def collectAfterDeletingPlannedFiles(spark: SparkSession, checkGpu: Boolean): Seq[String] = {
import spark.implicits._

withTempPath { base =>
val basePath = base.getCanonicalPath

Seq("0").toDF("a").write.mode("overwrite").format("orc")
.save(new Path(basePath, "second").toString)
Seq("1").toDF("a").write.mode("overwrite").format("orc")
.save(new Path(basePath, "fourth").toString)

val firstPath = new Path(basePath, "first")
val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())

Seq("2").toDF("a").write.mode("overwrite").format("orc").save(firstPath.toString)
Seq("3").toDF("a").write.mode("overwrite").format("orc").save(thirdPath.toString)

val filesToDelete = Seq(firstPath, thirdPath).flatMap { path =>
fs.listStatus(path).filter(_.isFile).map(_.getPath)
}
val df = spark.read.format("orc").load(
firstPath.toString,
new Path(basePath, "second").toString,
thirdPath.toString,
new Path(basePath, "fourth").toString)
val hasGpuScan = df.queryExecution.executedPlan.collect {
case scan: GpuFileSourceScanExec =>
scan.selectedPartitions
true
case scan: FileSourceScanExec =>
scan.selectedPartitions
false
}
assert(hasGpuScan.nonEmpty, "ORC read does not have a file source scan")
if (checkGpu) {
assert(hasGpuScan.contains(true), "ORC read is not running on GPU")
}

filesToDelete.foreach(file => fs.delete(file, false))
assert(fs.delete(thirdPath, true))

df.collect().map(_.getString(0)).sorted.toSeq
}
}

val conf = new SparkConf()
.set(SQLConf.USE_V1_SOURCE_LIST.key, "orc")
.set(SQLConf.IGNORE_MISSING_FILES.key, "true")
.set(RapidsConf.ORC_READER_TYPE.key, RapidsReaderType.COALESCING.toString)

val cpuResult = withCpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = false), conf)
val gpuResult = withGpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = true), conf)

assertResult(Seq("0", "1"))(cpuResult)
assertResult(cpuResult)(gpuResult)
}

private def causedByFileNotFound(t: Throwable): Boolean =
Iterator.iterate(t)(_.getCause).takeWhile(_ != null)
.exists(_.isInstanceOf[FileNotFoundException])

test("ORC coalescing reader throws FileNotFoundException when ignoreMissingFiles is false") {
def collectAfterDeletingPlannedFiles(spark: SparkSession, checkGpu: Boolean): Unit = {
import spark.implicits._

withTempPath { base =>
val basePath = base.getCanonicalPath

Seq("0").toDF("a").write.mode("overwrite").format("orc")
.save(new Path(basePath, "second").toString)
Seq("1").toDF("a").write.mode("overwrite").format("orc")
.save(new Path(basePath, "fourth").toString)

val firstPath = new Path(basePath, "first")
val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())

Seq("2").toDF("a").write.mode("overwrite").format("orc").save(firstPath.toString)
Seq("3").toDF("a").write.mode("overwrite").format("orc").save(thirdPath.toString)

val filesToDelete = Seq(firstPath, thirdPath).flatMap { path =>
fs.listStatus(path).filter(_.isFile).map(_.getPath)
}
val df = spark.read.format("orc").load(
firstPath.toString,
new Path(basePath, "second").toString,
thirdPath.toString,
new Path(basePath, "fourth").toString)
val hasGpuScan = df.queryExecution.executedPlan.collect {
case scan: GpuFileSourceScanExec =>
scan.selectedPartitions
true
case scan: FileSourceScanExec =>
scan.selectedPartitions
false
}
assert(hasGpuScan.nonEmpty, "ORC read does not have a file source scan")
if (checkGpu) {
assert(hasGpuScan.contains(true), "ORC read is not running on GPU")
}

filesToDelete.foreach(file => fs.delete(file, false))
assert(fs.delete(thirdPath, true))

val e = intercept[Exception](df.collect())
assert(causedByFileNotFound(e),
s"Expected a FileNotFoundException when ignoreMissingFiles=false, but got: $e")
}
}

val conf = new SparkConf()
.set(SQLConf.USE_V1_SOURCE_LIST.key, "orc")
.set(SQLConf.IGNORE_MISSING_FILES.key, "false")
.set(RapidsConf.ORC_READER_TYPE.key, RapidsReaderType.COALESCING.toString)

withCpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = false), conf)
withGpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = true), conf)
}

/**
*
* The calendar of hybrid-Julian-calendar.orc file is hybrid Julian Gregorian
Expand Down
Loading