88import warnings
99from collections import defaultdict
1010from datetime import datetime
11+ from itertools import chain
1112from pathlib import Path
1213from typing import Callable , List , Literal , Optional , Protocol , Tuple , Type , Union , cast
1314
@@ -1026,46 +1027,72 @@ def _process_message(self, dataframe_composed):
10261027 self ._on_message_processed (topic_name , partition , offset )
10271028
10281029 def _process_wall_clock (self , wall_clock_executors ):
1030+ # Emit time-based "ticks" when the wall-clock interval elapses.
1031+ # For each executor (grouped by topics), select one partition per partition id
1032+ # and determine an offset to include in MessageContext.
10291033 if not self ._wall_clock_active :
10301034 return
10311035
1036+ # Rate-limit by interval; skip until enough time has elapsed since last send.
10321037 now = datetime .now ().timestamp ()
10331038 if self ._wall_clock_last_sent > now - self ._wall_clock_interval :
10341039 return
10351040
1041+ # Synthetic "tick" payload (no value/key, headers empty, timestamp in ms).
10361042 value , key , timestamp , headers = None , None , int (now * 1000 ), {}
10371043
1038- # Offsets processed in the current, open checkpoint (in-flight)
1039- tp_offsets = self ._processing_context .checkpoint .tp_offsets
1040- assignment = self ._consumer .assignment ()
1041-
1042- for topics , executor in wall_clock_executors .items ():
1043- seen_partitions : set [int ] = set ()
1044- selected_partitions : list [tuple [str , int , int ]] = []
1045-
1046- for tp in assignment :
1047- if tp .topic in topics and tp .partition not in seen_partitions :
1048- offset = tp_offsets .get ((tp .topic , tp .partition ))
1049- if offset is None :
1050- # TODO: We can call only once for all required partitions
1051- committed_tp = self ._consumer .committed ([tp ], timeout = 30 )[0 ]
1052- if committed_tp .error :
1053- raise RuntimeError (
1054- "Failed to get committed offsets for "
1055- f'"{ committed_tp .topic } [{ committed_tp .partition } ]" '
1056- f"from the broker: { committed_tp .error } "
1057- )
1058- if committed_tp .offset >= 0 :
1059- offset = committed_tp .offset - 1
1060-
1061- # TODO: Handle the case when the offset is None
1062- # This means that the wall clock is triggered before any messages
1063- if offset is not None :
1064- seen_partitions .add (tp .partition )
1065- selected_partitions .append ((tp .topic , tp .partition , offset ))
1066-
1067- # Execute callback for each selected topic-partition with its offset
1068- for topic , partition , offset in selected_partitions :
1044+ # In-flight processed offsets within the current (open) checkpoint.
1045+ processed_offsets = self ._processing_context .checkpoint .tp_offsets
1046+ # Only consider currently assigned topic-partitions.
1047+ assigned_tps = self ._consumer .assignment ()
1048+ # Cache known offsets to avoid resolving them multiple times for different executors.
1049+ # Keyed by (topic, partition) to avoid relying on TopicPartition instance identity.
1050+ known_offsets : dict [tuple [str , int ], int ] = {}
1051+
1052+ for topics , executor in wall_clock_executors :
1053+ # candidate_partitions: partitions still needing an offset resolved
1054+ candidate_partitions : dict [int , set [TopicPartition ]] = defaultdict (set )
1055+ # selected_partitions: final partition_id -> (topic, offset)
1056+ selected_partitions : dict [int , tuple [str , int ]] = {}
1057+
1058+ for tp in assigned_tps :
1059+ known_offset = known_offsets .get ((tp .topic , tp .partition ))
1060+ if known_offset is not None :
1061+ selected_partitions [tp .partition ] = (tp .topic , known_offset )
1062+ continue
1063+
1064+ if tp .topic in topics and tp .partition not in selected_partitions :
1065+ # Prefer the most recent known processed offset if available.
1066+ if processed_offset := processed_offsets .get (
1067+ (tp .topic , tp .partition )
1068+ ):
1069+ # Use offset from the in-flight checkpoint.
1070+ selected_partitions [tp .partition ] = (tp .topic , processed_offset )
1071+ known_offsets [(tp .topic , tp .partition )] = processed_offset
1072+ else :
1073+ # Will resolve via committed offsets below.
1074+ candidate_partitions [tp .partition ].add (tp )
1075+
1076+ if candidate_partitions :
1077+ # Best-effort: fetch committed offsets in batch for unresolved partitions.
1078+ committed_tps = self ._consumer .committed (
1079+ list (chain (* candidate_partitions .values ())), timeout = 30
1080+ )
1081+ for tp in committed_tps :
1082+ if tp .error :
1083+ raise RuntimeError (
1084+ f"Failed to get committed offsets for "
1085+ f'"{ tp .topic } [{ tp .partition } ]" from the broker: { tp .error } '
1086+ )
1087+ if tp .partition not in selected_partitions :
1088+ # Committed offset is "next to consume"; last processed is offset - 1.
1089+ # The "invalid/unset" broker offset is negative.
1090+ offset = tp .offset - 1 if tp .offset >= 0 else tp .offset
1091+ selected_partitions [tp .partition ] = (tp .topic , offset )
1092+ known_offsets [(tp .topic , tp .partition )] = offset
1093+
1094+ # Execute callback for each selected topic-partition with its offset.
1095+ for partition , (topic , offset ) in selected_partitions .items ():
10691096 row = Row (
10701097 value = value ,
10711098 key = key ,
@@ -1087,7 +1114,7 @@ def _process_wall_clock(self, wall_clock_executors):
10871114 if not to_suppress :
10881115 raise
10891116
1090- # TODO: should we use a "new" now or the one from before the processing?
1117+ # Record the emission time for rate-limiting.
10911118 self ._wall_clock_last_sent = now
10921119
10931120 def _on_assign (self , _ , topic_partitions : List [TopicPartition ]):
0 commit comments