diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala index 2a2f60cc0..892d80236 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala @@ -1112,7 +1112,26 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I this.planBuilder.sinks.clear() } - + /** + * Write the data quanta in this instance to a database table. Triggers execution. + * + * @param tableName name of the target table + * @param mode write mode (e.g., "overwrite" or "append") + * @param columnNames names of the columns in the target table + * @param props database connection properties + */ + def writeTable(tableName: String, + mode: String, + columnNames: Array[String], + props: java.util.Properties): Unit = { + val sink = new TableSink[Out](props, mode, tableName, columnNames: _*) + sink.setName(s"Write to table $tableName") + this.connectTo(sink, 0) + this.planBuilder.sinks += sink + this.planBuilder.buildAndExecute() + this.planBuilder.sinks.clear() + } + /** * Write the data quanta in this instance to a text file. Triggers execution. * diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala index 709115797..ab7fc6da0 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala @@ -569,6 +569,40 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging this.dataQuanta().asInstanceOf[DataQuanta[Record]].writeParquet(url, overwrite, preferDataset) } + /** + * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TableSink]]. This triggers + * execution of the constructed [[WayangPlan]]. + * + * @param tableName name of the target table + * @param mode write mode (e.g., "overwrite" or "append") + * @param columnNames names of the columns in the target table + * @param props database connection properties + */ + def writeTable(tableName: String, + mode: String, + columnNames: Array[String], + props: java.util.Properties): Unit = + this.writeTable(tableName, mode, columnNames, props, null) + + /** + * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TableSink]]. This triggers + * execution of the constructed [[WayangPlan]]. + * + * @param tableName name of the target table + * @param mode write mode (e.g., "overwrite" or "append") + * @param columnNames names of the columns in the target table + * @param props database connection properties + * @param jobName optional name for the [[WayangPlan]] + */ + def writeTable(tableName: String, + mode: String, + columnNames: Array[String], + props: java.util.Properties, + jobName: String): Unit = { + if (jobName != null) this.javaPlanBuilder.withJobName(jobName) + this.dataQuanta().writeTable(tableName, mode, columnNames, props) + } + /** * Enriches the set of operations to [[Record]]-based ones. This instances must deal with data quanta of * type [[Record]], though. Because of Java's type erasure, we need to leave it up to you whether this diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java index decc102f9..60fbd542f 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java @@ -65,7 +65,8 @@ public class Mappings { new AzureBlobStorageSourceMapping(), new ApacheIcebergSourceMapping(), new ApacheIcebergSinkMapping(), - new ParquetSinkMapping() + new ParquetSinkMapping(), + new TableSinkMapping() ); public static Collection GRAPH_MAPPINGS = Arrays.asList( diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/TableSinkMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/TableSinkMapping.java new file mode 100644 index 000000000..395afaa44 --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/TableSinkMapping.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.java.operators.JavaTableSink; +import org.apache.wayang.java.platform.JavaPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link TableSink} to {@link JavaTableSink}. + */ +public class TableSinkMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + JavaPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern> operatorPattern = new OperatorPattern<>( + "sink", new TableSink(null, null, "", new String[0]), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new JavaTableSink<>(matchedOperator).at(epoch) + ); + } +} \ No newline at end of file diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java index 1e3642a9a..2b114b466 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java @@ -61,7 +61,8 @@ public class Mappings { new ZipWithIdMapping(), new KafkaTopicSinkMapping(), new KafkaTopicSourceMapping(), - new ParquetSinkMapping() + new ParquetSinkMapping(), + new TableSinkMapping() ); diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/TableSinkMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/TableSinkMapping.java new file mode 100644 index 000000000..2ed6485fe --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/TableSinkMapping.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.spark.operators.SparkTableSink; +import org.apache.wayang.spark.platform.SparkPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link TableSink} to {@link SparkTableSink}. + */ +public class TableSinkMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + SparkPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern> operatorPattern = new OperatorPattern<>( + "sink", new TableSink(null, null, "", new String[0]), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new SparkTableSink<>(matchedOperator).at(epoch) + ); + } +} \ No newline at end of file