Skip to content

Commit

Permalink
address comments and remove more 3.2 related code
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Jun 17, 2024
1 parent 99f2d5c commit 0e936a8
Show file tree
Hide file tree
Showing 16 changed files with 27 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
package org.apache.comet.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.comet.shims.ShimCometParquetUtils
import org.apache.spark.sql.internal.SQLConf

object CometParquetUtils extends ShimCometParquetUtils {
object CometParquetUtils {
private val PARQUET_FIELD_ID_WRITE_ENABLED = "spark.sql.parquet.fieldId.write.enabled"
private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled"
private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import org.apache.parquet.schema._
import org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.types._

import org.apache.comet.parquet.CometParquetUtils

/**
* This class is copied & slightly modified from [[ParquetReadSupport]] in Spark. Changes:
* - This doesn't extend from Parquet's `ReadSupport` class since that is used for row-based
Expand All @@ -53,7 +52,7 @@ object CometParquetReadSupport {
ignoreMissingIds: Boolean): MessageType = {
if (!ignoreMissingIds &&
!containsFieldIds(parquetSchema) &&
CometParquetUtils.hasFieldIds(catalystSchema)) {
ParquetUtils.hasFieldIds(catalystSchema)) {
throw new RuntimeException(
"Spark read schema expects field Ids, " +
"but Parquet file schema doesn't contain any field Ids.\n" +
Expand Down Expand Up @@ -334,14 +333,14 @@ object CometParquetReadSupport {
}

def matchIdField(f: StructField): Type = {
val fieldId = CometParquetUtils.getFieldId(f)
val fieldId = ParquetUtils.getFieldId(f)
idToParquetFieldMap
.get(fieldId)
.map { parquetTypes =>
if (parquetTypes.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
throw CometParquetUtils.foundDuplicateFieldInFieldIdLookupModeError(
throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
fieldId,
parquetTypesString)
} else {
Expand All @@ -355,9 +354,9 @@ object CometParquetReadSupport {
}
}

val shouldMatchById = useFieldId && CometParquetUtils.hasFieldIds(structType)
val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType)
structType.map { f =>
if (shouldMatchById && CometParquetUtils.hasFieldId(f)) {
if (shouldMatchById && ParquetUtils.hasFieldId(f)) {
matchIdField(f)
} else if (caseSensitive) {
matchCaseSensitiveField(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -66,8 +67,8 @@ class CometSparkToParquetSchemaConverter(
*/
def convertField(field: StructField): Type = {
val converted = convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
if (useFieldId && CometParquetUtils.hasFieldId(field)) {
converted.withId(CometParquetUtils.getFieldId(field))
if (useFieldId && ParquetUtils.hasFieldId(field)) {
converted.withId(ParquetUtils.getFieldId(field))
} else {
converted
}
Expand Down

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion docs/source/contributor-guide/adding_a_new_expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ If the expression you're adding has different behavior across different Spark ve

## Shimming to Support Different Spark Versions

By adding shims for each Spark version, you can provide a consistent interface for the expression across different Spark versions. For example, `unhex` added a new optional parameter is Spark 3.4, for if it should `failOnError` or not. So for version 3.2 and 3.3, the shim is:
By adding shims for each Spark version, you can provide a consistent interface for the expression across different Spark versions. For example, `unhex` added a new optional parameter is Spark 3.4, for if it should `failOnError` or not. So for version 3.3, the shim is:

```scala
trait CometExprShim {
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Make sure the following requirements are met and software installed on your mach

## Requirements

- Apache Spark 3.2, 3.3, or 3.4
- Apache Spark 3.3, or 3.4
- JDK 8 and up
- GLIBC 2.17 (Centos 7) and up

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The following diagram illustrates the architecture of Comet:

## Current Status

The project is currently integrated into Apache Spark 3.2, 3.3, and 3.4.
The project is currently integrated into Apache Spark 3.3, and 3.4.

## Feature Parity with Apache Spark

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.comet.shims.ShimCometScanExec
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -401,19 +402,20 @@ case class CometScanExec(
new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf),
metrics)

newDataSourceRDD(
new DataSourceRDD(
fsRelation.sparkSession.sparkContext,
partitions.map(Seq(_)),
partitionReaderFactory,
true,
Map.empty)
} else {
newFileScanRDD(
fsRelation,
new FileScanRDD(
fsRelation.sparkSession,
readFile,
partitions,
new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields),
new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf))
Seq.empty,
new FileSourceOptions(CaseInsensitiveMap(relation.options)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,14 +583,14 @@ class CometShuffleWriteProcessor(
}

/**
* Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.2.
* Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.3.
*/
private[spark] class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}

/**
* Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.2.
* Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.3.
*/
private[spark] class ConstantPartitioner extends Partitioner {
override def numPartitions: Int = 1
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast

trait ShimCometBroadcastExchangeExec {
// TODO: remove after dropping Spark 3.2 and 3.3 support
// TODO: remove after dropping Spark 3.3 support
protected def doBroadcast[T: ClassTag](sparkContext: SparkContext, value: T): Broadcast[Any] = {
// Spark 3.4 has new API `broadcastInternal` to broadcast the relation without caching the
// unserialized object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,60 +40,15 @@ import org.apache.spark.sql.types.{LongType, StructField, StructType}
trait ShimCometScanExec {
def wrapped: FileSourceScanExec

// TODO: remove after dropping Spark 3.2 support and directly call wrapped.metadataColumns
lazy val metadataColumns: Seq[AttributeReference] = wrapped.getClass.getDeclaredMethods
.filter(_.getName == "metadataColumns")
.map { a => a.setAccessible(true); a }
.flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])

// TODO: remove after dropping Spark 3.2 and 3.3 support and directly call
// TODO: remove after dropping Spark 3.3 support and directly call
// wrapped.fileConstantMetadataColumns
lazy val fileConstantMetadataColumns: Seq[AttributeReference] =
wrapped.getClass.getDeclaredMethods
.filter(_.getName == "fileConstantMetadataColumns")
.map { a => a.setAccessible(true); a }
.flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])

// TODO: remove after dropping Spark 3.2 support and directly call new DataSourceRDD
protected def newDataSourceRDD(
sc: SparkContext,
inputPartitions: Seq[Seq[InputPartition]],
partitionReaderFactory: PartitionReaderFactory,
columnarReads: Boolean,
customMetrics: Map[String, SQLMetric]): DataSourceRDD = {
implicit def flattenSeq(p: Seq[Seq[InputPartition]]): Seq[InputPartition] = p.flatten
new DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads, customMetrics)
}

// TODO: remove after dropping Spark 3.2 support and directly call new FileScanRDD
protected def newFileScanRDD(
fsRelation: HadoopFsRelation,
readFunction: PartitionedFile => Iterator[InternalRow],
filePartitions: Seq[FilePartition],
readSchema: StructType,
options: ParquetOptions): FileScanRDD =
classOf[FileScanRDD].getDeclaredConstructors
// Prevent to pick up incorrect constructors from any custom Spark forks.
.filter(c => List(3, 5, 6).contains(c.getParameterCount()) )
.map { c =>
c.getParameterCount match {
case 3 => c.newInstance(fsRelation.sparkSession, readFunction, filePartitions)
case 5 =>
c.newInstance(fsRelation.sparkSession, readFunction, filePartitions, readSchema, metadataColumns)
case 6 =>
c.newInstance(
fsRelation.sparkSession,
readFunction,
filePartitions,
readSchema,
fileConstantMetadataColumns,
options)
}
}
.last
.asInstanceOf[FileScanRDD]

// TODO: remove after dropping Spark 3.2 and 3.3 support and directly call
// TODO: remove after dropping Spark 3.3 support and directly call
// QueryExecutionErrors.SparkException
protected def invalidBucketFile(path: String, sparkVersion: String): Throwable = {
if (sparkVersion >= "3.3") {
Expand Down
2 changes: 1 addition & 1 deletion spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
} else if (CometSparkSessionExtensions.isSpark34Plus) {
// for Spark 3.4 we expect to reproduce the error message exactly
assert(cometMessage == sparkMessage)
} else if (CometSparkSessionExtensions.isSpark33Plus) {
} else {
// for Spark 3.3 we just need to strip the prefix from the Comet message
// before comparing
val cometMessageModified = cometMessage
Expand Down
Loading

0 comments on commit 0e936a8

Please sign in to comment.