Skip to content

ref: Type out chains and steps #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ repos:
responses,
"sentry-arroyo>=2.18.2",
types-pyYAML,
types-jsonschema
types-jsonschema,
"sentry-kafka-schemas>=1.2.0",
]
files: ^sentry_streams/.+
- repo: https://github.com/pycqa/isort
Expand Down
1 change: 1 addition & 0 deletions sentry_streams/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"sentry-arroyo>=2.18.2",
"pyyaml>=6.0.2",
"jsonschema>=4.23.0",
"sentry-kafka-schemas>=1.2.0",
]

[dependency-groups]
Expand Down
8 changes: 7 additions & 1 deletion sentry_streams/sentry_streams/adapters/arroyo/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,13 @@ def source(self, step: Source) -> Route:
"""
source_name = step.name
self.__sources.add_source(step)
self.__consumers[source_name] = ArroyoConsumer(source_name)

# This is the Arroyo adapter, and it only supports consuming from StreamSource anyways
assert isinstance(step, StreamSource)

self.__consumers[source_name] = ArroyoConsumer(
source_name, step.stream_name, step.header_filter
)

return Route(source_name, [])

Expand Down
38 changes: 28 additions & 10 deletions sentry_streams/sentry_streams/adapters/arroyo/consumer.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import logging
from dataclasses import dataclass, field
from typing import Any, Mapping, MutableSequence
from typing import Any, Mapping, MutableSequence, Optional, Tuple, Union

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.backends.kafka.consumer import Headers, KafkaPayload
from arroyo.processing.strategies import CommitOffsets
from arroyo.processing.strategies.abstract import (
ProcessingStrategy,
ProcessingStrategyFactory,
)
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import (
Commit,
Message,
Partition,
)
from arroyo.types import Commit, FilteredPayload, Message, Partition
from sentry_kafka_schemas import get_codec
from sentry_kafka_schemas.codecs import Codec

from sentry_streams.adapters.arroyo.routes import Route, RoutedValue
from sentry_streams.adapters.arroyo.steps import ArroyoStep
from sentry_streams.pipeline.message import Message as StreamsMessage

logger = logging.getLogger(__name__)

Expand All @@ -41,6 +40,8 @@ class ArroyoConsumer:
"""

source: str
stream_name: str
header_filter: Optional[Tuple[str, bytes]] = None
steps: MutableSequence[ArroyoStep] = field(default_factory=list)

def add_step(self, step: ArroyoStep) -> None:
Expand All @@ -59,9 +60,26 @@ def build_strategy(self, commit: Commit) -> ProcessingStrategy[Any]:
follow.
"""

def add_route(message: Message[KafkaPayload]) -> RoutedValue:
value = message.payload.value
return RoutedValue(route=Route(source=self.source, waypoints=[]), payload=value)
def add_route(message: Message[KafkaPayload]) -> Union[FilteredPayload, RoutedValue]:
filtered = False
if self.header_filter:
headers: Headers = message.payload.headers
if self.header_filter not in headers:
filtered = True

if filtered:
return FilteredPayload()
Comment on lines +64 to +71
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if self.header_filter:
    headers: Headers = message.payload.headers
    if self.header_filter not in headers:
           return FilteredPayload()

else:
value = message.payload.value
try:
schema: Codec[Any] = get_codec(self.stream_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should not need to run the get_codec function for every message. I'd get the instance of the schema only once and append it as a flyweight object to all messages.

except Exception:
raise ValueError(f"Kafka topic {self.stream_name} has no associated schema")

return RoutedValue(
route=Route(source=self.source, waypoints=[]),
payload=StreamsMessage(schema=schema, payload=value),
Copy link
Member Author

@ayirr7 ayirr7 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This identifies the schema of the messages based on the topic it comes from. I chose to do it this way, where we wrap it in this Message and pass Message throughout the pipeline, so that the user can do flexible parsing/deserialization of messages. This way, instead of baking it into the Source, we can basically do parsing whenever in the pipeline using the Parser() step.

If we can just bake parsing/deserialization into the Source then all this is not needed.

)

strategy: ProcessingStrategy[Any] = CommitOffsets(commit)
for step in reversed(self.steps):
Expand Down
4 changes: 1 addition & 3 deletions sentry_streams/sentry_streams/adapters/arroyo/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ def __init__(
def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None:
message_payload = message.value.payload
if isinstance(message_payload, RoutedValue) and message_payload.route == self.__route:
kafka_payload = message.value.replace(
KafkaPayload(None, str(message_payload.payload).encode("utf-8"), [])
)
kafka_payload = message.value.replace(KafkaPayload(None, message_payload.payload, []))
self.__produce_step.submit(Message(kafka_payload))
else:
self.__next_step.submit(message)
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/sentry_streams/adapters/arroyo/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def add(self, value: Any) -> Self:
self.offsets[partition] = max(offsets[partition], self.offsets[partition])

else:
self.offsets.update(offsets)
self.offsets[partition] = offsets[partition]

return self

Expand Down
54 changes: 27 additions & 27 deletions sentry_streams/sentry_streams/examples/batching.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
import json

from sentry_streams.pipeline import Batch, FlatMap, Map, streaming_source
from sentry_streams.pipeline.batch import unbatch
from sentry_streams.pipeline.function_template import InputType


def build_batch_str(batch: list[InputType]) -> str:
d = {"batch": batch}

return json.dumps(d)
from typing import Callable, MutableSequence, Union, cast

from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric

def build_message_str(message: str) -> str:
d = {"message": message}

return json.dumps(d)
from sentry_streams.pipeline import Batch, FlatMap, streaming_source
from sentry_streams.pipeline.batch import unbatch
from sentry_streams.pipeline.chain import Parser, Serializer
from sentry_streams.pipeline.message import Message

pipeline = streaming_source(
name="myinput",
stream_name="ingest-metrics",
)

pipeline = (
streaming_source(
name="myinput",
stream_name="events",
)
.apply("mybatch", Batch(batch_size=5)) # User simply provides the batch size
.apply("myunbatch", FlatMap(function=unbatch))
.apply("mymap", Map(function=build_message_str))
.sink(
"kafkasink",
stream_name="transformed-events",
) # flush the batches to the Sink
# TODO: Figure out why the concrete type of InputType is not showing up in the type hint of chain1
chain1 = pipeline.apply("parser", Parser(msg_type=IngestMetric)).apply(
"mybatch", Batch(batch_size=5)
) # User simply provides the batch size

chain2 = chain1.apply(
"myunbatch",
FlatMap(
function=cast(
Union[Callable[[Message[MutableSequence[IngestMetric]]], Message[IngestMetric]], str],
unbatch,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because unbatch is a function with generic arguments, the user unfortunately has to explicitly cast it to the concrete type in their pipeline. This shouldn't be difficult because the incoming ExtensibleChain already has concrete type hints.

Obviously this specific cast() can also be avoided if users write and use a custom unbatcher.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it would be easier if we had Batch and Unbatch appliers without having to go through the FlatMap.

)
),
)

chain3 = chain2.apply("serializer", Serializer()).sink(
"kafkasink2", stream_name="transformed-events"
) # flush the batches to the Sink
27 changes: 16 additions & 11 deletions sentry_streams/sentry_streams/examples/blq.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric

from sentry_streams.examples.blq_fn import (
DownstreamBranch,
json_dump_message,
should_send_to_blq,
unpack_kafka_message,
)
from sentry_streams.pipeline import Map, segment, streaming_source
from sentry_streams.pipeline import segment, streaming_source
from sentry_streams.pipeline.chain import Parser, Serializer

storage_branch = (
segment(name="recent")
.apply("dump_msg_recent", Map(json_dump_message))
segment(name="recent", msg_type=IngestMetric)
.apply("serializer1", Serializer())
.broadcast(
"send_message_to_DBs",
routes=[
segment("sbc").sink("kafkasink", stream_name="transformed-events"),
segment("clickhouse").sink("kafkasink2", stream_name="transformed-events-2"),
segment("sbc", msg_type=IngestMetric).sink(
"kafkasink", stream_name="transformed-events"
),
segment("clickhouse", msg_type=IngestMetric).sink(
"kafkasink2", stream_name="transformed-events-2"
),
],
)
)

save_delayed_message = (
segment(name="delayed")
.apply("dump_msg_delayed", Map(json_dump_message))
segment(name="delayed", msg_type=IngestMetric)
.apply("serializer2", Serializer())
.sink(
"kafkasink3",
stream_name="transformed-events-3",
Expand All @@ -30,9 +35,9 @@
pipeline = (
streaming_source(
name="ingest",
stream_name="events",
stream_name="ingest-metrics",
)
.apply("unpack_message", Map(unpack_kafka_message))
.apply("parser", Parser(msg_type=IngestMetric))
.route(
"blq_router",
routing_function=should_send_to_blq,
Expand Down
35 changes: 6 additions & 29 deletions sentry_streams/sentry_streams/examples/blq_fn.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any

from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric

from sentry_streams.pipeline.message import Message

# 10 minutes
MAX_MESSAGE_LATENCY = 600
Expand All @@ -13,33 +14,9 @@ class DownstreamBranch(Enum):
RECENT = "recent"


@dataclass
class Message:
value: Any
timestamp: float

def to_dict(self) -> dict[str, Any]:
return {
"value": self.value,
"timestamp": self.timestamp,
}


def unpack_kafka_message(msg: str) -> Message:
d = json.loads(msg)
return Message(
value=d["value"],
timestamp=d["timestamp"],
)


def should_send_to_blq(msg: Message) -> DownstreamBranch:
timestamp = msg.timestamp
def should_send_to_blq(msg: Message[IngestMetric]) -> DownstreamBranch:
timestamp = msg.payload["timestamp"] # We can do this because the type of the payload is known
if timestamp < time.time() - MAX_MESSAGE_LATENCY:
return DownstreamBranch.DELAYED
else:
return DownstreamBranch.RECENT


def json_dump_message(msg: Message) -> str:
return json.dumps(msg.to_dict())
41 changes: 16 additions & 25 deletions sentry_streams/sentry_streams/examples/multi_chain.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,44 @@
from json import JSONDecodeError, dumps, loads
from typing import Any, Mapping, cast
from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric

from sentry_streams.pipeline import Map, multi_chain, streaming_source
from sentry_streams.pipeline.chain import Parser, Serializer
from sentry_streams.pipeline.message import Message


def parse(msg: str) -> Mapping[str, Any]:
try:
parsed = loads(msg)
except JSONDecodeError:
return {"type": "invalid"}

return cast(Mapping[str, Any], parsed)


def serialize(msg: Mapping[str, Any]) -> str:
return dumps(msg)


def do_something(msg: Mapping[str, Any]) -> Mapping[str, Any]:
def do_something(msg: Message[IngestMetric]) -> Message[IngestMetric]:
# Do something with the message
return msg


pipeline = multi_chain(
[
# Main Ingest chain
streaming_source("ingest", stream_name="ingest-events")
.apply("parse_msg", Map(parse))
streaming_source("ingest", stream_name="ingest-metrics")
.apply("parse_msg", Parser(msg_type=IngestMetric))
.apply("process", Map(do_something))
.apply("serialize", Map(serialize))
.apply("serialize", Serializer())
.sink("eventstream", stream_name="events"),
# Snuba chain to Clickhouse
streaming_source("snuba", stream_name="events")
.apply("snuba_parse_msg", Map(parse))
streaming_source("snuba", stream_name="ingest-metrics")
.apply("snuba_parse_msg", Parser(msg_type=IngestMetric))
.apply("snuba_serialize", Serializer())
.sink(
"clickhouse",
stream_name="someewhere",
),
# Super Big Consumer chain
streaming_source("sbc", stream_name="events")
.apply("sbc_parse_msg", Map(parse))
streaming_source("sbc", stream_name="ingest-metrics")
.apply("sbc_parse_msg", Parser(msg_type=IngestMetric))
.apply("sbc_serialize", Serializer())
.sink(
"sbc_sink",
stream_name="someewhere",
),
# Post process chain
streaming_source("post_process", stream_name="events")
.apply("post_parse_msg", Map(parse))
streaming_source("post_process", stream_name="ingest-metrics")
.apply("post_parse_msg", Parser(msg_type=IngestMetric))
.apply("postprocess", Map(do_something))
.apply("postprocess_serialize", Serializer())
.sink(
"devnull",
stream_name="someewhereelse",
Expand Down
Loading
Loading