Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Jun 18, 2024
1 parent d3c5fca commit 1380e8f
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 38 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/pr_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,11 @@ jobs:
matrix:
java_version: [8, 17]
test-target: [java]
spark-version: ['3.2', '3.3']
spark-version: ['3.3']
scala-version: ['2.12', '2.13']
exclude:
- java_version: 17
spark-version: '3.2'
- java_version: 8
spark-version: '3.3'
- spark-version: '3.2'
scala-version: '2.13'
fail-fast: false
name: macos-14(Silicon)/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ matrix.test-target }}
runs-on: macos-14
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres

wrapped.logicalLink.foreach(setLogicalLink)

def keyGroupedPartitioning: Option[Seq[Expression]] = wrapped.keyGroupedPartitioning

def inputPartitions: Seq[InputPartition] = wrapped.inputPartitions

override lazy val inputRDD: RDD[InternalRow] = wrappedScan.inputRDD

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,12 @@

package org.apache.comet.shims

import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

trait ShimCometBatchScanExec {
def wrapped: BatchScanExec

// Only for Spark 3.3+
def keyGroupedPartitioning: Option[Seq[Expression]] = wrapped.getClass.getDeclaredMethods
.filter(_.getName == "keyGroupedPartitioning")
.flatMap(_.invoke(wrapped).asInstanceOf[Option[Seq[Expression]]])
.headOption

// Only for Spark 3.3+
def inputPartitions: Seq[InputPartition] = wrapped.getClass.getDeclaredMethods
.filter(_.getName == "inputPartitions")
.flatMap(_.invoke(wrapped).asInstanceOf[Seq[InputPartition]])

// Only for Spark 3.4+
def ordering: Option[Seq[SortOrder]] = wrapped.getClass.getDeclaredMethods
.filter(_.getName == "ordering")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.types.{LongType, StructField, StructType}
trait ShimCometScanExec {
def wrapped: FileSourceScanExec

// TODO: remove after dropping Spark 3.2 support and directly call wrapped.metadataColumns
// TODO: remove after dropping Spark 3.3 support
lazy val metadataColumns: Seq[AttributeReference] = wrapped.getClass.getDeclaredMethods
.filter(_.getName == "metadataColumns")
.map { a => a.setAccessible(true); a }
Expand All @@ -49,7 +49,7 @@ trait ShimCometScanExec {
.map { a => a.setAccessible(true); a }
.flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])

// TODO: remove after dropping Spark 3.2 support and directly call new FileScanRDD
// TODO: remove after dropping Spark 3.4 support and directly call new FileScanRDD
protected def newFileScanRDD(
fsRelation: HadoopFsRelation,
readFunction: PartitionedFile => Iterator[InternalRow],
Expand All @@ -58,10 +58,9 @@ trait ShimCometScanExec {
options: ParquetOptions): FileScanRDD =
classOf[FileScanRDD].getDeclaredConstructors
// Prevent to pick up incorrect constructors from any custom Spark forks.
.filter(c => List(3, 5, 6).contains(c.getParameterCount()))
.filter(c => List(5, 6).contains(c.getParameterCount()))
.map { c =>
c.getParameterCount match {
case 3 => c.newInstance(fsRelation.sparkSession, readFunction, filePartitions)
case 5 =>
c.newInstance(fsRelation.sparkSession, readFunction, filePartitions, readSchema, metadataColumns)
case 6 =>
Expand All @@ -80,16 +79,12 @@ trait ShimCometScanExec {
// TODO: remove after dropping Spark 3.3 support and directly call
// QueryExecutionErrors.SparkException
protected def invalidBucketFile(path: String, sparkVersion: String): Throwable = {
if (sparkVersion >= "3.3") {
val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path) else Array(path)
classOf[SparkException].getDeclaredConstructors
.filter(_.getParameterCount == 3)
.map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null))
.last
.asInstanceOf[SparkException]
} else { // Spark 3.2
new IllegalStateException(s"Invalid bucket file ${path}")
}
val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path) else Array(path)
classOf[SparkException].getDeclaredConstructors
.filter(_.getParameterCount == 3)
.map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null))
.last
.asInstanceOf[SparkException]
}

// Copied from Spark 3.4 RowIndexUtil due to PARQUET-2161 (tracked in SPARK-39634)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@

package org.apache.comet.shims

import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

trait ShimCometBatchScanExec {
def wrapped: BatchScanExec

def keyGroupedPartitioning: Option[Seq[Expression]] = wrapped.keyGroupedPartitioning

def inputPartitions: Seq[InputPartition] = wrapped.inputPartitions

def ordering: Option[Seq[SortOrder]] = wrapped.ordering
}

0 comments on commit 1380e8f

Please sign in to comment.