From c04c7b192232c4c32e5b07df14d55dbb621daae4 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 10 Jun 2026 08:29:33 -0700 Subject: [PATCH 1/3] Move Kudo dump helpers to Java-friendly APIs Signed-off-by: Gera Shegalov --- .../com/nvidia/spark/rapids/FileUtils.java | 65 ++++++++++++++++++ .../spark/rapids/SpillableKudoTable.java | 67 +++++++++++++++++++ .../com/nvidia/spark/rapids/DumpUtils.scala | 31 ++++++--- .../com/nvidia/spark/rapids/FileUtils.scala | 46 ------------- .../spark/rapids/GpuShuffleCoalesceExec.scala | 35 +++++----- .../spark/rapids/SpillableKudoTable.scala | 59 ---------------- .../nvidia/spark/rapids/DumpUtilsSuite.scala | 4 +- 7 files changed, 177 insertions(+), 130 deletions(-) create mode 100644 sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/FileUtils.java create mode 100644 sql-plugin/src/main/java/com/nvidia/spark/rapids/SpillableKudoTable.java delete mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/FileUtils.scala delete mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableKudoTable.scala diff --git a/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/FileUtils.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/FileUtils.java new file mode 100644 index 00000000000..06059b11e50 --- /dev/null +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/FileUtils.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2019-2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public final class FileUtils { + private FileUtils() {} + + public static final class TempFile { + private final FSDataOutputStream outputStream; + private final Path path; + + TempFile(FSDataOutputStream outputStream, Path path) { + this.outputStream = outputStream; + this.path = path; + } + + public FSDataOutputStream getOutputStream() { + return outputStream; + } + + public Path getPath() { + return path; + } + } + + public static TempFile createTempFile( + Configuration conf, String pathPrefix, String pathSuffix) throws IOException { + FileSystem fs = new Path(pathPrefix).getFileSystem(conf); + Random rnd = new Random(); + String suffix = pathSuffix != null ? pathSuffix : ""; + while (true) { + Path path = new Path(pathPrefix + rnd.nextInt(Integer.MAX_VALUE) + suffix); + if (!fs.exists(path)) { + try { + return new TempFile(fs.create(path, false), path); + } catch (FileAlreadyExistsException e) { + // Retry if another writer won the race between exists and create. + } + } + } + } +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/SpillableKudoTable.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/SpillableKudoTable.java new file mode 100644 index 00000000000..a2585829858 --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/SpillableKudoTable.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2025-2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import ai.rapids.cudf.HostMemoryBuffer; +import com.nvidia.spark.rapids.jni.kudo.KudoTable; +import com.nvidia.spark.rapids.jni.kudo.KudoTableHeader; + +public class SpillableKudoTable implements AutoCloseable { + public final KudoTableHeader header; + public final long length; + private final SpillableHostBuffer shb; + + public SpillableKudoTable(KudoTableHeader header, long length, SpillableHostBuffer shb) { + this.header = header; + this.length = length; + this.shb = shb; + } + + public static SpillableKudoTable from(KudoTableHeader header, HostMemoryBuffer buffer) { + if (buffer == null) { + return new SpillableKudoTable(header, 0, null); + } else { + return new SpillableKudoTable( + header, + buffer.getLength(), + SpillableHostBuffer.apply( + buffer, + buffer.getLength(), + SpillPriorities.ACTIVE_BATCHING_PRIORITY)); + } + } + + public KudoTable makeKudoTable() { + if (shb == null) { + return new KudoTable(header, null); + } else { + return new KudoTable(header, shb.getHostBuffer()); + } + } + + @Override + public String toString() { + return "SpillableKudoTable{header=" + header + ", shb=" + shb + '}'; + } + + @Override + public void close() { + if (shb != null) { + shb.close(); + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index c59977cdab6..c5a1dad551a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,10 +28,15 @@ import com.nvidia.spark.rapids.jni.kudo.KudoSerializer import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration -import org.apache.spark.internal.Logging import org.apache.spark.sql.vectorized.ColumnarBatch -object DumpUtils extends Logging { +object DumpUtils { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + /** * Debug utility to dump a host memory buffer to a file. * @@ -51,7 +56,8 @@ object DumpUtils extends Logging { prefix: String, suffix: String): String = { try { - val (out, path) = FileUtils.createTempFile(conf, prefix, suffix) + val tempFile = FileUtils.createTempFile(conf, prefix, suffix) + val out = tempFile.getOutputStream withResource(out) { _ => withResource(data.slice(offset, len)) { hmb => withResource(new HostMemoryInputStream(hmb, hmb.getLength)) { in => @@ -59,7 +65,7 @@ object DumpUtils extends Logging { } } } - path.toString + tempFile.getPath.toString } catch { case e: Exception => log.error(s"Error attempting to dump data", e) @@ -73,7 +79,8 @@ object DumpUtils extends Logging { prefix: String, suffix: String): String = { try { - val (out, path) = FileUtils.createTempFile(conf, prefix, suffix) + val tempFile = FileUtils.createTempFile(conf, prefix, suffix) + val out = tempFile.getOutputStream withResource(out) { _ => data.foreach { hmb => withResource(new HostMemoryInputStream(hmb, hmb.getLength)) { in => @@ -81,7 +88,7 @@ object DumpUtils extends Logging { } } } - path.toString + tempFile.getPath.toString } catch { case e: Exception => log.error(s"Error attempting to dump data", e) @@ -324,7 +331,15 @@ private class ColumnIndex() { } } -object ParquetDumper extends Logging { +object ParquetDumper { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + val COMPRESS_TYPE = CompressionType.SNAPPY def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ColumnWriterOptions]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FileUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FileUtils.scala deleted file mode 100644 index 5fe0f53d2d6..00000000000 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FileUtils.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2019, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids - -import scala.util.Random - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path} - -object FileUtils { - def createTempFile( - conf: Configuration, - pathPrefix: String, - pathSuffix: String): (FSDataOutputStream, Path) = { - val fs = new Path(pathPrefix).getFileSystem(conf) - val rnd = new Random - var out: FSDataOutputStream = null - var path: Path = null - var succeeded = false - val suffix = if (pathSuffix != null) pathSuffix else "" - while (!succeeded) { - path = new Path(pathPrefix + rnd.nextInt(Integer.MAX_VALUE) + suffix) - if (!fs.exists(path)) { - scala.util.control.Exception.ignoring(classOf[FileAlreadyExistsException]) { - out = fs.create(path, false) - succeeded = true - } - } - } - (out, path) - } -} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala index ac12baea358..851345b3dfe 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala @@ -38,7 +38,6 @@ import com.nvidia.spark.rapids.shims.ShimUnaryExecNode import org.apache.hadoop.conf.Configuration import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -113,7 +112,13 @@ case class CoalesceReadOption private( kudoEnabled: Boolean, kudoMode: RapidsConf.ShuffleKudoMode.Value, kudoDebugMode: DumpOption, kudoDebugDumpPrefix: Option[String], useAsync: Boolean) -object CoalesceReadOption extends Logging { +object CoalesceReadOption { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + private def resolveUseAsync(kudoMode: RapidsConf.ShuffleKudoMode.Value, useAsync: Boolean): Boolean = { @@ -190,10 +195,10 @@ object GpuShuffleCoalesceUtils { val secondHalfSize = newTargetSize.dataSize - firstHalfSize Seq( - CloseableTableSeqWithTargetSize(firstHalfTables, - AutoCloseableTargetSize(targetByteSize, newTargetSize.minSize, firstHalfSize)), - CloseableTableSeqWithTargetSize(secondHalfTables, - AutoCloseableTargetSize(targetByteSize, newTargetSize.minSize, secondHalfSize)) + new CloseableTableSeqWithTargetSize(firstHalfTables, + new AutoCloseableTargetSize(targetByteSize, newTargetSize.minSize, firstHalfSize)), + new CloseableTableSeqWithTargetSize(secondHalfTables, + new AutoCloseableTargetSize(targetByteSize, newTargetSize.minSize, secondHalfSize)) ) } } @@ -339,7 +344,7 @@ class JCudfTableOperator } } -case class RowCountOnlyMergeResult(rowCount: Int) extends CoalescedHostResult { +class RowCountOnlyMergeResult(val rowCount: Int) extends CoalescedHostResult { override def toGpuBatch(dataTypes: Array[DataType]): ColumnarBatch = { new ColumnarBatch(Array.empty, rowCount) } @@ -367,8 +372,8 @@ class KudoTableOperator(kudo: Option[KudoSerializer], readOption: CoalesceReadOp val dumpPrefix = readOption.kudoDebugDumpPrefix if (dumpOption != DumpOption.Never && dumpPrefix.isDefined) { val updatedPrefix = s"${dumpPrefix.get}_${taskIdentifier}" - lazy val (out, path) = createTempFile(new Configuration(), updatedPrefix, ".bin") - new MergeOptions(dumpOption, () => out, path.toString) + lazy val tempFile = createTempFile(new Configuration(), updatedPrefix, ".bin") + new MergeOptions(dumpOption, () => tempFile.getOutputStream, tempFile.getPath.toString) } else { new MergeOptions(dumpOption, null, null) } @@ -379,7 +384,7 @@ class KudoTableOperator(kudo: Option[KudoSerializer], readOption: CoalesceReadOp val numCols = columns.head.spillableKudoTable.header.getNumColumns if (numCols == 0) { val totalRowsNum = columns.map(getNumRows).sum - RowCountOnlyMergeResult(totalRowsNum) + new RowCountOnlyMergeResult(totalRowsNum) } else { // "lock" all input tables in memory before merge withResource(columns.safeMap(_.spillableKudoTable.makeKudoTable)) { kudoTables => @@ -455,9 +460,9 @@ class KudoGpuTableOperator(dataTypes: Array[DataType]) * splitting based on byte size when OOM occurs. Extends Seq[T] so it can be * used directly as a sequence. */ -case class CloseableTableSeqWithTargetSize[T <: AutoCloseable]( - tables: Seq[T], - targetSize: AutoCloseableTargetSize) extends Seq[T] with AutoCloseable { +class CloseableTableSeqWithTargetSize[T <: AutoCloseable]( + val tables: Seq[T], + val targetSize: AutoCloseableTargetSize) extends Seq[T] with AutoCloseable with Serializable { override def close(): Unit = { tables.foreach(_.safeClose()) targetSize.close() @@ -548,9 +553,9 @@ abstract class CoalesceIteratorBase[T <: AutoCloseable : ClassTag, R <: AutoClos splitPolicy match { case Some(policy) => val dataSize = tablesSeq.map(tableOperator.getDataLen).sum - val targetSizeWrapper = AutoCloseableTargetSize(targetBatchByteSize, + val targetSizeWrapper = new AutoCloseableTargetSize(targetBatchByteSize, minSplitSizeForRetry, dataSize) - val wrapper = CloseableTableSeqWithTargetSize(tablesSeq, targetSizeWrapper) + val wrapper = new CloseableTableSeqWithTargetSize(tablesSeq, targetSizeWrapper) val wrapperIter = Iterator(wrapper) inputIter = Some(wrapperIter) val resultIter = withRetry(wrapperIter, policy) { wrappedSeq => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableKudoTable.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableKudoTable.scala deleted file mode 100644 index 5bba5169de2..00000000000 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableKudoTable.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2025, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nvidia.spark.rapids - -import ai.rapids.cudf.HostMemoryBuffer -import com.nvidia.spark.rapids.jni.kudo.KudoTable -import com.nvidia.spark.rapids.jni.kudo.KudoTableHeader - - -class SpillableKudoTable(val header: KudoTableHeader, - val length: Long, - shb: SpillableHostBuffer) - extends AutoCloseable { - - def makeKudoTable: KudoTable = { - if (shb == null) { - new KudoTable(header, null) - } else { - new KudoTable(header, shb.getHostBuffer()) - } - } - - override def toString: String = - "SpillableKudoTable{header=" + this.header + ", shb=" + this.shb + '}' - - override def close(): Unit = { - if (shb != null) shb.close() - } -} - -object SpillableKudoTable { - def apply(header: KudoTableHeader, buffer: HostMemoryBuffer): SpillableKudoTable = { - if (buffer == null) { - new SpillableKudoTable(header, 0, null) - } else { - new SpillableKudoTable( - header, - buffer.getLength, - SpillableHostBuffer.apply( - buffer, - buffer.getLength, - SpillPriorities.ACTIVE_BATCHING_PRIORITY) - ) - } - } -} \ No newline at end of file diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/DumpUtilsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/DumpUtilsSuite.scala index 5c6d0fbe634..8bd42e8153c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/DumpUtilsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/DumpUtilsSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -120,7 +120,7 @@ class DumpUtilsSuite extends AnyFunSuite with BeforeAndAfterAll { val header = headerOptional.get() val buffer = HostMemoryBuffer.allocate(header.getTotalDataLen()) buffer.copyFromStream(0, din, header.getTotalDataLen()) - val spillableKudoTable = SpillableKudoTable(header, buffer) + val spillableKudoTable = SpillableKudoTable.from(header, buffer) withResource(new KudoSerializedTableColumn(spillableKudoTable)) { column => val batch = new ColumnarBatch(Array(column.asInstanceOf[GpuColumnVectorBase]), spillableKudoTable.header.getNumRows) From 7832bff32f100868431d16d6434faa2bffe9dc35 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 10 Jun 2026 08:30:03 -0700 Subject: [PATCH 2/3] Adapt file scan callers to columnar helper modules Signed-off-by: Gera Shegalov --- .../GpuCoalescingIcebergParquetReader.scala | 6 +- .../spark/rapids/AvroDataFileReader.scala | 34 +--- .../com/nvidia/spark/rapids/GpuCSVScan.scala | 18 +- .../rapids/GpuColumnarBatchSerializer.scala | 2 +- .../com/nvidia/spark/rapids/GpuExec.scala | 37 ++++ .../spark/rapids/GpuMultiFileReader.scala | 92 ++++----- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 192 ++++++++++-------- .../spark/rapids/GpuParquetFileFormat.scala | 17 +- .../spark/rapids/GpuReadCSVFileFormat.scala | 6 +- .../spark/rapids/GpuReadOrcFileFormat.scala | 6 +- .../nvidia/spark/rapids/dataSourceUtil.scala | 14 +- .../spark/rapids/parquet/GpuParquetScan.scala | 80 ++++---- .../shims/GpuOrcDataReader320Plus.scala | 8 +- .../rapids/shims/GpuOrcDataReaderBase.scala | 9 +- .../sql/hive/rapids/GpuHiveFileFormat.scala | 7 +- .../hive/rapids/GpuHiveTableScanExec.scala | 49 ++--- .../spark/sql/rapids/AvroProviderImpl.scala | 4 +- .../apache/spark/sql/rapids/GpuAvroScan.scala | 96 ++++----- .../spark/sql/rapids/GpuDataSourceBase.scala | 7 +- .../spark/sql/rapids/GpuOrcFileFormat.scala | 11 +- .../sql/rapids/GpuReadAvroFileFormat.scala | 6 +- .../sql/rapids/execution/TrampolineUtil.scala | 3 + 22 files changed, 384 insertions(+), 320 deletions(-) diff --git a/iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/parquet/GpuCoalescingIcebergParquetReader.scala b/iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/parquet/GpuCoalescingIcebergParquetReader.scala index 56ab66a20ee..2df18a7d7fd 100644 --- a/iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/parquet/GpuCoalescingIcebergParquetReader.scala +++ b/iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/parquet/GpuCoalescingIcebergParquetReader.scala @@ -68,11 +68,11 @@ class GpuCoalescingIcebergParquetReader( conf.metrics) info.blocks.map { block => - ParquetSingleDataBlockMeta( + new ParquetSingleDataBlockMeta( info.filePath, - ParquetDataBlock(block, CpuCompressionConfig.disabled()), + new ParquetDataBlock(block, CpuCompressionConfig.disabled()), InternalRow.empty, - ParquetSchemaWrapper(info.schema), + new ParquetSchemaWrapper(info.schema), info.readSchema, IcebergParquetExtraInfo( info.dateRebaseMode, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala index aaecf016871..bbc8f2a5dfd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala @@ -33,7 +33,7 @@ import org.apache.commons.io.output.CountingOutputStream import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.sql.rapids.shims.TrampolineConnectShims +import org.apache.spark.sql.rapids.execution.TrampolineUtil private[rapids] class AvroSeekableInputStream(in: SeekableInput) extends InputStream with SeekableInput { @@ -82,7 +82,7 @@ case class Header( @transient lazy val schema: Schema = { getMetaString(SCHEMA) - .map(s => TrampolineConnectShims.createSchemaParser().parse(s)) + .map(s => TrampolineUtil.createSchemaParser().parse(s)) .orNull } @@ -127,26 +127,6 @@ object Header { } } -/** - * The each Avro block information - * - * @param blockStart the start of block - * @param blockSize the whole block size = the size between two sync buffers + sync buffer - * @param dataSize the block data size - * @param count how many entries in this block - */ -case class BlockInfo(blockStart: Long, blockSize: Long, dataSize: Long, count: Long) - -/** - * The mutable version of the BlockInfo without block start. - * This is for reusing an existing instance when accessing data in the iterator pattern. - * - * @param blockSize the whole block size (the size between two sync buffers + sync buffer size) - * @param dataSize the data size in this block - * @param count how many entries in this block - */ -case class MutableBlockInfo(var blockSize: Long, var dataSize: Long, var count: Long) - /** The parent of the Rapids Avro file readers */ abstract class AvroFileReader(si: SeekableInput) extends AutoCloseable { // Children should update this pointer accordingly. @@ -328,7 +308,7 @@ class AvroMetaFileReader(si: SeekableInput) extends AvroFileReader(si) { val dataSizeLongLen = BinaryData.encodeLong(blockDataSize, buf, 0) // (len of entries) + (len of block size) + (block size) + (sync size) val blockLength = countLongLen + dataSizeLongLen + blockDataSize + SYNC_SIZE - blocks += BlockInfo(curBlockStart, blockLength, blockDataSize, blockCount) + blocks += new BlockInfo(curBlockStart, blockLength, blockDataSize, blockCount) // Do we need to check the SYNC BUFFER, or just let cudf do it? curBlockStart += blockLength @@ -405,11 +385,11 @@ class AvroDataFileReader(si: SeekableInput) extends AvroFileReader(si) { throw new NoSuchElementException } if (reuse == null) { - MutableBlockInfo(curBlockSize, curDataSize, curCount) + new MutableBlockInfo(curBlockSize, curDataSize, curCount) } else { - reuse.blockSize = curBlockSize - reuse.dataSize = curDataSize - reuse.count = curCount + reuse.setBlockSize(curBlockSize) + reuse.setDataSize(curDataSize) + reuse.setCount(curCount) reuse } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCSVScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCSVScan.scala index 75bd875743f..52c720650d3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCSVScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCSVScan.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.csv.{CSVOptions, GpuCsvUtils} @@ -54,7 +53,15 @@ trait ScanWithMetrics { var metrics : Map[String, GpuMetric] = Map.empty } -object GpuCSVScan extends Logging { +object GpuCSVScan { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logWarning(msg: => String): Unit = { + if (log.isWarnEnabled) { + log.warn(msg) + } + } + private def tryLoadCharset(name: String): Option[Charset] = { try { Some(Charset.forName(name)) @@ -320,7 +327,7 @@ case class GpuCSVScan( val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) - GpuCSVPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + new GpuCSVPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, parsedOptions, maxReaderBatchSizeRows, maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap) } @@ -344,7 +351,7 @@ case class GpuCSVScan( override def hashCode(): Int = super.hashCode() } -case class GpuCSVPartitionReaderFactory( +class GpuCSVPartitionReaderFactory( sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], dataSchema: StructType, @@ -356,7 +363,8 @@ case class GpuCSVPartitionReaderFactory( maxReaderBatchSizeBytes: Long, maxGpuColumnSizeBytes: Long, metrics: Map[String, GpuMetric], - @transient params: Map[String, String]) extends ShimFilePartitionReaderFactory(params) { + @transient params: Map[String, String]) + extends ShimFilePartitionReaderFactory(params) with Serializable { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { throw new IllegalStateException("ROW BASED PARSING IS NOT SUPPORTED ON THE GPU...") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index cfeebcdcede..6fb68704339 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -694,7 +694,7 @@ object KudoSerializedTableColumn { * @return columnar batch to be passed to [[GpuShuffleCoalesceExec]] */ def from(header: KudoTableHeader, hostBuffer: HostMemoryBuffer): ColumnarBatch = { - val kudoTable = SpillableKudoTable(header, hostBuffer) + val kudoTable = SpillableKudoTable.from(header, hostBuffer) val column = new KudoSerializedTableColumn(kudoTable) new ColumnarBatch(Array(column), kudoTable.header.getNumRows) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 609b17d44ea..921e0ab8e86 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -95,6 +95,43 @@ class GpuOpTimeTrackingRDD[T: scala.reflect.ClassTag]( firstParent[T].preferredLocations(split) } +trait RapidsLocalLog { + @transient private lazy val rapidsLocalLog = org.slf4j.LoggerFactory.getLogger( + getClass.getName.stripSuffix("$")) + + protected def logTrace(msg: => String): Unit = { + if (rapidsLocalLog.isTraceEnabled) rapidsLocalLog.trace(msg) + } + + protected def logDebug(msg: => String): Unit = { + if (rapidsLocalLog.isDebugEnabled) rapidsLocalLog.debug(msg) + } + + protected def logDebug(msg: => String, throwable: Throwable): Unit = { + if (rapidsLocalLog.isDebugEnabled) rapidsLocalLog.debug(msg, throwable) + } + + protected def logInfo(msg: => String): Unit = { + if (rapidsLocalLog.isInfoEnabled) rapidsLocalLog.info(msg) + } + + protected def logWarning(msg: => String): Unit = { + if (rapidsLocalLog.isWarnEnabled) rapidsLocalLog.warn(msg) + } + + protected def logWarning(msg: => String, throwable: Throwable): Unit = { + if (rapidsLocalLog.isWarnEnabled) rapidsLocalLog.warn(msg, throwable) + } + + protected def logError(msg: => String): Unit = { + if (rapidsLocalLog.isErrorEnabled) rapidsLocalLog.error(msg) + } + + protected def logError(msg: => String, throwable: Throwable): Unit = { + if (rapidsLocalLog.isErrorEnabled) rapidsLocalLog.error(msg, throwable) + } +} + object GpuExec { def outputBatching(sp: SparkPlan): CoalesceGoal = sp match { case gpu: GpuExec => gpu.outputBatching diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index 892e151594e..cf73e368989 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.QueryExecutionException @@ -185,7 +184,19 @@ trait MultiFileReaderFunctions { // Singleton thread pool used across all tasks for multifile reading. // Please note that the TaskContext is not set in these threads and should not be used. -object MultiFileReaderThreadPool extends Logging { +object MultiFileReaderThreadPool { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + @volatile private var threadPool: Option[ThreadPoolExecutor] = None @@ -296,7 +307,7 @@ abstract class MultiFilePartitionReaderFactoryBase( @transient sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], @transient rapidsConf: RapidsConf) - extends PartitionReaderFactory with Logging { + extends PartitionReaderFactory with RapidsLocalLog { protected val maxReadBatchSizeRows: Int = rapidsConf.maxReadBatchSizeRows protected val maxReadBatchSizeBytes: Long = rapidsConf.maxReadBatchSizeBytes @@ -388,7 +399,7 @@ abstract class MultiFilePartitionReaderFactoryBase( * @param execMetrics metrics */ abstract class FilePartitionReaderBase(conf: Configuration, execMetrics: Map[String, GpuMetric]) - extends PartitionReader[ColumnarBatch] with Logging with ScanWithMetrics { + extends PartitionReader[ColumnarBatch] with RapidsLocalLog with ScanWithMetrics { metrics = execMetrics @@ -405,42 +416,25 @@ abstract class FilePartitionReaderBase(conf: Configuration, execMetrics: Map[Str } } -case class CombineConf( - combineThresholdSize: Long, // The size to combine to when combining small files - combineWaitTime: Int) // The amount of time to wait for other files ready for combination. - -// TODO: Refactor thread pool components into a common utility, since it is not specific to -// multi-file reading. -trait ThreadPoolConf { - /** - * The maximum number of threads used by the thread pool, not necessarily the final number - */ - def maxThreadNumber: Int - - /** - * Whether to create pools for each Spark stage, only for testing for now - */ - def stageLevelPool: Boolean -} - -case class DefaultThreadPoolConf( - maxThreadNumber: Int, - stageLevelPool: Boolean) extends ThreadPoolConf - -case class MemoryBoundedPoolConf( - maxThreadNumber: Int, - stageLevelPool: Boolean, - memoryCapacity: Long, // The maximum host memory being used in bytes, must be > 0 - waitMemTimeoutMs: Long // The timeout for acquiring host memory in milliseconds -) extends ThreadPoolConf - class ThreadPoolConfBuilder( private val maxThreadNumber: Int, private val isMemoryBounded: Boolean, private val memoryCapacityFromDriver: Long, private val timeoutMs: Long, private val stageLevelPool: Boolean -) extends Logging with Serializable { +) extends Serializable { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + // Finalize the ThreadPoolConf, which mainly determines the memory capacity of the // ResourceBoundedThreadExecutor if isMemoryBounded is true. @@ -452,7 +446,7 @@ class ThreadPoolConfBuilder( // 3. if still not set, use the default value `DEFAULT_MEMORY_CAPACITY`. def build(): ThreadPoolConf = { if (!isMemoryBounded) { - DefaultThreadPoolConf(maxThreadNumber, stageLevelPool) + new DefaultThreadPoolConf(maxThreadNumber, stageLevelPool) } else { val memCap: Long = if (memoryCapacityFromDriver > 0) { memoryCapacityFromDriver @@ -468,11 +462,7 @@ class ThreadPoolConfBuilder( } } logDebug(s"Setting memory capacity for ResourcePoolConf to ${memCap >> 20}MB") - MemoryBoundedPoolConf( - maxThreadNumber = maxThreadNumber, - stageLevelPool = stageLevelPool, - memoryCapacity = memCap, - waitMemTimeoutMs = timeoutMs) + new MemoryBoundedPoolConf(maxThreadNumber, stageLevelPool, memCap, timeoutMs) } } } @@ -523,7 +513,7 @@ abstract class MultiFileCloudPartitionReaderBase( maxReadBatchSizeBytes: Long, ignoreCorruptFiles: Boolean = false, keepReadsInOrder: Boolean = true, - combineConf: CombineConf = CombineConf(-1, -1)) + combineConf: CombineConf = new CombineConf(-1, -1)) extends FilePartitionReaderBase(conf, execMetrics) { protected type BufferInfo = HostMemoryBuffersWithMetaDataBase @@ -1142,13 +1132,21 @@ abstract class MultiFileCoalescingPartitionReaderBase( } } - protected case class CurrentChunkMeta( - clippedSchema: SchemaBase, - readSchema: StructType, - currentChunk: FileMajorBlockChunk, - extraInfo: ExtraInfo) { + protected class CurrentChunkMeta( + val clippedSchema: SchemaBase, + val readSchema: StructType, + val currentChunk: FileMajorBlockChunk, + val extraInfo: ExtraInfo) { def rowsPerPartition: Array[Long] = currentChunk.rowsPerPartition def allPartValues: Array[InternalRow] = currentChunk.allPartValues + + def copy( + clippedSchema: SchemaBase = this.clippedSchema, + readSchema: StructType = this.readSchema, + currentChunk: FileMajorBlockChunk = this.currentChunk, + extraInfo: ExtraInfo = this.extraInfo): CurrentChunkMeta = { + new CurrentChunkMeta(clippedSchema, readSchema, currentChunk, extraInfo) + } } /** @@ -1609,6 +1607,6 @@ abstract class MultiFileCoalescingPartitionReaderBase( logDebug(s"Loaded $numRows rows from ${getFileFormatShortName}. " + s"${getFileFormatShortName} bytes read: $numChunkBytes. Estimated GPU bytes: $numBytes. " + s"Number of partition entries: ${fileMajorChunk.allPartValues.length}") - CurrentChunkMeta(currentClippedSchema, currentReadSchema, fileMajorChunk, extraInfo) + new CurrentChunkMeta(currentClippedSchema, currentReadSchema, fileMajorChunk, extraInfo) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index bc246a152f6..52c909f1ad9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import java.io.{ByteArrayInputStream, FileNotFoundException, IOException, OutputStream} import java.net.URI -import java.nio.ByteBuffer +import java.nio.{Buffer, ByteBuffer} import java.nio.channels.Channels import java.nio.charset.StandardCharsets import java.time.ZoneId @@ -39,6 +39,7 @@ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import com.nvidia.spark.rapids.SchemaUtils._ import com.nvidia.spark.rapids.filecache.FileCache import com.nvidia.spark.rapids.fileio.hadoop.HadoopFileIO +import com.nvidia.spark.rapids.fileio.hadoop.PerfIOHadoopInputFileFactory import com.nvidia.spark.rapids.io.async._ import com.nvidia.spark.rapids.jni.{CastStrings, RmmSpark} import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuOrcDataReader, NullOutputStreamShim, OrcCastingShims, OrcReadingShims, OrcShims, ShimFilePartitionReaderFactory} @@ -55,7 +56,6 @@ import org.apache.orc.mapred.OrcInputFormat import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression @@ -88,7 +88,7 @@ case class GpuOrcScan( dataFilters: Seq[Expression], rapidsConf: RapidsConf, queryUsesInputFile: Boolean = false) - extends FileScan with GpuScan with Logging { + extends FileScan with GpuScan { override def isSplitable(path: Path): Boolean = true @@ -101,12 +101,12 @@ case class GpuOrcScan( if (rapidsConf.isOrcPerFileReadEnabled) { logInfo("Using the original per file orc reader") - GpuOrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + new GpuOrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, metrics, options.asScala.toMap) } else { val poolConfBuilder = ThreadPoolConfBuilder(rapidsConf) - GpuOrcMultiFilePartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + new GpuOrcMultiFilePartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, poolConfBuilder, metrics, queryUsesInputFile) @@ -578,7 +578,7 @@ object GpuOrcScan { * off in GpuTransitionOverrides if InputFileName, * InputFileBlockStart, or InputFileBlockLength are used */ -case class GpuOrcMultiFilePartitionReaderFactory( +class GpuOrcMultiFilePartitionReaderFactory( @transient sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], dataSchema: StructType, @@ -589,7 +589,8 @@ case class GpuOrcMultiFilePartitionReaderFactory( poolConfBuilder: ThreadPoolConfBuilder, metrics: Map[String, GpuMetric], queryUsesInputFile: Boolean) - extends MultiFilePartitionReaderFactoryBase(sqlConf, broadcastedConf, rapidsConf) { + extends MultiFilePartitionReaderFactoryBase(sqlConf, broadcastedConf, rapidsConf) + with Serializable { private val debugDumpPrefix = rapidsConf.orcDebugDumpPrefix private val debugDumpAlways = rapidsConf.orcDebugDumpAlways @@ -619,7 +620,7 @@ case class GpuOrcMultiFilePartitionReaderFactory( */ override def buildBaseColumnarReaderForCloud(files: Array[PartitionedFile], conf: Configuration): PartitionReader[ColumnarBatch] = { - val combineConf = CombineConf(combineThresholdSize, combineWaitTime) + val combineConf = new CombineConf(combineThresholdSize, combineWaitTime) val poolConf = poolConfBuilder.build() val reader = new MultiFileCloudOrcPartitionReader( conf, files, dataSchema, readDataSchema, partitionSchema, @@ -658,13 +659,13 @@ case class GpuOrcMultiFilePartitionReaderFactory( compressionAndStripes.getOrElseUpdate(orcPartitionReaderContext.compressionKind, new ArrayBuffer[OrcSingleStripeMeta]) ++= orcPartitionReaderContext.blockIterator.map(block => - OrcSingleStripeMeta( + new OrcSingleStripeMeta( orcPartitionReaderContext.filePath, - OrcDataStripe(OrcStripeWithMeta(block, orcPartitionReaderContext)), + new OrcDataStripe(new OrcStripeWithMeta(block, orcPartitionReaderContext)), file.partitionValues, - OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema), + new OrcSchemaWrapper(orcPartitionReaderContext.updatedReadSchema), readDataSchema, - OrcExtraInfo(orcPartitionReaderContext.requestedMapping))) + new OrcExtraInfo(orcPartitionReaderContext.requestedMapping))) } } } @@ -687,7 +688,7 @@ case class GpuOrcMultiFilePartitionReaderFactory( override final def getFileFormatShortName: String = "ORC" } -case class GpuOrcPartitionReaderFactory( +class GpuOrcPartitionReaderFactory( @transient sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], dataSchema: StructType, @@ -697,7 +698,7 @@ case class GpuOrcPartitionReaderFactory( @transient rapidsConf: RapidsConf, metrics : Map[String, GpuMetric], @transient params: Map[String, String]) - extends ShimFilePartitionReaderFactory(params) { + extends ShimFilePartitionReaderFactory(params) with Serializable { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val debugDumpPrefix = rapidsConf.orcDebugDumpPrefix @@ -754,10 +755,10 @@ case class GpuOrcPartitionReaderFactory( * @param footer stripe footer * @param inputDataRanges input file ranges (based at file offset 0) of stripe data */ -case class OrcOutputStripe( - infoBuilder: OrcProto.StripeInformation.Builder, - footer: OrcProto.StripeFooter, - inputDataRanges: DiskRangeList) +class OrcOutputStripe( + val infoBuilder: OrcProto.StripeInformation.Builder, + val footer: OrcProto.StripeFooter, + val inputDataRanges: DiskRangeList) extends Serializable /** * This class holds fields needed to read and iterate over the OrcFile @@ -775,18 +776,18 @@ case class OrcOutputStripe( * @param blockIterator an iterator over the ORC output stripes * @param requestedMapping the optional requested column ids */ -case class OrcPartitionReaderContext( - filePath: Path, - conf: Configuration, - fileSchema: TypeDescription, - updatedReadSchema: TypeDescription, - evolution: SchemaEvolution, - fileTail: OrcProto.FileTail, - compressionSize: Int, - compressionKind: CompressionKind, - readerOpts: Reader.Options, - blockIterator: BufferedIterator[OrcOutputStripe], - requestedMapping: Option[Array[Int]]) +class OrcPartitionReaderContext( + val filePath: Path, + val conf: Configuration, + val fileSchema: TypeDescription, + val updatedReadSchema: TypeDescription, + val evolution: SchemaEvolution, + val fileTail: OrcProto.FileTail, + val compressionSize: Int, + val compressionKind: CompressionKind, + val readerOpts: Reader.Options, + val blockIterator: BufferedIterator[OrcOutputStripe], + val requestedMapping: Option[Array[Int]]) extends Serializable case class OrcBlockMetaForSplitCheck( filePath: Path, @@ -1033,8 +1034,8 @@ trait OrcCommonFunctions extends OrcCodecWritingHelper { self: FilePartitionRead /** * A base ORC partition reader which compose of some common methods */ -trait OrcPartitionReaderBase extends OrcCommonFunctions with Logging - with ScanWithMetrics { self: FilePartitionReaderBase => +trait OrcPartitionReaderBase extends OrcCommonFunctions + with RapidsLocalLog with ScanWithMetrics { self: FilePartitionReaderBase => def populateCurrentBlockChunk( blockIterator: BufferedIterator[OrcOutputStripe], @@ -1467,7 +1468,8 @@ private case class GpuOrcFileFilterHandler( sargApp, sargColumns, OrcConf.IGNORE_NON_UTF8_BLOOM_FILTERS.getBoolean(conf), orcReader.getWriterVersion, updatedReadSchema, resolveMemFileIncluded(fileIncluded, requestedMapping)) - OrcPartitionReaderContext(filePath, conf, orcReader.getSchema, updatedReadSchema, evolution, + new OrcPartitionReaderContext( + filePath, conf, orcReader.getSchema, updatedReadSchema, evolution, orcReader.getFileTail, orcReader.getCompressionSize, orcReader.getCompressionKind, readerOpts, stripes.iterator.buffered, requestedMapping) } @@ -1611,7 +1613,7 @@ private case class GpuOrcFileFilterHandler( .setDataLength(outputStripeDataLength) .setNumberOfRows(inputStripe.getNumberOfRows) - OrcOutputStripe(infoBuilder, outputStripeFooter, rangeCreator.get) + new OrcOutputStripe(infoBuilder, outputStripeFooter, rangeCreator.get) } /** @@ -1790,7 +1792,9 @@ private object GpuOrcFileFilterHandler { fs: FileSystem, conf: Configuration, metrics: Map[String, GpuMetric]): OrcTail = { - val fileIO = new HadoopFileIO(conf) + val fileIO = new HadoopFileIO( + conf, + PerfIOHadoopInputFileFactory.INSTANCE) val inputFile = fileIO.newInputFile(filePath) val cachedFooter = FileCache.get.getFooter(inputFile) val bb = cachedFooter.map { hmb => @@ -1832,7 +1836,7 @@ private object GpuOrcFileFilterHandler { val fileSize = bb.getLong val modificationTime = bb.getLong val serializedTail = bb.slice() - bb.position(0) + bb.asInstanceOf[Buffer].position(0) // last byte is the size of the postscript section val psSize = bb.get(bb.limit() - 1) & 0xff val ps = loadPostScript(bb, psSize) @@ -1866,8 +1870,8 @@ private object GpuOrcFileFilterHandler { val bb = ByteBuffer.allocate(footerSizeGuess) val readSize = fileSize.min(footerSizeGuess).toInt in.readFully(fileSize - readSize, bb.array(), bb.arrayOffset(), readSize) - bb.position(0) - bb.limit(readSize) + bb.asInstanceOf[Buffer].position(0) + bb.asInstanceOf[Buffer].limit(readSize) val psLen = bb.get(readSize - 1) & 0xff ensureOrcFooter(in, filePath, psLen, bb) val psOffset = readSize - 1 - psLen @@ -1878,18 +1882,18 @@ private object GpuOrcFileFilterHandler { // calculate the amount of tail data that was missed in the speculative initial read val unreadRemaining = Math.max(0, tailSize - readSize) // copy tail bytes from original buffer - bb.position(Math.max(0, readSize - tailSize)) - tailBuffer.position(TAIL_PREFIX_SIZE + unreadRemaining) + bb.asInstanceOf[Buffer].position(Math.max(0, readSize - tailSize)) + tailBuffer.asInstanceOf[Buffer].position(TAIL_PREFIX_SIZE + unreadRemaining) tailBuffer.put(bb) if (unreadRemaining > 0) { // first read did not grab the entire tail, need to read more - tailBuffer.position(TAIL_PREFIX_SIZE) + tailBuffer.asInstanceOf[Buffer].position(TAIL_PREFIX_SIZE) in.readFully(fileSize - readSize - unreadRemaining, tailBuffer.array(), tailBuffer.arrayOffset() + tailBuffer.position(), unreadRemaining) } tailBuffer.putLong(0, fileSize) tailBuffer.putLong(java.lang.Long.BYTES, modificationTime) - tailBuffer.position(0) + tailBuffer.asInstanceOf[Buffer].position(0) tailBuffer } } @@ -2044,26 +2048,26 @@ class MultiFileCloudOrcPartitionReader( keepReadsInOrder = keepReadsInOrder, combineConf = combineConf) with MultiFileReaderFunctions with OrcPartitionReaderBase { - private case class HostMemoryEmptyMetaData( + private class HostMemoryEmptyMetaData( override val partitionedFile: PartitionedFile, - numRows: Long, + val numRows: Long, override val bytesRead: Long, - readSchema: StructType, - override val allPartValues: Option[Array[(Long, InternalRow)]] = None) + val readSchema: StructType, + override val allPartValues: Option[Array[(Long, InternalRow)]]) extends HostMemoryBuffersWithMetaDataBase { override def memBuffersAndSizes: Array[SingleHMBAndMeta] = Array(SingleHMBAndMeta.empty(numRows)) } - private case class HostMemoryBuffersWithMetaData( + private class HostMemoryBuffersWithMetaData( override val partitionedFile: PartitionedFile, override val memBuffersAndSizes: Array[SingleHMBAndMeta], override val bytesRead: Long, - updatedReadSchema: TypeDescription, - compressionKind: CompressionKind, - requestedMapping: Option[Array[Int]], - override val allPartValues: Option[Array[(Long, InternalRow)]] = None) + val updatedReadSchema: TypeDescription, + val compressionKind: CompressionKind, + val requestedMapping: Option[Array[Int]], + override val allPartValues: Option[Array[(Long, InternalRow)]]) extends HostMemoryBuffersWithMetaDataBase private class ReadBatchRunner( @@ -2083,13 +2087,13 @@ class MultiFileCloudOrcPartitionReader( } catch { case e: FileNotFoundException if ignoreMissingFiles => logWarning(s"Skipped missing file: ${partFile.filePath}", e) - HostMemoryEmptyMetaData(partFile, 0, 0, null) + new HostMemoryEmptyMetaData(partFile, 0, 0, null, None) // Throw FileNotFoundException even if `ignoreCorruptFiles` is true case e: FileNotFoundException if !ignoreMissingFiles => throw e case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => logWarning( s"Skipped the rest of the content in the corrupted file: ${partFile.filePath}", e) - HostMemoryEmptyMetaData(partFile, 0, 0, null) + new HostMemoryEmptyMetaData(partFile, 0, 0, null, None) } finally { RmmSpark.poolThreadFinishedForTask(taskContext.taskAttemptId()) TrampolineUtil.unsetTaskContext() @@ -2109,7 +2113,7 @@ class MultiFileCloudOrcPartitionReader( if (ctx == null || ctx.blockIterator.isEmpty) { val bytesRead = fileSystemBytesRead() - startingBytesRead logDebug(s"Read no blocks from file: ${partFile.filePath.toString}") - HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema) + new HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema, None) } else { blockChunkIter = ctx.blockIterator if (isDone) { @@ -2117,21 +2121,21 @@ class MultiFileCloudOrcPartitionReader( // got close before finishing logDebug("Reader is closed, return empty buffer for the current read for " + s"file: ${partFile.filePath.toString}") - HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema) + new HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema, None) } else { if (ctx.updatedReadSchema.isEmpty) { val bytesRead = fileSystemBytesRead() - startingBytesRead val numRows = ctx.blockIterator.map(_.infoBuilder.getNumberOfRows).sum logDebug(s"Return empty buffer but with row number: $numRows for " + s"file: ${partFile.filePath.toString}") - HostMemoryEmptyMetaData(partFile, numRows, bytesRead, readDataSchema) + new HostMemoryEmptyMetaData(partFile, numRows, bytesRead, readDataSchema, None) } else { while (blockChunkIter.hasNext) { val blocksToRead = populateCurrentBlockChunk(blockChunkIter, maxReadBatchSizeRows, maxReadBatchSizeBytes) val (hostBuf, bufSize) = readPartFile(ctx, blocksToRead) val numRows = blocksToRead.map(_.infoBuilder.getNumberOfRows).sum - val metas = blocksToRead.map(b => OrcDataStripe(OrcStripeWithMeta(b, ctx))) + val metas = blocksToRead.map(b => new OrcDataStripe(new OrcStripeWithMeta(b, ctx))) hostBuffers += SingleHMBAndMeta(Array(hostBuf), bufSize, numRows, metas) } val bytesRead = fileSystemBytesRead() - startingBytesRead @@ -2140,10 +2144,10 @@ class MultiFileCloudOrcPartitionReader( hostBuffers.safeClose() logDebug("Reader is closed, return empty buffer for the current read for " + s"file: ${partFile.filePath.toString}") - HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema) + new HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema, None) } else { - HostMemoryBuffersWithMetaData(partFile, hostBuffers.toArray, bytesRead, - ctx.updatedReadSchema, ctx.compressionKind, ctx.requestedMapping) + new HostMemoryBuffersWithMetaData(partFile, hostBuffers.toArray, bytesRead, + ctx.updatedReadSchema, ctx.compressionKind, ctx.requestedMapping, None) } } } @@ -2159,10 +2163,10 @@ class MultiFileCloudOrcPartitionReader( } } - private case class CombinedMeta( - combinedEmptyMeta: Option[HostMemoryEmptyMetaData], - allPartValues: Array[(Long, InternalRow)], - toCombine: Array[HostMemoryBuffersWithMetaDataBase]) + private class CombinedMeta( + val combinedEmptyMeta: Option[HostMemoryEmptyMetaData], + val allPartValues: Array[(Long, InternalRow)], + val toCombine: Array[HostMemoryBuffersWithMetaDataBase]) /** * The sub-class must implement the real file reading logic in a Callable @@ -2255,7 +2259,9 @@ class MultiFileCloudOrcPartitionReader( buffer.partitionedFile, buffer.allPartValues) if (memBuffersAndSize.length > 1) { val updatedBuffers = memBuffersAndSize.drop(1) - currentFileHostBuffers = Some(buffer.copy(memBuffersAndSizes = updatedBuffers)) + currentFileHostBuffers = Some(new HostMemoryBuffersWithMetaData( + buffer.partitionedFile, updatedBuffers, buffer.bytesRead, buffer.updatedReadSchema, + buffer.compressionKind, buffer.requestedMapping, buffer.allPartValues)) } else { currentFileHostBuffers = None } @@ -2399,9 +2405,10 @@ class MultiFileCloudOrcPartitionReader( SpillPriorities.ACTIVE_BATCHING_PRIORITY) val combinedRet = SingleHMBAndMeta(Array(finalBuf), outStream.getPos, numRows, blockMetas) - val newHmbWithMeta = metaToUse.copy( - memBuffersAndSizes = Array(combinedRet), - allPartValues = Some(combinedMeta.allPartValues)) + val newHmbWithMeta = new HostMemoryBuffersWithMetaData( + metaToUse.partitionedFile, Array(combinedRet), metaToUse.bytesRead, + metaToUse.updatedReadSchema, metaToUse.compressionKind, metaToUse.requestedMapping, + Some(combinedMeta.allPartValues)) val filterTime = combinedMeta.toCombine.map(_.getFilterTime).sum val bufferTime = combinedMeta.toCombine.map(_.getBufferTime).sum newHmbWithMeta.setExecutionTime(filterTime, bufferTime) @@ -2488,7 +2495,7 @@ class MultiFileCloudOrcPartitionReader( val combinedEmptyMeta = if (allEmpty) { // metaForEmpty should not be null here - Some(HostMemoryEmptyMetaData( + Some(new HostMemoryEmptyMetaData( metaForEmpty.partitionedFile, // not used, so pick one emptyNumRows, emptyTotalBytesRead, metaForEmpty.readSchema, @@ -2496,7 +2503,7 @@ class MultiFileCloudOrcPartitionReader( } else { None } - CombinedMeta(combinedEmptyMeta, allPartValues.toArray, toCombine.toArray) + new CombinedMeta(combinedEmptyMeta, allPartValues.toArray, toCombine.toArray) } } @@ -2536,13 +2543,13 @@ trait OrcCodecWritingHelper { } // Orc schema wrapper -private case class OrcSchemaWrapper(schema: TypeDescription) extends SchemaBase { +private class OrcSchemaWrapper(val schema: TypeDescription) extends SchemaBase with Serializable { override def isEmpty: Boolean = schema.getFieldNames.isEmpty } -case class OrcStripeWithMeta(stripe: OrcOutputStripe, ctx: OrcPartitionReaderContext) - extends OrcCodecWritingHelper { +class OrcStripeWithMeta(val stripe: OrcOutputStripe, val ctx: OrcPartitionReaderContext) + extends OrcCodecWritingHelper with Serializable { lazy val stripeLength: Long = { // calculate the true stripe footer size @@ -2557,7 +2564,8 @@ case class OrcStripeWithMeta(stripe: OrcOutputStripe, ctx: OrcPartitionReaderCon } // OrcOutputStripe wrapper -private[rapids] case class OrcDataStripe(stripeMeta: OrcStripeWithMeta) extends DataBlockBase { +private[rapids] class OrcDataStripe(val stripeMeta: OrcStripeWithMeta) + extends DataBlockBase with Serializable { override def getRowCount: Long = stripeMeta.stripe.infoBuilder.getNumberOfRows @@ -2568,17 +2576,17 @@ private[rapids] case class OrcDataStripe(stripeMeta: OrcStripeWithMeta) extends } /** Orc extra information containing the requested column ids for the current coalescing stripes */ -case class OrcExtraInfo(requestedMapping: Option[Array[Int]]) extends ExtraInfo +class OrcExtraInfo(val requestedMapping: Option[Array[Int]]) extends ExtraInfo with Serializable // Contains meta about a single stripe of an ORC file -private case class OrcSingleStripeMeta( - filePath: Path, // Orc file path - dataBlock: OrcDataStripe, // Orc stripe information with the OrcPartitionReaderContext - partitionValues: InternalRow, // partitioned values - schema: OrcSchemaWrapper, // Orc schema - readSchema: StructType, // Orc read schema - extraInfo: OrcExtraInfo // Orc ExtraInfo containing the requested column ids -) extends SingleDataBlockInfo +private class OrcSingleStripeMeta( + val filePath: Path, // Orc file path + val dataBlock: OrcDataStripe, // Orc stripe information with the OrcPartitionReaderContext + val partitionValues: InternalRow, // partitioned values + val schema: OrcSchemaWrapper, // Orc schema + val readSchema: StructType, // Orc read schema + val extraInfo: OrcExtraInfo // Orc ExtraInfo containing the requested column ids +) extends SingleDataBlockInfo with Serializable /** * @@ -2840,7 +2848,15 @@ class MultiFileOrcPartitionReader( } } -object MakeOrcTableProducer extends Logging { +object MakeOrcTableProducer { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logWarning(msg: => String): Unit = { + if (log.isWarnEnabled) { + log.warn(msg) + } + } + def apply( useChunkedReader: Boolean, maxChunkedReaderMemoryUsageSizeBytes: Long, @@ -2865,7 +2881,7 @@ object MakeOrcTableProducer extends Logging { } } if (useChunkedReader) { - OrcTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes, + new OrcTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes, parseOpts, buffer, offset, bufferSize, metrics, isSchemaCaseSensitive, readDataSchema, tableSchema, splits, debugDumpPrefix, debugDumpAlways) } else { @@ -2905,7 +2921,7 @@ object MakeOrcTableProducer extends Logging { } } -case class OrcTableReader( +class OrcTableReader( conf: Configuration, chunkSizeByteLimit: Long, maxChunkedReaderMemoryUsageSizeBytes: Long, @@ -2919,7 +2935,7 @@ case class OrcTableReader( tableSchema: TypeDescription, splits: Array[PartitionedFile], debugDumpPrefix: Option[String], - debugDumpAlways: Boolean) extends GpuDataProducer[Table] with Logging { + debugDumpAlways: Boolean) extends GpuDataProducer[Table] with RapidsLocalLog with Serializable { private[this] val reader = new ORCChunkedReader(chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes, parseOpts, buffer, offset, bufferSize) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 19f8709d497..0dcf69ba7a2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -34,7 +34,6 @@ import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetWriteSupport} @@ -192,7 +191,21 @@ object GpuParquetFileFormat { } } -class GpuParquetFileFormat extends ColumnarFileFormat with Logging { +class GpuParquetFileFormat extends ColumnarFileFormat { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logInfo(msg: => String): Unit = { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + private def logWarning(msg: => String): Unit = { + if (log.isWarnEnabled) { + log.warn(msg) + } + } + /** * Prepares a write job and returns an [[ColumnarOutputWriterFactory]]. Client side job * preparation can be put here. For example, user defined output committer can be configured diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadCSVFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadCSVFileFormat.scala index 3ddcec05061..8d9dd910ec7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadCSVFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadCSVFileFormat.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2025, NVIDIA CORPORATION. + * Copyright (c) 2020-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,7 +55,7 @@ class GpuReadCSVFileFormat extends CSVFileFormat with GpuReadFileFormatWithMetri sqlConf.sessionLocalTimeZone, sqlConf.columnNameOfCorruptRecord) val rapidsConf = new RapidsConf(sqlConf) - val factory = GpuCSVPartitionReaderFactory( + val factory = new GpuCSVPartitionReaderFactory( sqlConf, broadcastedHadoopConf, dataSchema, @@ -80,7 +80,7 @@ class GpuReadCSVFileFormat extends CSVFileFormat with GpuReadFileFormatWithMetri } } -object GpuReadCSVFileFormat { +object GpuReadCSVFileFormat extends Serializable { def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = { val fsse = meta.wrapped GpuCSVScan.tagSupport( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadOrcFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadOrcFileFormat.scala index 62a62a1400c..92574f7ac72 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadOrcFileFormat.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2025, NVIDIA CORPORATION. + * Copyright (c) 2020-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ class GpuReadOrcFileFormat extends OrcFileFormat with GpuReadFileFormatWithMetri val sqlConf = sparkSession.sessionState.conf val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val factory = GpuOrcPartitionReaderFactory( + val factory = new GpuOrcPartitionReaderFactory( sqlConf, broadcastedHadoopConf, dataSchema, @@ -68,7 +68,7 @@ class GpuReadOrcFileFormat extends OrcFileFormat with GpuReadFileFormatWithMetri pushedFilters: Array[Filter], fileScan: GpuFileSourceScanExec): PartitionReaderFactory = { val poolConfBuilder = ThreadPoolConfBuilder(fileScan.rapidsConf) - GpuOrcMultiFilePartitionReaderFactory( + new GpuOrcMultiFilePartitionReaderFactory( fileScan.conf, broadcastedConf, fileScan.relation.dataSchema, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/dataSourceUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/dataSourceUtil.scala index aee11b3c84d..436a6b90f10 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/dataSourceUtil.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/dataSourceUtil.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,18 +40,6 @@ class PartitionIterator[T](reader: PartitionReader[T]) extends Iterator[T] { } } -class MetricsBatchIterator(iter: Iterator[ColumnarBatch]) extends Iterator[ColumnarBatch] { - private[this] val inputMetrics = TaskContext.get().taskMetrics().inputMetrics - - override def hasNext: Boolean = iter.hasNext - - override def next(): ColumnarBatch = { - val batch = iter.next() - TrampolineUtil.incInputRecordsRows(inputMetrics, batch.numRows()) - batch - } -} - /** Wraps a columnar PartitionReader to update bytes read metric based on filesystem statistics. */ class PartitionReaderWithBytesRead(reader: PartitionReader[ColumnarBatch]) extends PartitionReader[ColumnarBatch] { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetScan.scala index d0df36eebfc..95111d3944d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetScan.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.parquet import java.io.{Closeable, EOFException, FileNotFoundException, InputStream, IOException, OutputStream} import java.net.URI -import java.nio.{ByteBuffer, ByteOrder} +import java.nio.{Buffer, ByteBuffer, ByteOrder} import java.nio.channels.SeekableByteChannel import java.nio.charset.StandardCharsets import java.util.{Collections, Locale} @@ -39,6 +39,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import com.nvidia.spark.rapids.filecache.FileCache import com.nvidia.spark.rapids.fileio.hadoop.HadoopFileIO +import com.nvidia.spark.rapids.fileio.hadoop.PerfIOHadoopInputFileFactory import com.nvidia.spark.rapids.io.async._ import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter, RmmSpark} import com.nvidia.spark.rapids.jni.fileio.{RapidsFileIO, RapidsInputFile} @@ -66,7 +67,6 @@ import org.xerial.snappy.Snappy import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression @@ -117,7 +117,7 @@ case class GpuParquetScan( dataFilters: Seq[Expression], rapidsConf: RapidsConf, queryUsesInputFile: Boolean = false) - extends FileScan with GpuScan with Logging { + extends FileScan with GpuScan { override def isSplitable(path: Path): Boolean = true @@ -428,14 +428,14 @@ class HMBSeekableInputStream( if (bytesRead < 0) { bytesRead } else { - buf.position(buf.position() + bytesRead) + buf.asInstanceOf[Buffer].position(buf.position() + bytesRead) bytesRead } } private def readFullyHeapBuffer(buf: ByteBuffer): Unit = { readFully(buf.array, buf.arrayOffset + buf.position(), buf.remaining) - buf.position(buf.limit) + buf.asInstanceOf[Buffer].position(buf.limit) } private def readDirectBuffer(buf: ByteBuffer): Int = { @@ -492,7 +492,7 @@ class HMBInputFile(buffer: HostMemoryBuffer) extends InputFile { protected case class GpuParquetFileFilterHandler( @transient sqlConf: SQLConf, - metrics: Map[String, GpuMetric]) extends Logging { + metrics: Map[String, GpuMetric]) extends RapidsLocalLog { private val FOOTER_LENGTH_SIZE = 4 private val isCaseSensitive = sqlConf.caseSensitiveAnalysis @@ -1142,7 +1142,9 @@ abstract class AbstractGpuParquetMultiFilePartitionReaderFactory( // from a task when we need to create the fileIO instance. This stops a regression // when we materialize the hadoop conf eagerly, see: // https://github.com/NVIDIA/spark-rapids/issues/13353 - @transient protected lazy val fileIO = new HadoopFileIO(broadcastedConf.value.value) + @transient protected lazy val fileIO = new HadoopFileIO( + broadcastedConf.value.value, + PerfIOHadoopInputFileFactory.INSTANCE) protected val isCaseSensitive = sqlConf.caseSensitiveAnalysis protected val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix protected val debugDumpAlways = rapidsConf.parquetDebugDumpAlways @@ -1231,7 +1233,7 @@ abstract class AbstractGpuParquetMultiFilePartitionReaderFactory( filterHandler.filterBlocks(fileIO, footerReadType, file, new Configuration(conf), filters, readDataSchema) } - val combineConf = CombineConf(combineThresholdSize, combineWaitTime) + val combineConf = new CombineConf(combineThresholdSize, combineWaitTime) val poolConf = poolConfBuilder.build() val reader = createBaseMultiFileCloudReader(fileIO, conf, files, filterFunc, isCaseSensitive, @@ -1292,7 +1294,7 @@ abstract class AbstractGpuParquetMultiFilePartitionReaderFactory( conf: Configuration, filters: Array[Filter], readDataSchema: StructType) extends UnboundedAsyncRunner[Array[BlockMetaWithPartFile]] - with Logging { + with RapidsLocalLog { override def callImpl(): Array[BlockMetaWithPartFile] = { TrampolineUtil.setTaskContext(taskContext) @@ -1359,11 +1361,11 @@ abstract class AbstractGpuParquetMultiFilePartitionReaderFactory( metaAndFilesArr.foreach { metaAndFile => val singleFileInfo = metaAndFile.meta clippedBlocks ++= singleFileInfo.blocks.map(block => - ParquetSingleDataBlockMeta( + new ParquetSingleDataBlockMeta( singleFileInfo.filePath, - ParquetDataBlock(block, compressCfg), + new ParquetDataBlock(block, compressCfg), metaAndFile.file.partitionValues, - ParquetSchemaWrapper(singleFileInfo.schema), + new ParquetSchemaWrapper(singleFileInfo.schema), singleFileInfo.readSchema, new ParquetExtraInfo(singleFileInfo.dateRebaseMode, singleFileInfo.timestampRebaseMode, @@ -1474,7 +1476,9 @@ abstract class GpuParquetPartitionReaderFactoryBase( // from a task when we need to create the fileIO instance. This stops a regression // when we materialize the hadoop conf eagerly, see: // https://github.com/NVIDIA/spark-rapids/issues/13353 - @transient protected lazy val fileIO = new HadoopFileIO(broadcastedConf.value.value) + @transient protected lazy val fileIO = new HadoopFileIO( + broadcastedConf.value.value, + PerfIOHadoopInputFileFactory.INSTANCE) protected val isCaseSensitive = sqlConf.caseSensitiveAnalysis protected val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix protected val debugDumpAlways = rapidsConf.parquetDebugDumpAlways @@ -1524,7 +1528,7 @@ case class GpuParquetPartitionReaderFactory( @transient params: Map[String, String]) extends GpuParquetPartitionReaderFactoryBase( sqlConf, broadcastedConf, dataSchema, readDataSchema, partitionSchema, - rapidsConf, metrics, params) with Logging { + rapidsConf, metrics, params) with RapidsLocalLog { override protected def buildBaseColumnarParquetReader( file: PartitionedFile): PartitionReader[ColumnarBatch] = { @@ -1563,7 +1567,7 @@ object CpuCompressionConfig { def disabled(): CpuCompressionConfig = CpuCompressionConfig(false, false) } -trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics +trait ParquetPartitionReaderBase extends RapidsLocalLog with ScanWithMetrics with MultiFileReaderFunctions { // the size of Parquet magic (at start+end) and footer length values val PARQUET_META_SIZE: Long = 4 + 4 + 4 @@ -2254,21 +2258,21 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics block.asInstanceOf[ParquetDataBlock].dataBlock implicit def toDataBlockBase(blocks: Seq[BlockMetaData]): Seq[DataBlockBase] = - blocks.map(b => ParquetDataBlock(b, compressCfg)) + blocks.map(b => new ParquetDataBlock(b, compressCfg)) implicit def toBlockMetaDataSeq(blocks: Seq[DataBlockBase]): Seq[BlockMetaData] = blocks.map(_.asInstanceOf[ParquetDataBlock].dataBlock) } // Parquet schema wrapper -case class ParquetSchemaWrapper(schema: MessageType) extends SchemaBase { +class ParquetSchemaWrapper(val schema: MessageType) extends SchemaBase with Serializable { override def isEmpty: Boolean = schema.getFields.isEmpty } // Parquet BlockMetaData wrapper -case class ParquetDataBlock( - dataBlock: BlockMetaData, - compressCfg: CpuCompressionConfig) extends DataBlockBase { +class ParquetDataBlock( + val dataBlock: BlockMetaData, + val compressCfg: CpuCompressionConfig) extends DataBlockBase with Serializable { override def getRowCount: Long = dataBlock.getRowCount override def getReadDataSize: Long = dataBlock.getTotalByteSize override def getBlockSize: Long = { @@ -2282,13 +2286,13 @@ class ParquetExtraInfo(val dateRebaseMode: DateTimeRebaseMode, val hasInt96Timestamps: Boolean) extends ExtraInfo // contains meta about a single block in a file -case class ParquetSingleDataBlockMeta( - filePath: Path, - dataBlock: ParquetDataBlock, - partitionValues: InternalRow, - schema: ParquetSchemaWrapper, - readSchema: StructType, - extraInfo: ParquetExtraInfo) extends SingleDataBlockInfo +class ParquetSingleDataBlockMeta( + val filePath: Path, + val dataBlock: ParquetDataBlock, + val partitionValues: InternalRow, + val schema: ParquetSchemaWrapper, + val readSchema: StructType, + val extraInfo: ParquetExtraInfo) extends SingleDataBlockInfo with Serializable /** * Abstract base class for coalescing Parquet partition readers. @@ -2676,8 +2680,8 @@ abstract class AbstractMultiFileCloudParquetPartitionReader( next.dateRebaseMode, current.timestampRebaseMode, next.timestampRebaseMode, - ParquetSchemaWrapper(current.clippedSchema), - ParquetSchemaWrapper(next.clippedSchema), + new ParquetSchemaWrapper(current.clippedSchema), + new ParquetSchemaWrapper(next.clippedSchema), current.partitionedFile.filePath.toString(), next.partitionedFile.filePath.toString() ) @@ -2916,7 +2920,7 @@ abstract class AbstractMultiFileCloudParquetPartitionReader( private class ReadBatchRunner( file: PartitionedFile, filterFunc: PartitionedFile => ParquetFileInfoWithBlockMeta, - taskContext: TaskContext) extends MemoryBoundedAsyncRunner[BufferInfo] with Logging { + taskContext: TaskContext) extends MemoryBoundedAsyncRunner[BufferInfo] with RapidsLocalLog { // Set TaskContext in terms of an AsyncRunner override def sparkTaskContext: Option[TaskContext] = Some(taskContext) @@ -3325,7 +3329,15 @@ class MultiFileCloudParquetPartitionReader( } } -object MakeParquetTableProducer extends Logging { +object MakeParquetTableProducer { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logWarning(msg: => String): Unit = { + if (log.isWarnEnabled) { + log.warn(msg) + } + } + def apply( useChunkedReader: Boolean, maxChunkedReaderMemoryUsageSizeBytes: Long, @@ -3406,7 +3418,7 @@ trait ChunkedReader extends AutoCloseable { /** * A simple wrapper to adapt the JniParquetChunkedReader to the ChunkedReader interface. */ -case class ParquetChunkedReader(delegate: JniParquetChunkedReader) extends ChunkedReader { +class ParquetChunkedReader(val delegate: JniParquetChunkedReader) extends ChunkedReader { override def hasNext: Boolean = delegate.hasNext override def next: Table = delegate.readChunk() override def close(): Unit = delegate.close() @@ -3427,7 +3439,7 @@ abstract class AbstractParquetTableReader( clippedParquetSchema: MessageType, splits: Array[PartitionedFile], debugDumpPrefix: Option[String], - debugDumpAlways: Boolean) extends GpuDataProducer[Table] with Logging { + debugDumpAlways: Boolean) extends GpuDataProducer[Table] with RapidsLocalLog { protected val reader: ChunkedReader @@ -3500,7 +3512,7 @@ case class ParquetTableReader( opts, buffers, metrics, dateRebaseMode, timestampRebaseMode, isSchemaCaseSensitive, useFieldId, readDataSchema, clippedParquetSchema, splits, debugDumpPrefix, debugDumpAlways) { - override protected val reader: ChunkedReader = ParquetChunkedReader( + override protected val reader: ChunkedReader = new ParquetChunkedReader( new JniParquetChunkedReader(chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes, opts, buffers:_*) ) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/GpuOrcDataReader320Plus.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/GpuOrcDataReader320Plus.scala index fb4678f6cc6..eb162cace62 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/GpuOrcDataReader320Plus.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/GpuOrcDataReader320Plus.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids.shims import java.io.EOFException -import java.nio.ByteBuffer +import java.nio.{Buffer, ByteBuffer} import java.nio.channels.SeekableByteChannel import ai.rapids.cudf.HostMemoryBuffer @@ -43,8 +43,8 @@ abstract class GpuOrcDataReader320Plus( val offset = current.getOffset while (current ne last.next) { val buffer = if (current eq last) data else data.duplicate() - buffer.position((current.getOffset - offset).toInt) - buffer.limit((current.getEnd - offset).toInt) + buffer.asInstanceOf[Buffer].position((current.getOffset - offset).toInt) + buffer.asInstanceOf[Buffer].limit((current.getEnd - offset).toInt) current.asInstanceOf[BufferChunk].setChunk(buffer) // see if the filecache wants any of this data val cacheToken = FileCache.get.startDataRangeCache(inputFile, @@ -75,7 +75,7 @@ abstract class GpuOrcDataReader320Plus( throw new EOFException(s"Unexpected EOF while reading cached block for $filePathString") } } - buffer.flip() + buffer.asInstanceOf[Buffer].flip() chunk.asInstanceOf[BufferChunk].setChunk(buffer) chunk } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/GpuOrcDataReaderBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/GpuOrcDataReaderBase.scala index df62a38b241..32a25b7d775 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/GpuOrcDataReaderBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/GpuOrcDataReaderBase.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids.shims import java.io.{EOFException, IOException} -import java.nio.ByteBuffer +import java.nio.{Buffer, ByteBuffer} import java.nio.channels.SeekableByteChannel import ai.rapids.cudf.HostMemoryBuffer @@ -24,6 +24,7 @@ import com.nvidia.spark.rapids.{GpuMetric, HostMemoryOutputStream, NoopMetric} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.filecache.FileCache import com.nvidia.spark.rapids.fileio.hadoop.HadoopFileIO +import com.nvidia.spark.rapids.fileio.hadoop.PerfIOHadoopInputFileFactory import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FSDataInputStream @@ -37,7 +38,9 @@ abstract class GpuOrcDataReaderBase( metrics: Map[String, GpuMetric]) extends DataReader { protected val filePathString = props.getPath.toString protected var file: Option[FSDataInputStream] = None - protected lazy val fileIO = new HadoopFileIO(conf) + protected lazy val fileIO = new HadoopFileIO( + conf, + PerfIOHadoopInputFileFactory.INSTANCE) protected lazy val inputFile: RapidsInputFile = fileIO.newInputFile(filePathString) protected val compression = props.getCompression private val hitMetric = getMetric(GpuMetric.FILECACHE_DATA_RANGE_HITS) @@ -116,7 +119,7 @@ abstract class GpuOrcDataReaderBase( throw new EOFException("Unexpected EOF while reading stripe footer") } } - tailBuf.flip() + tailBuf.asInstanceOf[Buffer].flip() } } } else { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index 0861f36b064..7a99acb6770 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -27,7 +27,6 @@ import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO import com.nvidia.spark.rapids.shims.BucketingUtilsShim import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.hive.rapids.shims.GpuInsertIntoHiveTableMeta @@ -36,7 +35,7 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types.{DataType, Decimal, DecimalType, StringType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch -object GpuHiveFileFormat extends Logging { +object GpuHiveFileFormat { private val parquetOutputFormatClass = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" private val parquetSerdeClass = @@ -199,7 +198,7 @@ object GpuHiveFileFormat extends Logging { } class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFormat - with Logging with Serializable { + with Serializable with RapidsLocalLog { override def prepareWrite(sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): ColumnarOutputWriterFactory = { @@ -264,7 +263,7 @@ class GpuHiveParquetWriter(override val path: String, dataSchema: StructType, } -class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging with Serializable { +class GpuHiveTextFileFormat extends ColumnarFileFormat with Serializable { override def supportDataType(dataType: DataType): Boolean = GpuHiveTextFileUtils.isSupportedType(dataType) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveTableScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveTableScanExec.scala index e806790cdb2..359b111db6f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveTableScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveTableScanExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025, NVIDIA CORPORATION. + * Copyright (c) 2022-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -90,6 +90,8 @@ case class GpuHiveTableScanExec(requestedAttributes: Seq[Attribute], val partitionAttributes: Seq[AttributeReference] = hiveTableRelation.partitionCols + private def shimSparkSession: SparkSession = sparkSession.asInstanceOf[SparkSession] + // CPU expression to prune Hive partitions, based on [[partitionPruningPredicate]]. // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. @@ -140,7 +142,7 @@ case class GpuHiveTableScanExec(requestedAttributes: Seq[Attribute], prunePartitions(hivePartitions) } } else { - if (sparkSession.sessionState.conf.metastorePartitionPruning && + if (shimSparkSession.sessionState.conf.metastorePartitionPruning && partitionPruningPredicate.nonEmpty) { rawPartitions } else { @@ -152,16 +154,16 @@ case class GpuHiveTableScanExec(requestedAttributes: Seq[Attribute], // exposed for tests @transient lazy val rawPartitions: Seq[HivePartition] = { val prunedPartitions = - if (sparkSession.sessionState.conf.metastorePartitionPruning && + if (shimSparkSession.sessionState.conf.metastorePartitionPruning && partitionPruningPredicate.nonEmpty) { // Retrieve the original attributes based on expression ID so that capitalization matches. val normalizedFilters = partitionPruningPredicate.map(_.transform { case a: AttributeReference => originalAttributes(a) }) - sparkSession.sessionState.catalog + shimSparkSession.sessionState.catalog .listPartitionsByFilter(hiveTableRelation.tableMeta.identifier, normalizedFilters) } else { - sparkSession.sessionState.catalog.listPartitions(hiveTableRelation.tableMeta.identifier) + shimSparkSession.sessionState.catalog.listPartitions(hiveTableRelation.tableMeta.identifier) } prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) } @@ -202,7 +204,7 @@ case class GpuHiveTableScanExec(requestedAttributes: Seq[Attribute], readSchema: StructType, options: Map[String, String] ): PartitionedFile => Iterator[InternalRow] = { - val readerFactory = GpuHiveTextPartitionReaderFactory( + val readerFactory = new GpuHiveTextPartitionReaderFactory( sqlConf = sqlConf, broadcastConf = broadcastConf, inputFileSchema = dataSchema, @@ -329,13 +331,13 @@ case class GpuHiveTableScanExec(requestedAttributes: Seq[Attribute], // Assume Delimited text. val options = hiveTableRelation.tableMeta.properties ++ hiveTableRelation.tableMeta.storage.properties - val hadoopConf = sparkSession.sessionState.newHadoopConf() + val hadoopConf = shimSparkSession.sessionState.newHadoopConf() // In the CPU HiveTableScanExec the config will have a bunch of confs set for S3 keys // and predicate push down/etc. We don't need this because we are getting that information // directly. - val broadcastHadoopConf = sparkSession.sparkContext.broadcast( + val broadcastHadoopConf = shimSparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) - val sqlConf = sparkSession.sessionState.conf + val sqlConf = shimSparkSession.sessionState.conf val rapidsConf = new RapidsConf(sqlConf) val requestedOutputDataSchema = getRequestedOutputDataSchema(hiveTableRelation.tableMeta.schema, partitionAttributes, @@ -349,10 +351,10 @@ case class GpuHiveTableScanExec(requestedAttributes: Seq[Attribute], options) val rdd = if (hiveTableRelation.isPartitioned) { createReadRDDForPartitions(reader, hiveTableRelation, requestedOutputDataSchema, - sparkSession, hadoopConf) + shimSparkSession, hadoopConf) } else { createReadRDDForTable(reader, hiveTableRelation, requestedOutputDataSchema, - sparkSession, hadoopConf) + shimSparkSession, hadoopConf) } sendDriverMetrics() rdd @@ -439,18 +441,19 @@ class AlphabeticallyReorderingColumnPartitionReader(fileReader: PartitionReader[ } // Factory to build the columnar reader. -case class GpuHiveTextPartitionReaderFactory(sqlConf: SQLConf, - broadcastConf: Broadcast[SerializableConfiguration], - inputFileSchema: StructType, - partitionSchema: StructType, - requestedOutputDataSchema: StructType, - requestedAttributes: Seq[Attribute], - maxReaderBatchSizeRows: Integer, - maxReaderBatchSizeBytes: Long, - maxGpuColumnSizeBytes: Long, - metrics: Map[String, GpuMetric], - params: Map[String, String]) - extends ShimFilePartitionReaderFactory(params) { +class GpuHiveTextPartitionReaderFactory( + val sqlConf: SQLConf, + val broadcastConf: Broadcast[SerializableConfiguration], + val inputFileSchema: StructType, + val partitionSchema: StructType, + val requestedOutputDataSchema: StructType, + val requestedAttributes: Seq[Attribute], + val maxReaderBatchSizeRows: Integer, + val maxReaderBatchSizeBytes: Long, + val maxGpuColumnSizeBytes: Long, + val metrics: Map[String, GpuMetric], + val params: Map[String, String]) + extends ShimFilePartitionReaderFactory(params) with Serializable { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { throw new IllegalStateException("Row-based text parsing is not supported on GPU.") diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AvroProviderImpl.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AvroProviderImpl.scala index ff9ebaaff68..ac235f9310d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AvroProviderImpl.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AvroProviderImpl.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025, NVIDIA CORPORATION. + * Copyright (c) 2022-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,7 +61,7 @@ class AvroProviderImpl extends AvroProvider { pushedFilters: Array[Filter], fileScan: GpuFileSourceScanExec): PartitionReaderFactory = { val poolConfBuilder = ThreadPoolConfBuilder(fileScan.rapidsConf) - GpuAvroMultiFilePartitionReaderFactory( + new GpuAvroMultiFilePartitionReaderFactory( fileScan.relation.sparkSession.sessionState.conf, fileScan.rapidsConf, broadcastedConf, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala index a86120a7fad..ccf8c14242b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025, NVIDIA CORPORATION. + * Copyright (c) 2022-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.{FSDataInputStream, Path} import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.{AvroOptions, SchemaConverters} import org.apache.spark.sql.catalyst.InternalRow @@ -119,12 +118,12 @@ case class GpuAvroScan( // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. if (rapidsConf.isAvroPerFileReadEnabled) { - GpuAvroPartitionReaderFactory(sparkSession.sessionState.conf, rapidsConf, broadcastedConf, + new GpuAvroPartitionReaderFactory(sparkSession.sessionState.conf, rapidsConf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, parsedOptions, metrics, options.asScala.toMap) } else { val poolConfBuilder = ThreadPoolConfBuilder(rapidsConf) - GpuAvroMultiFilePartitionReaderFactory(sparkSession.sessionState.conf, + new GpuAvroMultiFilePartitionReaderFactory(sparkSession.sessionState.conf, rapidsConf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, parsedOptions, metrics, pushedFilters, poolConfBuilder, queryUsesInputFile) } @@ -152,7 +151,7 @@ case class GpuAvroScan( } /** Avro partition reader factory to build columnar reader */ -case class GpuAvroPartitionReaderFactory( +class GpuAvroPartitionReaderFactory( @transient sqlConf: SQLConf, @transient rapidsConf: RapidsConf, broadcastedConf: Broadcast[SerializableConfiguration], @@ -162,7 +161,7 @@ case class GpuAvroPartitionReaderFactory( avroOptions: AvroOptions, metrics: Map[String, GpuMetric], @transient params: Map[String, String]) - extends ShimFilePartitionReaderFactory(params) with Logging { + extends ShimFilePartitionReaderFactory(params) with RapidsLocalLog with Serializable { private val debugDumpPrefix = rapidsConf.avroDebugDumpPrefix private val debugDumpAlways = rapidsConf.avroDebugDumpAlways @@ -179,7 +178,7 @@ case class GpuAvroPartitionReaderFactory( override def buildColumnarReader(partFile: PartitionedFile): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value val startTime = System.nanoTime() - val blockMeta = AvroFileFilterHandler(conf, avroOptions).filterBlocks(partFile) + val blockMeta = new AvroFileFilterHandler(conf, avroOptions).filterBlocks(partFile) metrics.get(FILTER_TIME).foreach { _ += (System.nanoTime() - startTime) } @@ -194,7 +193,7 @@ case class GpuAvroPartitionReaderFactory( /** * The multi-file partition reader factory for cloud or coalescing reading of avro file format. */ -case class GpuAvroMultiFilePartitionReaderFactory( +class GpuAvroMultiFilePartitionReaderFactory( @transient sqlConf: SQLConf, @transient rapidsConf: RapidsConf, broadcastedConf: Broadcast[SerializableConfiguration], @@ -206,7 +205,8 @@ case class GpuAvroMultiFilePartitionReaderFactory( filters: Array[Filter], poolConfBuilder: ThreadPoolConfBuilder, queryUsesInputFile: Boolean) - extends MultiFilePartitionReaderFactoryBase(sqlConf, broadcastedConf, rapidsConf) { + extends MultiFilePartitionReaderFactoryBase(sqlConf, broadcastedConf, rapidsConf) + with Serializable { private val debugDumpPrefix = rapidsConf.avroDebugDumpPrefix private val debugDumpAlways = rapidsConf.avroDebugDumpAlways @@ -269,7 +269,7 @@ case class GpuAvroMultiFilePartitionReaderFactory( conf: Configuration): PartitionReader[ColumnarBatch] = { val clippedBlocks = ArrayBuffer[AvroSingleDataBlockInfo]() val mapPathHeader = LinkedHashMap[Path, Header]() - val filterHandler = AvroFileFilterHandler(conf, options) + val filterHandler = new AvroFileFilterHandler(conf, options) metrics.getOrElse(FILTER_TIME, NoopMetric).ns { metrics.getOrElse(SCAN_TIME, NoopMetric).ns { @@ -279,23 +279,23 @@ case class GpuAvroMultiFilePartitionReaderFactory( } catch { case e: FileNotFoundException if ignoreMissingFiles => logWarning(s"Skipped missing file: ${file.filePath}", e) - AvroBlockMeta(null, 0L, Seq.empty) + new AvroBlockMeta(null, 0L, Seq.empty) // Throw FileNotFoundException even if `ignoreCorruptFiles` is true case e: FileNotFoundException if !ignoreMissingFiles => throw e case e@(_: RuntimeException | _: IOException) if ignoreCorruptFiles => logWarning( s"Skipped the rest of the content in the corrupted file: ${file.filePath}", e) - AvroBlockMeta(null, 0L, Seq.empty) + new AvroBlockMeta(null, 0L, Seq.empty) } val fPath = new Path(new URI(file.filePath.toString())) clippedBlocks ++= singleFileInfo.blocks.map(block => - AvroSingleDataBlockInfo( + new AvroSingleDataBlockInfo( fPath, - AvroDataBlock(block), + new AvroDataBlock(block), file.partitionValues, - AvroSchemaWrapper(SchemaConverters.toAvroType(readDataSchema)), + new AvroSchemaWrapper(SchemaConverters.toAvroType(readDataSchema)), readDataSchema, - AvroExtraInfo())) + new AvroExtraInfo())) if (singleFileInfo.blocks.nonEmpty) { // No need to check the header since it can not be null when blocks is not empty here. mapPathHeader.put(fPath, singleFileInfo.header) @@ -312,7 +312,7 @@ case class GpuAvroMultiFilePartitionReaderFactory( } /** A trait collecting common methods across the 3 kinds of avro readers */ -trait GpuAvroReaderBase extends Logging { self: FilePartitionReaderBase => +trait GpuAvroReaderBase extends RapidsLocalLog { self: FilePartitionReaderBase => def debugDumpPrefix: Option[String] def debugDumpAlways: Boolean @@ -452,7 +452,7 @@ trait GpuAvroReaderBase extends Logging { self: FilePartitionReaderBase => withResource(partFilePath.getFileSystem(conf).open(partFilePath)) { in => closeOnExcept(HostMemoryBuffer.allocate(estOutSize)) { hmb => withResource(new HostMemoryOutputStream(hmb)) { out => - val headerAndBlocks = BlockInfo(0, headerSize, 0, 0) +: blocks + val headerAndBlocks = new BlockInfo(0, headerSize, 0, 0) +: blocks copyBlocksData(headerAndBlocks, in, out) // check we didn't go over memory if (out.getPos > estOutSize) { @@ -487,7 +487,7 @@ trait GpuAvroReaderBase extends Logging { self: FilePartitionReaderBase => // Copy every block without the tailing sync marker if a sync is given. This // is for coalescing reader who requires to append this given sync marker // to each block. Then we can not merge sequential blocks. - blocks.map(b => CopyRange(b.blockStart, b.blockSize - SYNC_SIZE)) + blocks.map(b => new CopyRange(b.blockStart, b.blockSize - SYNC_SIZE)) }.getOrElse(computeCopyRanges(blocks)) val copySyncFunc: OutputStream => Unit = if (sync.isEmpty) { @@ -535,7 +535,7 @@ trait GpuAvroReaderBase extends Logging { self: FilePartitionReaderBase => blocks.foreach { block => if (currentCopyEnd != block.blockStart) { if (currentCopyEnd != 0) { - copyRanges.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart)) + copyRanges.append(new CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart)) } currentCopyStart = block.blockStart currentCopyEnd = currentCopyStart @@ -544,7 +544,7 @@ trait GpuAvroReaderBase extends Logging { self: FilePartitionReaderBase => } if (currentCopyEnd != currentCopyStart) { - copyRanges.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart)) + copyRanges.append(new CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart)) } copyRanges.toSeq } @@ -696,7 +696,8 @@ class GpuMultiFileCloudAvroPartitionReader( closeOnExcept(batchIter) { _ => if (bufsAndSizes.length > 1) { val updatedBuffers = bufsAndSizes.drop(1) - currentFileHostBuffers = Some(buffer.copy(memBuffersAndSizes = updatedBuffers)) + currentFileHostBuffers = Some(new AvroHostBuffersWithMeta( + buffer.partitionedFile, updatedBuffers, buffer.bytesRead)) } else { currentFileHostBuffers = None } @@ -717,7 +718,7 @@ class GpuMultiFileCloudAvroPartitionReader( new ReadBatchRunner(tc, file, config, filters) /** Two utils classes */ - private case class AvroHostBuffersWithMeta( + private class AvroHostBuffersWithMeta( override val partitionedFile: PartitionedFile, override val memBuffersAndSizes: Array[SingleHMBAndMeta], override val bytesRead: Long) extends HostMemoryBuffersWithMetaDataBase @@ -726,7 +727,7 @@ class GpuMultiFileCloudAvroPartitionReader( taskContext: TaskContext, partFile: PartitionedFile, config: Configuration, - filters: Array[Filter]) extends UnboundedAsyncRunner[BufferInfo] with Logging { + filters: Array[Filter]) extends UnboundedAsyncRunner[BufferInfo] with RapidsLocalLog { override def callImpl(): BufferInfo = { TrampolineUtil.setTaskContext(taskContext) @@ -737,13 +738,13 @@ class GpuMultiFileCloudAvroPartitionReader( } catch { case e: FileNotFoundException if ignoreMissingFiles => logWarning(s"Skipped missing file: ${partFile.filePath}", e) - AvroHostBuffersWithMeta(partFile, Array(SingleHMBAndMeta.empty()), 0) + new AvroHostBuffersWithMeta(partFile, Array(SingleHMBAndMeta.empty()), 0) // Throw FileNotFoundException even if `ignoreCorruptFiles` is true case e: FileNotFoundException if !ignoreMissingFiles => throw e case e @(_: RuntimeException | _: IOException) if ignoreCorruptFiles => logWarning( s"Skipped the rest of the content in the corrupted file: ${partFile.filePath}", e) - AvroHostBuffersWithMeta(partFile, Array(SingleHMBAndMeta.empty()), 0) + new AvroHostBuffersWithMeta(partFile, Array(SingleHMBAndMeta.empty()), 0) } finally { RmmSpark.poolThreadFinishedForTask(taskContext.taskAttemptId()) TrampolineUtil.unsetTaskContext() @@ -754,7 +755,7 @@ class GpuMultiFileCloudAvroPartitionReader( arrayBufSize: Array[SingleHMBAndMeta], startingBytesRead: Long): HostMemoryBuffersWithMetaDataBase = { val bytesRead = fileSystemBytesRead() - startingBytesRead - AvroHostBuffersWithMeta(partFile, arrayBufSize, bytesRead) + new AvroHostBuffersWithMeta(partFile, arrayBufSize, bytesRead) } private val stopPosition = partFile.start + partFile.length @@ -944,7 +945,7 @@ class GpuMultiFileAvroPartitionReader( // in 'checkIfNeedToSplitDataBlock' val mergedHeader = Header.mergeMetadata(headers.toSeq) assert(mergedHeader.nonEmpty, "No header exists") - AvroBatchContext(chunkedBlocks, clippedSchema, mergedHeader.get) + new AvroBatchContext(chunkedBlocks, clippedSchema, mergedHeader.get) } override def calculateEstimatedBlocksOutputSize(batchContext: BatchContext): Long = { @@ -1044,7 +1045,7 @@ class GpuMultiFileAvroPartitionReader( blocks.map(toBlockInfo(_)) implicit def toBlockBases(blocks: Seq[BlockInfo]): Seq[DataBlockBase] = - blocks.map(AvroDataBlock(_)) + blocks.map(new AvroDataBlock(_)) implicit def toAvroExtraInfo(in: ExtraInfo): AvroExtraInfo = in.asInstanceOf[AvroExtraInfo] @@ -1055,9 +1056,9 @@ class GpuMultiFileAvroPartitionReader( } /** A tool to filter Avro blocks */ -case class AvroFileFilterHandler( +class AvroFileFilterHandler( hadoopConf: Configuration, - @transient options: AvroOptions) extends Logging { + options: AvroOptions) extends RapidsLocalLog with Serializable { @scala.annotation.nowarn( "msg=value ignoreExtension in class AvroOptions is deprecated*" @@ -1071,10 +1072,10 @@ case class AvroFileFilterHandler( // Get blocks only belong to this split reader.sync(partFile.start) val partBlocks = reader.getPartialBlocks(partFile.start + partFile.length) - AvroBlockMeta(reader.header, reader.headerSize, partBlocks) + new AvroBlockMeta(reader.header, reader.headerSize, partBlocks) } } else { - AvroBlockMeta(null, 0L, Seq.empty) + new AvroBlockMeta(null, 0L, Seq.empty) } } } @@ -1085,7 +1086,8 @@ case class AvroFileFilterHandler( * @param header the header of avro file * @param blocks the total block info of avro file */ -case class AvroBlockMeta(header: Header, headerSize: Long, blocks: Seq[BlockInfo]) +class AvroBlockMeta(val header: Header, val headerSize: Long, val blocks: Seq[BlockInfo]) + extends Serializable /** * CopyRange to indicate from where to copy. @@ -1093,32 +1095,32 @@ case class AvroBlockMeta(header: Header, headerSize: Long, blocks: Seq[BlockInfo * @param offset from where to copy * @param length how many bytes to copy */ -private case class CopyRange(offset: Long, length: Long) +private class CopyRange(val offset: Long, val length: Long) /** Extra information */ -case class AvroExtraInfo() extends ExtraInfo +class AvroExtraInfo extends ExtraInfo with Serializable /** avro schema wrapper */ -case class AvroSchemaWrapper(schema: Schema) extends SchemaBase { +class AvroSchemaWrapper(val schema: Schema) extends SchemaBase with Serializable { override def isEmpty: Boolean = schema.getFields.isEmpty } /** avro BlockInfo wrapper */ -case class AvroDataBlock(blockInfo: BlockInfo) extends DataBlockBase { +class AvroDataBlock(val blockInfo: BlockInfo) extends DataBlockBase with Serializable { override def getRowCount: Long = blockInfo.count override def getReadDataSize: Long = blockInfo.dataSize override def getBlockSize: Long = blockInfo.blockSize } -case class AvroSingleDataBlockInfo( - filePath: Path, - dataBlock: AvroDataBlock, - partitionValues: InternalRow, - schema: AvroSchemaWrapper, - readSchema: StructType, - extraInfo: AvroExtraInfo) extends SingleDataBlockInfo +class AvroSingleDataBlockInfo( + val filePath: Path, + val dataBlock: AvroDataBlock, + val partitionValues: InternalRow, + val schema: AvroSchemaWrapper, + val readSchema: StructType, + val extraInfo: AvroExtraInfo) extends SingleDataBlockInfo with Serializable -case class AvroBatchContext( +class AvroBatchContext( override val origChunkedBlocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]], override val schema: SchemaBase, - mergedHeader: Header) extends BatchContext(origChunkedBlocks, schema) + val mergedHeader: Header) extends BatchContext(origChunkedBlocks, schema) with Serializable diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceBase.scala index 4c90da533c9..4c8b502ddc4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceBase.scala @@ -21,7 +21,7 @@ import java.util.{Locale, ServiceConfigurationError, ServiceLoader} import scala.util.{Failure, Success, Try} -import com.nvidia.spark.rapids.GpuParquetFileFormat +import com.nvidia.spark.rapids.{GpuParquetFileFormat, RapidsLocalLog} import com.nvidia.spark.rapids.shims.SparkShimImpl import org.apache.commons.lang3.reflect.ConstructorUtils import org.apache.hadoop.conf.Configuration @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -62,7 +61,7 @@ abstract class GpuDataSourceBase( bucketSpec: Option[BucketSpec] = None, options: Map[String, String] = Map.empty, catalogTable: Option[CatalogTable] = None, - origProvider: Class[_]) extends Logging { + origProvider: Class[_]) extends RapidsLocalLog { protected def originalProvidingInstance() = origProvider.getConstructor().newInstance() @@ -311,7 +310,7 @@ abstract class GpuDataSourceBase( } } -object GpuDataSourceBase extends Logging { +object GpuDataSourceBase extends RapidsLocalLog { /** A map to maintain backward compatibility in case we move data sources around. */ private val backwardCompatibilityMap: Map[String, String] = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala index 62e59a530dc..e98d9f1c072 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2025, NVIDIA CORPORATION. + * Copyright (c) 2020-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ import org.apache.orc.OrcConf import org.apache.orc.OrcConf._ import org.apache.orc.mapred.OrcStruct -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.FileFormat @@ -37,7 +36,11 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types._ -object GpuOrcFileFormat extends Logging { +object GpuOrcFileFormat { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logInfo(msg: => String): Unit = if (log.isInfoEnabled) log.info(msg) + // The classname used when Spark is configured to use the Hive implementation for ORC. // Spark is not always compiled with Hive support so we cannot import from Spark jars directly. private val HIVE_IMPL_CLASS = "org.apache.spark.sql.hive.orc.OrcFileFormat" @@ -162,7 +165,7 @@ object GpuOrcFileFormat extends Logging { } } -class GpuOrcFileFormat extends ColumnarFileFormat with Logging { +class GpuOrcFileFormat extends ColumnarFileFormat { /** * Prepares a write job and returns an `ColumnarOutputWriterFactory`. Client side job * preparation can be put here. For example, user defined output committer can be configured here diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuReadAvroFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuReadAvroFileFormat.scala index a3cc1e9dab6..70da695ef8b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuReadAvroFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuReadAvroFileFormat.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025, NVIDIA CORPORATION. + * Copyright (c) 2022-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,7 +53,7 @@ class GpuReadAvroFileFormat extends AvroFileFormat with GpuReadFileFormatWithMet val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val factory = GpuAvroPartitionReaderFactory( + val factory = new GpuAvroPartitionReaderFactory( sqlConf, new RapidsConf(sqlConf), broadcastedHadoopConf, @@ -73,7 +73,7 @@ class GpuReadAvroFileFormat extends AvroFileFormat with GpuReadFileFormatWithMet pushedFilters: Array[Filter], fileScan: GpuFileSourceScanExec): PartitionReaderFactory = { val poolConfBuilder = ThreadPoolConfBuilder(fileScan.rapidsConf) - GpuAvroMultiFilePartitionReaderFactory( + new GpuAvroMultiFilePartitionReaderFactory( fileScan.relation.sparkSession.sessionState.conf, fileScan.rapidsConf, broadcastedConf, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala index 538df0c9d61..3969aa9024a 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.rapids.execution import java.util.concurrent.{ExecutorService, ScheduledExecutorService, ThreadPoolExecutor} +import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.json4s.JsonAST @@ -61,6 +62,8 @@ object TrampolineUtil { def jsonValue(dataType: DataType): JsonAST.JValue = dataType.jsonValue + def createSchemaParser(): Schema.Parser = TrampolineConnectShims.createSchemaParser() + /** Get a human-readable string, e.g.: "4.0 MiB", for a value in bytes. */ def bytesToString(size: Long): String = Utils.bytesToString(size) From df09b9ce13e7c44673b9b0fa343c2b9672b39f61 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 10 Jun 2026 16:56:47 -0700 Subject: [PATCH 3/3] Fix Delta coalescing reader helper construction --- .../delta/common/GpuDeltaParquetFileFormatBase2.scala | 8 ++++---- .../rapids/delta/GpuDeltaParquetFileFormatNativeDV.scala | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/delta-lake/common/src/main/delta-33x-41x/scala/com/nvidia/spark/rapids/delta/common/GpuDeltaParquetFileFormatBase2.scala b/delta-lake/common/src/main/delta-33x-41x/scala/com/nvidia/spark/rapids/delta/common/GpuDeltaParquetFileFormatBase2.scala index 48587ce3bca..5d922a596a9 100644 --- a/delta-lake/common/src/main/delta-33x-41x/scala/com/nvidia/spark/rapids/delta/common/GpuDeltaParquetFileFormatBase2.scala +++ b/delta-lake/common/src/main/delta-33x-41x/scala/com/nvidia/spark/rapids/delta/common/GpuDeltaParquetFileFormatBase2.scala @@ -502,7 +502,7 @@ class GpuDeltaParquetFileFormatBase2( queryUsesInputFile: Boolean) extends AbstractGpuParquetMultiFilePartitionReaderFactory(sqlConf, broadcastedConf, dataSchema, readDataSchema, partitionSchema, filters, rapidsConf, poolConfBuilder, - metrics, queryUsesInputFile) with Logging { + metrics, queryUsesInputFile) { logDebug("Using GpuDeltaParquetMultiFilePartitionReaderFactory for multi-threaded Parquet " + "reading with deletion vectors") @@ -584,11 +584,11 @@ class GpuDeltaParquetFileFormatBase2( val (rowGroupOffsets, rowGroupNumRows) = RapidsDeletionVectors.getRowGroupMetadata(singleFileInfo.blocks) clippedBlocks ++= singleFileInfo.blocks.zipWithIndex.map { case (block, i) => - ParquetSingleDataBlockMeta( + new ParquetSingleDataBlockMeta( singleFileInfo.filePath, - ParquetDataBlock(block, compressCfg), + new ParquetDataBlock(block, compressCfg), metaAndFile.file.partitionValues, - ParquetSchemaWrapper(singleFileInfo.schema), + new ParquetSchemaWrapper(singleFileInfo.schema), singleFileInfo.readSchema, new DeltaParquetExtraInfo( singleFileInfo.dateRebaseMode, diff --git a/delta-lake/delta-spark400db173/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatNativeDV.scala b/delta-lake/delta-spark400db173/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatNativeDV.scala index 1e39cbc79ef..8747612a6a4 100644 --- a/delta-lake/delta-spark400db173/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatNativeDV.scala +++ b/delta-lake/delta-spark400db173/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatNativeDV.scala @@ -546,7 +546,7 @@ case class GpuDeltaParquetFileFormatNativeDV( tablePathOpt: Option[String]) extends AbstractGpuParquetMultiFilePartitionReaderFactory(sqlConf, broadcastedConf, dataSchema, readDataSchema, partitionSchema, filters, rapidsConf, poolConfBuilder, - metrics, queryUsesInputFile) with Logging { + metrics, queryUsesInputFile) { logDebug("Using GpuDeltaParquetMultiFilePartitionReaderFactory for multi-threaded Parquet " + "reading with deletion vectors") @@ -636,11 +636,11 @@ case class GpuDeltaParquetFileFormatNativeDV( val (rowGroupOffsets, rowGroupNumRows) = RapidsDeletionVectors.getRowGroupMetadata(singleFileInfo.blocks) clippedBlocks ++= singleFileInfo.blocks.zipWithIndex.map { case (block, i) => - ParquetSingleDataBlockMeta( + new ParquetSingleDataBlockMeta( singleFileInfo.filePath, - ParquetDataBlock(block, compressCfg), + new ParquetDataBlock(block, compressCfg), metaAndFile.file.partitionValues, - ParquetSchemaWrapper(singleFileInfo.schema), + new ParquetSchemaWrapper(singleFileInfo.schema), singleFileInfo.readSchema, new DeltaParquetExtraInfo( singleFileInfo.dateRebaseMode,