Skip to content

ref: Type out chains and steps #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open

ref: Type out chains and steps #99

wants to merge 20 commits into from

Conversation

ayirr7
Copy link
Member

@ayirr7 ayirr7 commented Apr 14, 2025

Does the following:

  • Links types of different steps of an ExtensibleChain
  • Provides a simple Message class to carry message metadata through the pipeline. This allows for decoupling certain steps, like parsing/deserialization from the stream source step.
  • Basic message decoders/encoders provided out of the box in the API (JSON, protobuf)
  • Basic header filtering in the source
  • Schema validation and transforming raw bytes into a schema-enforced message type
  • General type checks

Still to do:

  • Unit tests cleanup
  • Improve a bunch of naming (of interfaces, of files, etc.) -- I will get to this
  • Unfortunately a couple of small hitches with typing which I have called out in my PR comments below

@ayirr7 ayirr7 marked this pull request as draft April 14, 2025 21:41
@ayirr7
Copy link
Member Author

ayirr7 commented Apr 18, 2025

Got some offline reviews from @untitaker

Currently working on:

  • Adding schema validation to the Parser step and updating an example to use that
  • Seeing if types for certain steps should be more narrow (i.e. what the Serializer outputs)

@ayirr7 ayirr7 changed the title Riya/chain typing ref: Type out chains and steps Apr 21, 2025


# a message with a generic payload
class Message(Generic[TIn]):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to intentionally keep Message as bare-bones as possible. For example, I don't think the user should have to think about:

headers after the source step
partition key
offset management
However, they can provide a schema for messages and any additional info. e.g. for the generic metrics pipeline, maybe they annotate each message with the metric type.

Do let me know if something important is missing though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on hiding entirely offset management and partitions. They have no place in this abstraction.
I would revisit a few decisions though:

  • Headers. They are something the application logic may have to deal with for routing messages before payload parsing. The platform itself will need a way to make sense of message without the payload parsing. For exampe we will want to mark invalid messages or stale message
  • Timestamp. I think a concept of message timestamp is needed. Then the question is whether to introduce the broker timestamp and a second, optional event timestamp.

A few ideas:

  • Separate the concept of pre-parsing message and post-parsing. Before parsing you can only access headers and timestamp. After parsing you can access the parsed payload as well. This can be done with different Message classes or with the payload type. The goal is to discourage people from parsing bytes on their own. If the user wants to access the payload, it has to be a parsed payload.
  • Expose headers and separate them between internal ones managed by platform, mutable by the platform and the application ones: readonly and have to be present at the source. We will figure out later how to provide a more flexible set of application headers.
  • Add a timestamp field which is the source timestamp. We will figure out event timestamp another time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expose headers and separate them between internal ones managed by platform, mutable by the platform and the application ones: readonly and have to be present at the source. We will figure out later how to provide a more flexible set of application headers

With tasks, we have headers as as simple mutable map. Separating the headers out seems like additional complexity we could avoid. Our SDKs use message headers to pass trace_id and trace baggage which I assume you'll want for streaming as well.


return RoutedValue(
route=Route(source=self.source, waypoints=[]),
payload=StreamsMessage(schema=schema, payload=value),
Copy link
Member Author

@ayirr7 ayirr7 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This identifies the schema of the messages based on the topic it comes from. I chose to do it this way, where we wrap it in this Message and pass Message throughout the pipeline, so that the user can do flexible parsing/deserialization of messages. This way, instead of baking it into the Source, we can basically do parsing whenever in the pipeline using the Parser() step.

If we can just bake parsing/deserialization into the Source then all this is not needed.

"myunbatch",
FlatMap(
function=cast(
Union[Callable[[Message[MutableSequence[IngestMetric]]], Message[IngestMetric]], str],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because unbatch is a function with generic arguments, the user unfortunately has to explicitly cast it to the concrete type in their pipeline. This shouldn't be difficult because the incoming ExtensibleChain already has concrete type hints.

Obviously this specific cast() can also be avoided if users write and use a custom unbatcher.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it would be easier if we had Batch and Unbatch appliers without having to go through the FlatMap.

Serializer(serializer=json_serializer),
) # ExtensibleChain[bytes]

chain4 = chain3.sink(
Copy link
Member Author

@ayirr7 ayirr7 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split up the pipeline like this in the example because you can see the inferred type hints of each chain in a code editor

@ayirr7 ayirr7 marked this pull request as ready for review April 22, 2025 00:17
Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it goes in a good direction.

See the comments in line for details but my main feedback is to be more strict on the message interface. Specifically:

  • I would separate the concept of parsed message (only headers) and serialized (pre-parsing and post serialization) message where the payload is available already parsed. We will figure out later whether to relax this constraint. This would also make it clear that serialization has to happen before a sink.
  • Maybe later, introduce a difference in terms of types between serializable messages (where we can break segments) and any other type where we cannot break segments.

Also, what happens today for invalid messages? We will have to add support for the DLQ. It is ok to do it separately, but please call it out in the parser.

TRoute = TypeVar("TRoute")

TIn = TypeVar("TIn")
TOut = TypeVar("TOut")


# a message with a generic payload
class Message(Generic[TIn]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
I think you want to preserve the timestamp though. That can be useful.
re: schema, I think it may be a good idea.


from sentry_kafka_schemas.codecs import Codec

TIn = TypeVar("TIn")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this would be better named as TPayload. TIn was used in the chain to represent the the input of a function as opposed to TOut



# a message with a generic payload
class Message(Generic[TIn]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on hiding entirely offset management and partitions. They have no place in this abstraction.
I would revisit a few decisions though:

  • Headers. They are something the application logic may have to deal with for routing messages before payload parsing. The platform itself will need a way to make sense of message without the payload parsing. For exampe we will want to mark invalid messages or stale message
  • Timestamp. I think a concept of message timestamp is needed. Then the question is whether to introduce the broker timestamp and a second, optional event timestamp.

A few ideas:

  • Separate the concept of pre-parsing message and post-parsing. Before parsing you can only access headers and timestamp. After parsing you can access the parsed payload as well. This can be done with different Message classes or with the payload type. The goal is to discourage people from parsing bytes on their own. If the user wants to access the payload, it has to be a parsed payload.
  • Expose headers and separate them between internal ones managed by platform, mutable by the platform and the application ones: readonly and have to be present at the source. We will figure out later how to provide a more flexible set of application headers.
  • Add a timestamp field which is the source timestamp. We will figure out event timestamp another time.

self.payload = payload
self.schema = schema

def replace_payload(self, p: TIn) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IF you make this class a dataclass there is a standard way to replace fields in an immutable dataclass https://docs.python.org/3/library/dataclasses.html#dataclasses.replace. So you can keep the class immutable but allowing to replace the payload creating a new message object.

class Map(Applier[TIn, TOut], Generic[TIn, TOut]):
function: Union[Callable[[TIn], TOut], str]
class Map(Applier[Message[TIn], Message[TOut]], Generic[TIn, TOut]):
function: Union[Callable[[Message[TIn]], Message[TOut]], str]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the typing of the function here.
A Map transforms the payload of a message. Maybe we can consider letting the Map add headers, The Map should definitively not transform the timestamp. I think we should not allow the Map change the identity of the Message object or instantiate a new one.

Consider passing the message in as immutable and only allow the payload out.

pipeline._add_start(Branch(name=name, ctx=pipeline))
return pipeline


def streaming_source(name: str, stream_name: str) -> ExtensibleChain:
def streaming_source(
name: str, stream_name: str, header_filter: Optional[Tuple[str, bytes]] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we support header_filter here we need to support it in the router as well. That is going to be a common scenario.
If you want to do that in a separate PR that's ok, please file a ticket.

Comment on lines +64 to +71
filtered = False
if self.header_filter:
headers: Headers = message.payload.headers
if self.header_filter not in headers:
filtered = True

if filtered:
return FilteredPayload()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if self.header_filter:
    headers: Headers = message.payload.headers
    if self.header_filter not in headers:
           return FilteredPayload()

else:
value = message.payload.value
try:
schema: Codec[Any] = get_codec(self.stream_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should not need to run the get_codec function for every message. I'd get the instance of the schema only once and append it as a flyweight object to all messages.

"myunbatch",
FlatMap(
function=cast(
Union[Callable[[Message[MutableSequence[IngestMetric]]], Message[IngestMetric]], str],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it would be easier if we had Batch and Unbatch appliers without having to go through the FlatMap.

@@ -0,0 +1,31 @@
from typing import Any

from sentry_streams.pipeline.message import Message
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests please ensuring it works for protobuf as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need a protobuf message type published to do that. If you need a hand with sentry-protos let me know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants