diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index bc246a152f6..59f7ec6462d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -652,19 +652,28 @@ case class GpuOrcMultiFilePartitionReaderFactory( metrics.getOrElse(FILTER_TIME, NoopMetric).ns { metrics.getOrElse(SCAN_TIME, NoopMetric).ns { - files.map { file => - val orcPartitionReaderContext = filterHandler.filterStripes(file, dataSchema, - readDataSchema, partitionSchema) - compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind, - new ArrayBuffer[OrcSingleStripeMeta]) ++= - orcPartitionReaderContext.blockIterator.map(block => - OrcSingleStripeMeta( - orcPartitionReaderContext.filePath, - OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)), - file.partitionValues, - OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema), - readDataSchema, - OrcExtraInfo(orcPartitionReaderContext.requestedMapping))) + files.foreach { file => + try { + val orcPartitionReaderContext = filterHandler.filterStripes(file, dataSchema, + readDataSchema, partitionSchema) + // filterStripes returns null for an empty ORC file; it has no stripes to + // contribute, so skip it (the single-file path uses an EmptyPartitionReader). + if (orcPartitionReaderContext != null) { + compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind, + new ArrayBuffer[OrcSingleStripeMeta]) ++= + orcPartitionReaderContext.blockIterator.map(block => + OrcSingleStripeMeta( + orcPartitionReaderContext.filePath, + OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)), + file.partitionValues, + OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema), + readDataSchema, + OrcExtraInfo(orcPartitionReaderContext.requestedMapping))) + } + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: ${file.filePath}", e) + } } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala index 0eafa74181a..88f8b5082dd 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,15 @@ package com.nvidia.spark.rapids +import java.io.FileNotFoundException + +import org.apache.hadoop.fs.Path + import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.GpuFileSourceScanExec import org.apache.spark.sql.rapids.shims.TrampolineConnectShims.SparkSession import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType} @@ -76,6 +83,127 @@ class OrcScanSuite extends SparkQueryCompareTestSuite { StructField("_col2", StringType), StructField("_col1", LongType))))) { frame => frame } + test("ORC coalescing reader honors ignoreMissingFiles") { + def collectAfterDeletingPlannedFiles(spark: SparkSession, checkGpu: Boolean): Seq[String] = { + import spark.implicits._ + + withTempPath { base => + val basePath = base.getCanonicalPath + + Seq("0").toDF("a").write.mode("overwrite").format("orc") + .save(new Path(basePath, "second").toString) + Seq("1").toDF("a").write.mode("overwrite").format("orc") + .save(new Path(basePath, "fourth").toString) + + val firstPath = new Path(basePath, "first") + val thirdPath = new Path(basePath, "third") + val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf()) + + Seq("2").toDF("a").write.mode("overwrite").format("orc").save(firstPath.toString) + Seq("3").toDF("a").write.mode("overwrite").format("orc").save(thirdPath.toString) + + val filesToDelete = Seq(firstPath, thirdPath).flatMap { path => + fs.listStatus(path).filter(_.isFile).map(_.getPath) + } + val df = spark.read.format("orc").load( + firstPath.toString, + new Path(basePath, "second").toString, + thirdPath.toString, + new Path(basePath, "fourth").toString) + val hasGpuScan = df.queryExecution.executedPlan.collect { + case scan: GpuFileSourceScanExec => + scan.selectedPartitions + true + case scan: FileSourceScanExec => + scan.selectedPartitions + false + } + assert(hasGpuScan.nonEmpty, "ORC read does not have a file source scan") + if (checkGpu) { + assert(hasGpuScan.contains(true), "ORC read is not running on GPU") + } + + filesToDelete.foreach(file => fs.delete(file, false)) + assert(fs.delete(thirdPath, true)) + + df.collect().map(_.getString(0)).sorted.toSeq + } + } + + val conf = new SparkConf() + .set(SQLConf.USE_V1_SOURCE_LIST.key, "orc") + .set(SQLConf.IGNORE_MISSING_FILES.key, "true") + .set(RapidsConf.ORC_READER_TYPE.key, RapidsReaderType.COALESCING.toString) + + val cpuResult = withCpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = false), conf) + val gpuResult = withGpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = true), conf) + + assertResult(Seq("0", "1"))(cpuResult) + assertResult(cpuResult)(gpuResult) + } + + private def causedByFileNotFound(t: Throwable): Boolean = + Iterator.iterate(t)(_.getCause).takeWhile(_ != null) + .exists(_.isInstanceOf[FileNotFoundException]) + + test("ORC coalescing reader throws FileNotFoundException when ignoreMissingFiles is false") { + def collectAfterDeletingPlannedFiles(spark: SparkSession, checkGpu: Boolean): Unit = { + import spark.implicits._ + + withTempPath { base => + val basePath = base.getCanonicalPath + + Seq("0").toDF("a").write.mode("overwrite").format("orc") + .save(new Path(basePath, "second").toString) + Seq("1").toDF("a").write.mode("overwrite").format("orc") + .save(new Path(basePath, "fourth").toString) + + val firstPath = new Path(basePath, "first") + val thirdPath = new Path(basePath, "third") + val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf()) + + Seq("2").toDF("a").write.mode("overwrite").format("orc").save(firstPath.toString) + Seq("3").toDF("a").write.mode("overwrite").format("orc").save(thirdPath.toString) + + val filesToDelete = Seq(firstPath, thirdPath).flatMap { path => + fs.listStatus(path).filter(_.isFile).map(_.getPath) + } + val df = spark.read.format("orc").load( + firstPath.toString, + new Path(basePath, "second").toString, + thirdPath.toString, + new Path(basePath, "fourth").toString) + val hasGpuScan = df.queryExecution.executedPlan.collect { + case scan: GpuFileSourceScanExec => + scan.selectedPartitions + true + case scan: FileSourceScanExec => + scan.selectedPartitions + false + } + assert(hasGpuScan.nonEmpty, "ORC read does not have a file source scan") + if (checkGpu) { + assert(hasGpuScan.contains(true), "ORC read is not running on GPU") + } + + filesToDelete.foreach(file => fs.delete(file, false)) + assert(fs.delete(thirdPath, true)) + + val e = intercept[Exception](df.collect()) + assert(causedByFileNotFound(e), + s"Expected a FileNotFoundException when ignoreMissingFiles=false, but got: $e") + } + } + + val conf = new SparkConf() + .set(SQLConf.USE_V1_SOURCE_LIST.key, "orc") + .set(SQLConf.IGNORE_MISSING_FILES.key, "false") + .set(RapidsConf.ORC_READER_TYPE.key, RapidsReaderType.COALESCING.toString) + + withCpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = false), conf) + withGpuSparkSession(collectAfterDeletingPlannedFiles(_, checkGpu = true), conf) + } + /** * * The calendar of hybrid-Julian-calendar.orc file is hybrid Julian Gregorian