Skip to content

Commit 8726d1f

Browse files
feat(spark): support Hive DDL / Insert operations
Add support for DdlRel and WriteRel for Hive in Spark Signed-off-by: Andrew Coleman <[email protected]>
1 parent 4ae8188 commit 8726d1f

File tree

5 files changed

+264
-37
lines changed

5 files changed

+264
-37
lines changed

spark/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
metastore_db
2+
spark-warehouse
3+
/src/test/resources/write-a.csv
4+
derby.log

spark/build.gradle.kts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ dependencies {
118118
implementation(libs.scala.library)
119119
api(libs.spark.core)
120120
api(libs.spark.sql)
121+
implementation(libs.spark.hive)
121122
implementation(libs.spark.catalyst)
122123
implementation(libs.slf4j.api)
123124

@@ -150,6 +151,9 @@ tasks {
150151
test {
151152
dependsOn(":core:shadowJar")
152153
useJUnitPlatform { includeEngines("scalatest") }
153-
jvmArgs("--add-exports=java.base/sun.nio.ch=ALL-UNNAMED")
154+
jvmArgs(
155+
"--add-exports=java.base/sun.nio.ch=ALL-UNNAMED",
156+
"--add-opens=java.base/java.net=ALL-UNNAMED",
157+
)
154158
}
155159
}

spark/src/main/scala/io/substrait/spark/logical/ToLogicalPlan.scala

Lines changed: 79 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,31 @@ import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOute
2929
import org.apache.spark.sql.catalyst.plans.logical._
3030
import org.apache.spark.sql.catalyst.util.toPrettySQL
3131
import org.apache.spark.sql.execution.QueryExecution
32-
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, LeafRunnableCommand}
32+
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, CreateTableCommand, DataWritingCommand, DropTableCommand, LeafRunnableCommand}
3333
import org.apache.spark.sql.execution.datasources.{FileFormat => SparkFileFormat, HadoopFsRelation, InMemoryFileIndex, InsertIntoHadoopFsRelationCommand, LogicalRelation, V1Writes}
3434
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
3535
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
3636
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
37-
import org.apache.spark.sql.internal.SQLConf
37+
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
38+
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
3839
import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
3940

4041
import io.substrait.`type`.{NamedStruct, StringTypeVisitor, Type}
4142
import io.substrait.{expression => exp}
4243
import io.substrait.expression.{Expression => SExpression}
4344
import io.substrait.plan.Plan
4445
import io.substrait.relation
45-
import io.substrait.relation.{ExtensionWrite, LocalFiles, NamedWrite}
46+
import io.substrait.relation.{ExtensionWrite, LocalFiles, NamedDdl, NamedWrite}
47+
import io.substrait.relation.AbstractDdlRel.{DdlObject, DdlOp}
4648
import io.substrait.relation.AbstractWriteRel.{CreateMode, WriteOp}
4749
import io.substrait.relation.Expand.{ConsistentField, SwitchingField}
4850
import io.substrait.relation.Set.SetOp
4951
import io.substrait.relation.files.FileFormat
5052
import io.substrait.util.EmptyVisitationContext
5153
import org.apache.hadoop.fs.Path
5254

55+
import java.net.URI
56+
5357
import scala.annotation.nowarn
5458
import scala.collection.JavaConverters.asScalaBufferConverter
5559
import scala.collection.mutable.ArrayBuffer
@@ -439,35 +443,44 @@ class ToLogicalPlan(spark: SparkSession = SparkSession.builder().getOrCreate())
439443

440444
override def visit(write: NamedWrite, context: EmptyVisitationContext): LogicalPlan = {
441445
val child = write.getInput.accept(this, context)
442-
443-
val (table, database, catalog) = write.getNames.asScala match {
444-
case Seq(table) => (table, None, None)
445-
case Seq(database, table) => (table, Some(database), None)
446-
case Seq(catalog, database, table) => (table, Some(database), Some(catalog))
447-
case names =>
448-
throw new UnsupportedOperationException(
449-
s"NamedWrite requires up to three names ([[catalog,] database,] table): $names")
446+
val table = catalogTable(write.getNames.asScala)
447+
val isHive = spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION.key) match {
448+
case "hive" => true
449+
case _ => false
450450
}
451-
val id = TableIdentifier(table, database, catalog)
452-
val catalogTable = CatalogTable(
453-
id,
454-
CatalogTableType.MANAGED,
455-
CatalogStorageFormat.empty,
456-
new StructType(),
457-
Some("parquet")
458-
)
459451
write.getOperation match {
460452
case WriteOp.CTAS =>
461453
withChild(child) {
462-
CreateDataSourceTableAsSelectCommand(
463-
catalogTable,
464-
saveMode(write.getCreateMode),
454+
if (isHive) {
455+
CreateHiveTableAsSelectCommand(
456+
table,
457+
child,
458+
write.getTableSchema.names().asScala,
459+
saveMode(write.getCreateMode)
460+
)
461+
} else {
462+
CreateDataSourceTableAsSelectCommand(
463+
table,
464+
saveMode(write.getCreateMode),
465+
child,
466+
write.getTableSchema.names().asScala
467+
)
468+
}
469+
}
470+
case WriteOp.INSERT if isHive =>
471+
withChild(child) {
472+
InsertIntoHiveTable(
473+
table,
474+
Map.empty,
465475
child,
476+
write.getCreateMode == CreateMode.REPLACE_IF_EXISTS,
477+
false,
466478
write.getTableSchema.names().asScala
467479
)
468480
}
469481
case op => throw new UnsupportedOperationException(s"Write mode $op not supported")
470482
}
483+
471484
}
472485

473486
override def visit(write: ExtensionWrite, context: EmptyVisitationContext): LogicalPlan = {
@@ -493,14 +506,7 @@ class ToLogicalPlan(spark: SparkSession = SparkSession.builder().getOrCreate())
493506
val (format, options) = convertFileFormat(file.getFileFormat.get)
494507

495508
val name = file.getPath.get.split('/').reverse.head
496-
val id = TableIdentifier(name)
497-
val table = CatalogTable(
498-
id,
499-
CatalogTableType.MANAGED,
500-
CatalogStorageFormat.empty,
501-
new StructType(),
502-
None
503-
)
509+
val table = catalogTable(Seq(name))
504510

505511
withChild(child) {
506512
V1Writes.apply(
@@ -521,6 +527,49 @@ class ToLogicalPlan(spark: SparkSession = SparkSession.builder().getOrCreate())
521527
}
522528
}
523529

530+
override def visit(ddl: NamedDdl, context: EmptyVisitationContext): LogicalPlan = {
531+
val table = catalogTable(ddl.getNames.asScala, ToSparkType.toStructType(ddl.getTableSchema))
532+
533+
(ddl.getOperation, ddl.getObject) match {
534+
case (DdlOp.CREATE, DdlObject.TABLE) => CreateTableCommand(table, false)
535+
case (DdlOp.DROP, DdlObject.TABLE) => DropTableCommand(table.identifier, false, false, false)
536+
case (DdlOp.DROP_IF_EXIST, DdlObject.TABLE) =>
537+
DropTableCommand(table.identifier, true, false, false)
538+
case op => throw new UnsupportedOperationException(s"Ddl operation $op not supported")
539+
}
540+
}
541+
542+
private def catalogTable(
543+
names: Seq[String],
544+
schema: StructType = new StructType()): CatalogTable = {
545+
val (table, database, catalog) = names match {
546+
case Seq(table) => (table, None, None)
547+
case Seq(database, table) => (table, Some(database), None)
548+
case Seq(catalog, database, table) => (table, Some(database), Some(catalog))
549+
case names =>
550+
throw new UnsupportedOperationException(
551+
s"NamedWrite requires up to three names ([[catalog,] database,] table): $names")
552+
}
553+
554+
val loc = spark.conf.get(StaticSQLConf.WAREHOUSE_PATH.key)
555+
val storage = CatalogStorageFormat(
556+
locationUri = Some(URI.create(f"$loc/$table")),
557+
inputFormat = None,
558+
outputFormat = None,
559+
serde = None,
560+
compressed = false,
561+
properties = Map.empty
562+
)
563+
val id = TableIdentifier(table, database, catalog)
564+
CatalogTable(
565+
id,
566+
CatalogTableType.MANAGED,
567+
storage,
568+
schema,
569+
Some("parquet")
570+
)
571+
}
572+
524573
private def saveMode(mode: CreateMode): SaveMode = mode match {
525574
case CreateMode.APPEND_IF_EXISTS => SaveMode.Append
526575
case CreateMode.REPLACE_IF_EXISTS => SaveMode.Overwrite

spark/src/main/scala/io/substrait/spark/logical/ToSubstraitRel.scala

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,31 @@ import io.substrait.spark.expression._
2222
import org.apache.spark.internal.Logging
2323
import org.apache.spark.sql.SaveMode
2424
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
2526
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
2627
import org.apache.spark.sql.catalyst.expressions._
2728
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Sum}
2829
import org.apache.spark.sql.catalyst.plans._
2930
import org.apache.spark.sql.catalyst.plans.logical._
3031
import org.apache.spark.sql.execution.LogicalRDD
31-
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
32+
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, CreateTableCommand, DropTableCommand}
3233
import org.apache.spark.sql.execution.datasources.{FileFormat => DSFileFormat, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation, V1WriteCommand, WriteFiles}
3334
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
3435
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
3536
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
36-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
37+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, V2SessionCatalog}
38+
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
3739
import org.apache.spark.sql.types.{NullType, StructType}
3840

3941
import io.substrait.`type`.{NamedStruct, Type}
4042
import io.substrait.{proto, relation}
4143
import io.substrait.debug.TreePrinter
4244
import io.substrait.expression.{Expression => SExpression, ExpressionCreator}
45+
import io.substrait.expression.Expression.StructLiteral
4346
import io.substrait.extension.ExtensionCollector
4447
import io.substrait.hint.Hint
4548
import io.substrait.plan.Plan
49+
import io.substrait.relation.AbstractDdlRel.{DdlObject, DdlOp}
4650
import io.substrait.relation.AbstractWriteRel.{CreateMode, OutputMode, WriteOp}
4751
import io.substrait.relation.RelProtoConverter
4852
import io.substrait.relation.Set.SetOp
@@ -54,7 +58,7 @@ import io.substrait.utils.Util
5458
import java.util
5559
import java.util.{Collections, Optional}
5660

57-
import scala.collection.JavaConverters.asJavaIterableConverter
61+
import scala.collection.JavaConverters.{asJavaIterableConverter, seqAsJavaList}
5862
import scala.collection.mutable
5963
import scala.collection.mutable.ArrayBuffer
6064

@@ -75,9 +79,7 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
7579
override def default(p: LogicalPlan): relation.Rel = p match {
7680
case c: CommandResult => visit(c.commandLogicalPlan)
7781
case w: WriteFiles => visit(w.child)
78-
case c: V1WriteCommand => convertDataWritingCommand(c)
79-
case CreateDataSourceTableAsSelectCommand(table, mode, query, names) =>
80-
convertCTAS(table, mode, query, names)
82+
case c: Command => convertCommand(c)
8183
case p: LeafNode => convertReadOperator(p)
8284
case s: SubqueryAlias => visit(s.child)
8385
case v: View => visit(v.child)
@@ -566,6 +568,28 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
566568
}
567569
}
568570

571+
private def convertCommand(command: Command): relation.Rel = command match {
572+
case c: V1WriteCommand => convertDataWritingCommand(c)
573+
case CreateDataSourceTableAsSelectCommand(table, mode, query, names) =>
574+
convertCTAS(table, mode, query, names)
575+
case CreateHiveTableAsSelectCommand(table, query, names, mode) =>
576+
convertCTAS(table, mode, query, names)
577+
case CreateTableCommand(table, _) =>
578+
convertCreateTable(table.identifier.unquotedString.split("\\."), table.schema)
579+
case DropTableCommand(tableName, ifExists, _, _) =>
580+
convertDropTable(tableName.unquotedString.split("\\."), ifExists)
581+
case CreateTable(ResolvedIdentifier(c: V2SessionCatalog, id), tableSchema, _, _, _)
582+
if id.namespace().length > 0 =>
583+
val names = Seq(c.name(), id.namespace()(0), id.name())
584+
convertCreateTable(names, tableSchema)
585+
case DropTable(ResolvedIdentifier(c: V2SessionCatalog, id), ifExists, _)
586+
if id.namespace().length > 0 =>
587+
val names = Seq(c.name(), id.namespace()(0), id.name())
588+
convertDropTable(names, ifExists)
589+
case _ =>
590+
throw new UnsupportedOperationException(s"Unable to convert command: $command")
591+
}
592+
569593
private def convertDataWritingCommand(command: V1WriteCommand): relation.AbstractWriteRel =
570594
command match {
571595
case InsertIntoHadoopFsRelationCommand(
@@ -600,6 +624,16 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
600624
.tableSchema(outputSchema(child.output, outputColumnNames))
601625
.detail(FileHolder(file))
602626
.build()
627+
case InsertIntoHiveTable(table, _, child, overwrite, _, outputColumnNames, _, _, _, _, _) =>
628+
relation.NamedWrite
629+
.builder()
630+
.input(visit(child))
631+
.operation(WriteOp.INSERT)
632+
.outputMode(OutputMode.UNSPECIFIED)
633+
.createMode(if (overwrite) CreateMode.REPLACE_IF_EXISTS else CreateMode.ERROR_IF_EXISTS)
634+
.names(seqAsJavaList(table.identifier.unquotedString.split("\\.").toList))
635+
.tableSchema(outputSchema(child.output, outputColumnNames))
636+
.build()
603637
case _ =>
604638
throw new UnsupportedOperationException(s"Unable to convert command: ${command.getClass}")
605639
}
@@ -619,6 +653,29 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
619653
.tableSchema(outputSchema(query.output, outputColumnNames))
620654
.build()
621655

656+
private def convertCreateTable(names: Seq[String], schema: StructType): relation.NamedDdl = {
657+
relation.NamedDdl
658+
.builder()
659+
.operation(DdlOp.CREATE)
660+
.`object`(DdlObject.TABLE)
661+
.names(seqAsJavaList(names))
662+
.tableSchema(ToSubstraitType.toNamedStruct(schema))
663+
.tableDefaults(StructLiteral.builder.nullable(true).build())
664+
.build()
665+
}
666+
667+
private def convertDropTable(names: Seq[String], ifExists: Boolean): relation.NamedDdl = {
668+
relation.NamedDdl
669+
.builder()
670+
.operation(if (ifExists) DdlOp.DROP_IF_EXIST else DdlOp.DROP)
671+
.`object`(DdlObject.TABLE)
672+
.names(seqAsJavaList(names))
673+
.tableSchema(
674+
NamedStruct.builder().struct(Type.Struct.builder().nullable(true).build()).build())
675+
.tableDefaults(StructLiteral.builder.nullable(true).build())
676+
.build()
677+
}
678+
622679
private def createMode(mode: SaveMode): CreateMode = mode match {
623680
case SaveMode.Append => CreateMode.APPEND_IF_EXISTS
624681
case SaveMode.Overwrite => CreateMode.REPLACE_IF_EXISTS

0 commit comments

Comments
 (0)