From 1a09e0707c6a3d9be73d37dafaa032f51bb10b83 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Wed, 17 Jun 2026 16:38:57 +0800 Subject: [PATCH 1/4] Fix ORC coalescing ignore missing files Signed-off-by: Allen Xu --- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 33 +++++----- .../nvidia/spark/rapids/OrcScanSuite.scala | 60 ++++++++++++++++++- 2 files changed, 78 insertions(+), 15 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index bc246a152f6..7d0026d00f0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -652,19 +652,24 @@ case class GpuOrcMultiFilePartitionReaderFactory( metrics.getOrElse(FILTER_TIME, NoopMetric).ns { metrics.getOrElse(SCAN_TIME, NoopMetric).ns { - files.map { file => - val orcPartitionReaderContext = filterHandler.filterStripes(file, dataSchema, - readDataSchema, partitionSchema) - compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind, - new ArrayBuffer[OrcSingleStripeMeta]) ++= - orcPartitionReaderContext.blockIterator.map(block => - OrcSingleStripeMeta( - orcPartitionReaderContext.filePath, - OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)), - file.partitionValues, - OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema), - readDataSchema, - OrcExtraInfo(orcPartitionReaderContext.requestedMapping))) + files.foreach { file => + try { + val orcPartitionReaderContext = filterHandler.filterStripes(file, dataSchema, + readDataSchema, partitionSchema) + compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind, + new ArrayBuffer[OrcSingleStripeMeta]) ++= + orcPartitionReaderContext.blockIterator.map(block => + OrcSingleStripeMeta( + orcPartitionReaderContext.filePath, + OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)), + file.partitionValues, + OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema), + readDataSchema, + OrcExtraInfo(orcPartitionReaderContext.requestedMapping))) + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: ${file.filePath}", e) + } } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala index 0eafa74181a..18c079a15af 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,12 @@ package com.nvidia.spark.rapids +import org.apache.hadoop.fs.Path + import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.functions.col +import org.apache.spark.sql.rapids.GpuFileSourceScanExec import org.apache.spark.sql.rapids.shims.TrampolineConnectShims.SparkSession import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType} @@ -76,6 +80,60 @@ class OrcScanSuite extends SparkQueryCompareTestSuite { StructField("_col2", StringType), StructField("_col1", LongType))))) { frame => frame } + test("ORC coalescing reader honors ignoreMissingFiles") { + def collectAfterDeletingPlannedFiles(spark: SparkSession, checkGpu: Boolean): Seq[String] = { + import spark.implicits._ + + withTempPath { base => + val basePath = base.getCanonicalPath + + Seq("0").toDF("a").write.mode("overwrite").format("orc") + .save(new Path(basePath, "second").toString) + Seq("1").toDF("a").write.mode("overwrite").format("orc") + .save(new Path(basePath, "fourth").toString) + + val firstPath = new Path(basePath, "first") + val thirdPath = new Path(basePath, "third") + val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf()) + + Seq("2").toDF("a").write.mode("overwrite").format("orc").save(firstPath.toString) + Seq("3").toDF("a").write.mode("overwrite").format("orc").save(thirdPath.toString) + + val filesToDelete = Seq(firstPath, thirdPath).flatMap { path => + fs.listStatus(path).filter(_.isFile).map(_.getPath) + } + val df = spark.read.format("orc").load( + firstPath.toString, + new Path(basePath, "second").toString, + thirdPath.toString, + new Path(basePath, "fourth").toString) + + if (checkGpu) { + val gpuScans = df.queryExecution.executedPlan.collect { + case _: GpuFileSourceScanExec => true + } + assert(gpuScans.nonEmpty, "ORC read is not running on GPU") + } + + filesToDelete.foreach(file => fs.delete(file, false)) + assert(fs.delete(thirdPath, true)) + + df.collect().map(_.getString(0)).sorted.toSeq + } + } + + val conf = new SparkConf() + .set(SQLConf.USE_V1_SOURCE_LIST.key, "orc") + .set(SQLConf.IGNORE_MISSING_FILES.key, "true") + .set(RapidsConf.ORC_READER_TYPE.key, RapidsReaderType.COALESCING.toString) + + val cpuResult = withCpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = false), conf) + val gpuResult = withGpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = true), conf) + + assertResult(Seq("0", "1"))(cpuResult) + assertResult(cpuResult)(gpuResult) + } + /** * * The calendar of hybrid-Julian-calendar.orc file is hybrid Julian Gregorian From 007cceaf87acef0cdfc6de997388fa782410e803 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Wed, 17 Jun 2026 17:10:25 +0800 Subject: [PATCH 2/4] Address ORC scan review feedback Signed-off-by: Allen Xu --- .../com/nvidia/spark/rapids/OrcScanSuite.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala index 18c079a15af..c8d7769ca9a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala @@ -19,8 +19,9 @@ package com.nvidia.spark.rapids import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuFileSourceScanExec import org.apache.spark.sql.rapids.shims.TrampolineConnectShims.SparkSession import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType} @@ -107,12 +108,17 @@ class OrcScanSuite extends SparkQueryCompareTestSuite { new Path(basePath, "second").toString, thirdPath.toString, new Path(basePath, "fourth").toString) - + val hasGpuScan = df.queryExecution.executedPlan.collect { + case scan: GpuFileSourceScanExec => + scan.selectedPartitions + true + case scan: FileSourceScanExec => + scan.selectedPartitions + false + } + assert(hasGpuScan.nonEmpty, "ORC read does not have a file source scan") if (checkGpu) { - val gpuScans = df.queryExecution.executedPlan.collect { - case _: GpuFileSourceScanExec => true - } - assert(gpuScans.nonEmpty, "ORC read is not running on GPU") + assert(hasGpuScan.contains(true), "ORC read is not running on GPU") } filesToDelete.foreach(file => fs.delete(file, false)) From bc2baf75b6ff0ffd009b52c49e657c016986fc0d Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 25 Jun 2026 14:38:57 +0800 Subject: [PATCH 3/4] Add ORC ignoreMissingFiles=false negative test (FileNotFoundException) Complements the existing "honors ignoreMissingFiles" test with the negative case: when spark.sql.files.ignoreMissingFiles=false and a planned ORC file is deleted before read, the coalescing reader must surface a FileNotFoundException (verified via a cause-chain walk) on both CPU and GPU. Local validation: OrcScanSuite => Tests: succeeded 12, failed 0, canceled 0, ignored 1, pending 0; BUILD SUCCESS. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Allen Xu --- .../nvidia/spark/rapids/OrcScanSuite.scala | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala index c8d7769ca9a..88f8b5082dd 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala @@ -16,6 +16,8 @@ package com.nvidia.spark.rapids +import java.io.FileNotFoundException + import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf @@ -140,6 +142,68 @@ class OrcScanSuite extends SparkQueryCompareTestSuite { assertResult(cpuResult)(gpuResult) } + private def causedByFileNotFound(t: Throwable): Boolean = + Iterator.iterate(t)(_.getCause).takeWhile(_ != null) + .exists(_.isInstanceOf[FileNotFoundException]) + + test("ORC coalescing reader throws FileNotFoundException when ignoreMissingFiles is false") { + def collectAfterDeletingPlannedFiles(spark: SparkSession, checkGpu: Boolean): Unit = { + import spark.implicits._ + + withTempPath { base => + val basePath = base.getCanonicalPath + + Seq("0").toDF("a").write.mode("overwrite").format("orc") + .save(new Path(basePath, "second").toString) + Seq("1").toDF("a").write.mode("overwrite").format("orc") + .save(new Path(basePath, "fourth").toString) + + val firstPath = new Path(basePath, "first") + val thirdPath = new Path(basePath, "third") + val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf()) + + Seq("2").toDF("a").write.mode("overwrite").format("orc").save(firstPath.toString) + Seq("3").toDF("a").write.mode("overwrite").format("orc").save(thirdPath.toString) + + val filesToDelete = Seq(firstPath, thirdPath).flatMap { path => + fs.listStatus(path).filter(_.isFile).map(_.getPath) + } + val df = spark.read.format("orc").load( + firstPath.toString, + new Path(basePath, "second").toString, + thirdPath.toString, + new Path(basePath, "fourth").toString) + val hasGpuScan = df.queryExecution.executedPlan.collect { + case scan: GpuFileSourceScanExec => + scan.selectedPartitions + true + case scan: FileSourceScanExec => + scan.selectedPartitions + false + } + assert(hasGpuScan.nonEmpty, "ORC read does not have a file source scan") + if (checkGpu) { + assert(hasGpuScan.contains(true), "ORC read is not running on GPU") + } + + filesToDelete.foreach(file => fs.delete(file, false)) + assert(fs.delete(thirdPath, true)) + + val e = intercept[Exception](df.collect()) + assert(causedByFileNotFound(e), + s"Expected a FileNotFoundException when ignoreMissingFiles=false, but got: $e") + } + } + + val conf = new SparkConf() + .set(SQLConf.USE_V1_SOURCE_LIST.key, "orc") + .set(SQLConf.IGNORE_MISSING_FILES.key, "false") + .set(RapidsConf.ORC_READER_TYPE.key, RapidsReaderType.COALESCING.toString) + + withCpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = false), conf) + withGpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = true), conf) + } + /** * * The calendar of hybrid-Julian-calendar.orc file is hybrid Julian Gregorian From c25e238a9881c15fd21ec0e7f53f6df9568474cc Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Fri, 26 Jun 2026 14:05:02 +0800 Subject: [PATCH 4/4] Guard null filterStripes context in ORC coalescing reader buildBaseColumnarReaderForCoalescing dereferenced orcPartitionReaderContext.compressionKind without a null check. filterStripes returns null for an empty ORC file (the resultedColPruneInfo .isEmpty branch), so an empty file in a coalesced read threw NPE. Wrap the stripe-collection in a non-null branch and skip the file, matching the single-file path that returns EmptyPartitionReader for a null context. Addresses review comment on #15103 (res-life r3472703942). Validated: mvn package -pl tests -am -Dbuildver=330 \ -DwildcardSuites=com.nvidia.spark.rapids.OrcScanSuite -> Tests: succeeded 12, failed 0, canceled 0, ignored 1, pending 0; BUILD SUCCESS. ### Review notes - nt-code-review: 0 must-fix. GPU-CPU parity confirmed (null -> skip = zero rows = single-file EmptyPartitionReader); coalescing was the only unguarded ORC path (cloud path already guards); count(*) unaffected. - Informational (not addressed): no test exercises the empty-schema-ORC coalescing path specifically; existing OrcScanSuite cases cover the FileNotFoundException catch path. The guard mirrors the proven single-file null handling. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Allen Xu --- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 7d0026d00f0..59f7ec6462d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -656,16 +656,20 @@ case class GpuOrcMultiFilePartitionReaderFactory( try { val orcPartitionReaderContext = filterHandler.filterStripes(file, dataSchema, readDataSchema, partitionSchema) - compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind, - new ArrayBuffer[OrcSingleStripeMeta]) ++= - orcPartitionReaderContext.blockIterator.map(block => - OrcSingleStripeMeta( - orcPartitionReaderContext.filePath, - OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)), - file.partitionValues, - OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema), - readDataSchema, - OrcExtraInfo(orcPartitionReaderContext.requestedMapping))) + // filterStripes returns null for an empty ORC file; it has no stripes to + // contribute, so skip it (the single-file path uses an EmptyPartitionReader). + if (orcPartitionReaderContext != null) { + compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind, + new ArrayBuffer[OrcSingleStripeMeta]) ++= + orcPartitionReaderContext.blockIterator.map(block => + OrcSingleStripeMeta( + orcPartitionReaderContext.filePath, + OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)), + file.partitionValues, + OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema), + readDataSchema, + OrcExtraInfo(orcPartitionReaderContext.requestedMapping))) + } } catch { case e: FileNotFoundException if ignoreMissingFiles => logWarning(s"Skipped missing file: ${file.filePath}", e)