Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object GpuCheckDeltaInvariant extends Logging {
ExprChecks.projectOnly(
TypeSig.all,
TypeSig.all,
paramCheck = Seq(ParamCheck("input", TypeSig.all, TypeSig.all))),
paramCheck = Seq(new ParamCheck("input", TypeSig.all, TypeSig.all))),
(c, conf, p, r) => new GpuCheckDeltaInvariantMeta(c, conf, p, r))

def maybeConvertToGpu(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ trait DatabricksDeltaProviderBase extends DeltaProviderImplBase {
"Delta RTAS was tagged as unsupported and should not be converted to GPU")
}

protected case class DeltaWriteV1Config(
deltaLog: DeltaLog,
forceOverwrite: Boolean,
options: mutable.HashMap[String, String])
protected class DeltaWriteV1Config(
val deltaLog: DeltaLog,
val forceOverwrite: Boolean,
val options: mutable.HashMap[String, String])

private def extractWriteV1Config(
meta: RapidsMeta[_, _, _],
Expand Down Expand Up @@ -210,7 +210,7 @@ trait DatabricksDeltaProviderBase extends DeltaProviderImplBase {
f.get(outerObj).asInstanceOf[mutable.HashMap[String, String]]
}
if (forceOverwrite.isDefined && options.isDefined) {
Some(DeltaWriteV1Config(deltaLog, forceOverwrite.get, options.get))
Some(new DeltaWriteV1Config(deltaLog, forceOverwrite.get, options.get))
} else {
meta.willNotWorkOnGpu(s"write class has unsupported outer class $outerClass")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

// Expression support shared across versions - defined outside class to avoid serialization issues
case class GpuIncrementMetricMeta(
cpuInc: IncrementMetric,
override val conf: RapidsConf,
p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule) extends ExprMeta[IncrementMetric](cpuInc, conf, p, r) {
class GpuIncrementMetricMeta(
val cpuInc: IncrementMetric,
override val conf: RapidsConf,
val p: Option[RapidsMeta[_, _, _]],
val r: DataFromReplacementRule)
extends ExprMeta[IncrementMetric](cpuInc, conf, p, r) with Serializable {
override def convertToGpuImpl(): GpuExpression = {
val gpuChild = childExprs.head.convertToGpu()
GpuIncrementMetric(cpuInc, gpuChild)
Expand Down Expand Up @@ -88,7 +89,7 @@ abstract class DeltaProviderBase extends DeltaIOProvider {
GpuOverrides.expr[IncrementMetric](
"IncrementMetric",
ExprChecks.unaryProject(TypeSig.all, TypeSig.all, TypeSig.all, TypeSig.all),
(cpuInc, conf, p, r) => GpuIncrementMetricMeta(cpuInc, conf, p, r)
(cpuInc, conf, p, r) => new GpuIncrementMetricMeta(cpuInc, conf, p, r)
)
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
Expand All @@ -52,7 +51,7 @@ class GpuDeltaParquetFileFormatBase(
optimizationsEnabled: Boolean = true,
tablePath: Option[String] = None,
isCDCRead: Boolean = false
) extends com.nvidia.spark.rapids.delta.GpuDeltaParquetFileFormat with Logging {
) extends com.nvidia.spark.rapids.delta.GpuDeltaParquetFileFormat {

// Validate either we have all arguments for DV enabled read or none of them.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.parquet.schema.MessageType

import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.QuotingUtils
Expand Down Expand Up @@ -71,7 +70,7 @@ class GpuDeltaParquetFileFormatBase2(
optimizationsEnabled: Boolean = true,
tablePath: Option[String] = None,
isCDCRead: Boolean = false
) extends com.nvidia.spark.rapids.delta.GpuDeltaParquetFileFormat with Logging {
) extends com.nvidia.spark.rapids.delta.GpuDeltaParquetFileFormat {

// Validate either we have all arguments for DV enabled read or none of them.

Expand Down Expand Up @@ -443,19 +442,19 @@ class GpuDeltaParquetFileFormatBase2(
* @param rowGroupNumRows number of rows in each row group
* @param partitionIndex index into rowsPerPartition / allPartValues this file contributes to
*/
case class PerFileDVEntry(
dvDescriptor: Option[String],
rowGroupOffsets: Array[Long],
rowGroupNumRows: Array[Int],
partitionIndex: Int)
class PerFileDVEntry(
val dvDescriptor: Option[String],
val rowGroupOffsets: Array[Long],
val rowGroupNumRows: Array[Int],
val partitionIndex: Int)

/**
* Per-file DV load result produced during [[prepareForDecode]].
*
* @param gpuBitmap serialized roaring bitmap buffer for the file's deletion vector
* @param aliveCount number of alive (non-deleted) rows in the file
*/
case class SerializedRoaringBitmap(gpuBitmap: SpillableHostBuffer, aliveCount: Long)
class SerializedRoaringBitmap(val gpuBitmap: SpillableHostBuffer, val aliveCount: Long)

/**
* Per-batch DV info that replaces [[ParquetExtraInfo]] in [[CurrentChunkMeta]] after batch
Expand All @@ -464,13 +463,13 @@ class GpuDeltaParquetFileFormatBase2(
* - [[loadedDVResults]] is filled in by [[prepareForDecode]] after the copy phase.
* [[perFileEntries]] and [[loadedDVResults]] are always parallel sequences of the same length.
*/
case class DeltaBatchExtraInfo(
class DeltaBatchExtraInfo(
override val dateRebaseMode: DateTimeRebaseMode,
override val timestampRebaseMode: DateTimeRebaseMode,
override val hasInt96Timestamps: Boolean,
val perFileEntries: Seq[PerFileDVEntry],
// Filled by prepareForDecode() after the copy phase; empty until then.
val loadedDVResults: Seq[SerializedRoaringBitmap] = Seq.empty
val loadedDVResults: Seq[SerializedRoaringBitmap]
) extends ParquetExtraInfo(dateRebaseMode, timestampRebaseMode, hasInt96Timestamps) {
/**
* True if at least one file in this batch carries a deletion vector descriptor.
Expand All @@ -481,7 +480,8 @@ class GpuDeltaParquetFileFormatBase2(
* Returns a copy of this instance with [[loadedDVResults]] set.
*/
def withLoadedDVResults(loadedDVResults: Seq[SerializedRoaringBitmap]): DeltaBatchExtraInfo =
this.copy(loadedDVResults = loadedDVResults)
new DeltaBatchExtraInfo(dateRebaseMode, timestampRebaseMode, hasInt96Timestamps,
perFileEntries, loadedDVResults)

/**
* Closes the DV bitmaps in [[loadedDVResults]].
Expand Down Expand Up @@ -745,55 +745,53 @@ class GpuDeltaParquetFileFormatBase2(
/**
* Deletion vector metadata for a single host memory buffer containing a part of data.
*/
private case class SingleBufferDVMetadata(
maybeDvInfo: Option[SpillableDeletionVectorInfo]
)

private case class DeletionVectorMetadata(
metadatas: Array[SingleBufferDVMetadata]
)

private object DeletionVectorMetadata {
def forSingleBuffer(maybeDvInfo: Option[SpillableDeletionVectorInfo]) = {
DeletionVectorMetadata(
Array(
SingleBufferDVMetadata(maybeDvInfo)
)
private class SingleBufferDVMetadata(
val maybeDvInfo: Option[SpillableDeletionVectorInfo])

private class DeletionVectorMetadata(
val metadatas: Array[SingleBufferDVMetadata])

private def deletionVectorMetadataForSingleBuffer(
maybeDvInfo: Option[SpillableDeletionVectorInfo]): DeletionVectorMetadata = {
new DeletionVectorMetadata(
Array(
new SingleBufferDVMetadata(maybeDvInfo)
)
}
)
}

def combine(metadatas: Array[DeletionVectorMetadata]): DeletionVectorMetadata = {
DeletionVectorMetadata(metadatas.flatMap(_.metadatas))
}
private def combineDeletionVectorMetadata(
metadatas: Array[DeletionVectorMetadata]): DeletionVectorMetadata = {
new DeletionVectorMetadata(metadatas.flatMap(_.metadatas))
}

private case class DeltaParquetHostMemoryEmptyMetaData(
private class DeltaParquetHostMemoryEmptyMetaData(
override val partitionedFile: PartitionedFile,
bufferSize: Long,
val bufferSize: Long,
override val bytesRead: Long,
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
hasInt96Timestamps: Boolean,
clippedSchema: MessageType,
readSchema: StructType,
numRows: Long,
dvMetadata: Array[DeletionVectorMetadata],
override val allPartValues: Option[Array[(Long, InternalRow)]] = None)
val dateRebaseMode: DateTimeRebaseMode,
val timestampRebaseMode: DateTimeRebaseMode,
val hasInt96Timestamps: Boolean,
val clippedSchema: MessageType,
val readSchema: StructType,
val numRows: Long,
val dvMetadata: Array[DeletionVectorMetadata],
override val allPartValues: Option[Array[(Long, InternalRow)]])
extends HostMemoryEmptyMetaData {}

private case class DeltaParquetHostMemoryBuffersWithMetaData(
private class DeltaParquetHostMemoryBuffersWithMetaData(
override val partitionedFile: PartitionedFile,
override val memBuffersAndSizes: Array[SingleHMBAndMeta],
override val bytesRead: Long,
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
hasInt96Timestamps: Boolean,
clippedSchema: MessageType,
readSchema: StructType,
val dateRebaseMode: DateTimeRebaseMode,
val timestampRebaseMode: DateTimeRebaseMode,
val hasInt96Timestamps: Boolean,
val clippedSchema: MessageType,
val readSchema: StructType,
override val allPartValues: Option[Array[(Long, InternalRow)]],
// deletion vector metadata. should be aligned with memBuffersAndSizes if deletion vectors
// are present.
dvMetadata: Array[DeletionVectorMetadata]
val dvMetadata: Array[DeletionVectorMetadata]
) extends HostMemoryBuffersWithMetaData {

override def consumeHeadBuffer(): HostMemoryBuffersWithMetaData = {
Expand All @@ -806,7 +804,17 @@ class GpuDeltaParquetFileFormatBase2(
} else {
(Array.empty[SingleHMBAndMeta], Array.empty[DeletionVectorMetadata])
}
this.copy(memBuffersAndSizes = remainingBuffers, dvMetadata = newDvMetadata)
new DeltaParquetHostMemoryBuffersWithMetaData(
partitionedFile,
remainingBuffers,
bytesRead,
dateRebaseMode,
timestampRebaseMode,
hasInt96Timestamps,
clippedSchema,
readSchema,
allPartValues,
newDvMetadata)
}
}

Expand Down Expand Up @@ -843,7 +851,7 @@ class GpuDeltaParquetFileFormatBase2(
}

closeOnExcept(maybeSerializedDV) { _ =>
val dvMetadata = DeletionVectorMetadata.forSingleBuffer(
val dvMetadata = deletionVectorMetadataForSingleBuffer(
maybeSerializedDV.map{ serializedDV =>
val (rowGroupOffsets, rowGroupNumRows) = RapidsDeletionVectors
.getRowGroupMetadata(blocks)
Expand All @@ -853,7 +861,7 @@ class GpuDeltaParquetFileFormatBase2(
rowGroupOffsets,
rowGroupNumRows)}
)
DeltaParquetHostMemoryEmptyMetaData(
new DeltaParquetHostMemoryEmptyMetaData(
partitionedFile,
bufferSize,
bytesRead,
Expand All @@ -863,7 +871,8 @@ class GpuDeltaParquetFileFormatBase2(
clippedSchema,
readSchema,
numRows,
Array(dvMetadata)
Array(dvMetadata),
None
)
}
}
Expand All @@ -872,9 +881,9 @@ class GpuDeltaParquetFileFormatBase2(
nonEmptyMeta: CombinedMeta): HostMemoryEmptyMetaData = {
val metaForEmpty = emptyMeta.metaForEmpty
val toCombine = emptyMeta.emptyMetas.map(_.asInstanceOf[DeltaParquetHostMemoryEmptyMetaData])
val combinedDVMeta = DeletionVectorMetadata.combine(toCombine.flatMap(_.dvMetadata))
val combinedDVMeta = combineDeletionVectorMetadata(toCombine.flatMap(_.dvMetadata))

DeltaParquetHostMemoryEmptyMetaData(
new DeltaParquetHostMemoryEmptyMetaData(
metaForEmpty.partitionedFile, // just pick one since not used
emptyMeta.emptyBufferSize,
emptyMeta.emptyTotalBytesRead,
Expand Down Expand Up @@ -912,7 +921,7 @@ class GpuDeltaParquetFileFormatBase2(
.map(_.asInstanceOf[ParquetDataBlock].dataBlock)
val (rowGroupOffsets, rowGroupNumRows) = RapidsDeletionVectors
.getRowGroupMetadata(dataBlocks)
DeletionVectorMetadata.forSingleBuffer(
deletionVectorMetadataForSingleBuffer(
maybeSerializedDV.map { serializedDV =>
serializedDV.incRefCount()
SpillableDeletionVectorInfo(
Expand All @@ -923,7 +932,7 @@ class GpuDeltaParquetFileFormatBase2(
})
}

DeltaParquetHostMemoryBuffersWithMetaData(
new DeltaParquetHostMemoryBuffersWithMetaData(
partitionedFile,
memBuffersAndSize,
bytesRead,
Expand All @@ -946,9 +955,9 @@ class GpuDeltaParquetFileFormatBase2(
val metaToUse = combinedMeta.firstNonEmpty
val toCombine = combinedMeta.toCombine
.collect { case hmb: DeltaParquetHostMemoryBuffersWithMetaData => hmb }
val combinedDVMeta = DeletionVectorMetadata.combine(toCombine.flatMap(_.dvMetadata))
val combinedDVMeta = combineDeletionVectorMetadata(toCombine.flatMap(_.dvMetadata))

DeltaParquetHostMemoryBuffersWithMetaData(
new DeltaParquetHostMemoryBuffersWithMetaData(
metaToUse.partitionedFile,
Array(newHmbBufferInfo),
offset,
Expand Down Expand Up @@ -1113,12 +1122,12 @@ class GpuDeltaParquetFileFormatBase2(
fileNumRows += extra.rowGroupNumRows
}

PerFileDVEntry(fileDesc, fileOffsets.toArray, fileNumRows.toArray, partitionIndex)
new PerFileDVEntry(fileDesc, fileOffsets.toArray, fileNumRows.toArray, partitionIndex)
}.toSeq

val batchExtra = new DeltaBatchExtraInfo(
meta.extraInfo.dateRebaseMode, meta.extraInfo.timestampRebaseMode,
meta.extraInfo.hasInt96Timestamps, fileEntries)
meta.extraInfo.hasInt96Timestamps, fileEntries, Seq.empty)
meta.copy(extraInfo = batchExtra)
}

Expand Down Expand Up @@ -1160,7 +1169,7 @@ class GpuDeltaParquetFileFormatBase2(
require(numDeleted <= totalRows,
s"Deletion vector cardinality ($numDeleted) exceeds " +
s"file row count ($totalRows)")
SerializedRoaringBitmap(gpuBitmap, totalRows - numDeleted)
new SerializedRoaringBitmap(gpuBitmap, totalRows - numDeleted)
}
}
})
Expand Down Expand Up @@ -1238,8 +1247,8 @@ class GpuDeltaParquetFileFormatBase2(
* A simple wrapper to adapt the DeletionVector.ParquetChunkedReader to the ChunkedReader interface
* expected by AbstractParquetTableReader.
*/
case class DeltaParquetChunkedReader(delegate: DeletionVector.ParquetChunkedReader)
extends ChunkedReader {
class DeltaParquetChunkedReader(val delegate: DeletionVector.ParquetChunkedReader)
extends ChunkedReader with Serializable {
override def hasNext: Boolean = delegate.hasNext
override def next: Table = delegate.readChunk()
override def close(): Unit = delegate.close()
Expand All @@ -1248,7 +1257,7 @@ case class DeltaParquetChunkedReader(delegate: DeletionVector.ParquetChunkedRead
/**
* A chunked reader for Parquet files with deletion vectors.
*/
case class DeltaParquetTableReader(
class DeltaParquetTableReader(
conf: Configuration,
chunkSizeByteLimit: Long,
maxChunkedReaderMemoryUsageSizeBytes: Long,
Expand All @@ -1268,11 +1277,11 @@ case class DeltaParquetTableReader(
conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes, opts, buffers, metrics,
dateRebaseMode, timestampRebaseMode, isSchemaCaseSensitive, useFieldId, readDataSchema,
clippedParquetSchema, splits, debugDumpPrefix, debugDumpAlways
) {
) with Serializable {

logDebug("Using DeltaParquetTableReader for reading Parquet with deletion vectors")

override protected val reader = DeltaParquetChunkedReader(
override protected val reader = new DeltaParquetChunkedReader(
DeletionVector.newParquetChunkedReader(chunkSizeByteLimit,
maxChunkedReaderMemoryUsageSizeBytes, opts, buffers, dvInfos)
)
Expand All @@ -1287,7 +1296,7 @@ case class DeltaParquetTableReader(
}
}

object MakeParquetTableWithDVProducer extends Logging {
object MakeParquetTableWithDVProducer extends RapidsLocalLog {
def apply(
useChunkedReader: Boolean,
maxChunkedReaderMemoryUsageSizeBytes: Long,
Expand Down Expand Up @@ -1318,7 +1327,7 @@ object MakeParquetTableWithDVProducer extends Logging {
}
}
if (useChunkedReader) {
DeltaParquetTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes,
new DeltaParquetTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes,
opts, buffers, metrics, dateRebaseMode, timestampRebaseMode,
isSchemaCaseSensitive, useFieldId, readDataSchema, clippedParquetSchema,
splits, debugDumpPrefix, debugDumpAlways, deletionVectorInfos)
Expand Down
Loading
Loading