Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion faststream/confluent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
try:
from .annotations import KafkaMessage
from .broker import KafkaBroker, KafkaPublisher, KafkaRoute, KafkaRouter
from .response import KafkaPublishCommand, KafkaResponse
from .response import KafkaPublishCommand, KafkaPublishMessage, KafkaResponse
from .schemas import TopicPartition
from .testing import TestKafkaBroker

Expand All @@ -19,6 +19,7 @@
"KafkaBroker",
"KafkaMessage",
"KafkaPublishCommand",
"KafkaPublishMessage",
"KafkaPublisher",
"KafkaResponse",
"KafkaRoute",
Expand Down
6 changes: 0 additions & 6 deletions faststream/confluent/publisher/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from functools import wraps
from typing import TYPE_CHECKING, Any

from faststream.exceptions import SetupError

from .config import KafkaPublisherConfig, KafkaPublisherSpecificationConfig
from .specification import KafkaPublisherSpecification
from .usecase import BatchPublisher, DefaultPublisher
Expand Down Expand Up @@ -54,10 +52,6 @@ def create_publisher(

publisher: BatchPublisher | DefaultPublisher
if batch:
if key:
msg = "You can't setup `key` with batch publisher"
raise SetupError(msg)

publisher = BatchPublisher(publisher_config, specification)
publish_method = "_basic_publish_batch"

Expand Down
4 changes: 2 additions & 2 deletions faststream/confluent/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:

headers_to_send = cmd.headers_to_publish()

for msg in cmd.batch_bodies:
for message_position, msg in enumerate(cmd.batch_bodies):
message, content_type = encode_message(msg, serializer=self.serializer)

if content_type:
Expand All @@ -169,7 +169,7 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
final_headers = headers_to_send.copy()

batch.append(
key=None,
key=cmd.key_for(message_position),
value=message,
timestamp=cmd.timestamp_ms,
headers=[(i, j.encode()) for i, j in final_headers.items()],
Expand Down
12 changes: 11 additions & 1 deletion faststream/confluent/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,20 @@ async def request(


class BatchPublisher(LogicPublisher):
def __init__(
self,
config: "KafkaPublisherConfig",
specification: "PublisherSpecification[Any, Any]",
) -> None:
super().__init__(config, specification)
self.key = config.key

@override
async def publish(
self,
*messages: "SendableMessage",
topic: str = "",
key: bytes | str | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
Expand All @@ -229,7 +238,7 @@ async def publish(
) -> None:
cmd = KafkaPublishCommand(
*messages,
key=None,
key=key or self.key,
topic=topic or self.topic,
partition=partition or self.partition,
reply_to=reply_to or self.reply_to,
Expand Down Expand Up @@ -261,6 +270,7 @@ async def _publish(
cmd.reply_to = cmd.reply_to or self.reply_to

cmd.partition = cmd.partition or self.partition
cmd.key = cmd.key or self.key

await self._basic_publish_batch(
cmd,
Expand Down
40 changes: 37 additions & 3 deletions faststream/confluent/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,36 @@
from typing_extensions import override

from faststream.response.publish_type import PublishType
from faststream.response.response import BatchPublishCommand, PublishCommand, Response
from faststream.response.response import (
BatchPublishCommand,
PublishCommand,
Response,
extract_per_message_keys_and_bodies,
key_for_index,
)

if TYPE_CHECKING:
from faststream._internal.basic_types import SendableMessage


class KafkaResponse(Response):
"""Kafka-specific response object for outgoing messages.

Can be used in two ways:
1. As a return value from handler to send a response message
2. Directly in publish_batch() to set per-message attributes (key, headers, etc.)

For publish operations, consider using the more semantic alias `KafkaPublishMessage`.
"""

def __init__(
self,
body: "SendableMessage",
*,
headers: dict[str, Any] | None = None,
correlation_id: str | None = None,
timestamp_ms: int | None = None,
key: bytes | str | None = None,
key: bytes | Any | None = None,
) -> None:
super().__init__(
body=body,
Expand All @@ -28,6 +43,11 @@ def __init__(
self.timestamp_ms = timestamp_ms
self.key = key

@override
def get_publish_key(self) -> bytes | Any | None:
"""Return the Kafka message key for publishing."""
return self.key

@override
def as_publish_command(self) -> "KafkaPublishCommand":
return KafkaPublishCommand(
Expand All @@ -50,7 +70,7 @@ def __init__(
*messages: "SendableMessage",
topic: str,
_publish_type: PublishType,
key: bytes | str | None = None,
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
Expand All @@ -77,6 +97,12 @@ def __init__(
# request option
self.timeout = timeout

# per-message keys support
keys, normalized = extract_per_message_keys_and_bodies(self.batch_bodies)
if normalized is not None:
self.batch_bodies = normalized
self._per_message_keys = keys

@classmethod
def from_cmd(
cls,
Expand All @@ -100,6 +126,9 @@ def from_cmd(
_publish_type=cmd.publish_type,
)

def key_for(self, index: int) -> Any | None:
return key_for_index(self._per_message_keys, self.key, index)

def headers_to_publish(self) -> dict[str, str]:
headers = {}

Expand All @@ -110,3 +139,8 @@ def headers_to_publish(self) -> dict[str, str]:
headers["reply_to"] = self.reply_to

return headers | self.headers


# Semantic alias for publish operations
# More intuitive name when using in publish_batch() rather than as handler return value
KafkaPublishMessage = KafkaResponse
3 changes: 2 additions & 1 deletion faststream/confluent/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,13 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
topic=cmd.destination,
partition=cmd.partition,
timestamp_ms=cmd.timestamp_ms,
key=cmd.key_for(message_position),
headers=cmd.headers,
correlation_id=cmd.correlation_id,
reply_to=cmd.reply_to,
serializer=self.broker.config.fd_config._serializer,
)
for message in cmd.batch_bodies
for message_position, message in enumerate(cmd.batch_bodies)
)

if isinstance(handler, BatchSubscriber):
Expand Down
3 changes: 2 additions & 1 deletion faststream/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from .annotations import KafkaMessage
from .broker import KafkaBroker, KafkaPublisher, KafkaRoute, KafkaRouter
from .response import KafkaPublishCommand, KafkaResponse
from .response import KafkaPublishCommand, KafkaPublishMessage, KafkaResponse
from .testing import TestKafkaBroker

except ImportError as e:
Expand All @@ -22,6 +22,7 @@
"KafkaBroker",
"KafkaMessage",
"KafkaPublishCommand",
"KafkaPublishMessage",
"KafkaPublisher",
"KafkaResponse",
"KafkaRoute",
Expand Down
6 changes: 0 additions & 6 deletions faststream/kafka/publisher/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
Any,
)

from faststream.exceptions import SetupError

from .config import KafkaPublisherConfig, KafkaPublisherSpecificationConfig
from .specification import KafkaPublisherSpecification
from .usecase import BatchPublisher, DefaultPublisher
Expand Down Expand Up @@ -56,10 +54,6 @@ def create_publisher(
)

if batch:
if key:
msg = "You can't setup `key` with batch publisher"
raise SetupError(msg)

publisher: BatchPublisher | DefaultPublisher = BatchPublisher(
publisher_config,
specification,
Expand Down
2 changes: 1 addition & 1 deletion faststream/kafka/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def publish_batch(
final_headers = headers_to_send.copy()

metadata = batch.append(
key=None,
key=cmd.key_for(message_position),
value=message,
timestamp=cmd.timestamp_ms,
headers=[(i, j.encode()) for i, j in final_headers.items()],
Expand Down
22 changes: 21 additions & 1 deletion faststream/kafka/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,20 @@ async def request(


class BatchPublisher(LogicPublisher):
def __init__(
self,
config: "KafkaPublisherConfig",
specification: "PublisherSpecification[Any, Any]",
) -> None:
super().__init__(config, specification)
self.key = config.key

@overload
async def publish(
self,
*messages: "SendableMessage",
topic: str = "",
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
Expand All @@ -316,6 +325,7 @@ async def publish(
self,
*messages: "SendableMessage",
topic: str = "",
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
Expand All @@ -329,6 +339,7 @@ async def publish(
self,
*messages: "SendableMessage",
topic: str = "",
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
Expand All @@ -342,6 +353,7 @@ async def publish(
self,
*messages: "SendableMessage",
topic: str = "",
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
Expand All @@ -356,6 +368,13 @@ async def publish(
Messages bodies to send.
topic:
Topic where the message will be published.
key:
A single key to associate with every message in this batch. If a
partition is not specified and the producer uses the default
partitioner, messages with the same key will be routed to the
same partition. Must be bytes or serializable to bytes via the
configured key serializer. If omitted, falls back to the
publisher's default key (if configured).
partition:
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`
Expand All @@ -378,7 +397,7 @@ async def publish(
"""
cmd = KafkaPublishCommand(
*messages,
key=None,
key=key or self.key,
topic=topic or self.topic,
partition=partition or self.partition,
reply_to=reply_to or self.reply_to,
Expand Down Expand Up @@ -410,6 +429,7 @@ async def _publish(
cmd.reply_to = cmd.reply_to or self.reply_to

cmd.partition = cmd.partition or self.partition
cmd.key = cmd.key or self.key

await self._basic_publish_batch(
cmd,
Expand Down
36 changes: 35 additions & 1 deletion faststream/kafka/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,28 @@
from typing_extensions import override

from faststream.response.publish_type import PublishType
from faststream.response.response import BatchPublishCommand, PublishCommand, Response
from faststream.response.response import (
BatchPublishCommand,
PublishCommand,
Response,
extract_per_message_keys_and_bodies,
key_for_index,
)

if TYPE_CHECKING:
from faststream._internal.basic_types import SendableMessage


class KafkaResponse(Response):
"""Kafka-specific response object for outgoing messages.

Can be used in two ways:
1. As a return value from handler to send a response message
2. Directly in publish_batch() to set per-message attributes (key, headers, etc.)

For publish operations, consider using the more semantic alias `KafkaPublishMessage`.
"""

def __init__(
self,
body: "SendableMessage",
Expand All @@ -28,6 +43,11 @@ def __init__(
self.timestamp_ms = timestamp_ms
self.key = key

@override
def get_publish_key(self) -> bytes | None:
"""Return the Kafka message key for publishing."""
return self.key

@override
def as_publish_command(self) -> "KafkaPublishCommand":
return KafkaPublishCommand(
Expand Down Expand Up @@ -77,6 +97,12 @@ def __init__(
# request option
self.timeout = timeout

# per-message keys support
keys, normalized = extract_per_message_keys_and_bodies(self.batch_bodies)
if normalized is not None:
self.batch_bodies = normalized
self._per_message_keys = keys

@classmethod
def from_cmd(
cls,
Expand All @@ -100,6 +126,9 @@ def from_cmd(
_publish_type=cmd.publish_type,
)

def key_for(self, index: int) -> Any | None:
return key_for_index(self._per_message_keys, self.key, index)

def headers_to_publish(self) -> dict[str, str]:
headers = {}

Expand All @@ -110,3 +139,8 @@ def headers_to_publish(self) -> dict[str, str]:
headers["reply_to"] = self.reply_to

return headers | self.headers


# Semantic alias for publish operations
# More intuitive name when using in publish_batch() rather than as handler return value
KafkaPublishMessage = KafkaResponse
Loading
Loading