Skip to content

Add fluent API for TableSink and Java/Spark optimizer mappings#752

Merged
zkaoudi merged 1 commit intoapache:mainfrom
gknz:feature/table-sink-fluent-api
May 4, 2026
Merged

Add fluent API for TableSink and Java/Spark optimizer mappings#752
zkaoudi merged 1 commit intoapache:mainfrom
gknz:feature/table-sink-fluent-api

Conversation

@gknz
Copy link
Copy Markdown

@gknz gknz commented Apr 27, 2026

Hey again Wayang community!

This PR adds a fluent API method (writeTable) for the TableSink operator in both DataQuanta.scala and DataQuantaBuilder.scala, so users can express table-write pipelines with the same syntax that's already available for other sinks (writeTextFile, writeParquet, writeKafkaTopic, etc.). It also fills a gap: the JavaTableSink and SparkTableSink operators currently have no optimizer mappings, which means the planner never selects them and the operators are effectively unreachable through normal pipelines. Two small mapping classes are added to fix that.

What this PR adds

Fluent API methods (wayang-api-scala-java)

  • DataQuanta.writeTable(tableName, mode, columnNames, props) — creates a logical TableSink, wires it into the plan, and triggers execution. Follows the same pattern as the existing writeKafkaTopic and writeParquet methods.
  • DataQuantaBuilder.writeTable(...) with two overloads (with and without optional jobName) — the Java-facing fluent layer that delegates to the Scala DataQuanta method. Same overload pattern used by the existing writeParquet methods.

Optimizer mappings

  • TableSinkMapping in wayang-java (registered in Mappings.java) — transforms the logical TableSink into JavaTableSink for the Java platform.
  • TableSinkMapping in wayang-spark (registered in Mappings.java) — transforms the logical TableSink into SparkTableSink for the Spark platform.

Without these mappings the optimizer cannot route a logical TableSink to either execution operator. Pipelines that rely on the optimizer (including any fluent pipeline) fail with "no execution plan found" because no platform claims the sink. Both mapping classes follow the exact pattern of the existing ParquetSinkMapping in each platform.

Testing

I verified end-to-end with two pipelines against a real PostgreSQL instance:

  1. Read → Write. planBuilder.readTable(source).writeTable(...) — copied a six-row source table into a new target table.
  2. Read → Filter → Write. planBuilder.readTable(source).filter(...).writeTable(...) — applied a Java filter and wrote the single matching row to a new target table.

Both pipelines produce execution plans where the optimizer correctly selects JavaTableSink after the new mapping is registered, confirming that the fluent method, the underlying TableSink plan wiring, and the new mappings all work together. Without the mappings, the same plans fail at optimization with "Could not find a single execution plan."

Notes

  • This PR works against current main. Once the in-database JdbcTableSinkOperator PR is merged, the same fluent writeTable(...) call will automatically benefit from the in-database execution path when the source and sink share a database platform — no changes to the fluent API will be required, since the optimizer simply gains additional TableSink mappings to choose from.
  • The two new mapping classes are in reality translations of the existing ParquetSinkMapping pattern. They were absent from the original JavaTableSink/SparkTableSink contributions.

Files

  • wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala — added writeTable
  • wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala — added writeTable (two overloads)
  • wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/TableSinkMapping.java — new
  • wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java — registration
  • wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/TableSinkMapping.java — new
  • wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java — registration

- Add writeTable() method in DataQuanta.scala for the underlying fluent API
- Add writeTable() overloads in DataQuantaBuilder.scala for the Java-facing
  fluent API (with and without optional jobName)
- Add TableSinkMapping in wayang-java to register JavaTableSink with the
  optimizer; previously the JavaTableSink operator existed but had no
  mapping, making it unreachable through the optimizer
- Add TableSinkMapping in wayang-spark to register SparkTableSink with the
  optimizer for the same reason
- Register both mappings in their respective Mappings.java files

This enables fluent pipelines like planBuilder.readTable(source).writeTable(...)
to be routed by the optimizer to the appropriate platform-specific sink.
@zkaoudi zkaoudi merged commit ccb2540 into apache:main May 4, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants