Skip to content

Commit d23d044

Browse files
committed
Refactor _as_wall_clock wrapper
1 parent e5f8887 commit d23d044

File tree

4 files changed

+19
-19
lines changed

4 files changed

+19
-19
lines changed

quixstreams/dataframe/utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import timedelta
1+
from datetime import datetime, timedelta
22
from typing import Union
33

44

@@ -22,3 +22,7 @@ def ensure_milliseconds(delta: Union[int, timedelta]) -> int:
2222
f'Timedelta must be either "int" representing milliseconds '
2323
f'or "datetime.timedelta", got "{type(delta)}"'
2424
)
25+
26+
27+
def now() -> int:
28+
return int(datetime.now().timestamp() * 1000)

quixstreams/dataframe/windows/base.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
Stream,
2222
TransformExpandedCallback,
2323
TransformFunction,
24-
TransformWallClockExpandedCallback,
2524
)
2625
from quixstreams.core.stream.exceptions import InvalidOperation
2726
from quixstreams.models.topics.manager import TopicManager
@@ -47,7 +46,7 @@
4746
Iterable[Message],
4847
]
4948

50-
WallClockCallback = Callable[[int, WindowedPartitionTransaction], Iterable[Message]]
49+
WallClockCallback = Callable[[WindowedPartitionTransaction], Iterable[Message]]
5150

5251

5352
class Window(abc.ABC):
@@ -79,7 +78,6 @@ def process_window(
7978
@abstractmethod
8079
def process_wall_clock(
8180
self,
82-
timestamp_ms: int,
8381
transaction: WindowedPartitionTransaction,
8482
) -> Iterable[WindowKeyResult]:
8583
pass
@@ -122,7 +120,7 @@ def _apply_window(
122120
func=windowed_func, expand=True
123121
)
124122
wall_clock_stream = Stream(
125-
TransformFunction(wall_clock_transform_func, expand=True, wall_clock=True)
123+
func=TransformFunction(wall_clock_transform_func, expand=True)
126124
)
127125
sdf = self._dataframe.__dataframe_clone__(stream=windowed_stream)
128126
return sdf.concat_wall_clock(wall_clock_stream)
@@ -169,10 +167,10 @@ def window_callback(
169167
yield (window, key, window["start"], None)
170168

171169
def wall_clock_callback(
172-
timestamp: int, transaction: WindowedPartitionTransaction
170+
transaction: WindowedPartitionTransaction,
173171
) -> Iterable[Message]:
174172
# TODO: Check if this will work for sliding windows
175-
for key, window in self.process_wall_clock(timestamp, transaction):
173+
for key, window in self.process_wall_clock(transaction):
176174
yield (window, key, window["start"], None)
177175

178176
return self._apply_window(
@@ -225,7 +223,7 @@ def window_callback(
225223
yield (window, key, window["start"], None)
226224

227225
def wall_clock_callback(
228-
timestamp: int, transaction: WindowedPartitionTransaction
226+
transaction: WindowedPartitionTransaction,
229227
) -> Iterable[Message]:
230228
# TODO: Implement wall_clock callback
231229
return []
@@ -475,17 +473,19 @@ def _as_wall_clock(
475473
processing_context: "ProcessingContext",
476474
store_name: str,
477475
stream_id: str,
478-
) -> TransformWallClockExpandedCallback:
476+
) -> TransformExpandedCallback:
479477
@functools.wraps(func)
480-
def wrapper(timestamp: int) -> Iterable[Message]:
478+
def wrapper(
479+
value: Any, key: Any, timestamp: int, headers: Any
480+
) -> Iterable[Message]:
481481
ctx = message_context()
482482
transaction = cast(
483483
WindowedPartitionTransaction,
484484
processing_context.checkpoint.get_store_transaction(
485485
stream_id=stream_id, partition=ctx.partition, store_name=store_name
486486
),
487487
)
488-
return func(timestamp, transaction)
488+
return func(transaction)
489489

490490
return wrapper
491491

quixstreams/dataframe/windows/count_based.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ def process_window(
191191

192192
def process_wall_clock(
193193
self,
194-
timestamp_ms: int,
195194
transaction: WindowedPartitionTransaction,
196195
) -> Iterable[WindowKeyResult]:
197196
return []

quixstreams/dataframe/windows/time_based.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import TYPE_CHECKING, Any, Iterable, Literal, Optional
44

55
from quixstreams.context import message_context
6+
from quixstreams.dataframe.utils import now
67
from quixstreams.state import WindowedPartitionTransaction, WindowedState
78

89
from .base import (
@@ -202,16 +203,12 @@ def process_window(
202203

203204
def process_wall_clock(
204205
self,
205-
timestamp_ms: int,
206206
transaction: WindowedPartitionTransaction,
207207
) -> Iterable[WindowKeyResult]:
208-
latest_expired_window_end = transaction.get_latest_expired(prefix=b"")
209-
latest_timestamp = max(timestamp_ms, latest_expired_window_end)
210-
max_expired_window_end = latest_timestamp - self._grace_ms
211208
return self.expire_by_partition(
212-
transaction,
213-
max_expired_window_end,
214-
self.collect,
209+
transaction=transaction,
210+
max_expired_end=now() - self._grace_ms,
211+
collect=self.collect,
215212
advance_last_expired_timestamp=False,
216213
)
217214

0 commit comments

Comments
 (0)