Skip to content

Commit 800c11e

Browse files
authored
ref: Type out chains and steps (#99)
* basic typing between steps, serializer and deserializer * some basic header filtering in source * some minor clean up * remove random comment * some Message typing * wip use Message[] * remove concept of Message * schema validation basics * working transformer example * fix sm batchin * fix blq.py typing * type checking is fixed * assert StreamSources in arroyo adapter * somem cleanups * more cleanup * support protobuf * change name to msg_parser * some comment * remove deserializer and serializer * an immutable message interface * somehow fixed all of the tests * add some comments, don't get_codec for every msg * fix sm test * make flatmap use mutablesequence * make reduce tests better * more comments
1 parent b299e3c commit 800c11e

29 files changed

+1018
-397
lines changed

.pre-commit-config.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ repos:
4141
responses,
4242
"sentry-arroyo>=2.18.2",
4343
types-pyYAML,
44-
types-jsonschema
44+
types-jsonschema,
45+
"sentry-kafka-schemas>=1.2.0",
4546
]
4647
files: ^sentry_streams/.+
4748
- repo: https://github.com/pycqa/isort

sentry_streams/pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies = [
1414
"sentry-arroyo>=2.18.2",
1515
"pyyaml>=6.0.2",
1616
"jsonschema>=4.23.0",
17+
"sentry-kafka-schemas>=1.2.0",
1718
]
1819

1920
[dependency-groups]

sentry_streams/sentry_streams/adapters/arroyo/adapter.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload, KafkaProducer
1818
from arroyo.processing.processor import StreamProcessor
1919
from arroyo.types import Topic
20+
from sentry_kafka_schemas import get_codec
21+
from sentry_kafka_schemas.codecs import Codec
2022

2123
from sentry_streams.adapters.arroyo.consumer import (
2224
ArroyoConsumer,
@@ -148,7 +150,17 @@ def source(self, step: Source) -> Route:
148150
"""
149151
source_name = step.name
150152
self.__sources.add_source(step)
151-
self.__consumers[source_name] = ArroyoConsumer(source_name)
153+
154+
# This is the Arroyo adapter, and it only supports consuming from StreamSource anyways
155+
assert isinstance(step, StreamSource)
156+
try:
157+
schema: Codec[Any] = get_codec(step.stream_name)
158+
except Exception:
159+
raise ValueError(f"Kafka topic {step.stream_name} has no associated schema")
160+
161+
self.__consumers[source_name] = ArroyoConsumer(
162+
source_name, step.stream_name, schema, step.header_filter
163+
)
152164

153165
return Route(source_name, [])
154166

sentry_streams/sentry_streams/adapters/arroyo/broadcaster.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from arroyo.types import FilteredPayload, Message, Partition, Value
88

99
from sentry_streams.adapters.arroyo.routes import Route, RoutedValue
10+
from sentry_streams.pipeline.message import Message as StreamsMessage
1011

1112

1213
@dataclass(eq=True)
@@ -78,6 +79,7 @@ def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None:
7879
for branch in self.__downstream_branches:
7980
msg_copy = cast(Message[RoutedValue], deepcopy(message))
8081
copy_payload = msg_copy.value.payload
82+
streams_msg = copy_payload.payload
8183
routed_copy = Message(
8284
Value(
8385
committable=msg_copy.value.committable,
@@ -87,7 +89,12 @@ def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None:
8789
source=copy_payload.route.source,
8890
waypoints=[*copy_payload.route.waypoints, branch],
8991
),
90-
payload=copy_payload.payload,
92+
payload=StreamsMessage(
93+
streams_msg.payload,
94+
streams_msg.headers,
95+
streams_msg.timestamp,
96+
streams_msg.schema,
97+
),
9198
),
9299
)
93100
)

sentry_streams/sentry_streams/adapters/arroyo/consumer.py

+22-9
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
import logging
2+
import time
23
from dataclasses import dataclass, field
3-
from typing import Any, Mapping, MutableSequence
4+
from typing import Any, Mapping, MutableSequence, Optional, Tuple, Union
45

5-
from arroyo.backends.kafka.consumer import KafkaPayload
6+
from arroyo.backends.kafka.consumer import Headers, KafkaPayload
67
from arroyo.processing.strategies import CommitOffsets
78
from arroyo.processing.strategies.abstract import (
89
ProcessingStrategy,
910
ProcessingStrategyFactory,
1011
)
1112
from arroyo.processing.strategies.run_task import RunTask
12-
from arroyo.types import (
13-
Commit,
14-
Message,
15-
Partition,
16-
)
13+
from arroyo.types import Commit, FilteredPayload, Message, Partition
14+
from sentry_kafka_schemas.codecs import Codec
1715

1816
from sentry_streams.adapters.arroyo.routes import Route, RoutedValue
1917
from sentry_streams.adapters.arroyo.steps import ArroyoStep
18+
from sentry_streams.pipeline.message import Message as StreamsMessage
2019

2120
logger = logging.getLogger(__name__)
2221

@@ -41,6 +40,9 @@ class ArroyoConsumer:
4140
"""
4241

4342
source: str
43+
stream_name: str
44+
schema: Codec[Any]
45+
header_filter: Optional[Tuple[str, bytes]] = None
4446
steps: MutableSequence[ArroyoStep] = field(default_factory=list)
4547

4648
def add_step(self, step: ArroyoStep) -> None:
@@ -59,9 +61,20 @@ def build_strategy(self, commit: Commit) -> ProcessingStrategy[Any]:
5961
follow.
6062
"""
6163

62-
def add_route(message: Message[KafkaPayload]) -> RoutedValue:
64+
def add_route(message: Message[KafkaPayload]) -> Union[FilteredPayload, RoutedValue]:
65+
headers: Headers = message.payload.headers
66+
if self.header_filter and self.header_filter not in headers:
67+
return FilteredPayload()
68+
69+
broker_timestamp = message.timestamp.timestamp() if message.timestamp else time.time()
6370
value = message.payload.value
64-
return RoutedValue(route=Route(source=self.source, waypoints=[]), payload=value)
71+
72+
return RoutedValue(
73+
route=Route(source=self.source, waypoints=[]),
74+
payload=StreamsMessage(
75+
payload=value, headers=headers, timestamp=broker_timestamp, schema=self.schema
76+
),
77+
)
6578

6679
strategy: ProcessingStrategy[Any] = CommitOffsets(commit)
6780
for step in reversed(self.steps):

sentry_streams/sentry_streams/adapters/arroyo/forwarder.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ def __init__(
2828
def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None:
2929
message_payload = message.value.payload
3030
if isinstance(message_payload, RoutedValue) and message_payload.route == self.__route:
31+
# TODO: get headers from the StreamsMessage
32+
assert isinstance(message_payload.payload.payload, bytes)
3133
kafka_payload = message.value.replace(
32-
KafkaPayload(None, str(message_payload.payload).encode("utf-8"), [])
34+
KafkaPayload(None, message_payload.payload.payload, [])
3335
)
3436
self.__produce_step.submit(Message(kafka_payload))
3537
else:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import time
2+
from typing import Optional, TypeVar, Union, cast
3+
4+
from arroyo.processing.strategies.abstract import ProcessingStrategy
5+
from arroyo.types import FilteredPayload, Message, Value
6+
7+
from sentry_streams.adapters.arroyo.routes import Route, RoutedValue
8+
from sentry_streams.pipeline.message import Message as StreamsMessage
9+
10+
TPayload = TypeVar("TPayload")
11+
12+
13+
class MessageWrapper(ProcessingStrategy[Union[FilteredPayload, TPayload]]):
14+
"""
15+
Custom processing strategy which can wrap payloads coming from the previous step
16+
into a Message. In the case that the previous step already forwards a Message
17+
or a FilteredPayload, this strategy will simply forward that as well to the
18+
next step.
19+
"""
20+
21+
def __init__(
22+
self,
23+
route: Route,
24+
next_step: ProcessingStrategy[Union[FilteredPayload, RoutedValue]],
25+
) -> None:
26+
self.__next_step = next_step
27+
self.__route = route
28+
29+
def submit(self, message: Message[Union[FilteredPayload, TPayload]]) -> None:
30+
now = time.time()
31+
if not isinstance(message.payload, FilteredPayload):
32+
33+
if isinstance(message.payload, RoutedValue):
34+
# No need to wrap a StreamsMessage in StreamsMessage() again
35+
# This case occurs when prior strategy is forwarding a message that belongs on a separate route
36+
assert isinstance(message.payload.payload, StreamsMessage)
37+
self.__next_step.submit(cast(Message[Union[FilteredPayload, RoutedValue]], message))
38+
else:
39+
msg = StreamsMessage(message.payload, [], now, None)
40+
41+
routed_msg: Message[RoutedValue] = Message(
42+
Value(
43+
committable=message.value.committable,
44+
payload=RoutedValue(self.__route, msg),
45+
)
46+
)
47+
self.__next_step.submit(routed_msg)
48+
49+
else:
50+
self.__next_step.submit(cast(Message[FilteredPayload], message))
51+
52+
def poll(self) -> None:
53+
self.__next_step.poll()
54+
55+
def join(self, timeout: Optional[float] = None) -> None:
56+
self.__next_step.join(timeout)
57+
58+
def close(self) -> None:
59+
self.__next_step.close()
60+
61+
def terminate(self) -> None:
62+
self.__next_step.terminate()

sentry_streams/sentry_streams/adapters/arroyo/reduce.py

+16-17
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def add(self, value: Any) -> Self:
9090
self.offsets[partition] = max(offsets[partition], self.offsets[partition])
9191

9292
else:
93-
self.offsets.update(offsets)
93+
self.offsets[partition] = offsets[partition]
9494

9595
return self
9696

@@ -127,15 +127,15 @@ def __init__(
127127
window_size: float,
128128
window_slide: float,
129129
acc: Callable[[], Accumulator[Any, Any]],
130-
next_step: ProcessingStrategy[TResult],
130+
next_step: ProcessingStrategy[Union[FilteredPayload, TResult]],
131131
route: Route,
132132
) -> None:
133133

134134
self.window_count = int(window_size / window_slide)
135135
self.window_size = int(window_size)
136136
self.window_slide = int(window_slide)
137137

138-
self.next_step = next_step
138+
self.msg_wrap_step = next_step
139139
self.start_time = time.time()
140140
self.route = route
141141

@@ -180,9 +180,8 @@ def __merge_and_flush(self, window_id: int) -> None:
180180

181181
# If there is a gap in the data, it is possible to have empty flushes
182182
if payload:
183-
result = RoutedValue(self.route, payload)
184-
self.next_step.submit(
185-
Message(Value(cast(TResult, result), merged_window.get_offsets()))
183+
self.msg_wrap_step.submit(
184+
Message(Value(cast(TResult, payload), merged_window.get_offsets()))
186185
)
187186

188187
# Refresh only the accumulator that was the first
@@ -206,12 +205,12 @@ def __maybe_flush(self, cur_time: float) -> None:
206205
def submit(self, message: Message[Union[FilteredPayload, TPayload]]) -> None:
207206
value = message.payload
208207
if isinstance(value, FilteredPayload):
209-
self.next_step.submit(cast(Message[TResult], message))
208+
self.msg_wrap_step.submit(cast(Message[Union[FilteredPayload, TResult]], message))
210209
return
211210

212211
assert isinstance(value, RoutedValue)
213212
if value.route != self.route:
214-
self.next_step.submit(cast(Message[TResult], message))
213+
self.msg_wrap_step.submit(cast(Message[Union[FilteredPayload, TResult]], message))
215214
return
216215

217216
cur_time = time.time() - self.start_time
@@ -225,23 +224,23 @@ def poll(self) -> None:
225224
cur_time = time.time() - self.start_time
226225
self.__maybe_flush(cur_time)
227226

228-
self.next_step.poll()
227+
self.msg_wrap_step.poll()
229228

230229
def close(self) -> None:
231-
self.next_step.close()
230+
self.msg_wrap_step.close()
232231

233232
def terminate(self) -> None:
234-
self.next_step.terminate()
233+
self.msg_wrap_step.terminate()
235234

236235
def join(self, timeout: Optional[float] = None) -> None:
237-
self.next_step.close()
238-
self.next_step.join()
236+
self.msg_wrap_step.close()
237+
self.msg_wrap_step.join()
239238

240239

241240
def build_arroyo_windowed_reduce(
242241
streams_window: Window[MeasurementUnit],
243242
accumulator: Callable[[], Accumulator[Any, Any]],
244-
next_step: ProcessingStrategy[Union[FilteredPayload, TPayload]],
243+
msg_wrapper: ProcessingStrategy[Union[FilteredPayload, TPayload]],
245244
route: Route,
246245
) -> ProcessingStrategy[Union[FilteredPayload, TPayload]]:
247246

@@ -268,7 +267,7 @@ def build_arroyo_windowed_reduce(
268267
size,
269268
slide,
270269
accumulator,
271-
next_step,
270+
msg_wrapper,
272271
route,
273272
)
274273

@@ -294,15 +293,15 @@ def build_arroyo_windowed_reduce(
294293
arroyo_acc.accumulator,
295294
),
296295
arroyo_acc.initial_value,
297-
next_step,
296+
msg_wrapper,
298297
)
299298

300299
case timedelta():
301300
return TimeWindowedReduce(
302301
window_size.total_seconds(),
303302
window_size.total_seconds(),
304303
accumulator,
305-
next_step,
304+
msg_wrapper,
306305
route,
307306
)
308307

sentry_streams/sentry_streams/adapters/arroyo/routes.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from dataclasses import dataclass
22
from typing import Any, MutableSequence
33

4+
from sentry_streams.pipeline.message import Message as StreamsMessage
5+
46

57
@dataclass(frozen=True)
68
class Route:
@@ -23,4 +25,4 @@ class Route:
2325
@dataclass(frozen=True)
2426
class RoutedValue:
2527
route: Route
26-
payload: Any
28+
payload: StreamsMessage[Any]

0 commit comments

Comments
 (0)