diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a7a3b1b..f3a63bf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -41,7 +41,8 @@ repos: responses, "sentry-arroyo>=2.18.2", types-pyYAML, - types-jsonschema + types-jsonschema, + "sentry-kafka-schemas>=1.2.0", ] files: ^sentry_streams/.+ - repo: https://github.com/pycqa/isort diff --git a/sentry_streams/pyproject.toml b/sentry_streams/pyproject.toml index 3289d24..ef0d452 100644 --- a/sentry_streams/pyproject.toml +++ b/sentry_streams/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "sentry-arroyo>=2.18.2", "pyyaml>=6.0.2", "jsonschema>=4.23.0", + "sentry-kafka-schemas>=1.2.0", ] [dependency-groups] diff --git a/sentry_streams/sentry_streams/adapters/arroyo/adapter.py b/sentry_streams/sentry_streams/adapters/arroyo/adapter.py index 81a52b5..7512d1e 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/adapter.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/adapter.py @@ -17,6 +17,8 @@ from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload, KafkaProducer from arroyo.processing.processor import StreamProcessor from arroyo.types import Topic +from sentry_kafka_schemas import get_codec +from sentry_kafka_schemas.codecs import Codec from sentry_streams.adapters.arroyo.consumer import ( ArroyoConsumer, @@ -148,7 +150,17 @@ def source(self, step: Source) -> Route: """ source_name = step.name self.__sources.add_source(step) - self.__consumers[source_name] = ArroyoConsumer(source_name) + + # This is the Arroyo adapter, and it only supports consuming from StreamSource anyways + assert isinstance(step, StreamSource) + try: + schema: Codec[Any] = get_codec(step.stream_name) + except Exception: + raise ValueError(f"Kafka topic {step.stream_name} has no associated schema") + + self.__consumers[source_name] = ArroyoConsumer( + source_name, step.stream_name, schema, step.header_filter + ) return Route(source_name, []) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/broadcaster.py b/sentry_streams/sentry_streams/adapters/arroyo/broadcaster.py index 455fda9..3796d62 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/broadcaster.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/broadcaster.py @@ -7,6 +7,7 @@ from arroyo.types import FilteredPayload, Message, Partition, Value from sentry_streams.adapters.arroyo.routes import Route, RoutedValue +from sentry_streams.pipeline.message import Message as StreamsMessage @dataclass(eq=True) @@ -78,6 +79,7 @@ def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None: for branch in self.__downstream_branches: msg_copy = cast(Message[RoutedValue], deepcopy(message)) copy_payload = msg_copy.value.payload + streams_msg = copy_payload.payload routed_copy = Message( Value( committable=msg_copy.value.committable, @@ -87,7 +89,12 @@ def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None: source=copy_payload.route.source, waypoints=[*copy_payload.route.waypoints, branch], ), - payload=copy_payload.payload, + payload=StreamsMessage( + streams_msg.payload, + streams_msg.headers, + streams_msg.timestamp, + streams_msg.schema, + ), ), ) ) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/consumer.py b/sentry_streams/sentry_streams/adapters/arroyo/consumer.py index d7ea98d..fb2962b 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/consumer.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/consumer.py @@ -1,22 +1,21 @@ import logging +import time from dataclasses import dataclass, field -from typing import Any, Mapping, MutableSequence +from typing import Any, Mapping, MutableSequence, Optional, Tuple, Union -from arroyo.backends.kafka.consumer import KafkaPayload +from arroyo.backends.kafka.consumer import Headers, KafkaPayload from arroyo.processing.strategies import CommitOffsets from arroyo.processing.strategies.abstract import ( ProcessingStrategy, ProcessingStrategyFactory, ) from arroyo.processing.strategies.run_task import RunTask -from arroyo.types import ( - Commit, - Message, - Partition, -) +from arroyo.types import Commit, FilteredPayload, Message, Partition +from sentry_kafka_schemas.codecs import Codec from sentry_streams.adapters.arroyo.routes import Route, RoutedValue from sentry_streams.adapters.arroyo.steps import ArroyoStep +from sentry_streams.pipeline.message import Message as StreamsMessage logger = logging.getLogger(__name__) @@ -41,6 +40,9 @@ class ArroyoConsumer: """ source: str + stream_name: str + schema: Codec[Any] + header_filter: Optional[Tuple[str, bytes]] = None steps: MutableSequence[ArroyoStep] = field(default_factory=list) def add_step(self, step: ArroyoStep) -> None: @@ -59,9 +61,20 @@ def build_strategy(self, commit: Commit) -> ProcessingStrategy[Any]: follow. """ - def add_route(message: Message[KafkaPayload]) -> RoutedValue: + def add_route(message: Message[KafkaPayload]) -> Union[FilteredPayload, RoutedValue]: + headers: Headers = message.payload.headers + if self.header_filter and self.header_filter not in headers: + return FilteredPayload() + + broker_timestamp = message.timestamp.timestamp() if message.timestamp else time.time() value = message.payload.value - return RoutedValue(route=Route(source=self.source, waypoints=[]), payload=value) + + return RoutedValue( + route=Route(source=self.source, waypoints=[]), + payload=StreamsMessage( + payload=value, headers=headers, timestamp=broker_timestamp, schema=self.schema + ), + ) strategy: ProcessingStrategy[Any] = CommitOffsets(commit) for step in reversed(self.steps): diff --git a/sentry_streams/sentry_streams/adapters/arroyo/forwarder.py b/sentry_streams/sentry_streams/adapters/arroyo/forwarder.py index 904ffb5..da9bfdc 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/forwarder.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/forwarder.py @@ -28,8 +28,10 @@ def __init__( def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None: message_payload = message.value.payload if isinstance(message_payload, RoutedValue) and message_payload.route == self.__route: + # TODO: get headers from the StreamsMessage + assert isinstance(message_payload.payload.payload, bytes) kafka_payload = message.value.replace( - KafkaPayload(None, str(message_payload.payload).encode("utf-8"), []) + KafkaPayload(None, message_payload.payload.payload, []) ) self.__produce_step.submit(Message(kafka_payload)) else: diff --git a/sentry_streams/sentry_streams/adapters/arroyo/msg_wrapper.py b/sentry_streams/sentry_streams/adapters/arroyo/msg_wrapper.py new file mode 100644 index 0000000..264a86b --- /dev/null +++ b/sentry_streams/sentry_streams/adapters/arroyo/msg_wrapper.py @@ -0,0 +1,62 @@ +import time +from typing import Optional, TypeVar, Union, cast + +from arroyo.processing.strategies.abstract import ProcessingStrategy +from arroyo.types import FilteredPayload, Message, Value + +from sentry_streams.adapters.arroyo.routes import Route, RoutedValue +from sentry_streams.pipeline.message import Message as StreamsMessage + +TPayload = TypeVar("TPayload") + + +class MessageWrapper(ProcessingStrategy[Union[FilteredPayload, TPayload]]): + """ + Custom processing strategy which can wrap payloads coming from the previous step + into a Message. In the case that the previous step already forwards a Message + or a FilteredPayload, this strategy will simply forward that as well to the + next step. + """ + + def __init__( + self, + route: Route, + next_step: ProcessingStrategy[Union[FilteredPayload, RoutedValue]], + ) -> None: + self.__next_step = next_step + self.__route = route + + def submit(self, message: Message[Union[FilteredPayload, TPayload]]) -> None: + now = time.time() + if not isinstance(message.payload, FilteredPayload): + + if isinstance(message.payload, RoutedValue): + # No need to wrap a StreamsMessage in StreamsMessage() again + # This case occurs when prior strategy is forwarding a message that belongs on a separate route + assert isinstance(message.payload.payload, StreamsMessage) + self.__next_step.submit(cast(Message[Union[FilteredPayload, RoutedValue]], message)) + else: + msg = StreamsMessage(message.payload, [], now, None) + + routed_msg: Message[RoutedValue] = Message( + Value( + committable=message.value.committable, + payload=RoutedValue(self.__route, msg), + ) + ) + self.__next_step.submit(routed_msg) + + else: + self.__next_step.submit(cast(Message[FilteredPayload], message)) + + def poll(self) -> None: + self.__next_step.poll() + + def join(self, timeout: Optional[float] = None) -> None: + self.__next_step.join(timeout) + + def close(self) -> None: + self.__next_step.close() + + def terminate(self) -> None: + self.__next_step.terminate() diff --git a/sentry_streams/sentry_streams/adapters/arroyo/reduce.py b/sentry_streams/sentry_streams/adapters/arroyo/reduce.py index 17d9699..d93a974 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/reduce.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/reduce.py @@ -90,7 +90,7 @@ def add(self, value: Any) -> Self: self.offsets[partition] = max(offsets[partition], self.offsets[partition]) else: - self.offsets.update(offsets) + self.offsets[partition] = offsets[partition] return self @@ -127,7 +127,7 @@ def __init__( window_size: float, window_slide: float, acc: Callable[[], Accumulator[Any, Any]], - next_step: ProcessingStrategy[TResult], + next_step: ProcessingStrategy[Union[FilteredPayload, TResult]], route: Route, ) -> None: @@ -135,7 +135,7 @@ def __init__( self.window_size = int(window_size) self.window_slide = int(window_slide) - self.next_step = next_step + self.msg_wrap_step = next_step self.start_time = time.time() self.route = route @@ -180,9 +180,8 @@ def __merge_and_flush(self, window_id: int) -> None: # If there is a gap in the data, it is possible to have empty flushes if payload: - result = RoutedValue(self.route, payload) - self.next_step.submit( - Message(Value(cast(TResult, result), merged_window.get_offsets())) + self.msg_wrap_step.submit( + Message(Value(cast(TResult, payload), merged_window.get_offsets())) ) # Refresh only the accumulator that was the first @@ -206,12 +205,12 @@ def __maybe_flush(self, cur_time: float) -> None: def submit(self, message: Message[Union[FilteredPayload, TPayload]]) -> None: value = message.payload if isinstance(value, FilteredPayload): - self.next_step.submit(cast(Message[TResult], message)) + self.msg_wrap_step.submit(cast(Message[Union[FilteredPayload, TResult]], message)) return assert isinstance(value, RoutedValue) if value.route != self.route: - self.next_step.submit(cast(Message[TResult], message)) + self.msg_wrap_step.submit(cast(Message[Union[FilteredPayload, TResult]], message)) return cur_time = time.time() - self.start_time @@ -225,23 +224,23 @@ def poll(self) -> None: cur_time = time.time() - self.start_time self.__maybe_flush(cur_time) - self.next_step.poll() + self.msg_wrap_step.poll() def close(self) -> None: - self.next_step.close() + self.msg_wrap_step.close() def terminate(self) -> None: - self.next_step.terminate() + self.msg_wrap_step.terminate() def join(self, timeout: Optional[float] = None) -> None: - self.next_step.close() - self.next_step.join() + self.msg_wrap_step.close() + self.msg_wrap_step.join() def build_arroyo_windowed_reduce( streams_window: Window[MeasurementUnit], accumulator: Callable[[], Accumulator[Any, Any]], - next_step: ProcessingStrategy[Union[FilteredPayload, TPayload]], + msg_wrapper: ProcessingStrategy[Union[FilteredPayload, TPayload]], route: Route, ) -> ProcessingStrategy[Union[FilteredPayload, TPayload]]: @@ -268,7 +267,7 @@ def build_arroyo_windowed_reduce( size, slide, accumulator, - next_step, + msg_wrapper, route, ) @@ -294,7 +293,7 @@ def build_arroyo_windowed_reduce( arroyo_acc.accumulator, ), arroyo_acc.initial_value, - next_step, + msg_wrapper, ) case timedelta(): @@ -302,7 +301,7 @@ def build_arroyo_windowed_reduce( window_size.total_seconds(), window_size.total_seconds(), accumulator, - next_step, + msg_wrapper, route, ) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/routes.py b/sentry_streams/sentry_streams/adapters/arroyo/routes.py index 94092d5..143484b 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/routes.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/routes.py @@ -1,6 +1,8 @@ from dataclasses import dataclass from typing import Any, MutableSequence +from sentry_streams.pipeline.message import Message as StreamsMessage + @dataclass(frozen=True) class Route: @@ -23,4 +25,4 @@ class Route: @dataclass(frozen=True) class RoutedValue: route: Route - payload: Any + payload: StreamsMessage[Any] diff --git a/sentry_streams/sentry_streams/adapters/arroyo/steps.py b/sentry_streams/sentry_streams/adapters/arroyo/steps.py index 663dddc..834e5d7 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/steps.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/steps.py @@ -1,7 +1,7 @@ import logging from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, Callable, Generic, Union +from typing import Any, Callable, Generic, TypeVar, Union from arroyo.backends.abstract import Producer from arroyo.processing.strategies import CommitOffsets, Produce @@ -11,8 +11,10 @@ from sentry_streams.adapters.arroyo.broadcaster import Broadcaster from sentry_streams.adapters.arroyo.forwarder import Forwarder +from sentry_streams.adapters.arroyo.msg_wrapper import MessageWrapper from sentry_streams.adapters.arroyo.reduce import build_arroyo_windowed_reduce from sentry_streams.adapters.arroyo.routes import Route, RoutedValue +from sentry_streams.pipeline.message import Message as StreamsMessage from sentry_streams.pipeline.pipeline import ( Broadcast, Filter, @@ -24,6 +26,8 @@ logger = logging.getLogger(__name__) +TPayload = TypeVar("TPayload") + @dataclass class ArroyoStep(ABC): @@ -50,6 +54,12 @@ def build( raise NotImplementedError +# NOTE: All of the steps below now: +# Perform operations / evaluations on a StreamsMessage +# Receive a raw payload, which is NOT wrapped in a StreamsMessage (as the user provides this type signature) +# Wrap the raw payload in a StreamsMessage to send it along to the next step + + def process_message( route: Route, message: Message[Union[FilteredPayload, RoutedValue]], @@ -91,9 +101,14 @@ def transformer( return process_message( self.route, message, - lambda payload: RoutedValue( - route=payload.route, - payload=self.pipeline_step.resolved_function(payload.payload), + lambda routed_value: RoutedValue( + route=routed_value.route, + payload=StreamsMessage( + self.pipeline_step.resolved_function(routed_value.payload), + routed_value.payload.headers, + routed_value.payload.timestamp, + routed_value.payload.schema, + ), ), ) @@ -122,9 +137,19 @@ def transformer( return process_message( self.route, message, - lambda payload: ( - payload - if self.pipeline_step.resolved_function(payload.payload) + lambda routed_value: ( + RoutedValue( + self.route, + StreamsMessage( + routed_value.payload.payload, + routed_value.payload.headers, + routed_value.payload.timestamp, + routed_value.payload.schema, + ), + ) + if self.pipeline_step.resolved_function( + routed_value.payload + ) # The function used for filtering takes in a StreamsMessage else FilteredPayload() ), ) @@ -177,7 +202,13 @@ def append_branch_to_waypoints( result_branch = routing_func(payload.payload) result_branch_name = routing_table[result_branch].name payload.route.waypoints.append(result_branch_name) - return payload + + streams_msg = payload.payload + msg = StreamsMessage( + streams_msg.payload, streams_msg.headers, streams_msg.timestamp, streams_msg.schema + ) + + return RoutedValue(payload.route, msg) return RunTask( lambda message: process_message( @@ -219,9 +250,19 @@ def build( self, next: ProcessingStrategy[Union[FilteredPayload, RoutedValue]], commit: Commit ) -> ProcessingStrategy[Union[FilteredPayload, RoutedValue]]: # TODO: Support group by keys - windowed_reduce: ProcessingStrategy[Union[FilteredPayload, RoutedValue]] = ( + + msg_wrapper: ProcessingStrategy[Union[FilteredPayload, Any]] = MessageWrapper( + self.route, next + ) # Since the Reduce step produces aggregated raw payloads, we need to wrap them + # in a Message (a StreamsMessage) to prepare it for the next step. The next step + # expects a Message (a StreamsMessage). + + windowed_reduce: ProcessingStrategy[Union[FilteredPayload, Any]] = ( build_arroyo_windowed_reduce( - self.pipeline_step.windowing, self.pipeline_step.aggregate_fn, next, self.route + self.pipeline_step.windowing, + self.pipeline_step.aggregate_fn, + msg_wrapper, + self.route, ) ) diff --git a/sentry_streams/sentry_streams/examples/batching.py b/sentry_streams/sentry_streams/examples/batching.py index 57ca62e..4bc55cd 100644 --- a/sentry_streams/sentry_streams/examples/batching.py +++ b/sentry_streams/sentry_streams/examples/batching.py @@ -1,32 +1,24 @@ -import json +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric -from sentry_streams.pipeline import Batch, FlatMap, Map, streaming_source +from sentry_streams.pipeline import Batch, FlatMap, streaming_source from sentry_streams.pipeline.batch import unbatch -from sentry_streams.pipeline.function_template import InputType +from sentry_streams.pipeline.chain import Parser, Serializer +pipeline = streaming_source( + name="myinput", + stream_name="ingest-metrics", +) -def build_batch_str(batch: list[InputType]) -> str: - d = {"batch": batch} - - return json.dumps(d) - - -def build_message_str(message: str) -> str: - d = {"message": message} - - return json.dumps(d) - +# TODO: Figure out why the concrete type of InputType is not showing up in the type hint of chain1 +chain1 = pipeline.apply("parser", Parser(msg_type=IngestMetric)).apply( + "mybatch", Batch(batch_size=5) +) # User simply provides the batch size -pipeline = ( - streaming_source( - name="myinput", - stream_name="events", - ) - .apply("mybatch", Batch(batch_size=5)) # User simply provides the batch size - .apply("myunbatch", FlatMap(function=unbatch)) - .apply("mymap", Map(function=build_message_str)) - .sink( - "kafkasink", - stream_name="transformed-events", - ) # flush the batches to the Sink +chain2 = chain1.apply( + "myunbatch", + FlatMap(function=unbatch), ) + +chain3 = chain2.apply("serializer", Serializer()).sink( + "kafkasink2", stream_name="transformed-events" +) # flush the batches to the Sink diff --git a/sentry_streams/sentry_streams/examples/blq.py b/sentry_streams/sentry_streams/examples/blq.py index 285bfab..a13b853 100644 --- a/sentry_streams/sentry_streams/examples/blq.py +++ b/sentry_streams/sentry_streams/examples/blq.py @@ -1,26 +1,31 @@ +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric + from sentry_streams.examples.blq_fn import ( DownstreamBranch, - json_dump_message, should_send_to_blq, - unpack_kafka_message, ) -from sentry_streams.pipeline import Map, segment, streaming_source +from sentry_streams.pipeline import segment, streaming_source +from sentry_streams.pipeline.chain import Parser, Serializer storage_branch = ( - segment(name="recent") - .apply("dump_msg_recent", Map(json_dump_message)) + segment(name="recent", msg_type=IngestMetric) + .apply("serializer1", Serializer()) .broadcast( "send_message_to_DBs", routes=[ - segment("sbc").sink("kafkasink", stream_name="transformed-events"), - segment("clickhouse").sink("kafkasink2", stream_name="transformed-events-2"), + segment("sbc", msg_type=IngestMetric).sink( + "kafkasink", stream_name="transformed-events" + ), + segment("clickhouse", msg_type=IngestMetric).sink( + "kafkasink2", stream_name="transformed-events-2" + ), ], ) ) save_delayed_message = ( - segment(name="delayed") - .apply("dump_msg_delayed", Map(json_dump_message)) + segment(name="delayed", msg_type=IngestMetric) + .apply("serializer2", Serializer()) .sink( "kafkasink3", stream_name="transformed-events-3", @@ -30,9 +35,9 @@ pipeline = ( streaming_source( name="ingest", - stream_name="events", + stream_name="ingest-metrics", ) - .apply("unpack_message", Map(unpack_kafka_message)) + .apply("parser", Parser(msg_type=IngestMetric)) .route( "blq_router", routing_function=should_send_to_blq, diff --git a/sentry_streams/sentry_streams/examples/blq_fn.py b/sentry_streams/sentry_streams/examples/blq_fn.py index 07af987..1c440d5 100644 --- a/sentry_streams/sentry_streams/examples/blq_fn.py +++ b/sentry_streams/sentry_streams/examples/blq_fn.py @@ -1,8 +1,9 @@ -import json import time -from dataclasses import dataclass from enum import Enum -from typing import Any + +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric + +from sentry_streams.pipeline.message import Message # 10 minutes MAX_MESSAGE_LATENCY = 600 @@ -13,33 +14,9 @@ class DownstreamBranch(Enum): RECENT = "recent" -@dataclass -class Message: - value: Any - timestamp: float - - def to_dict(self) -> dict[str, Any]: - return { - "value": self.value, - "timestamp": self.timestamp, - } - - -def unpack_kafka_message(msg: str) -> Message: - d = json.loads(msg) - return Message( - value=d["value"], - timestamp=d["timestamp"], - ) - - -def should_send_to_blq(msg: Message) -> DownstreamBranch: - timestamp = msg.timestamp +def should_send_to_blq(msg: Message[IngestMetric]) -> DownstreamBranch: + timestamp = msg.payload["timestamp"] # We can do this because the type of the payload is known if timestamp < time.time() - MAX_MESSAGE_LATENCY: return DownstreamBranch.DELAYED else: return DownstreamBranch.RECENT - - -def json_dump_message(msg: Message) -> str: - return json.dumps(msg.to_dict()) diff --git a/sentry_streams/sentry_streams/examples/multi_chain.py b/sentry_streams/sentry_streams/examples/multi_chain.py index 70598ce..22ae306 100644 --- a/sentry_streams/sentry_streams/examples/multi_chain.py +++ b/sentry_streams/sentry_streams/examples/multi_chain.py @@ -1,23 +1,11 @@ -from json import JSONDecodeError, dumps, loads -from typing import Any, Mapping, cast +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric from sentry_streams.pipeline import Map, multi_chain, streaming_source +from sentry_streams.pipeline.chain import Parser, Serializer +from sentry_streams.pipeline.message import Message -def parse(msg: str) -> Mapping[str, Any]: - try: - parsed = loads(msg) - except JSONDecodeError: - return {"type": "invalid"} - - return cast(Mapping[str, Any], parsed) - - -def serialize(msg: Mapping[str, Any]) -> str: - return dumps(msg) - - -def do_something(msg: Mapping[str, Any]) -> Mapping[str, Any]: +def do_something(msg: Message[IngestMetric]) -> Message[IngestMetric]: # Do something with the message return msg @@ -25,29 +13,32 @@ def do_something(msg: Mapping[str, Any]) -> Mapping[str, Any]: pipeline = multi_chain( [ # Main Ingest chain - streaming_source("ingest", stream_name="ingest-events") - .apply("parse_msg", Map(parse)) + streaming_source("ingest", stream_name="ingest-metrics") + .apply("parse_msg", Parser(msg_type=IngestMetric)) .apply("process", Map(do_something)) - .apply("serialize", Map(serialize)) + .apply("serialize", Serializer()) .sink("eventstream", stream_name="events"), # Snuba chain to Clickhouse - streaming_source("snuba", stream_name="events") - .apply("snuba_parse_msg", Map(parse)) + streaming_source("snuba", stream_name="ingest-metrics") + .apply("snuba_parse_msg", Parser(msg_type=IngestMetric)) + .apply("snuba_serialize", Serializer()) .sink( "clickhouse", stream_name="someewhere", ), # Super Big Consumer chain - streaming_source("sbc", stream_name="events") - .apply("sbc_parse_msg", Map(parse)) + streaming_source("sbc", stream_name="ingest-metrics") + .apply("sbc_parse_msg", Parser(msg_type=IngestMetric)) + .apply("sbc_serialize", Serializer()) .sink( "sbc_sink", stream_name="someewhere", ), # Post process chain - streaming_source("post_process", stream_name="events") - .apply("post_parse_msg", Map(parse)) + streaming_source("post_process", stream_name="ingest-metrics") + .apply("post_parse_msg", Parser(msg_type=IngestMetric)) .apply("postprocess", Map(do_something)) + .apply("postprocess_serialize", Serializer()) .sink( "devnull", stream_name="someewhereelse", diff --git a/sentry_streams/sentry_streams/examples/transformer.py b/sentry_streams/sentry_streams/examples/transformer.py index cc1a800..7ed6883 100644 --- a/sentry_streams/sentry_streams/examples/transformer.py +++ b/sentry_streams/sentry_streams/examples/transformer.py @@ -1,49 +1,38 @@ from datetime import timedelta -from json import JSONDecodeError, dumps, loads -from typing import Any, Mapping, MutableSequence, Self, cast +from typing import MutableSequence, Self -from sentry_streams.pipeline import Filter, Map, streaming_source -from sentry_streams.pipeline.chain import Reducer +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric + +from sentry_streams.pipeline import streaming_source +from sentry_streams.pipeline.chain import ( + Parser, + Reducer, + Serializer, +) from sentry_streams.pipeline.function_template import Accumulator +from sentry_streams.pipeline.message import Message from sentry_streams.pipeline.window import SlidingWindow # The simplest possible pipeline. # - reads from Kafka -# - parses the event -# - filters the event based on an attribute -# - serializes the event into json +# - parses the metric data, validating against schema +# - batches messages together, emits aggregate results based on sliding window configuration +# - serializes the result into bytes # - produces the event on Kafka -def parse(msg: str) -> Mapping[str, Any]: - try: - parsed = loads(msg) - except JSONDecodeError: - return {"type": "invalid"} - - return cast(Mapping[str, Any], parsed) - - -def filter_not_event(msg: Mapping[str, Any]) -> bool: - return bool(msg["type"] == "event") - - -def serialize_msg(msg: Mapping[str, Any]) -> str: - return dumps(msg) - - -class TransformerBatch(Accumulator[Any, Any]): +class TransformerBatch(Accumulator[Message[IngestMetric], MutableSequence[IngestMetric]]): def __init__(self) -> None: - self.batch: MutableSequence[Any] = [] + self.batch: MutableSequence[IngestMetric] = [] - def add(self, value: Any) -> Self: - self.batch.append(value["test"]) + def add(self, value: Message[IngestMetric]) -> Self: + self.batch.append(value.payload) return self - def get_value(self) -> Any: - return "".join(self.batch) + def get_value(self) -> MutableSequence[IngestMetric]: + return self.batch def merge(self, other: Self) -> Self: self.batch.extend(other.batch) @@ -53,17 +42,27 @@ def merge(self, other: Self) -> Self: reduce_window = SlidingWindow(window_size=timedelta(seconds=6), window_slide=timedelta(seconds=2)) -pipeline = ( - streaming_source( - name="myinput", - stream_name="events", - ) - .apply("mymap", Map(function=parse)) - .apply("myfilter", Filter(function=filter_not_event)) - .apply("myreduce", Reducer(reduce_window, TransformerBatch)) - .apply("serializer", Map(function=serialize_msg)) - .sink( - "kafkasink2", - stream_name="transformed-events", - ) # flush the batches to the Sink -) +pipeline = streaming_source( + name="myinput", stream_name="ingest-metrics" +) # ExtensibleChain[Message[bytes]] + +chain1 = pipeline.apply( + "parser", + Parser( + msg_type=IngestMetric, + ), # pass in the standard message parser function +) # ExtensibleChain[Message[IngestMetric]] + +chain2 = chain1.apply( + "custom_batcher", Reducer(reduce_window, TransformerBatch) +) # ExtensibleChain[Message[MutableSequence[IngestMetric]]] + +chain3 = chain2.apply( + "serializer", + Serializer(), # pass in the standard message serializer function +) # ExtensibleChain[bytes] + +chain4 = chain3.sink( + "kafkasink2", + stream_name="transformed-events", +) # Chain diff --git a/sentry_streams/sentry_streams/pipeline/batch.py b/sentry_streams/sentry_streams/pipeline/batch.py index 4f6ee4f..bb7036b 100644 --- a/sentry_streams/sentry_streams/pipeline/batch.py +++ b/sentry_streams/sentry_streams/pipeline/batch.py @@ -1,9 +1,12 @@ -from typing import Generator, MutableSequence, Self +from typing import Any, Generator, MutableSequence, Optional, Self + +from sentry_kafka_schemas.codecs import Codec from sentry_streams.pipeline.function_template import Accumulator, InputType +from sentry_streams.pipeline.message import Message -class BatchBuilder(Accumulator[InputType, MutableSequence[InputType]]): +class BatchBuilder(Accumulator[Message[InputType], MutableSequence[InputType]]): """ Takes a generic input format, and batches into a generic batch representation with the same input type. Returns this batch representation. @@ -13,9 +16,10 @@ class BatchBuilder(Accumulator[InputType, MutableSequence[InputType]]): def __init__(self) -> None: self.batch: MutableSequence[InputType] = [] + self.schema: Optional[Codec[Any]] = None - def add(self, value: InputType) -> Self: - self.batch.append(value) + def add(self, value: Message[InputType]) -> Self: + self.batch.append(value.payload) return self @@ -28,7 +32,9 @@ def merge(self, other: Self) -> Self: return self -def unbatch(batch: MutableSequence[InputType]) -> Generator[InputType, None, None]: +def unbatch( + batch: Message[MutableSequence[InputType]], +) -> Generator[InputType, None, None]: """ Takes in a generic batch representation, outputs a Generator type for iterating over individual elements which compose the batch. @@ -36,5 +42,5 @@ def unbatch(batch: MutableSequence[InputType]) -> Generator[InputType, None, Non The data type of the elements remains the same through this operation. This operation may need to be followed by a Map or other transformation if a new output type is expected. """ - for message in batch: - yield message + for payload in batch.payload: + yield payload diff --git a/sentry_streams/sentry_streams/pipeline/chain.py b/sentry_streams/sentry_streams/pipeline/chain.py index 01bc70d..3cbad9c 100644 --- a/sentry_streams/sentry_streams/pipeline/chain.py +++ b/sentry_streams/sentry_streams/pipeline/chain.py @@ -7,9 +7,13 @@ Generic, Mapping, MutableSequence, + Optional, Sequence, + Tuple, + Type, TypeVar, Union, + cast, ) from sentry_streams.pipeline.function_template import ( @@ -19,6 +23,8 @@ InputType, OutputType, ) +from sentry_streams.pipeline.message import Message +from sentry_streams.pipeline.msg_parser import msg_parser, msg_serializer from sentry_streams.pipeline.pipeline import ( Aggregate, ) @@ -40,7 +46,6 @@ from sentry_streams.pipeline.window import MeasurementUnit, Window TRoute = TypeVar("TRoute") - TIn = TypeVar("TIn") TOut = TypeVar("TOut") @@ -69,33 +74,38 @@ def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: @dataclass -class Map(Applier[TIn, TOut], Generic[TIn, TOut]): - function: Union[Callable[[TIn], TOut], str] +class Map(Applier[Message[TIn], Message[TOut]], Generic[TIn, TOut]): + function: Union[Callable[[Message[TIn]], TOut], str] def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: return MapStep(name=name, ctx=ctx, inputs=[previous], function=self.function) @dataclass -class Filter(Applier[TIn, TIn], Generic[TIn]): - function: Union[Callable[[TIn], bool], str] +class Filter(Applier[Message[TIn], Message[TIn]], Generic[TIn]): + function: Union[Callable[[Message[TIn]], bool], str] def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: return FilterStep(name=name, ctx=ctx, inputs=[previous], function=self.function) @dataclass -class FlatMap(Applier[TIn, TOut], Generic[TIn, TOut]): - function: Union[Callable[[TIn], TOut], str] +class FlatMap(Applier[Message[MutableSequence[TIn]], Message[TOut]], Generic[TIn, TOut]): + function: Union[ + Callable[[Message[MutableSequence[TIn]]], TOut], str + ] # TODO: Consider making this type an Iterable rather than MutableSequence def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: return FlatMapStep(name=name, ctx=ctx, inputs=[previous], function=self.function) @dataclass -class Reducer(Applier[InputType, OutputType], Generic[MeasurementUnit, InputType, OutputType]): +class Reducer( + Applier[Message[InputType], Message[OutputType]], + Generic[MeasurementUnit, InputType, OutputType], +): window: Window[MeasurementUnit] - aggregate_func: Callable[[], Accumulator[InputType, OutputType]] + aggregate_func: Callable[[], Accumulator[Message[InputType], OutputType]] aggregate_backend: AggregationBackend[OutputType] | None = None group_by_key: GroupBy | None = None @@ -111,9 +121,48 @@ def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: ) +@dataclass +class Parser(Applier[Message[bytes], Message[TOut]], Generic[TOut]): + """ + A step to decode bytes, deserialize the resulting message and validate it against the schema + which corresponds to the message type provided. The message type should be one which + is supported by sentry-kafka-schemas. See examples/ for usage, this step can be plugged in + flexibly into a pipeline. Keep in mind, data up until this step will simply be bytes. + + Supports both JSON and protobuf. + """ + + msg_type: Type[TOut] + + def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: + return MapStep( + name=name, + ctx=ctx, + inputs=[previous], + function=msg_parser, + ) + + +@dataclass +class Serializer(Applier[Message[TIn], bytes], Generic[TIn]): + """ + A step to serialize and encode messages into bytes. These bytes can be written + to sink data to a Kafka topic, for example. This step will need to precede a + sink step which writes to Kafka. + """ + + def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: + return MapStep( + name=name, + ctx=ctx, + inputs=[previous], + function=msg_serializer, + ) + + @dataclass class Batch( - Applier[InputType, MutableSequence[InputType]], + Applier[Message[InputType], Message[MutableSequence[InputType]]], Generic[MeasurementUnit, InputType], ): batch_size: MeasurementUnit @@ -141,7 +190,7 @@ def __init__(self, name: str) -> None: self.name = name -class ExtensibleChain(Chain): +class ExtensibleChain(Chain, Generic[TIn]): """ Defines a streaming pipeline or a segment of a pipeline by chaining operators that define steps via function calls. @@ -178,7 +227,7 @@ def __init__(self, name: str) -> None: def _add_start(self, start: Step) -> None: self.__edge = start - def apply(self, name: str, applier: Applier[TIn, TOut]) -> ExtensibleChain: + def apply(self, name: str, applier: Applier[TIn, TOut]) -> ExtensibleChain[TOut]: """ Apply a transformation to the stream. The transformation is defined via an `Applier`. @@ -192,7 +241,7 @@ def apply(self, name: str, applier: Applier[TIn, TOut]) -> ExtensibleChain: """ assert self.__edge is not None self.__edge = applier.build_step(name, self, self.__edge) - return self + return cast(ExtensibleChain[TOut], self) def broadcast(self, name: str, routes: Sequence[Chain]) -> Chain: """ @@ -235,7 +284,11 @@ def route( return self - def sink(self, name: str, stream_name: str) -> Chain: + def sink( + self, + name: str, + stream_name: str, + ) -> Chain: """ Terminates the pipeline. @@ -246,25 +299,25 @@ def sink(self, name: str, stream_name: str) -> Chain: return self -def segment(name: str) -> ExtensibleChain: +def segment(name: str, msg_type: Type[TIn]) -> ExtensibleChain[Message[TIn]]: """ Creates a segment of a pipeline to be referenced in existing pipelines in route and broadcast steps. """ - pipeline: ExtensibleChain = ExtensibleChain(name) + pipeline: ExtensibleChain[Message[TIn]] = ExtensibleChain(name) pipeline._add_start(Branch(name=name, ctx=pipeline)) return pipeline -def streaming_source(name: str, stream_name: str) -> ExtensibleChain: +def streaming_source( + name: str, stream_name: str, header_filter: Optional[Tuple[str, bytes]] = None +) -> ExtensibleChain[Message[bytes]]: """ Create a pipeline that starts with a StreamingSource. """ - pipeline: ExtensibleChain = ExtensibleChain("root") + pipeline: ExtensibleChain[Message[bytes]] = ExtensibleChain("root") source = StreamSource( - name=name, - ctx=pipeline, - stream_name=stream_name, + name=name, ctx=pipeline, stream_name=stream_name, header_filter=header_filter ) pipeline._add_start(source) return pipeline diff --git a/sentry_streams/sentry_streams/pipeline/message.py b/sentry_streams/sentry_streams/pipeline/message.py new file mode 100644 index 0000000..9b65201 --- /dev/null +++ b/sentry_streams/sentry_streams/pipeline/message.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import ( + Any, + Generic, + MutableSequence, + Optional, + Tuple, + TypeVar, +) + +from sentry_kafka_schemas.codecs import Codec + +TIn = TypeVar("TIn") # TODO: Consider naming this TPayload + + +# A message with a generic payload +@dataclass(frozen=True) +class Message(Generic[TIn]): + payload: TIn + headers: MutableSequence[Tuple[str, bytes]] + timestamp: float + schema: Optional[ + Codec[Any] + ] # The schema of incoming messages. This is optional so Messages can be flexibly initialized in any part of the pipeline. We may want to change this down the road. + # TODO: Add support for an event timestamp diff --git a/sentry_streams/sentry_streams/pipeline/msg_parser.py b/sentry_streams/sentry_streams/pipeline/msg_parser.py new file mode 100644 index 0000000..9e06881 --- /dev/null +++ b/sentry_streams/sentry_streams/pipeline/msg_parser.py @@ -0,0 +1,27 @@ +import json +from typing import Any + +from sentry_streams.pipeline.message import Message + +# TODO: Push the following to docs +# Standard message decoders and encoders live here +# These are used in the defintions of Parser() and Serializer() steps, see chain/ + + +def msg_parser(msg: Message[bytes]) -> Any: + codec = msg.schema + payload = msg.payload + + assert ( + codec is not None + ) # Message cannot be deserialized without a schema, it is automatically inferred from the stream source + + decoded = codec.decode(payload, True) + + return decoded + + +def msg_serializer(msg: Message[Any]) -> bytes: + payload = msg.payload + + return json.dumps(payload).encode("utf-8") diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index 028a15a..f463606 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -14,6 +14,7 @@ Optional, Sequence, Set, + Tuple, TypeVar, Union, cast, @@ -165,6 +166,7 @@ class StreamSource(Source): """ stream_name: str + header_filter: Optional[Tuple[str, bytes]] = None step_type: StepType = StepType.SOURCE def __post_init__(self) -> None: diff --git a/sentry_streams/tests/adapters/arroyo/conftest.py b/sentry_streams/tests/adapters/arroyo/conftest.py index faf291e..65f594a 100644 --- a/sentry_streams/tests/adapters/arroyo/conftest.py +++ b/sentry_streams/tests/adapters/arroyo/conftest.py @@ -1,5 +1,5 @@ from datetime import timedelta -from typing import Any, Callable, MutableSequence, Self +from typing import Callable, MutableSequence, Self import pytest from arroyo.backends.kafka.consumer import KafkaPayload @@ -7,19 +7,31 @@ from arroyo.backends.local.storages.memory import MemoryMessageStorage from arroyo.types import Topic from arroyo.utils.clock import MockedClock - -from sentry_streams.pipeline import Filter, Map, Reducer, segment, streaming_source +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric + +from sentry_streams.pipeline.chain import ( + Filter, + Map, + Parser, + Reducer, + Serializer, + segment, + streaming_source, +) from sentry_streams.pipeline.function_template import Accumulator +from sentry_streams.pipeline.message import Message from sentry_streams.pipeline.pipeline import Pipeline from sentry_streams.pipeline.window import SlidingWindow +# def decode(msg: bytes) -> str: +# return msg.decode("utf-8") -def decode(msg: bytes) -> str: - return msg.decode("utf-8") +def basic_map(msg: Message[IngestMetric]) -> IngestMetric: + payload = msg.payload + payload["name"] = "new_metric" -def basic_map(msg: str) -> str: - return msg + "_mapped" + return payload @pytest.fixture @@ -29,20 +41,58 @@ def broker() -> LocalBroker[KafkaPayload]: broker.create_topic(Topic("events"), 1) broker.create_topic(Topic("transformed-events"), 1) broker.create_topic(Topic("transformed-events-2"), 1) + broker.create_topic(Topic("ingest-metrics"), 1) return broker -class TestTransformerBatch(Accumulator[Any, Any]): +@pytest.fixture +def metric() -> IngestMetric: + return { + "org_id": 420, + "project_id": 420, + "name": "s:sessions/user@none", + "tags": { + "sdk": "raven-node/2.6.3", + "environment": "production", + "release": "sentry-test@1.0.0", + }, + "timestamp": 1846062325, + "type": "s", + "retention_days": 90, + "value": [1617781333], + } + + +@pytest.fixture +def transformed_metric() -> IngestMetric: + return { + "org_id": 420, + "project_id": 420, + "name": "new_metric", + "tags": { + "sdk": "raven-node/2.6.3", + "environment": "production", + "release": "sentry-test@1.0.0", + }, + "timestamp": 1846062325, + "type": "s", + "retention_days": 90, + "value": [1617781333], + } + + +class TestTransformerBatch(Accumulator[Message[IngestMetric], MutableSequence[IngestMetric]]): + def __init__(self) -> None: - self.batch: MutableSequence[Any] = [] + self.batch: MutableSequence[IngestMetric] = [] - def add(self, value: Any) -> Self: - self.batch.append(value) + def add(self, value: Message[IngestMetric]) -> Self: + self.batch.append(value.payload) return self - def get_value(self) -> Any: - return "".join(self.batch) + def get_value(self) -> MutableSequence[IngestMetric]: + return self.batch def merge(self, other: Self) -> Self: self.batch.extend(other.batch) @@ -58,10 +108,11 @@ def transformer() -> Callable[[], TestTransformerBatch]: @pytest.fixture def pipeline() -> Pipeline: pipeline = ( - streaming_source("myinput", stream_name="events") - .apply("decoder", Map(decode)) - .apply("myfilter", Filter(lambda msg: msg == "go_ahead")) + streaming_source("myinput", stream_name="ingest-metrics") + .apply("decoder", Parser(msg_type=IngestMetric)) + .apply("myfilter", Filter(lambda msg: msg.payload["type"] == "s")) .apply("mymap", Map(basic_map)) + .apply("serializer", Serializer()) .sink("kafkasink", stream_name="transformed-events") ) @@ -73,12 +124,14 @@ def reduce_pipeline(transformer: Callable[[], TestTransformerBatch]) -> Pipeline reduce_window = SlidingWindow( window_size=timedelta(seconds=6), window_slide=timedelta(seconds=2) ) + pipeline = ( - streaming_source("myinput", "events") - .apply("decoder", Map(decode)) + streaming_source("myinput", stream_name="ingest-metrics") + .apply("decoder", Parser(msg_type=IngestMetric)) .apply("mymap", Map(basic_map)) .apply("myreduce", Reducer(reduce_window, transformer)) - .sink("kafkasink", "transformed-events") + .apply("serializer", Serializer()) + .sink("kafkasink", stream_name="transformed-events") ) return pipeline @@ -87,28 +140,28 @@ def reduce_pipeline(transformer: Callable[[], TestTransformerBatch]) -> Pipeline @pytest.fixture def router_pipeline() -> Pipeline: branch_1 = ( - segment("even_branch") - .apply("myfilter", Filter(lambda msg: msg == "go_ahead")) + segment("set_branch", IngestMetric) + .apply("serializer", Serializer()) .sink("kafkasink1", stream_name="transformed-events") ) branch_2 = ( - segment("odd_branch") - .apply("mymap", Map(basic_map)) + segment("not_set_branch", IngestMetric) + .apply("serializer2", Serializer()) .sink("kafkasink2", stream_name="transformed-events-2") ) pipeline = ( streaming_source( name="ingest", - stream_name="events", + stream_name="ingest-metrics", ) - .apply("decoder", Map(decode)) + .apply("decoder", Parser(msg_type=IngestMetric)) .route( "router", - routing_function=lambda msg: "even" if len(msg) % 2 == 0 else "odd", + routing_function=lambda msg: "set" if msg.payload["type"] == "s" else "not_set", routes={ - "even": branch_1, - "odd": branch_2, + "set": branch_1, + "not_set": branch_2, }, ) ) @@ -119,22 +172,24 @@ def router_pipeline() -> Pipeline: @pytest.fixture def broadcast_pipeline() -> Pipeline: branch_1 = ( - segment("even_branch") + segment("even_branch", IngestMetric) .apply("mymap1", Map(basic_map)) + .apply("serializer", Serializer()) .sink("kafkasink1", stream_name="transformed-events") ) branch_2 = ( - segment("odd_branch") + segment("odd_branch", IngestMetric) .apply("mymap2", Map(basic_map)) + .apply("serializer2", Serializer()) .sink("kafkasink2", stream_name="transformed-events-2") ) pipeline = ( streaming_source( name="ingest", - stream_name="events", + stream_name="ingest-metrics", ) - .apply("decoder", Map(decode)) + .apply("decoder", Parser(msg_type=IngestMetric)) .broadcast( "broadcast", routes=[ diff --git a/sentry_streams/tests/adapters/arroyo/test_adapter.py b/sentry_streams/tests/adapters/arroyo/test_adapter.py index 247b3b1..af70476 100644 --- a/sentry_streams/tests/adapters/arroyo/test_adapter.py +++ b/sentry_streams/tests/adapters/arroyo/test_adapter.py @@ -1,3 +1,4 @@ +import json from typing import cast from unittest import mock @@ -5,6 +6,7 @@ from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload, KafkaProducer from arroyo.backends.local.backend import LocalBroker from arroyo.types import Partition, Topic +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric from sentry_streams.adapters.arroyo.adapter import ( ArroyoAdapter, @@ -51,7 +53,12 @@ def test_kafka_sources() -> None: assert sources.get_consumer("source1") is not None -def test_adapter(broker: LocalBroker[KafkaPayload], pipeline: Pipeline) -> None: +def test_adapter( + broker: LocalBroker[KafkaPayload], + pipeline: Pipeline, + metric: IngestMetric, + transformed_metric: IngestMetric, +) -> None: adapter = ArroyoAdapter.build( { "env": {}, @@ -60,7 +67,7 @@ def test_adapter(broker: LocalBroker[KafkaPayload], pipeline: Pipeline) -> None: "kafkasink": {"kafkasink": {}}, }, }, - {"myinput": cast(KafkaConsumer, broker.get_consumer("events"))}, + {"myinput": cast(KafkaConsumer, broker.get_consumer("ingest-metrics"))}, {"kafkasink": cast(KafkaProducer, broker.get_producer())}, ) iterate_edges(pipeline, RuntimeTranslator(adapter)) @@ -69,14 +76,17 @@ def test_adapter(broker: LocalBroker[KafkaPayload], pipeline: Pipeline) -> None: processor = adapter.get_processor("myinput") broker.produce( - Partition(Topic("events"), 0), KafkaPayload(None, "go_ahead".encode("utf-8"), []) + Partition(Topic("ingest-metrics"), 0), + KafkaPayload(None, json.dumps(metric).encode("utf-8"), []), ) broker.produce( - Partition(Topic("events"), 0), - KafkaPayload(None, "do_not_go_ahead".encode("utf-8"), []), + Partition(Topic("ingest-metrics"), 0), + KafkaPayload(None, json.dumps(metric).encode("utf-8"), []), ) + metric["type"] = "c" broker.produce( - Partition(Topic("events"), 0), KafkaPayload(None, "go_ahead".encode("utf-8"), []) + Partition(Topic("ingest-metrics"), 0), + KafkaPayload(None, json.dumps(metric).encode("utf-8"), []), ) processor._run_once() @@ -85,7 +95,8 @@ def test_adapter(broker: LocalBroker[KafkaPayload], pipeline: Pipeline) -> None: topic = Topic("transformed-events") msg1 = broker.consume(Partition(topic, 0), 0) - assert msg1 is not None and msg1.payload.value == "go_ahead_mapped".encode("utf-8") + + assert msg1 is not None and msg1.payload.value == json.dumps(transformed_metric).encode("utf-8") msg2 = broker.consume(Partition(topic, 0), 1) - assert msg2 is not None and msg2.payload.value == "go_ahead_mapped".encode("utf-8") + assert msg2 is not None and msg2.payload.value == json.dumps(transformed_metric).encode("utf-8") assert broker.consume(Partition(topic, 0), 2) is None diff --git a/sentry_streams/tests/adapters/arroyo/test_broadcaster.py b/sentry_streams/tests/adapters/arroyo/test_broadcaster.py index 4c785d4..4e2ba17 100644 --- a/sentry_streams/tests/adapters/arroyo/test_broadcaster.py +++ b/sentry_streams/tests/adapters/arroyo/test_broadcaster.py @@ -1,16 +1,19 @@ +import time from unittest import mock from unittest.mock import call import pytest from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy from arroyo.types import FilteredPayload +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric from sentry_streams.adapters.arroyo.broadcaster import Broadcaster from sentry_streams.adapters.arroyo.routes import Route +from sentry_streams.pipeline.message import Message from tests.adapters.arroyo.message_helpers import make_value_msg -def test_submit_routedvalue() -> None: +def test_submit_routedvalue(metric: IngestMetric) -> None: next_step = mock.Mock(spec=ProcessingStrategy) broadcaster = Broadcaster( route=Route(source="source", waypoints=[]), @@ -18,21 +21,20 @@ def test_submit_routedvalue() -> None: next_step=next_step, ) - message = make_value_msg( - payload="test-payload", route=Route(source="source", waypoints=[]), offset=0 - ) + msg = Message(metric, [], time.time(), None) + message = make_value_msg(payload=msg, route=Route(source="source", waypoints=[]), offset=0) expected_calls = [ call.submit( make_value_msg( - payload="test-payload", + payload=msg, route=Route(source="source", waypoints=["branch_1"]), offset=0, ) ), call.submit( make_value_msg( - payload="test-payload", + payload=msg, route=Route(source="source", waypoints=["branch_2"]), offset=0, ) @@ -75,7 +77,7 @@ def test_submit_wrong_route() -> None: next_step.submit.assert_called_once_with(message) -def test_message_rejected() -> None: +def test_message_rejected(metric: IngestMetric) -> None: next_step = mock.Mock(spec=ProcessingStrategy) # raise MessageRejected on submit next_step.submit.side_effect = MessageRejected() @@ -86,13 +88,12 @@ def test_message_rejected() -> None: next_step=next_step, ) - message = make_value_msg( - payload="test-payload", route=Route(source="source", waypoints=[]), offset=0 - ) + msg = Message(metric, [], time.time(), None) + message = make_value_msg(payload=msg, route=Route(source="source", waypoints=[]), offset=0) message_rejected_expected_call = call( make_value_msg( - payload="test-payload", + payload=msg, route=Route(source="source", waypoints=["branch_1"]), offset=0, ) diff --git a/sentry_streams/tests/adapters/arroyo/test_consumer.py b/sentry_streams/tests/adapters/arroyo/test_consumer.py index a88f6a1..e7db0dc 100644 --- a/sentry_streams/tests/adapters/arroyo/test_consumer.py +++ b/sentry_streams/tests/adapters/arroyo/test_consumer.py @@ -1,4 +1,6 @@ +import json import time +from copy import deepcopy from datetime import timedelta from typing import Any, cast from unittest import mock @@ -7,6 +9,8 @@ from arroyo.backends.kafka.consumer import KafkaPayload from arroyo.backends.local.backend import LocalBroker from arroyo.types import Commit, Partition, Topic +from sentry_kafka_schemas import get_codec +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric from sentry_streams.adapters.arroyo.consumer import ( ArroyoConsumer, @@ -31,15 +35,22 @@ ) from tests.adapters.arroyo.message_helpers import make_kafka_msg +SCHEMA = get_codec("ingest-metrics") -def test_single_route(broker: LocalBroker[KafkaPayload], pipeline: Pipeline) -> None: + +def test_single_route( + broker: LocalBroker[KafkaPayload], + pipeline: Pipeline, + metric: IngestMetric, + transformed_metric: IngestMetric, +) -> None: """ Test the creation of an Arroyo Consumer from a number of pipeline steps. """ empty_route = Route(source="source1", waypoints=[]) - consumer = ArroyoConsumer(source="source1") + consumer = ArroyoConsumer(source="source1", stream_name="ingest-metrics", schema=SCHEMA) consumer.add_step( MapStep( route=empty_route, @@ -58,6 +69,12 @@ def test_single_route(broker: LocalBroker[KafkaPayload], pipeline: Pipeline) -> pipeline_step=cast(Map, pipeline.steps["mymap"]), ) ) + consumer.add_step( + MapStep( + route=empty_route, + pipeline_step=cast(Map, pipeline.steps["serializer"]), + ) + ) consumer.add_step( StreamSinkStep( route=empty_route, @@ -68,44 +85,52 @@ def test_single_route(broker: LocalBroker[KafkaPayload], pipeline: Pipeline) -> factory = ArroyoStreamingFactory(consumer) commit = mock.Mock(spec=Commit) - strategy = factory.create_with_partitions(commit, {Partition(Topic("events"), 0): 0}) + strategy = factory.create_with_partitions(commit, {Partition(Topic("ingest-metrics"), 0): 0}) + + counter_metric = deepcopy(metric) + counter_metric["type"] = "c" - strategy.submit(make_kafka_msg("go_ahead", "events", 0)) + strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 0)) strategy.poll() - strategy.submit(make_kafka_msg("do_not_go_ahead", "events", 2)) + strategy.submit(make_kafka_msg(json.dumps(counter_metric), "ingest-metrics", 2)) strategy.poll() - strategy.submit(make_kafka_msg("go_ahead", "events", 3)) + strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 3)) strategy.poll() topic = Topic("transformed-events") msg1 = broker.consume(Partition(topic, 0), 0) - assert msg1 is not None and msg1.payload.value == "go_ahead_mapped".encode("utf-8") + assert msg1 is not None and msg1.payload.value == json.dumps(transformed_metric).encode("utf-8") msg2 = broker.consume(Partition(topic, 0), 1) - assert msg2 is not None and msg2.payload.value == "go_ahead_mapped".encode("utf-8") + assert msg2 is not None and msg2.payload.value == json.dumps(transformed_metric).encode("utf-8") assert broker.consume(Partition(topic, 0), 2) is None commit.assert_has_calls( [ call({}), - call({Partition(Topic("events"), 0): 1}), + call({Partition(Topic("ingest-metrics"), 0): 1}), call({}), - call({Partition(Topic("events"), 0): 3}), + call({Partition(Topic("ingest-metrics"), 0): 3}), call({}), call({}), call({}), - call({Partition(Topic("events"), 0): 4}), + call({Partition(Topic("ingest-metrics"), 0): 4}), call({}), ], ) -def test_broadcast(broker: LocalBroker[KafkaPayload], broadcast_pipeline: Pipeline) -> None: +def test_broadcast( + broker: LocalBroker[KafkaPayload], + broadcast_pipeline: Pipeline, + metric: IngestMetric, + transformed_metric: IngestMetric, +) -> None: """ Test the creation of an Arroyo Consumer from pipeline steps which contain a Broadcast. """ - consumer = ArroyoConsumer(source="source1") + consumer = ArroyoConsumer(source="source1", stream_name="ingest-metrics", schema=SCHEMA) consumer.add_step( MapStep( route=Route(source="source1", waypoints=[]), @@ -130,6 +155,18 @@ def test_broadcast(broker: LocalBroker[KafkaPayload], broadcast_pipeline: Pipeli pipeline_step=cast(Map, broadcast_pipeline.steps["mymap2"]), ) ) + consumer.add_step( + MapStep( + route=Route(source="source1", waypoints=["even_branch"]), + pipeline_step=cast(Map, broadcast_pipeline.steps["serializer"]), + ) + ) + consumer.add_step( + MapStep( + route=Route(source="source1", waypoints=["odd_branch"]), + pipeline_step=cast(Map, broadcast_pipeline.steps["serializer2"]), + ) + ) consumer.add_step( StreamSinkStep( route=Route(source="source1", waypoints=["even_branch"]), @@ -147,33 +184,41 @@ def test_broadcast(broker: LocalBroker[KafkaPayload], broadcast_pipeline: Pipeli factory = ArroyoStreamingFactory(consumer) commit = mock.Mock(spec=Commit) - strategy = factory.create_with_partitions(commit, {Partition(Topic("events"), 0): 0}) + strategy = factory.create_with_partitions(commit, {Partition(Topic("ingest-metrics"), 0): 0}) - strategy.submit(make_kafka_msg("go_ahead", "events", 0)) + strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 0)) strategy.poll() - strategy.submit(make_kafka_msg("do_not_go_ahead", "events", 2)) + strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 2)) strategy.poll() - strategy.submit(make_kafka_msg("go_ahead", "events", 3)) + strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 3)) strategy.poll() topics = [Topic("transformed-events"), Topic("transformed-events-2")] for topic in topics: msg1 = broker.consume(Partition(topic, 0), 0) - assert msg1 is not None and msg1.payload.value == "go_ahead_mapped".encode("utf-8") + assert msg1 is not None and msg1.payload.value == json.dumps(transformed_metric).encode( + "utf-8" + ) msg2 = broker.consume(Partition(topic, 0), 1) - assert msg2 is not None and msg2.payload.value == "do_not_go_ahead_mapped".encode("utf-8") + assert msg2 is not None and msg2.payload.value == json.dumps(transformed_metric).encode( + "utf-8" + ) msg3 = broker.consume(Partition(topic, 0), 2) - assert msg3 is not None and msg3.payload.value == "go_ahead_mapped".encode("utf-8") + assert msg3 is not None and msg3.payload.value == json.dumps(transformed_metric).encode( + "utf-8" + ) -def test_multiple_routes(broker: LocalBroker[KafkaPayload], router_pipeline: Pipeline) -> None: +def test_multiple_routes( + broker: LocalBroker[KafkaPayload], router_pipeline: Pipeline, metric: IngestMetric +) -> None: """ Test the creation of an Arroyo Consumer from pipeline steps which contain branching routes. """ - consumer = ArroyoConsumer(source="source1") + consumer = ArroyoConsumer(source="source1", stream_name="ingest-metrics", schema=SCHEMA) consumer.add_step( MapStep( route=Route(source="source1", waypoints=[]), @@ -187,27 +232,27 @@ def test_multiple_routes(broker: LocalBroker[KafkaPayload], router_pipeline: Pip ) ) consumer.add_step( - FilterStep( - route=Route(source="source1", waypoints=["even_branch"]), - pipeline_step=cast(Filter, router_pipeline.steps["myfilter"]), + MapStep( + route=Route(source="source1", waypoints=["set_branch"]), + pipeline_step=cast(Map, router_pipeline.steps["serializer"]), ) ) consumer.add_step( MapStep( - route=Route(source="source1", waypoints=["odd_branch"]), - pipeline_step=cast(Map, router_pipeline.steps["mymap"]), + route=Route(source="source1", waypoints=["not_set_branch"]), + pipeline_step=cast(Map, router_pipeline.steps["serializer2"]), ) ) consumer.add_step( StreamSinkStep( - route=Route(source="source1", waypoints=["even_branch"]), + route=Route(source="source1", waypoints=["set_branch"]), producer=broker.get_producer(), topic_name="transformed-events", ) ) consumer.add_step( StreamSinkStep( - route=Route(source="source1", waypoints=["odd_branch"]), + route=Route(source="source1", waypoints=["not_set_branch"]), producer=broker.get_producer(), topic_name="transformed-events-2", ) @@ -215,50 +260,58 @@ def test_multiple_routes(broker: LocalBroker[KafkaPayload], router_pipeline: Pip factory = ArroyoStreamingFactory(consumer) commit = mock.Mock(spec=Commit) - strategy = factory.create_with_partitions(commit, {Partition(Topic("events"), 0): 0}) + strategy = factory.create_with_partitions(commit, {Partition(Topic("ingest-metrics"), 0): 0}) - strategy.submit(make_kafka_msg("go_ahead", "events", 0)) + counter_metric = deepcopy(metric) + counter_metric["type"] = "c" + + strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 0)) strategy.poll() - strategy.submit(make_kafka_msg("do_not_go_ahead", "events", 2)) + strategy.submit(make_kafka_msg(json.dumps(counter_metric), "ingest-metrics", 2)) strategy.poll() - strategy.submit(make_kafka_msg("go_ahead", "events", 3)) + strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 3)) strategy.poll() - topic = Topic("transformed-events") - topic2 = Topic("transformed-events-2") + topic = Topic("transformed-events") # for set messages + topic2 = Topic("transformed-events-2") # for non-set messages msg1 = broker.consume(Partition(topic, 0), 0) - assert msg1 is not None and msg1.payload.value == "go_ahead".encode("utf-8") + assert msg1 is not None and msg1.payload.value == json.dumps(metric).encode("utf-8") msg2 = broker.consume(Partition(topic, 0), 1) - assert msg2 is not None and msg2.payload.value == "go_ahead".encode("utf-8") + assert msg2 is not None and msg2.payload.value == json.dumps(metric).encode("utf-8") msg3 = broker.consume(Partition(topic2, 0), 0) - assert msg3 is not None and msg3.payload.value == "do_not_go_ahead_mapped".encode("utf-8") + assert msg3 is not None and msg3.payload.value == json.dumps(counter_metric).encode("utf-8") commit.assert_has_calls( [ call({}), - call({Partition(topic=Topic(name="events"), index=0): 1}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 1}), call({}), call({}), call({}), call({}), - call({Partition(topic=Topic(name="events"), index=0): 3}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 3}), call({}), call({}), - call({Partition(topic=Topic(name="events"), index=0): 4}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 4}), call({}), call({}), ], ) -def test_standard_reduce(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pipeline) -> None: +def test_standard_reduce( + broker: LocalBroker[KafkaPayload], + reduce_pipeline: Pipeline, + metric: IngestMetric, + transformed_metric: IngestMetric, +) -> None: """ Test a full "loop" of the sliding window algorithm. Checks for correct results, timestamps, and offset management strategy """ - consumer = ArroyoConsumer(source="source1") + consumer = ArroyoConsumer(source="source1", stream_name="ingest-metrics", schema=SCHEMA) consumer.add_step( MapStep( route=Route(source="source1", waypoints=[]), @@ -277,6 +330,12 @@ def test_standard_reduce(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip pipeline_step=cast(Reduce[timedelta, Any, Any], reduce_pipeline.steps["myreduce"]), ) ) + consumer.add_step( + MapStep( + route=Route(source="source1", waypoints=[]), + pipeline_step=cast(Map, reduce_pipeline.steps["serializer"]), + ) + ) consumer.add_step( StreamSinkStep( route=Route(source="source1", waypoints=[]), @@ -287,31 +346,44 @@ def test_standard_reduce(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip factory = ArroyoStreamingFactory(consumer) commit = mock.Mock(spec=Commit) - strategy = factory.create_with_partitions(commit, {Partition(Topic("logical-events"), 0): 0}) + strategy = factory.create_with_partitions(commit, {Partition(Topic("ingest-metrics"), 0): 0}) cur_time = time.time() # 6 messages + messages = [] + for i in range(6): + modified_metric = deepcopy(metric) + modified_metric["org_id"] = i + messages.append(modified_metric) + # Accumulators: [0,1] [2,3] [4,5] [6,7] [8,9] for i in range(6): - msg = f"msg{i}" with mock.patch("time.time", return_value=cur_time + 2 * i): - strategy.submit(make_kafka_msg(msg, "logical-events", i)) + strategy.submit(make_kafka_msg(json.dumps(messages[i]), "ingest-metrics", i)) # Last submit was at T+10, which means we've only flushed the first 3 windows + + transformed_msgs = [] + for i in range(6): + modified_metric = deepcopy(transformed_metric) + modified_metric["org_id"] = i + transformed_msgs.append(modified_metric) + topic = Topic("transformed-events") msg1 = broker.consume(Partition(topic, 0), 0) - assert msg1 is not None and msg1.payload.value == "msg0_mappedmsg1_mappedmsg2_mapped".encode( + + assert msg1 is not None and msg1.payload.value == json.dumps(transformed_msgs[:3]).encode( "utf-8" ) msg2 = broker.consume(Partition(topic, 0), 1) - assert msg2 is not None and msg2.payload.value == "msg1_mappedmsg2_mappedmsg3_mapped".encode( + assert msg2 is not None and msg2.payload.value == json.dumps(transformed_msgs[1:4]).encode( "utf-8" ) msg3 = broker.consume(Partition(topic, 0), 2) - assert msg3 is not None and msg3.payload.value == "msg2_mappedmsg3_mappedmsg4_mapped".encode( + assert msg3 is not None and msg3.payload.value == json.dumps(transformed_msgs[2:5]).encode( "utf-8" ) @@ -322,26 +394,42 @@ def test_standard_reduce(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip strategy.poll() msg4 = broker.consume(Partition(topic, 0), 3) - assert msg4 is not None and msg4.payload.value == "msg3_mappedmsg4_mappedmsg5_mapped".encode( + assert msg4 is not None and msg4.payload.value == json.dumps(transformed_msgs[3:6]).encode( "utf-8" ) msg5 = broker.consume(Partition(topic, 0), 4) - assert msg5 is not None and msg5.payload.value == "msg4_mappedmsg5_mapped".encode("utf-8") + assert msg5 is not None and msg5.payload.value == json.dumps(transformed_msgs[4:6]).encode( + "utf-8" + ) msg6 = broker.consume(Partition(topic, 0), 5) - assert msg6 is not None and msg6.payload.value == "msg5_mapped".encode("utf-8") + assert msg6 is not None and msg6.payload.value == json.dumps([transformed_msgs[5]]).encode( + "utf-8" + ) # Up to this point everything is flushed out + messages = [] + for i in range(12, 14): + modified_metric = deepcopy(metric) + modified_metric["org_id"] = i + messages.append(modified_metric) # Submit data at T+24, T+26 (data comes in at a gap) for i in range(12, 14): - msg = f"msg{i}" with mock.patch("time.time", return_value=cur_time + 2 * i): - strategy.submit(make_kafka_msg(msg, "logical-events", i)) + strategy.submit(make_kafka_msg(json.dumps(messages[i - 12]), "ingest-metrics", i)) + + transformed_msgs = [] + for i in range(12, 14): + modified_metric = deepcopy(transformed_metric) + modified_metric["org_id"] = i + transformed_msgs.append(modified_metric) msg12 = broker.consume(Partition(topic, 0), 6) - assert msg12 is not None and msg12.payload.value == "msg12_mapped".encode("utf-8") + assert msg12 is not None and msg12.payload.value == json.dumps([transformed_msgs[0]]).encode( + "utf-8" + ) msg13 = broker.consume(Partition(topic, 0), 7) assert msg13 is None @@ -350,7 +438,9 @@ def test_standard_reduce(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip strategy.poll() msg13 = broker.consume(Partition(topic, 0), 7) - assert msg13 is not None and msg13.payload.value == "msg12_mappedmsg13_mapped".encode("utf-8") + assert msg13 is not None and msg13.payload.value == json.dumps(transformed_msgs[:2]).encode( + "utf-8" + ) msg14 = broker.consume(Partition(topic, 0), 8) assert msg14 is None @@ -359,13 +449,17 @@ def test_standard_reduce(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip strategy.poll() msg14 = broker.consume(Partition(topic, 0), 8) - assert msg14 is not None and msg14.payload.value == "msg12_mappedmsg13_mapped".encode("utf-8") + assert msg14 is not None and msg14.payload.value == json.dumps(transformed_msgs[:2]).encode( + "utf-8" + ) with mock.patch("time.time", return_value=cur_time + 2 * 16): strategy.poll() msg15 = broker.consume(Partition(topic, 0), 9) - assert msg15 is not None and msg15.payload.value == "msg13_mapped".encode("utf-8") + assert msg15 is not None and msg15.payload.value == json.dumps([transformed_msgs[1]]).encode( + "utf-8" + ) with mock.patch("time.time", return_value=cur_time + 2 * 17): strategy.poll() @@ -377,19 +471,19 @@ def test_standard_reduce(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip commit.assert_has_calls( [ call({}), - call({Partition(topic=Topic(name="logical-events"), index=0): 1}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 1}), call({}), - call({Partition(topic=Topic(name="logical-events"), index=0): 2}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 2}), call({}), - call({Partition(topic=Topic(name="logical-events"), index=0): 3}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 3}), call({}), - call({Partition(topic=Topic(name="logical-events"), index=0): 4}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 4}), call({}), call({}), - call({Partition(topic=Topic(name="logical-events"), index=0): 5}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 5}), call({}), call({}), - call({Partition(topic=Topic(name="logical-events"), index=0): 6}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 6}), call({}), call({}), call({}), @@ -397,10 +491,10 @@ def test_standard_reduce(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip call({}), call({}), call({}), - call({Partition(topic=Topic(name="logical-events"), index=0): 13}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 13}), call({}), call({}), - call({Partition(topic=Topic(name="logical-events"), index=0): 14}), + call({Partition(topic=Topic(name="ingest-metrics"), index=0): 14}), call({}), call({}), call({}), @@ -408,13 +502,18 @@ def test_standard_reduce(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip ) -def test_reduce_with_gap(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pipeline) -> None: +def test_reduce_with_gap( + broker: LocalBroker[KafkaPayload], + reduce_pipeline: Pipeline, + metric: IngestMetric, + transformed_metric: IngestMetric, +) -> None: """ Test a full "loop" of the sliding window algorithm. Checks for correct results, timestamps, and offset management strategy """ - consumer = ArroyoConsumer(source="source1") + consumer = ArroyoConsumer(source="source1", stream_name="ingest-metrics", schema=SCHEMA) consumer.add_step( MapStep( route=Route(source="source1", waypoints=[]), @@ -433,6 +532,12 @@ def test_reduce_with_gap(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip pipeline_step=cast(Reduce[timedelta, Any, Any], reduce_pipeline.steps["myreduce"]), ) ) + consumer.add_step( + MapStep( + route=Route(source="source1", waypoints=[]), + pipeline_step=cast(Map, reduce_pipeline.steps["serializer"]), + ) + ) consumer.add_step( StreamSinkStep( route=Route(source="source1", waypoints=[]), @@ -443,32 +548,44 @@ def test_reduce_with_gap(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip factory = ArroyoStreamingFactory(consumer) commit = mock.Mock(spec=Commit) - strategy = factory.create_with_partitions(commit, {Partition(Topic("logical-events"), 0): 0}) + strategy = factory.create_with_partitions(commit, {Partition(Topic("ingest-metrics"), 0): 0}) cur_time = time.time() - # 6 messages + # 6 messages to use in this test + # Give them an "ID" so we can test for correctness in the algorithm + messages = [] + for i in range(6): + modified_metric = deepcopy(metric) + modified_metric["org_id"] = i + messages.append(modified_metric) + # Accumulators: [0,1] [2,3] [4,5] [6,7] [8,9] for i in range(6): - msg = f"msg{i}" with mock.patch("time.time", return_value=cur_time + 2 * i): - strategy.submit(make_kafka_msg(msg, "logical-events", i)) + strategy.submit(make_kafka_msg(json.dumps(messages[i]), "ingest-metrics", i)) # Last submit was at T+10, which means we've only flushed the first 3 windows + transformed_msgs = [] + for i in range(6): + modified_metric = deepcopy(transformed_metric) + modified_metric["org_id"] = i + transformed_msgs.append(modified_metric) + topic = Topic("transformed-events") msg1 = broker.consume(Partition(topic, 0), 0) - assert msg1 is not None and msg1.payload.value == "msg0_mappedmsg1_mappedmsg2_mapped".encode( + assert msg1 is not None and msg1.payload.value == json.dumps(transformed_msgs[:3]).encode( "utf-8" ) msg2 = broker.consume(Partition(topic, 0), 1) - assert msg2 is not None and msg2.payload.value == "msg1_mappedmsg2_mappedmsg3_mapped".encode( + assert msg2 is not None and msg2.payload.value == json.dumps(transformed_msgs[1:4]).encode( "utf-8" ) msg3 = broker.consume(Partition(topic, 0), 2) - assert msg3 is not None and msg3.payload.value == "msg2_mappedmsg3_mappedmsg4_mapped".encode( + assert msg3 is not None and msg3.payload.value == json.dumps(transformed_msgs[2:5]).encode( "utf-8" ) @@ -481,29 +598,33 @@ def test_reduce_with_gap(broker: LocalBroker[KafkaPayload], reduce_pipeline: Pip strategy.poll() msg4 = broker.consume(Partition(topic, 0), 3) - assert msg4 is not None and msg4.payload.value == "msg3_mappedmsg4_mappedmsg5_mapped".encode( + assert msg4 is not None and msg4.payload.value == json.dumps(transformed_msgs[3:6]).encode( "utf-8" ) msg5 = broker.consume(Partition(topic, 0), 4) - assert msg5 is not None and msg5.payload.value == "msg4_mappedmsg5_mapped".encode("utf-8") + assert msg5 is not None and msg5.payload.value == json.dumps(transformed_msgs[4:6]).encode( + "utf-8" + ) msg6 = broker.consume(Partition(topic, 0), 5) - assert msg6 is not None and msg6.payload.value == "msg5_mapped".encode("utf-8") + assert msg6 is not None and msg6.payload.value == json.dumps([transformed_msgs[5]]).encode( + "utf-8" + ) commit.assert_has_calls( [ call({}), - call({Partition(Topic("logical-events"), 0): 1}), + call({Partition(Topic("ingest-metrics"), 0): 1}), call({}), - call({Partition(Topic("logical-events"), 0): 2}), + call({Partition(Topic("ingest-metrics"), 0): 2}), call({}), - call({Partition(Topic("logical-events"), 0): 3}), + call({Partition(Topic("ingest-metrics"), 0): 3}), call({}), - call({Partition(Topic("logical-events"), 0): 4}), + call({Partition(Topic("ingest-metrics"), 0): 4}), call({}), - call({Partition(Topic("logical-events"), 0): 5}), + call({Partition(Topic("ingest-metrics"), 0): 5}), call({}), - call({Partition(Topic("logical-events"), 0): 6}), + call({Partition(Topic("ingest-metrics"), 0): 6}), ] ) diff --git a/sentry_streams/tests/adapters/arroyo/test_forwarder.py b/sentry_streams/tests/adapters/arroyo/test_forwarder.py index bb5435b..83871ed 100644 --- a/sentry_streams/tests/adapters/arroyo/test_forwarder.py +++ b/sentry_streams/tests/adapters/arroyo/test_forwarder.py @@ -1,3 +1,5 @@ +import json +import time from datetime import datetime from typing import Mapping from unittest import mock @@ -6,13 +8,15 @@ from arroyo.processing.strategies import Produce from arroyo.processing.strategies.abstract import ProcessingStrategy from arroyo.types import BrokerValue, Message, Partition, Topic +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric from sentry_streams.adapters.arroyo.forwarder import Forwarder from sentry_streams.adapters.arroyo.routes import Route, RoutedValue +from sentry_streams.pipeline.message import Message as StreamsMessage from tests.adapters.arroyo.message_helpers import make_msg -def test_submit() -> None: +def test_submit(metric: IngestMetric) -> None: produce_step = mock.Mock(spec=Produce) next_step = mock.Mock(spec=ProcessingStrategy) forwarder = Forwarder( @@ -21,14 +25,17 @@ def test_submit() -> None: next_step=next_step, ) + produced_msg = StreamsMessage(json.dumps(metric).encode("utf-8"), [], time.time(), None) + msg = StreamsMessage(metric, [], time.time(), None) + messages: Mapping[str, Message[RoutedValue]] = { "correct": make_msg( - payload="test-payload", + payload=produced_msg, route=Route(source="source", waypoints=["correct_branch"]), offset=0, ), "wrong": make_msg( - payload="test-payload", + payload=msg, route=Route(source="source", waypoints=["wrong_branch"]), offset=1, ), @@ -36,7 +43,7 @@ def test_submit() -> None: expected_messages = { "correct": Message( value=BrokerValue( - payload=KafkaPayload(None, "test-payload".encode("utf-8"), []), + payload=KafkaPayload(None, json.dumps(msg.payload).encode("utf-8"), []), partition=Partition(Topic("test_topic"), 0), offset=0, timestamp=datetime(2025, 1, 1, 12, 0), @@ -45,7 +52,7 @@ def test_submit() -> None: "wrong": Message( value=BrokerValue( payload=RoutedValue( - route=Route(source="source", waypoints=["wrong_branch"]), payload="test-payload" + route=Route(source="source", waypoints=["wrong_branch"]), payload=msg ), partition=Partition(Topic("test_topic"), 0), offset=1, diff --git a/sentry_streams/tests/adapters/arroyo/test_steps.py b/sentry_streams/tests/adapters/arroyo/test_steps.py index db6922e..f7e3b90 100644 --- a/sentry_streams/tests/adapters/arroyo/test_steps.py +++ b/sentry_streams/tests/adapters/arroyo/test_steps.py @@ -1,19 +1,25 @@ +import json import time -from datetime import timedelta +from datetime import datetime, timedelta from typing import Callable from unittest import mock -from unittest.mock import call +from unittest.mock import ANY, call from arroyo.backends.abstract import Producer from arroyo.backends.kafka.consumer import KafkaPayload from arroyo.processing.strategies.abstract import ProcessingStrategy from arroyo.types import ( + BrokerValue, Commit, FilteredPayload, + Message, + Partition, Topic, + Value, ) +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric -from sentry_streams.adapters.arroyo.routes import Route +from sentry_streams.adapters.arroyo.routes import Route, RoutedValue from sentry_streams.adapters.arroyo.steps import ( BroadcastStep, FilterStep, @@ -23,6 +29,7 @@ StreamSinkStep, ) from sentry_streams.examples.transformer import TransformerBatch +from sentry_streams.pipeline.message import Message as StreamsMessage from sentry_streams.pipeline.pipeline import ( Aggregate, Branch, @@ -36,7 +43,7 @@ from tests.adapters.arroyo.message_helpers import make_msg, make_value_msg -def test_map_step() -> None: +def test_map_step(metric: IngestMetric) -> None: """ Send messages for different routes through the Arroyo RunTask strategy generate by the pipeline Map step. @@ -45,16 +52,18 @@ def test_map_step() -> None: mapped_route = Route(source="source1", waypoints=["branch1"]) other_route = Route(source="source1", waypoints=["branch2"]) pipeline = Pipeline() - pipeline_map = Map(name="mymap", ctx=pipeline, inputs=[], function=lambda x: x + "_mapped") + pipeline_map = Map(name="mymap", ctx=pipeline, inputs=[], function=lambda msg: msg.payload) arroyo_map = MapStep(mapped_route, pipeline_map) next_strategy = mock.Mock(spec=ProcessingStrategy) strategy = arroyo_map.build(next_strategy, commit=mock.Mock(spec=Commit)) + test_msg = StreamsMessage(metric, [], time.time(), None) + messages = [ - make_msg("test_val", mapped_route, 0), - make_msg("test_val", other_route, 1), + make_msg(test_msg, mapped_route, 0), + make_msg(test_msg, other_route, 1), make_msg(FilteredPayload(), mapped_route, 3), ] @@ -64,11 +73,11 @@ def test_map_step() -> None: expected_calls = [ call.submit( - make_msg("test_val_mapped", mapped_route, 0), + make_msg(test_msg, mapped_route, 0), ), call.poll(), call.submit( - make_msg("test_val", other_route, 1), + make_msg(test_msg, other_route, 1), ), call.poll(), call.submit( @@ -80,7 +89,7 @@ def test_map_step() -> None: next_strategy.assert_has_calls(expected_calls) -def test_filter_step() -> None: +def test_filter_step(metric: IngestMetric, transformed_metric: IngestMetric) -> None: """ Send messages for different routes through the Arroyo RunTask strategy generate by the pipeline Filter step. @@ -90,17 +99,22 @@ def test_filter_step() -> None: pipeline = Pipeline() pipeline_filter = Filter( - name="myfilter", ctx=pipeline, inputs=[], function=lambda x: x == "test_val" + name="myfilter", + ctx=pipeline, + inputs=[], + function=lambda msg: msg.payload["name"] != "new_metric", ) arroyo_filter = FilterStep(mapped_route, pipeline_filter) next_strategy = mock.Mock(spec=ProcessingStrategy) strategy = arroyo_filter.build(next_strategy, commit=mock.Mock(spec=Commit)) + msg = StreamsMessage(metric, [], time.time(), None) + filtered_msg = StreamsMessage(transformed_metric, [], time.time(), None) messages = [ - make_msg("test_val", mapped_route, 0), - make_msg("not_test_val", mapped_route, 1), - make_msg("test_val", other_route, 2), + make_msg(msg, mapped_route, 0), + make_msg(filtered_msg, mapped_route, 1), + make_msg(msg, other_route, 2), make_msg(FilteredPayload(), mapped_route, 3), ] @@ -110,13 +124,13 @@ def test_filter_step() -> None: expected_calls = [ call.submit( - make_msg("test_val", mapped_route, 0), + make_msg(msg, mapped_route, 0), ), call.poll(), call.submit(make_msg(FilteredPayload(), mapped_route, 1)), call.poll(), call.submit( - make_msg("test_val", other_route, 2), + make_msg(msg, other_route, 2), ), call.poll(), call.submit( @@ -128,7 +142,7 @@ def test_filter_step() -> None: next_strategy.assert_has_calls(expected_calls) -def test_router() -> None: +def test_router(metric: IngestMetric, transformed_metric: IngestMetric) -> None: """ Verifies the Router step properly updates the waypoints of a RoutedValue message. """ @@ -136,8 +150,8 @@ def test_router() -> None: other_route = Route(source="source1", waypoints=["other_branch"]) pipeline = Pipeline() - def dummy_routing_func(message: str) -> str: - return "map" if message == "test_val" else "other" + def dummy_routing_func(message: StreamsMessage[IngestMetric]) -> str: + return "map" if message.payload["name"] != "new_metric" else "other" pipeline_router = Router( name="myrouter", @@ -154,10 +168,13 @@ def dummy_routing_func(message: str) -> str: next_strategy = mock.Mock(spec=ProcessingStrategy) strategy = arroyo_router.build(next_strategy, commit=mock.Mock(spec=Commit)) + msg = StreamsMessage(metric, [], time.time(), None) + filtered_msg = StreamsMessage(transformed_metric, [], time.time(), None) + messages = [ - make_msg("test_val", Route(source="source1", waypoints=[]), 0), - make_msg("not_test_val", Route(source="source1", waypoints=[]), 1), - make_msg("test_val", Route(source="source1", waypoints=[]), 2), + make_msg(msg, Route(source="source1", waypoints=[]), 0), + make_msg(filtered_msg, Route(source="source1", waypoints=[]), 1), + make_msg(msg, Route(source="source1", waypoints=[]), 2), make_msg(FilteredPayload(), Route(source="source1", waypoints=[]), 3), ] @@ -167,13 +184,13 @@ def dummy_routing_func(message: str) -> str: expected_calls = [ call.submit( - make_msg("test_val", mapped_route, 0), + make_msg(msg, mapped_route, 0), ), call.poll(), - call.submit(make_msg("not_test_val", other_route, 1)), + call.submit(make_msg(filtered_msg, other_route, 1)), call.poll(), call.submit( - make_msg("test_val", mapped_route, 2), + make_msg(msg, mapped_route, 2), ), call.poll(), call.submit( @@ -185,7 +202,7 @@ def dummy_routing_func(message: str) -> str: next_strategy.assert_has_calls(expected_calls) -def test_broadcast() -> None: +def test_broadcast(metric: IngestMetric, transformed_metric: IngestMetric) -> None: """ Verifies the Broadcast step properly updates the waypoints the messages it produces. """ @@ -207,14 +224,17 @@ def test_broadcast() -> None: next_strategy = mock.Mock(spec=ProcessingStrategy) strategy = arroyo_broadcast.build(next_strategy, commit=mock.Mock(spec=Commit)) + msg = StreamsMessage(metric, [], time.time(), None) + filtered_msg = StreamsMessage(transformed_metric, [], time.time(), None) + messages = [ make_value_msg( - "test_val", + msg, Route(source="source1", waypoints=[]), 0, ), make_value_msg( - "not_test_val", + filtered_msg, Route(source="source1", waypoints=[]), 1, ), @@ -230,11 +250,11 @@ def test_broadcast() -> None: strategy.poll() expected_calls = [ - call.submit(make_value_msg("test_val", mapped_route, 0)), - call.submit(make_value_msg("test_val", other_route, 0)), + call.submit(make_value_msg(msg, mapped_route, 0)), + call.submit(make_value_msg(msg, other_route, 0)), call.poll(), - call.submit(make_value_msg("not_test_val", mapped_route, 1)), - call.submit(make_value_msg("not_test_val", other_route, 1)), + call.submit(make_value_msg(filtered_msg, mapped_route, 1)), + call.submit(make_value_msg(filtered_msg, other_route, 1)), call.poll(), call.submit( make_value_msg( @@ -249,7 +269,7 @@ def test_broadcast() -> None: next_strategy.assert_has_calls(expected_calls) -def test_sink() -> None: +def test_sink(metric: IngestMetric) -> None: """ Sends routed messages through a Sink and verifies that only the messages for the specified sink are sent to the producer. @@ -263,9 +283,12 @@ def test_sink() -> None: next_strategy, commit=mock.Mock(spec=Commit) ) + # assume this is a serialized msg being produced to Kafka + msg = StreamsMessage(json.dumps(metric).encode("utf-8"), [], time.time(), None) + messages = [ - make_msg("test_val", mapped_route, 0), - make_msg("test_val", other_route, 1), + make_msg(msg, mapped_route, 0), + make_msg(msg, other_route, 1), make_msg(FilteredPayload(), mapped_route, 2), ] @@ -273,12 +296,10 @@ def test_sink() -> None: strategy.submit(message) strategy.poll() - producer.produce.assert_called_with( - Topic("test_topic"), KafkaPayload(None, "test_val".encode("utf-8"), []) - ) + producer.produce.assert_called_with(Topic("test_topic"), KafkaPayload(None, msg.payload, [])) -def test_reduce_step(transformer: Callable[[], TransformerBatch]) -> None: +def test_reduce_step(transformer: Callable[[], TransformerBatch], metric: IngestMetric) -> None: """ Send messages for different routes through the Arroyo RunTask strategy generate by the pipeline Reduce step. @@ -303,9 +324,10 @@ def test_reduce_step(transformer: Callable[[], TransformerBatch]) -> None: next_strategy = mock.Mock(spec=ProcessingStrategy) strategy = arroyo_reduce.build(next_strategy, commit=mock.Mock(spec=Commit)) + msg = StreamsMessage(metric, [], ANY, None) messages = [ - make_msg("test_val", mapped_route, 0), - make_msg("test_val", other_route, 1), # wrong route + make_msg(msg, mapped_route, 0), + make_msg(msg, other_route, 1), make_msg(FilteredPayload(), mapped_route, 3), # to be filtered out ] @@ -318,14 +340,45 @@ def test_reduce_step(transformer: Callable[[], TransformerBatch]) -> None: with mock.patch("time.time", return_value=cur_time + 8.0): strategy.poll() + new_msg = StreamsMessage( + [metric], [], ANY, None + ) # since Reduce produces a timestamp based on when the aggregate result is produced, we mock the timestamp + + other_route_msg = Message( + BrokerValue( + payload=RoutedValue(route=other_route, payload=msg), + partition=Partition(Topic("test_topic"), 0), + offset=1, + timestamp=datetime(2025, 1, 1, 12, 0), + ), + ) + + mapped_msg = Message( + Value( + payload=RoutedValue(route=mapped_route, payload=new_msg), + committable={Partition(Topic("test_topic"), 0): 1}, + timestamp=None, + ) + ) + + filtered_msg = Message( + BrokerValue( + payload=FilteredPayload(), + partition=Partition(Topic("test_topic"), 0), + offset=3, + timestamp=datetime(2025, 1, 1, 12, 0), + ), + ) + expected_calls = [ call.poll(), - call.submit(make_msg("test_val", other_route, 1)), + call.submit(other_route_msg), call.poll(), - call.submit(make_msg(FilteredPayload(), mapped_route, 3)), + call.submit(filtered_msg), call.poll(), - call.submit(make_value_msg("test_val", mapped_route, 1, include_timestamp=False)), + call.submit(mapped_msg), call.poll(), ] + next_strategy next_strategy.assert_has_calls(expected_calls) diff --git a/sentry_streams/tests/pipeline/test_chain.py b/sentry_streams/tests/pipeline/test_chain.py index 3303781..c9fc9df 100644 --- a/sentry_streams/tests/pipeline/test_chain.py +++ b/sentry_streams/tests/pipeline/test_chain.py @@ -3,6 +3,7 @@ from unittest import mock import pytest +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric from sentry_streams.pipeline.chain import ( Applier, @@ -56,10 +57,10 @@ def test_broadcast() -> None: .broadcast( "route_to_all", [ - segment(name="route1") + segment(name="route1", msg_type=IngestMetric) .apply("transform2", Map(lambda msg: msg)) .sink("myoutput1", "transformed-events-2"), - segment(name="route2") + segment(name="route2", msg_type=IngestMetric) .apply("transform3", Map(lambda msg: msg)) .sink("myoutput2", "transformed-events-3"), ], @@ -117,10 +118,10 @@ def test_router() -> None: "route_to_one", routing_function=routing_func, routes={ - Routes.ROUTE1: segment(name="route1") + Routes.ROUTE1: segment(name="route1", msg_type=IngestMetric) .apply("transform2", Map(lambda msg: msg)) .sink("myoutput1", "transformed-events-2"), - Routes.ROUTE2: segment(name="route2") + Routes.ROUTE2: segment(name="route2", msg_type=IngestMetric) .apply("transform3", Map(lambda msg: msg)) .sink("myoutput2", "transformed-events-3"), }, diff --git a/sentry_streams/tests/test_runner.py b/sentry_streams/tests/test_runner.py index d71a010..cc071ac 100644 --- a/sentry_streams/tests/test_runner.py +++ b/sentry_streams/tests/test_runner.py @@ -2,6 +2,7 @@ from typing import Any import pytest +from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric from sentry_streams.adapters.loader import load_adapter from sentry_streams.adapters.stream_adapter import PipelineConfig, RuntimeTranslator @@ -21,22 +22,22 @@ class RouterBranch(Enum): @pytest.fixture def create_pipeline() -> Pipeline: broadcast_branch_1 = ( - segment("branch1") + segment("branch1", IngestMetric) .apply("map2", Map(function=lambda x: x)) .route( "router1", routing_function=lambda x: RouterBranch.BRANCH1, routes={ - RouterBranch.BRANCH1: segment("map4_segment").apply( + RouterBranch.BRANCH1: segment("map4_segment", IngestMetric).apply( "map4", Map(function=lambda x: x) ), - RouterBranch.BRANCH2: segment("map5_segment").apply( + RouterBranch.BRANCH2: segment("map5_segment", IngestMetric).apply( "map5", Map(function=lambda x: x) ), }, ) ) - broadcast_branch_2 = segment("branch2").apply("map3", Map(function=lambda x: x)) + broadcast_branch_2 = segment("branch2", IngestMetric).apply("map3", Map(function=lambda x: x)) test_pipeline = ( streaming_source("source1", stream_name="foo") diff --git a/sentry_streams/uv.lock b/sentry_streams/uv.lock index 20fa279..19ec95a 100644 --- a/sentry_streams/uv.lock +++ b/sentry_streams/uv.lock @@ -1,5 +1,4 @@ version = 1 -revision = 1 requires-python = ">=3.11" [[package]] @@ -146,6 +145,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8f/d7/9322c609343d929e75e7e5e6255e614fcc67572cfd083959cdef3b7aad79/docutils-0.21.2-py3-none-any.whl", hash = "sha256:dafca5b9e384f0e419294eb4d2ff9fa826435bf15f15b7bd45723e8ad76811b2", size = 587408 }, ] +[[package]] +name = "fastjsonschema" +version = "2.21.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/8b/50/4b769ce1ac4071a1ef6d86b1a3fb56cdc3a37615e8c5519e1af96cdac366/fastjsonschema-2.21.1.tar.gz", hash = "sha256:794d4f0a58f848961ba16af7b9c85a3e88cd360df008c59aac6fc5ae9323b5d4", size = 373939 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/90/2b/0817a2b257fe88725c25589d89aec060581aabf668707a8d03b2e9e0cb2a/fastjsonschema-2.21.1-py3-none-any.whl", hash = "sha256:c9e5b7e908310918cf494a434eeb31384dd84a98b57a30bcb1f535015b554667", size = 23924 }, +] + [[package]] name = "filelock" version = "3.18.0" @@ -155,6 +163,56 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/4d/36/2a115987e2d8c300a974597416d9de88f2444426de9571f4b59b2cca3acc/filelock-3.18.0-py3-none-any.whl", hash = "sha256:c401f4f8377c4464e6db25fff06205fd89bdd83b65eb0488ed1b160f780e21de", size = 16215 }, ] +[[package]] +name = "grpc-stubs" +version = "1.53.0.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "grpcio" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/55/8d/14c6b8c2fa5d82ffe96aed53b1c38e2a9fb6a57c5836966545f3080e5adc/grpc-stubs-1.53.0.5.tar.gz", hash = "sha256:3e1b642775cbc3e0c6332cfcedfccb022176db87e518757bef3a1241397be406", size = 14259 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e0/86/147d2ccaf9b4b81407734b9abc1152aff39836e8e05be3bf069f9374c021/grpc_stubs-1.53.0.5-py3-none-any.whl", hash = "sha256:04183fb65a1b166a1febb9627e3d9647d3926ccc2dfe049fe7b6af243428dbe1", size = 16497 }, +] + +[[package]] +name = "grpcio" +version = "1.71.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/1c/95/aa11fc09a85d91fbc7dd405dcb2a1e0256989d67bf89fa65ae24b3ba105a/grpcio-1.71.0.tar.gz", hash = "sha256:2b85f7820475ad3edec209d3d89a7909ada16caab05d3f2e08a7e8ae3200a55c", size = 12549828 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/63/04/a085f3ad4133426f6da8c1becf0749872a49feb625a407a2e864ded3fb12/grpcio-1.71.0-cp311-cp311-linux_armv7l.whl", hash = "sha256:d6aa986318c36508dc1d5001a3ff169a15b99b9f96ef5e98e13522c506b37eef", size = 5210453 }, + { url = "https://files.pythonhosted.org/packages/b4/d5/0bc53ed33ba458de95020970e2c22aa8027b26cc84f98bea7fcad5d695d1/grpcio-1.71.0-cp311-cp311-macosx_10_14_universal2.whl", hash = "sha256:d2c170247315f2d7e5798a22358e982ad6eeb68fa20cf7a820bb74c11f0736e7", size = 11347567 }, + { url = "https://files.pythonhosted.org/packages/e3/6d/ce334f7e7a58572335ccd61154d808fe681a4c5e951f8a1ff68f5a6e47ce/grpcio-1.71.0-cp311-cp311-manylinux_2_17_aarch64.whl", hash = "sha256:e6f83a583ed0a5b08c5bc7a3fe860bb3c2eac1f03f1f63e0bc2091325605d2b7", size = 5696067 }, + { url = "https://files.pythonhosted.org/packages/05/4a/80befd0b8b1dc2b9ac5337e57473354d81be938f87132e147c4a24a581bd/grpcio-1.71.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4be74ddeeb92cc87190e0e376dbc8fc7736dbb6d3d454f2fa1f5be1dee26b9d7", size = 6348377 }, + { url = "https://files.pythonhosted.org/packages/c7/67/cbd63c485051eb78663355d9efd1b896cfb50d4a220581ec2cb9a15cd750/grpcio-1.71.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4dd0dfbe4d5eb1fcfec9490ca13f82b089a309dc3678e2edabc144051270a66e", size = 5940407 }, + { url = "https://files.pythonhosted.org/packages/98/4b/7a11aa4326d7faa499f764eaf8a9b5a0eb054ce0988ee7ca34897c2b02ae/grpcio-1.71.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:a2242d6950dc892afdf9e951ed7ff89473aaf744b7d5727ad56bdaace363722b", size = 6030915 }, + { url = "https://files.pythonhosted.org/packages/eb/a2/cdae2d0e458b475213a011078b0090f7a1d87f9a68c678b76f6af7c6ac8c/grpcio-1.71.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:0fa05ee31a20456b13ae49ad2e5d585265f71dd19fbd9ef983c28f926d45d0a7", size = 6648324 }, + { url = "https://files.pythonhosted.org/packages/27/df/f345c8daaa8d8574ce9869f9b36ca220c8845923eb3087e8f317eabfc2a8/grpcio-1.71.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:3d081e859fb1ebe176de33fc3adb26c7d46b8812f906042705346b314bde32c3", size = 6197839 }, + { url = "https://files.pythonhosted.org/packages/f2/2c/cd488dc52a1d0ae1bad88b0d203bc302efbb88b82691039a6d85241c5781/grpcio-1.71.0-cp311-cp311-win32.whl", hash = "sha256:d6de81c9c00c8a23047136b11794b3584cdc1460ed7cbc10eada50614baa1444", size = 3619978 }, + { url = "https://files.pythonhosted.org/packages/ee/3f/cf92e7e62ccb8dbdf977499547dfc27133124d6467d3a7d23775bcecb0f9/grpcio-1.71.0-cp311-cp311-win_amd64.whl", hash = "sha256:24e867651fc67717b6f896d5f0cac0ec863a8b5fb7d6441c2ab428f52c651c6b", size = 4282279 }, + { url = "https://files.pythonhosted.org/packages/4c/83/bd4b6a9ba07825bd19c711d8b25874cd5de72c2a3fbf635c3c344ae65bd2/grpcio-1.71.0-cp312-cp312-linux_armv7l.whl", hash = "sha256:0ff35c8d807c1c7531d3002be03221ff9ae15712b53ab46e2a0b4bb271f38537", size = 5184101 }, + { url = "https://files.pythonhosted.org/packages/31/ea/2e0d90c0853568bf714693447f5c73272ea95ee8dad107807fde740e595d/grpcio-1.71.0-cp312-cp312-macosx_10_14_universal2.whl", hash = "sha256:b78a99cd1ece4be92ab7c07765a0b038194ded2e0a26fd654591ee136088d8d7", size = 11310927 }, + { url = "https://files.pythonhosted.org/packages/ac/bc/07a3fd8af80467390af491d7dc66882db43884128cdb3cc8524915e0023c/grpcio-1.71.0-cp312-cp312-manylinux_2_17_aarch64.whl", hash = "sha256:dc1a1231ed23caac1de9f943d031f1bc38d0f69d2a3b243ea0d664fc1fbd7fec", size = 5654280 }, + { url = "https://files.pythonhosted.org/packages/16/af/21f22ea3eed3d0538b6ef7889fce1878a8ba4164497f9e07385733391e2b/grpcio-1.71.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e6beeea5566092c5e3c4896c6d1d307fb46b1d4bdf3e70c8340b190a69198594", size = 6312051 }, + { url = "https://files.pythonhosted.org/packages/49/9d/e12ddc726dc8bd1aa6cba67c85ce42a12ba5b9dd75d5042214a59ccf28ce/grpcio-1.71.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d5170929109450a2c031cfe87d6716f2fae39695ad5335d9106ae88cc32dc84c", size = 5910666 }, + { url = "https://files.pythonhosted.org/packages/d9/e9/38713d6d67aedef738b815763c25f092e0454dc58e77b1d2a51c9d5b3325/grpcio-1.71.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:5b08d03ace7aca7b2fadd4baf291139b4a5f058805a8327bfe9aece7253b6d67", size = 6012019 }, + { url = "https://files.pythonhosted.org/packages/80/da/4813cd7adbae6467724fa46c952d7aeac5e82e550b1c62ed2aeb78d444ae/grpcio-1.71.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:f903017db76bf9cc2b2d8bdd37bf04b505bbccad6be8a81e1542206875d0e9db", size = 6637043 }, + { url = "https://files.pythonhosted.org/packages/52/ca/c0d767082e39dccb7985c73ab4cf1d23ce8613387149e9978c70c3bf3b07/grpcio-1.71.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:469f42a0b410883185eab4689060a20488a1a0a00f8bbb3cbc1061197b4c5a79", size = 6186143 }, + { url = "https://files.pythonhosted.org/packages/00/61/7b2c8ec13303f8fe36832c13d91ad4d4ba57204b1c723ada709c346b2271/grpcio-1.71.0-cp312-cp312-win32.whl", hash = "sha256:ad9f30838550695b5eb302add33f21f7301b882937460dd24f24b3cc5a95067a", size = 3604083 }, + { url = "https://files.pythonhosted.org/packages/fd/7c/1e429c5fb26122055d10ff9a1d754790fb067d83c633ff69eddcf8e3614b/grpcio-1.71.0-cp312-cp312-win_amd64.whl", hash = "sha256:652350609332de6dac4ece254e5d7e1ff834e203d6afb769601f286886f6f3a8", size = 4272191 }, + { url = "https://files.pythonhosted.org/packages/04/dd/b00cbb45400d06b26126dcfdbdb34bb6c4f28c3ebbd7aea8228679103ef6/grpcio-1.71.0-cp313-cp313-linux_armv7l.whl", hash = "sha256:cebc1b34ba40a312ab480ccdb396ff3c529377a2fce72c45a741f7215bfe8379", size = 5184138 }, + { url = "https://files.pythonhosted.org/packages/ed/0a/4651215983d590ef53aac40ba0e29dda941a02b097892c44fa3357e706e5/grpcio-1.71.0-cp313-cp313-macosx_10_14_universal2.whl", hash = "sha256:85da336e3649a3d2171e82f696b5cad2c6231fdd5bad52616476235681bee5b3", size = 11310747 }, + { url = "https://files.pythonhosted.org/packages/57/a3/149615b247f321e13f60aa512d3509d4215173bdb982c9098d78484de216/grpcio-1.71.0-cp313-cp313-manylinux_2_17_aarch64.whl", hash = "sha256:f9a412f55bb6e8f3bb000e020dbc1e709627dcb3a56f6431fa7076b4c1aab0db", size = 5653991 }, + { url = "https://files.pythonhosted.org/packages/ca/56/29432a3e8d951b5e4e520a40cd93bebaa824a14033ea8e65b0ece1da6167/grpcio-1.71.0-cp313-cp313-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:47be9584729534660416f6d2a3108aaeac1122f6b5bdbf9fd823e11fe6fbaa29", size = 6312781 }, + { url = "https://files.pythonhosted.org/packages/a3/f8/286e81a62964ceb6ac10b10925261d4871a762d2a763fbf354115f9afc98/grpcio-1.71.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7c9c80ac6091c916db81131d50926a93ab162a7e97e4428ffc186b6e80d6dda4", size = 5910479 }, + { url = "https://files.pythonhosted.org/packages/35/67/d1febb49ec0f599b9e6d4d0d44c2d4afdbed9c3e80deb7587ec788fcf252/grpcio-1.71.0-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:789d5e2a3a15419374b7b45cd680b1e83bbc1e52b9086e49308e2c0b5bbae6e3", size = 6013262 }, + { url = "https://files.pythonhosted.org/packages/a1/04/f9ceda11755f0104a075ad7163fc0d96e2e3a9fe25ef38adfc74c5790daf/grpcio-1.71.0-cp313-cp313-musllinux_1_1_i686.whl", hash = "sha256:1be857615e26a86d7363e8a163fade914595c81fec962b3d514a4b1e8760467b", size = 6643356 }, + { url = "https://files.pythonhosted.org/packages/fb/ce/236dbc3dc77cf9a9242adcf1f62538734ad64727fabf39e1346ad4bd5c75/grpcio-1.71.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:a76d39b5fafd79ed604c4be0a869ec3581a172a707e2a8d7a4858cb05a5a7637", size = 6186564 }, + { url = "https://files.pythonhosted.org/packages/10/fd/b3348fce9dd4280e221f513dd54024e765b21c348bc475516672da4218e9/grpcio-1.71.0-cp313-cp313-win32.whl", hash = "sha256:74258dce215cb1995083daa17b379a1a5a87d275387b7ffe137f1d5131e2cfbb", size = 3601890 }, + { url = "https://files.pythonhosted.org/packages/be/f8/db5d5f3fc7e296166286c2a397836b8b042f7ad1e11028d82b061701f0f7/grpcio-1.71.0-cp313-cp313-win_amd64.whl", hash = "sha256:22c3bc8d488c039a199f7a003a38cb7635db6656fa96437a8accde8322ce2366", size = 4273308 }, +] + [[package]] name = "identify" version = "2.6.9" @@ -278,6 +336,47 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/4f/65/6079a46068dfceaeabb5dcad6d674f5f5c61a6fa5673746f42a9f4c233b3/MarkupSafe-3.0.2-cp313-cp313t-win_amd64.whl", hash = "sha256:e444a31f8db13eb18ada366ab3cf45fd4b31e4db1236a4448f68778c1d1a5a2f", size = 15739 }, ] +[[package]] +name = "msgpack" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/cb/d0/7555686ae7ff5731205df1012ede15dd9d927f6227ea151e901c7406af4f/msgpack-1.1.0.tar.gz", hash = "sha256:dd432ccc2c72b914e4cb77afce64aab761c1137cc698be3984eee260bcb2896e", size = 167260 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b7/5e/a4c7154ba65d93be91f2f1e55f90e76c5f91ccadc7efc4341e6f04c8647f/msgpack-1.1.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:3d364a55082fb2a7416f6c63ae383fbd903adb5a6cf78c5b96cc6316dc1cedc7", size = 150803 }, + { url = "https://files.pythonhosted.org/packages/60/c2/687684164698f1d51c41778c838d854965dd284a4b9d3a44beba9265c931/msgpack-1.1.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:79ec007767b9b56860e0372085f8504db5d06bd6a327a335449508bbee9648fa", size = 84343 }, + { url = "https://files.pythonhosted.org/packages/42/ae/d3adea9bb4a1342763556078b5765e666f8fdf242e00f3f6657380920972/msgpack-1.1.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:6ad622bf7756d5a497d5b6836e7fc3752e2dd6f4c648e24b1803f6048596f701", size = 81408 }, + { url = "https://files.pythonhosted.org/packages/dc/17/6313325a6ff40ce9c3207293aee3ba50104aed6c2c1559d20d09e5c1ff54/msgpack-1.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8e59bca908d9ca0de3dc8684f21ebf9a690fe47b6be93236eb40b99af28b6ea6", size = 396096 }, + { url = "https://files.pythonhosted.org/packages/a8/a1/ad7b84b91ab5a324e707f4c9761633e357820b011a01e34ce658c1dda7cc/msgpack-1.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5e1da8f11a3dd397f0a32c76165cf0c4eb95b31013a94f6ecc0b280c05c91b59", size = 403671 }, + { url = "https://files.pythonhosted.org/packages/bb/0b/fd5b7c0b308bbf1831df0ca04ec76fe2f5bf6319833646b0a4bd5e9dc76d/msgpack-1.1.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:452aff037287acb1d70a804ffd022b21fa2bb7c46bee884dbc864cc9024128a0", size = 387414 }, + { url = "https://files.pythonhosted.org/packages/f0/03/ff8233b7c6e9929a1f5da3c7860eccd847e2523ca2de0d8ef4878d354cfa/msgpack-1.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8da4bf6d54ceed70e8861f833f83ce0814a2b72102e890cbdfe4b34764cdd66e", size = 383759 }, + { url = "https://files.pythonhosted.org/packages/1f/1b/eb82e1fed5a16dddd9bc75f0854b6e2fe86c0259c4353666d7fab37d39f4/msgpack-1.1.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:41c991beebf175faf352fb940bf2af9ad1fb77fd25f38d9142053914947cdbf6", size = 394405 }, + { url = "https://files.pythonhosted.org/packages/90/2e/962c6004e373d54ecf33d695fb1402f99b51832631e37c49273cc564ffc5/msgpack-1.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:a52a1f3a5af7ba1c9ace055b659189f6c669cf3657095b50f9602af3a3ba0fe5", size = 396041 }, + { url = "https://files.pythonhosted.org/packages/f8/20/6e03342f629474414860c48aeffcc2f7f50ddaf351d95f20c3f1c67399a8/msgpack-1.1.0-cp311-cp311-win32.whl", hash = "sha256:58638690ebd0a06427c5fe1a227bb6b8b9fdc2bd07701bec13c2335c82131a88", size = 68538 }, + { url = "https://files.pythonhosted.org/packages/aa/c4/5a582fc9a87991a3e6f6800e9bb2f3c82972912235eb9539954f3e9997c7/msgpack-1.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd2906780f25c8ed5d7b323379f6138524ba793428db5d0e9d226d3fa6aa1788", size = 74871 }, + { url = "https://files.pythonhosted.org/packages/e1/d6/716b7ca1dbde63290d2973d22bbef1b5032ca634c3ff4384a958ec3f093a/msgpack-1.1.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:d46cf9e3705ea9485687aa4001a76e44748b609d260af21c4ceea7f2212a501d", size = 152421 }, + { url = "https://files.pythonhosted.org/packages/70/da/5312b067f6773429cec2f8f08b021c06af416bba340c912c2ec778539ed6/msgpack-1.1.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:5dbad74103df937e1325cc4bfeaf57713be0b4f15e1c2da43ccdd836393e2ea2", size = 85277 }, + { url = "https://files.pythonhosted.org/packages/28/51/da7f3ae4462e8bb98af0d5bdf2707f1b8c65a0d4f496e46b6afb06cbc286/msgpack-1.1.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:58dfc47f8b102da61e8949708b3eafc3504509a5728f8b4ddef84bd9e16ad420", size = 82222 }, + { url = "https://files.pythonhosted.org/packages/33/af/dc95c4b2a49cff17ce47611ca9ba218198806cad7796c0b01d1e332c86bb/msgpack-1.1.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4676e5be1b472909b2ee6356ff425ebedf5142427842aa06b4dfd5117d1ca8a2", size = 392971 }, + { url = "https://files.pythonhosted.org/packages/f1/54/65af8de681fa8255402c80eda2a501ba467921d5a7a028c9c22a2c2eedb5/msgpack-1.1.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:17fb65dd0bec285907f68b15734a993ad3fc94332b5bb21b0435846228de1f39", size = 401403 }, + { url = "https://files.pythonhosted.org/packages/97/8c/e333690777bd33919ab7024269dc3c41c76ef5137b211d776fbb404bfead/msgpack-1.1.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a51abd48c6d8ac89e0cfd4fe177c61481aca2d5e7ba42044fd218cfd8ea9899f", size = 385356 }, + { url = "https://files.pythonhosted.org/packages/57/52/406795ba478dc1c890559dd4e89280fa86506608a28ccf3a72fbf45df9f5/msgpack-1.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2137773500afa5494a61b1208619e3871f75f27b03bcfca7b3a7023284140247", size = 383028 }, + { url = "https://files.pythonhosted.org/packages/e7/69/053b6549bf90a3acadcd8232eae03e2fefc87f066a5b9fbb37e2e608859f/msgpack-1.1.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:398b713459fea610861c8a7b62a6fec1882759f308ae0795b5413ff6a160cf3c", size = 391100 }, + { url = "https://files.pythonhosted.org/packages/23/f0/d4101d4da054f04274995ddc4086c2715d9b93111eb9ed49686c0f7ccc8a/msgpack-1.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:06f5fd2f6bb2a7914922d935d3b8bb4a7fff3a9a91cfce6d06c13bc42bec975b", size = 394254 }, + { url = "https://files.pythonhosted.org/packages/1c/12/cf07458f35d0d775ff3a2dc5559fa2e1fcd06c46f1ef510e594ebefdca01/msgpack-1.1.0-cp312-cp312-win32.whl", hash = "sha256:ad33e8400e4ec17ba782f7b9cf868977d867ed784a1f5f2ab46e7ba53b6e1e1b", size = 69085 }, + { url = "https://files.pythonhosted.org/packages/73/80/2708a4641f7d553a63bc934a3eb7214806b5b39d200133ca7f7afb0a53e8/msgpack-1.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:115a7af8ee9e8cddc10f87636767857e7e3717b7a2e97379dc2054712693e90f", size = 75347 }, + { url = "https://files.pythonhosted.org/packages/c8/b0/380f5f639543a4ac413e969109978feb1f3c66e931068f91ab6ab0f8be00/msgpack-1.1.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:071603e2f0771c45ad9bc65719291c568d4edf120b44eb36324dcb02a13bfddf", size = 151142 }, + { url = "https://files.pythonhosted.org/packages/c8/ee/be57e9702400a6cb2606883d55b05784fada898dfc7fd12608ab1fdb054e/msgpack-1.1.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:0f92a83b84e7c0749e3f12821949d79485971f087604178026085f60ce109330", size = 84523 }, + { url = "https://files.pythonhosted.org/packages/7e/3a/2919f63acca3c119565449681ad08a2f84b2171ddfcff1dba6959db2cceb/msgpack-1.1.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:4a1964df7b81285d00a84da4e70cb1383f2e665e0f1f2a7027e683956d04b734", size = 81556 }, + { url = "https://files.pythonhosted.org/packages/7c/43/a11113d9e5c1498c145a8925768ea2d5fce7cbab15c99cda655aa09947ed/msgpack-1.1.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:59caf6a4ed0d164055ccff8fe31eddc0ebc07cf7326a2aaa0dbf7a4001cd823e", size = 392105 }, + { url = "https://files.pythonhosted.org/packages/2d/7b/2c1d74ca6c94f70a1add74a8393a0138172207dc5de6fc6269483519d048/msgpack-1.1.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0907e1a7119b337971a689153665764adc34e89175f9a34793307d9def08e6ca", size = 399979 }, + { url = "https://files.pythonhosted.org/packages/82/8c/cf64ae518c7b8efc763ca1f1348a96f0e37150061e777a8ea5430b413a74/msgpack-1.1.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:65553c9b6da8166e819a6aa90ad15288599b340f91d18f60b2061f402b9a4915", size = 383816 }, + { url = "https://files.pythonhosted.org/packages/69/86/a847ef7a0f5ef3fa94ae20f52a4cacf596a4e4a010197fbcc27744eb9a83/msgpack-1.1.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:7a946a8992941fea80ed4beae6bff74ffd7ee129a90b4dd5cf9c476a30e9708d", size = 380973 }, + { url = "https://files.pythonhosted.org/packages/aa/90/c74cf6e1126faa93185d3b830ee97246ecc4fe12cf9d2d31318ee4246994/msgpack-1.1.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:4b51405e36e075193bc051315dbf29168d6141ae2500ba8cd80a522964e31434", size = 387435 }, + { url = "https://files.pythonhosted.org/packages/7a/40/631c238f1f338eb09f4acb0f34ab5862c4e9d7eda11c1b685471a4c5ea37/msgpack-1.1.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:b4c01941fd2ff87c2a934ee6055bda4ed353a7846b8d4f341c428109e9fcde8c", size = 399082 }, + { url = "https://files.pythonhosted.org/packages/e9/1b/fa8a952be252a1555ed39f97c06778e3aeb9123aa4cccc0fd2acd0b4e315/msgpack-1.1.0-cp313-cp313-win32.whl", hash = "sha256:7c9a35ce2c2573bada929e0b7b3576de647b0defbd25f5139dcdaba0ae35a4cc", size = 69037 }, + { url = "https://files.pythonhosted.org/packages/b6/bc/8bd826dd03e022153bfa1766dcdec4976d6c818865ed54223d71f07862b3/msgpack-1.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:bce7d9e614a04d0883af0b3d4d501171fbfca038f12c77fa838d9f198147a23f", size = 75140 }, +] + [[package]] name = "mypy" version = "1.15.0" @@ -370,6 +469,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/88/74/a88bf1b1efeae488a0c0b7bdf71429c313722d1fc0f377537fbe554e6180/pre_commit-4.2.0-py2.py3-none-any.whl", hash = "sha256:a009ca7205f1eb497d10b845e52c838a98b6cdd2102a6c8e4540e94ee75c58bd", size = 220707 }, ] +[[package]] +name = "protobuf" +version = "5.29.4" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/17/7d/b9dca7365f0e2c4fa7c193ff795427cfa6290147e5185ab11ece280a18e7/protobuf-5.29.4.tar.gz", hash = "sha256:4f1dfcd7997b31ef8f53ec82781ff434a28bf71d9102ddde14d076adcfc78c99", size = 424902 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9a/b2/043a1a1a20edd134563699b0e91862726a0dc9146c090743b6c44d798e75/protobuf-5.29.4-cp310-abi3-win32.whl", hash = "sha256:13eb236f8eb9ec34e63fc8b1d6efd2777d062fa6aaa68268fb67cf77f6839ad7", size = 422709 }, + { url = "https://files.pythonhosted.org/packages/79/fc/2474b59570daa818de6124c0a15741ee3e5d6302e9d6ce0bdfd12e98119f/protobuf-5.29.4-cp310-abi3-win_amd64.whl", hash = "sha256:bcefcdf3976233f8a502d265eb65ea740c989bacc6c30a58290ed0e519eb4b8d", size = 434506 }, + { url = "https://files.pythonhosted.org/packages/46/de/7c126bbb06aa0f8a7b38aaf8bd746c514d70e6a2a3f6dd460b3b7aad7aae/protobuf-5.29.4-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:307ecba1d852ec237e9ba668e087326a67564ef83e45a0189a772ede9e854dd0", size = 417826 }, + { url = "https://files.pythonhosted.org/packages/a2/b5/bade14ae31ba871a139aa45e7a8183d869efe87c34a4850c87b936963261/protobuf-5.29.4-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:aec4962f9ea93c431d5714ed1be1c93f13e1a8618e70035ba2b0564d9e633f2e", size = 319574 }, + { url = "https://files.pythonhosted.org/packages/46/88/b01ed2291aae68b708f7d334288ad5fb3e7aa769a9c309c91a0d55cb91b0/protobuf-5.29.4-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:d7d3f7d1d5a66ed4942d4fefb12ac4b14a29028b209d4bfb25c68ae172059922", size = 319672 }, + { url = "https://files.pythonhosted.org/packages/12/fb/a586e0c973c95502e054ac5f81f88394f24ccc7982dac19c515acd9e2c93/protobuf-5.29.4-py3-none-any.whl", hash = "sha256:3fde11b505e1597f71b875ef2fc52062b6a9740e5f7c8997ce878b6009145862", size = 172551 }, +] + [[package]] name = "pygments" version = "2.19.1" @@ -394,6 +507,12 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/30/3d/64ad57c803f1fa1e963a7946b6e0fea4a70df53c1a7fed304586539c2bac/pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820", size = 343634 }, ] +[[package]] +name = "python-rapidjson" +version = "1.8" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/0d/85/393d6bd7965402e92a77f1040935c5fc6403a0c7fa7018b03c6f4ba488c7/python-rapidjson-1.8.tar.gz", hash = "sha256:170c2ff97d01735f67afd0e1cb0aaa690cb69ae6016e020c6afd5e0ab9b39899", size = 222766 } + [[package]] name = "pyyaml" version = "6.0.2" @@ -564,6 +683,37 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/76/5a/98d1f61aa7d84b26b6b22c46e67331b4ba32fb7d7d2df85b7a57bc8b9f21/sentry_arroyo-2.20.6-py3-none-any.whl", hash = "sha256:d70eb2d6f599487d9f751f270fae5f93ef6287acf70701b80131e2bfb6a9e83a", size = 109268 }, ] +[[package]] +name = "sentry-kafka-schemas" +version = "1.2.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "fastjsonschema" }, + { name = "msgpack" }, + { name = "python-rapidjson" }, + { name = "pyyaml" }, + { name = "sentry-protos" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/89/b7/f2306a77ae1a1b27559dfa7d821aaf1d56f0caa272d187780999bee54ac4/sentry-kafka-schemas-1.2.0.tar.gz", hash = "sha256:06b9b686616f82e88625ca1ce8bcc0b8d40b3182942ac74d4f5ef135c3552fae", size = 142180 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c4/ea/0779ff5e4c43876d172e0ef36c3fa9ee18f8615e757a5d9a8c8809ab6f5a/sentry_kafka_schemas-1.2.0-py2.py3-none-any.whl", hash = "sha256:30a3798bdfb77d1c59c44bc72eca055a333faa31b98b2c6e04bcb8798eba16cb", size = 259391 }, +] + +[[package]] +name = "sentry-protos" +version = "0.1.70" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "grpc-stubs" }, + { name = "grpcio" }, + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/23/4b/7ff02993ab817a9fb31185fd268d1378395a742b54ea225b6fbefb17065f/sentry_protos-0.1.70.tar.gz", hash = "sha256:87623fb9cc2c3950aace05abb5f652cb01375279c92a76a9fc7c10f834b5a37e", size = 122121 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/50/fb/f7886bebe25a72ee8801e74a1e29970a01e6cad701f5f62eef9505ea3b9f/sentry_protos-0.1.70-py3-none-any.whl", hash = "sha256:11b8a7f5252f2524296b722ec421290dfc2894bd26f19cc6ee55a1cbdd0c18b6", size = 195486 }, +] + [[package]] name = "sentry-streams" version = "0.0.16" @@ -573,6 +723,7 @@ dependencies = [ { name = "pyyaml" }, { name = "requests" }, { name = "sentry-arroyo" }, + { name = "sentry-kafka-schemas" }, ] [package.dev-dependencies] @@ -597,6 +748,7 @@ requires-dist = [ { name = "pyyaml", specifier = ">=6.0.2" }, { name = "requests", specifier = ">=2.32.3" }, { name = "sentry-arroyo", specifier = ">=2.18.2" }, + { name = "sentry-kafka-schemas", specifier = ">=1.2.0" }, ] [package.metadata.requires-dev]