Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ class GpuDeltaParquetFileFormatBase2(
queryUsesInputFile: Boolean)
extends AbstractGpuParquetMultiFilePartitionReaderFactory(sqlConf, broadcastedConf,
dataSchema, readDataSchema, partitionSchema, filters, rapidsConf, poolConfBuilder,
metrics, queryUsesInputFile) with Logging {
metrics, queryUsesInputFile) {

logDebug("Using GpuDeltaParquetMultiFilePartitionReaderFactory for multi-threaded Parquet " +
"reading with deletion vectors")
Expand Down Expand Up @@ -584,11 +584,11 @@ class GpuDeltaParquetFileFormatBase2(
val (rowGroupOffsets, rowGroupNumRows) =
RapidsDeletionVectors.getRowGroupMetadata(singleFileInfo.blocks)
clippedBlocks ++= singleFileInfo.blocks.zipWithIndex.map { case (block, i) =>
ParquetSingleDataBlockMeta(
new ParquetSingleDataBlockMeta(
singleFileInfo.filePath,
ParquetDataBlock(block, compressCfg),
new ParquetDataBlock(block, compressCfg),
metaAndFile.file.partitionValues,
ParquetSchemaWrapper(singleFileInfo.schema),
new ParquetSchemaWrapper(singleFileInfo.schema),
singleFileInfo.readSchema,
new DeltaParquetExtraInfo(
singleFileInfo.dateRebaseMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ case class GpuDeltaParquetFileFormatNativeDV(
tablePathOpt: Option[String])
extends AbstractGpuParquetMultiFilePartitionReaderFactory(sqlConf, broadcastedConf,
dataSchema, readDataSchema, partitionSchema, filters, rapidsConf, poolConfBuilder,
metrics, queryUsesInputFile) with Logging {
metrics, queryUsesInputFile) {

logDebug("Using GpuDeltaParquetMultiFilePartitionReaderFactory for multi-threaded Parquet " +
"reading with deletion vectors")
Expand Down Expand Up @@ -636,11 +636,11 @@ case class GpuDeltaParquetFileFormatNativeDV(
val (rowGroupOffsets, rowGroupNumRows) =
RapidsDeletionVectors.getRowGroupMetadata(singleFileInfo.blocks)
clippedBlocks ++= singleFileInfo.blocks.zipWithIndex.map { case (block, i) =>
ParquetSingleDataBlockMeta(
new ParquetSingleDataBlockMeta(
singleFileInfo.filePath,
ParquetDataBlock(block, compressCfg),
new ParquetDataBlock(block, compressCfg),
metaAndFile.file.partitionValues,
ParquetSchemaWrapper(singleFileInfo.schema),
new ParquetSchemaWrapper(singleFileInfo.schema),
singleFileInfo.readSchema,
new DeltaParquetExtraInfo(
singleFileInfo.dateRebaseMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ class GpuCoalescingIcebergParquetReader(
conf.metrics)

info.blocks.map { block =>
ParquetSingleDataBlockMeta(
new ParquetSingleDataBlockMeta(
info.filePath,
ParquetDataBlock(block, CpuCompressionConfig.disabled()),
new ParquetDataBlock(block, CpuCompressionConfig.disabled()),
InternalRow.empty,
ParquetSchemaWrapper(info.schema),
new ParquetSchemaWrapper(info.schema),
info.readSchema,
IcebergParquetExtraInfo(
info.dateRebaseMode,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2019-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public final class FileUtils {
private FileUtils() {}

public static final class TempFile {
private final FSDataOutputStream outputStream;
private final Path path;

TempFile(FSDataOutputStream outputStream, Path path) {
this.outputStream = outputStream;
this.path = path;
}

public FSDataOutputStream getOutputStream() {
return outputStream;
}

public Path getPath() {
return path;
}
}

public static TempFile createTempFile(
Configuration conf, String pathPrefix, String pathSuffix) throws IOException {
FileSystem fs = new Path(pathPrefix).getFileSystem(conf);
Random rnd = new Random();
String suffix = pathSuffix != null ? pathSuffix : "";
while (true) {
Path path = new Path(pathPrefix + rnd.nextInt(Integer.MAX_VALUE) + suffix);
if (!fs.exists(path)) {
try {
return new TempFile(fs.create(path, false), path);
} catch (FileAlreadyExistsException e) {
// Retry if another writer won the race between exists and create.
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import ai.rapids.cudf.HostMemoryBuffer;
import com.nvidia.spark.rapids.jni.kudo.KudoTable;
import com.nvidia.spark.rapids.jni.kudo.KudoTableHeader;

public class SpillableKudoTable implements AutoCloseable {
public final KudoTableHeader header;
public final long length;
private final SpillableHostBuffer shb;

public SpillableKudoTable(KudoTableHeader header, long length, SpillableHostBuffer shb) {
this.header = header;
this.length = length;
this.shb = shb;
}

public static SpillableKudoTable from(KudoTableHeader header, HostMemoryBuffer buffer) {
if (buffer == null) {
return new SpillableKudoTable(header, 0, null);
} else {
return new SpillableKudoTable(
header,
buffer.getLength(),
SpillableHostBuffer.apply(
buffer,
buffer.getLength(),
SpillPriorities.ACTIVE_BATCHING_PRIORITY));
}
}

public KudoTable makeKudoTable() {
if (shb == null) {
return new KudoTable(header, null);
} else {
return new KudoTable(header, shb.getHostBuffer());
}
}

@Override
public String toString() {
return "SpillableKudoTable{header=" + header + ", shb=" + shb + '}';
}

@Override
public void close() {
if (shb != null) {
shb.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.commons.io.output.CountingOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.rapids.shims.TrampolineConnectShims
import org.apache.spark.sql.rapids.execution.TrampolineUtil

private[rapids] class AvroSeekableInputStream(in: SeekableInput) extends InputStream
with SeekableInput {
Expand Down Expand Up @@ -82,7 +82,7 @@ case class Header(
@transient
lazy val schema: Schema = {
getMetaString(SCHEMA)
.map(s => TrampolineConnectShims.createSchemaParser().parse(s))
.map(s => TrampolineUtil.createSchemaParser().parse(s))
.orNull
}

Expand Down Expand Up @@ -127,26 +127,6 @@ object Header {
}
}

/**
* The each Avro block information
*
* @param blockStart the start of block
* @param blockSize the whole block size = the size between two sync buffers + sync buffer
* @param dataSize the block data size
* @param count how many entries in this block
*/
case class BlockInfo(blockStart: Long, blockSize: Long, dataSize: Long, count: Long)

/**
* The mutable version of the BlockInfo without block start.
* This is for reusing an existing instance when accessing data in the iterator pattern.
*
* @param blockSize the whole block size (the size between two sync buffers + sync buffer size)
* @param dataSize the data size in this block
* @param count how many entries in this block
*/
case class MutableBlockInfo(var blockSize: Long, var dataSize: Long, var count: Long)

/** The parent of the Rapids Avro file readers */
abstract class AvroFileReader(si: SeekableInput) extends AutoCloseable {
// Children should update this pointer accordingly.
Expand Down Expand Up @@ -328,7 +308,7 @@ class AvroMetaFileReader(si: SeekableInput) extends AvroFileReader(si) {
val dataSizeLongLen = BinaryData.encodeLong(blockDataSize, buf, 0)
// (len of entries) + (len of block size) + (block size) + (sync size)
val blockLength = countLongLen + dataSizeLongLen + blockDataSize + SYNC_SIZE
blocks += BlockInfo(curBlockStart, blockLength, blockDataSize, blockCount)
blocks += new BlockInfo(curBlockStart, blockLength, blockDataSize, blockCount)

// Do we need to check the SYNC BUFFER, or just let cudf do it?
curBlockStart += blockLength
Expand Down Expand Up @@ -405,11 +385,11 @@ class AvroDataFileReader(si: SeekableInput) extends AvroFileReader(si) {
throw new NoSuchElementException
}
if (reuse == null) {
MutableBlockInfo(curBlockSize, curDataSize, curCount)
new MutableBlockInfo(curBlockSize, curDataSize, curCount)
} else {
reuse.blockSize = curBlockSize
reuse.dataSize = curDataSize
reuse.count = curCount
reuse.setBlockSize(curBlockSize)
reuse.setDataSize(curDataSize)
reuse.setCount(curCount)
reuse
}
}
Expand Down
31 changes: 23 additions & 8 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
* Copyright (c) 2021-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,10 +28,15 @@ import com.nvidia.spark.rapids.jni.kudo.KudoSerializer
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.vectorized.ColumnarBatch

object DumpUtils extends Logging {
object DumpUtils {
private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))

private def logWarning(msg: => String): Unit = {
log.warn(msg)
}

/**
* Debug utility to dump a host memory buffer to a file.
*
Expand All @@ -51,15 +56,16 @@ object DumpUtils extends Logging {
prefix: String,
suffix: String): String = {
try {
val (out, path) = FileUtils.createTempFile(conf, prefix, suffix)
val tempFile = FileUtils.createTempFile(conf, prefix, suffix)
val out = tempFile.getOutputStream
withResource(out) { _ =>
withResource(data.slice(offset, len)) { hmb =>
withResource(new HostMemoryInputStream(hmb, hmb.getLength)) { in =>
IOUtils.copy(in, out)
}
}
}
path.toString
tempFile.getPath.toString
} catch {
case e: Exception =>
log.error(s"Error attempting to dump data", e)
Expand All @@ -73,15 +79,16 @@ object DumpUtils extends Logging {
prefix: String,
suffix: String): String = {
try {
val (out, path) = FileUtils.createTempFile(conf, prefix, suffix)
val tempFile = FileUtils.createTempFile(conf, prefix, suffix)
val out = tempFile.getOutputStream
withResource(out) { _ =>
data.foreach { hmb =>
withResource(new HostMemoryInputStream(hmb, hmb.getLength)) { in =>
IOUtils.copy(in, out)
}
}
}
path.toString
tempFile.getPath.toString
} catch {
case e: Exception =>
log.error(s"Error attempting to dump data", e)
Expand Down Expand Up @@ -324,7 +331,15 @@ private class ColumnIndex() {
}
}

object ParquetDumper extends Logging {
object ParquetDumper {
private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))

private def logDebug(msg: => String): Unit = {
if (log.isDebugEnabled) {
log.debug(msg)
}
}

val COMPRESS_TYPE = CompressionType.SNAPPY

def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
Expand Down
46 changes: 0 additions & 46 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/FileUtils.scala

This file was deleted.

Loading
Loading