Skip to content
7 changes: 7 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,13 @@ object CometConf extends ShimCometConf {
.toSequence
.createWithDefault(Seq("Range,InMemoryTableScan,RDDScan"))

val COMET_PREFER_TO_ARROW_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.preferToColumnar.enabled")
.internal()
.doc("TODO: doc")
.booleanConf
.createWithDefault(true)

val COMET_CASE_CONVERSION_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.caseConversion.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,13 @@ object Utils extends CometTypeShim {
name,
fieldType,
Seq(toArrowField("element", elementType, containsNull, timeZoneId)).asJava)
case StructType(fields) =>
case st @ StructType(fields) =>
if (st.names.toSet.size != fields.length) {
throw new SparkException(
"Duplicated field names in Arrow Struct are not allowed," +
s" got ${st.names.mkString("[", ", ", "]")}.")
}

val fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null)
new Field(
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.ExtendedExplainGenerator
import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag}
import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec

import org.apache.comet.CometExplainInfo.getActualPlan

Expand Down Expand Up @@ -158,6 +159,7 @@ object CometExplainInfo {
case p: InputAdapter => getActualPlan(p.child)
case p: QueryStageExec => getActualPlan(p.plan)
case p: WholeStageCodegenExec => getActualPlan(p.child)
case p: ReusedExchangeExec => getActualPlan(p.child)
case p => p
}
}
Expand Down
Loading
Loading