@@ -22,7 +22,7 @@ class DataFrameRegistry:
2222
2323 def __init__ (self ) -> None :
2424 self ._registry : dict [str , Stream ] = {}
25- self ._wall_clock_registry : dict [str , Stream ] = {}
25+ self ._wall_clock_registry : dict [str , tuple [ tuple [ Topic , ...], Stream ] ] = {}
2626 self ._topics : list [Topic ] = []
2727 self ._repartition_origins : set [str ] = set ()
2828 self ._topics_to_stream_ids : dict [str , set [str ]] = {}
@@ -74,15 +74,11 @@ def register_wall_clock(
7474 self , dataframe : "StreamingDataFrame" , stream : Stream
7575 ) -> None :
7676 """
77- Register a wall clock stream for the given topic.
77+ Register a wall clock stream root for the given dataframe.
78+ Stores the Stream itself to be composed later with an optional sink.
7879 """
79- topics = dataframe .topics
80- if len (topics ) > 1 :
81- raise ValueError (
82- f"Expected a StreamingDataFrame with one topic, got { len (topics )} "
83- )
84- topic = topics [0 ]
85- self ._wall_clock_registry [topic .name ] = stream
80+ # TODO: What if there are more wall clock streams for the same stream_id?
81+ self ._wall_clock_registry [dataframe .stream_id ] = (dataframe .topics , stream )
8682
8783 def register_groupby (
8884 self ,
@@ -128,28 +124,28 @@ def compose_all(
128124 :param sink: callable to accumulate the results of the execution, optional.
129125 :return: a {topic_name: composed} dict, where composed is a callable
130126 """
131- return self ._compose (registry = self ._registry , sink = sink )
132-
133- def compose_wall_clock (self ) -> dict [str , VoidExecutor ]:
134- """
135- Composes all the wall clock streams and returns a dict of format {<topic>: <VoidExecutor>}
136- :return: a {topic_name: composed} dict, where composed is a callable
137- """
138- return self ._compose (registry = self ._wall_clock_registry )
139-
140- def _compose (
141- self , registry : dict [str , Stream ], sink : Optional [VoidExecutor ] = None
142- ) -> dict [str , VoidExecutor ]:
143127 executors = {}
144128 # Go over the registered topics with root Streams and compose them
145- for topic , root_stream in registry .items ():
129+ for topic , root_stream in self . _registry .items ():
146130 # If a root stream is connected to other roots, ".compose()" will
147131 # return them all.
148132 # Use the registered root Stream to filter them out.
149133 root_executors = root_stream .compose (sink = sink )
150134 executors [topic ] = root_executors [root_stream ]
151135 return executors
152136
137+ def compose_wall_clock (self ) -> dict [tuple [str , ...], VoidExecutor ]:
138+ """
139+ Compose all wall clock Streams and return executors keyed by stream_id.
140+ Returns mapping: {stream_id: (topics, executor)}
141+ """
142+ executors = {}
143+ for _ , (topics , root_stream ) in self ._wall_clock_registry .items ():
144+ root_executors = root_stream .compose ()
145+ _topics = tuple ({t .name for t in topics })
146+ executors [_topics ] = root_executors [root_stream ]
147+ return executors
148+
153149 def register_stream_id (self , stream_id : str , topic_names : list [str ]):
154150 """
155151 Register a mapping between the stream_id and topic names.
0 commit comments