Skip to content

Conversation

@AnishMahto
Copy link
Contributor

@AnishMahto AnishMahto commented Nov 14, 2025

What changes were proposed in this pull request?

Today, read options attached to any UnresolvedRelation that is analyzed by the pipelines flow analyzer are dropped. This PR fixes that bug, and in doing so also makes the following micro refactors:

  • Get rid of StreamingReadOptions/BatchReadOptions. Previously neither of the fields of either classes were ever populated, and the classes were instead used to determine whether a streaming read or batch read was being executed.
  • Propagate the streaming or batch dataframe reader as the sole source of truth for options to execute reads with, rather than passing in both a reader and read options side-by-side.
  • Correct the Table class hierarchy. Table is a GraphElement but it is not an Input. Because it was previously inheriting Input it had a load override, but that was dead code; logically a Table could never be passed into the polymorphic call sites of Input.load.
  • Get rid of AnalysisWarning, whose exceptions were also dead code

Why are the changes needed?

Prior to these changes, any options specified in UnresolvedRelation.options would be dropped when analyzed via FlowAnalysis.analyze. To my knowledge, in a vanilla installation of Spark (ex. without Delta io) today there are no options that could be dropped that would've otherwise actually been respected by the creation of an UnresolvedRelation (ex. via spark.read.table), but at the very least this is future proofing a definite bug.

How was this patch tested?

org.apache.spark.sql.pipelines.analysis.ReadOptionsPropagationOnAnalysisSuite

@github-actions github-actions bot added the SQL label Nov 14, 2025
@AnishMahto AnishMahto changed the title [SPARK-53890] Test (and fix) read/readstream options are respected for pipelines [SPARK-53890][SDP] Test (and fix) read/readstream options are respected for pipelines Nov 14, 2025
mem.addData(1, 2)
registerPersistedView("complete-view", query = dfFlowFunc(Seq(1, 2).toDF("x")))
registerPersistedView("incremental-view", query = dfFlowFunc(mem.toDF()))
registerTable("`complete-table`", query = Option(readFlowFunc("complete-view")))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my changes we are actually parsing the identifier passed into readFlowFunc using the catalyst parser, hence why this invalid flow function name is only now throwing an invalid name exception. I am simply quoting the name to resolve this.

@AnishMahto
Copy link
Contributor Author

@sryza Ready for review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants