From 328461cefe35151fddfb774bd504d4a3ffe433e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=B5=D1=84=D0=B0=D0=BD=20=C2=ABGambit=C2=BB?= =?UTF-8?q?=20=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B5=D0=BD=D0=BA=D0=BE?= Date: Fri, 17 Oct 2025 21:02:00 +0300 Subject: [PATCH 1/9] feat: Added OperationReply models --- .../asyncapi/v3_0_0/schema/operation_reply.py | 21 +++++++++++++++++++ .../asyncapi/v3_0_0/schema/operations.py | 3 +++ 2 files changed, 24 insertions(+) create mode 100644 faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py diff --git a/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py b/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py new file mode 100644 index 0000000000..6371d795c5 --- /dev/null +++ b/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py @@ -0,0 +1,21 @@ +from pydantic import BaseModel, Field +from .utils import Reference + + +from faststream._internal._compat import PYDANTIC_V2 + + +class OperationReplyAddress(BaseModel): + description: str | None = None + location: str + +class OperationReply(BaseModel): + address: OperationReplyAddress + channel: Reference + messages: list[Reference] = Field(default_factory=list) + + if PYDANTIC_V2: + model_config = {"extra": "allow"} + else: + class Config: + extra = "allow" \ No newline at end of file diff --git a/faststream/specification/asyncapi/v3_0_0/schema/operations.py b/faststream/specification/asyncapi/v3_0_0/schema/operations.py index 05ca29fbf0..8e17f25d66 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/operations.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/operations.py @@ -9,6 +9,7 @@ from .bindings import OperationBinding from .channels import Channel +from .operation_reply import OperationReply from .tag import Tag from .utils import Reference @@ -43,6 +44,8 @@ class Operation(BaseModel): security: dict[str, list[str]] | None = None + reply: OperationReply | None = None + # TODO # traits From 839efb4436fec9cb9205a70dc0048477f43238fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=B5=D1=84=D0=B0=D0=BD=20=C2=ABGambit=C2=BB?= =?UTF-8?q?=20=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B5=D0=BD=D0=BA=D0=BE?= Date: Mon, 20 Oct 2025 01:16:33 +0300 Subject: [PATCH 2/9] feat: Added base for generate reply --- .../endpoint/subscriber/specification.py | 29 +++++++- faststream/rabbit/subscriber/specification.py | 5 ++ faststream/specification/asyncapi/message.py | 21 ++++++ .../specification/asyncapi/v3_0_0/generate.py | 71 +++++++++++++++++++ .../asyncapi/v3_0_0/schema/__init__.py | 2 + .../asyncapi/v3_0_0/schema/operation_reply.py | 21 ++++-- .../asyncapi/v3_0_0/schema/operations.py | 2 + .../specification/schema/operation/model.py | 1 + 8 files changed, 145 insertions(+), 7 deletions(-) diff --git a/faststream/_internal/endpoint/subscriber/specification.py b/faststream/_internal/endpoint/subscriber/specification.py index 369649713c..20de4cd1fb 100644 --- a/faststream/_internal/endpoint/subscriber/specification.py +++ b/faststream/_internal/endpoint/subscriber/specification.py @@ -7,7 +7,7 @@ from faststream._internal.configs import BrokerConfig, SubscriberSpecificationConfig from faststream.exceptions import SetupError -from faststream.specification.asyncapi.message import parse_handler_params +from faststream.specification.asyncapi.message import parse_handler_params, parse_handler_return from faststream.specification.asyncapi.utils import to_camelcase if TYPE_CHECKING: @@ -78,6 +78,33 @@ def get_payloads(self) -> list[tuple["dict[str, Any]", str]]: return payloads + def get_reply_payloads(self) -> list[tuple["dict[str, Any]", str]]: + payloads: list[tuple[dict[str, Any], str]] = [] + + call_name = self.call_name + + for h in self.calls: + if h.dependant is None: + msg = "You should setup `Handler` at first." + raise SetupError(msg) + + reply_body = parse_handler_return( + h.dependant, + prefix=f"{self.config.title_ or call_name}:ReplyMessage", + ) + payloads.append((reply_body, to_camelcase(h.name))) + + if not self.calls: + payloads.append( + ( + { + "title": f"{self.config.title_ or call_name}:ReplyMessage:Payload", + }, + to_camelcase(call_name), + ), + ) + + return payloads @property @abstractmethod def name(self) -> str: diff --git a/faststream/rabbit/subscriber/specification.py b/faststream/rabbit/subscriber/specification.py index 78408c8b5d..c3b2b22d3a 100644 --- a/faststream/rabbit/subscriber/specification.py +++ b/faststream/rabbit/subscriber/specification.py @@ -31,6 +31,7 @@ def name(self) -> str: def get_schema(self) -> dict[str, SubscriberSpec]: payloads = self.get_payloads() + reply_payloads = self.get_reply_payloads() queue = self.config.queue.add_prefix(self._outer_config.prefix) @@ -59,6 +60,10 @@ def get_schema(self) -> dict[str, SubscriberSpec]: title=f"{channel_name}:Message", payload=resolve_payloads(payloads), ), + reply_message=Message( + title=f"{channel_name}:ReplyMessage", + payload=resolve_payloads(reply_payloads), + ) ), bindings=ChannelBinding( amqp=amqp.ChannelBinding( diff --git a/faststream/specification/asyncapi/message.py b/faststream/specification/asyncapi/message.py index 16946a69da..5dd39198bb 100644 --- a/faststream/specification/asyncapi/message.py +++ b/faststream/specification/asyncapi/message.py @@ -35,6 +35,27 @@ def parse_handler_params(call: "CallModel", prefix: str = "") -> dict[str, Any]: return body +def parse_handler_return(call: "CallModel", prefix: str = "") -> dict[str, Any]: + """Parses the handler parameters.""" + model_container = getattr(call, "serializer") + model = cast("type[BaseModel] | None", getattr(model_container, "model", None)) + assert model + out = model_container.response_option['return'] + + body = get_model_schema( + create_model( + model.__name__, + **{out.field_name: (out.field_type, out.default_value)}, # type: ignore[call-overload] + ), + prefix=prefix, + exclude=tuple(call.custom_fields.keys()), + ) + + if body is None: + return {"title": "EmptyPayload", "type": "null"} + + return body + @overload def get_response_schema(call: None, prefix: str = "") -> None: ... diff --git a/faststream/specification/asyncapi/v3_0_0/generate.py b/faststream/specification/asyncapi/v3_0_0/generate.py index b08b86f823..d0da565dfb 100644 --- a/faststream/specification/asyncapi/v3_0_0/generate.py +++ b/faststream/specification/asyncapi/v3_0_0/generate.py @@ -19,6 +19,7 @@ Operation, Reference, Server, + OperationReply, Tag, ) from faststream.specification.asyncapi.v3_0_0.schema.bindings import ( @@ -64,6 +65,7 @@ def get_app_schema( channels, operations = get_broker_channels(broker) messages: dict[str, Message] = {} + reply_messages: dict[str, Message] = {} payloads: dict[str, dict[str, Any]] = {} for channel in channels.values(): @@ -89,6 +91,17 @@ def get_app_schema( channel.messages = msgs + for operation_name, operation in operations.items(): + reply_msgs: dict[str, Message | Reference] = {} + for message in operation.reply.messages: + reply_msgs['ReplyMessage'] = _resolve_reply_payloads( + 'ReplyMessage', + message, + payloads, + reply_messages, + ) + + messages.update(reply_messages) return ApplicationSchema( info=ApplicationInfo( title=title, @@ -190,6 +203,9 @@ def get_broker_channels( ], channel=Reference(**{"$ref": f"#/channels/{channel_key}"}), operation=sub_channel.operation, + reply=OperationReply( + messages=[Message.from_spec(sub_channel.operation.reply_message)], + ) ) for pub in filter(lambda p: p.specification.include_in_schema, broker.publishers): @@ -256,6 +272,61 @@ def get_asgi_routes( def _get_http_binding_method(methods: Sequence[str]) -> str: return next((method for method in methods if method != "HEAD"), "HEAD") +def _resolve_reply_payloads( + message_name: str, + m: Message, + payloads: dict[str, Any], + reply_messages: dict[str, Any], +) -> Reference: + assert isinstance(m.payload, dict) + + m.payload = move_pydantic_refs(m.payload, DEF_KEY) + + message_name = clear_key(message_name) + + if DEF_KEY in m.payload: + payloads.update(m.payload.pop(DEF_KEY)) + + one_of = m.payload.get("oneOf", None) + if isinstance(one_of, dict): + one_of_list = [] + processed_payloads: dict[str, dict[str, Any]] = {} + for name, payload in one_of.items(): + # Promote nested Pydantic $defs from each payload into components/schemas + # so that referenced nested models are available globally. + if isinstance(payload, dict) and DEF_KEY in payload: + defs = payload.pop(DEF_KEY) or {} + for def_name, def_schema in defs.items(): + payloads[clear_key(def_name)] = def_schema + processed_payloads[clear_key(name)] = payload + one_of_list.append(Reference(**{"$ref": f"#/components/schemas/{name}"})) + + payloads.update(processed_payloads) + m.payload["oneOf"] = one_of_list + assert m.title + reply_messages[clear_key(m.title)] = m + return Reference( + **{"$ref": f"#/components/messages/{message_name}"}, + ) + + payloads.update(m.payload.pop(DEF_KEY, {})) + payload_name = m.payload.get("title", f"{message_name}:Payload") + payload_name = clear_key(payload_name) + + if payload_name in payloads and payloads[payload_name] != m.payload: + warnings.warn( + f"Overwriting the message schema, data types have the same name: `{payload_name}`", + RuntimeWarning, + stacklevel=1, + ) + + payloads[payload_name] = m.payload + m.payload = {"$ref": f"#/components/schemas/{payload_name}"} + assert m.title + reply_messages[clear_key(m.title)] = m + return Reference( + **{"$ref": f"#/components/messages/{message_name}"}, + ) def _resolve_msg_payloads( message_name: str, diff --git a/faststream/specification/asyncapi/v3_0_0/schema/__init__.py b/faststream/specification/asyncapi/v3_0_0/schema/__init__.py index e0cbcbd7b2..42b5c3a3c1 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/__init__.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/__init__.py @@ -6,6 +6,7 @@ from .license import License from .message import CorrelationId, Message from .operations import Operation +from .operation_reply import OperationReply from .schema import ApplicationSchema from .servers import Server, ServerVariable from .tag import Tag @@ -23,6 +24,7 @@ "License", "Message", "Operation", + "OperationReply", "Parameter", "Reference", "Server", diff --git a/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py b/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py index 6371d795c5..5fac70648c 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py @@ -1,8 +1,11 @@ from pydantic import BaseModel, Field -from .utils import Reference - +from typing_extensions import Self from faststream._internal._compat import PYDANTIC_V2 +from faststream.specification.asyncapi.v3_0_0.schema.message import Message +from faststream.specification.schema import Operation + +from .utils import Reference class OperationReplyAddress(BaseModel): @@ -10,12 +13,18 @@ class OperationReplyAddress(BaseModel): location: str class OperationReply(BaseModel): - address: OperationReplyAddress - channel: Reference - messages: list[Reference] = Field(default_factory=list) + messages: list[Message | Reference] + channel: Reference | None = None + address: OperationReplyAddress | None = None if PYDANTIC_V2: model_config = {"extra": "allow"} else: class Config: - extra = "allow" \ No newline at end of file + extra = "allow" + + @classmethod + def from_sub(cls, messages: list[Reference] ) -> Self: + return cls( + messages=messages + ) \ No newline at end of file diff --git a/faststream/specification/asyncapi/v3_0_0/schema/operations.py b/faststream/specification/asyncapi/v3_0_0/schema/operations.py index 8e17f25d66..238a34c510 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/operations.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/operations.py @@ -65,12 +65,14 @@ def from_sub( messages: list[Reference], channel: Reference, operation: OperationSpec, + reply: OperationReply, ) -> Self: return cls( action=Action.RECEIVE, messages=messages, channel=channel, bindings=OperationBinding.from_sub(operation.bindings), + reply=reply, summary=None, description=None, security=None, diff --git a/faststream/specification/schema/operation/model.py b/faststream/specification/schema/operation/model.py index 58f426dc17..3e8d44d37c 100644 --- a/faststream/specification/schema/operation/model.py +++ b/faststream/specification/schema/operation/model.py @@ -8,3 +8,4 @@ class Operation: message: Message bindings: OperationBinding | None + reply_message: Message | None From 997933afb2dd7095d6c3ab402e51d5c7faee353e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=B5=D1=84=D0=B0=D0=BD=20=C2=ABGambit=C2=BB?= =?UTF-8?q?=20=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B5=D0=BD=D0=BA=D0=BE?= Date: Sat, 1 Nov 2025 15:13:51 +0300 Subject: [PATCH 3/9] feat: Added channel info to reply --- .../endpoint/subscriber/specification.py | 1 + faststream/rabbit/publisher/specification.py | 1 + faststream/specification/asyncapi/message.py | 11 +++---- .../specification/asyncapi/v3_0_0/generate.py | 29 +++++++++++++++---- .../asyncapi/v3_0_0/schema/operation_reply.py | 10 +------ .../asyncapi/v3_0_0/schema/operations.py | 3 +- .../specification/schema/operation/model.py | 2 +- .../specification/schema/reply/__init__.py | 3 ++ .../specification/schema/reply/model.py | 14 +++++++++ 9 files changed, 53 insertions(+), 21 deletions(-) create mode 100644 faststream/specification/schema/reply/__init__.py create mode 100644 faststream/specification/schema/reply/model.py diff --git a/faststream/_internal/endpoint/subscriber/specification.py b/faststream/_internal/endpoint/subscriber/specification.py index 20de4cd1fb..2530dfc6eb 100644 --- a/faststream/_internal/endpoint/subscriber/specification.py +++ b/faststream/_internal/endpoint/subscriber/specification.py @@ -105,6 +105,7 @@ def get_reply_payloads(self) -> list[tuple["dict[str, Any]", str]]: ) return payloads + @property @abstractmethod def name(self) -> str: diff --git a/faststream/rabbit/publisher/specification.py b/faststream/rabbit/publisher/specification.py index 834ba356b5..34fcd916cc 100644 --- a/faststream/rabbit/publisher/specification.py +++ b/faststream/rabbit/publisher/specification.py @@ -70,6 +70,7 @@ def get_schema(self) -> dict[str, "PublisherSpec"]: served_words=2 if self.config.title_ is None else 1, ), ), + reply_message=None ), bindings=ChannelBinding( amqp=amqp.ChannelBinding( diff --git a/faststream/specification/asyncapi/message.py b/faststream/specification/asyncapi/message.py index 5dd39198bb..6d208ea2c8 100644 --- a/faststream/specification/asyncapi/message.py +++ b/faststream/specification/asyncapi/message.py @@ -37,14 +37,15 @@ def parse_handler_params(call: "CallModel", prefix: str = "") -> dict[str, Any]: def parse_handler_return(call: "CallModel", prefix: str = "") -> dict[str, Any]: """Parses the handler parameters.""" - model_container = getattr(call, "serializer") - model = cast("type[BaseModel] | None", getattr(model_container, "model", None)) - assert model - out = model_container.response_option['return'] + model_container = getattr(call, "serializer", call) + response_option = getattr(model_container, "response_option", None) + if not response_option: + return {"title": "EmptyPayload", "type": "null"} + out = response_option["return"] body = get_model_schema( create_model( - model.__name__, + "", **{out.field_name: (out.field_type, out.default_value)}, # type: ignore[call-overload] ), prefix=prefix, diff --git a/faststream/specification/asyncapi/v3_0_0/generate.py b/faststream/specification/asyncapi/v3_0_0/generate.py index d0da565dfb..442cda404a 100644 --- a/faststream/specification/asyncapi/v3_0_0/generate.py +++ b/faststream/specification/asyncapi/v3_0_0/generate.py @@ -1,7 +1,7 @@ import string import warnings from collections.abc import Sequence -from typing import TYPE_CHECKING, Any, Optional, Union +from typing import Callable, TYPE_CHECKING, Any, Optional, Union from urllib.parse import urlparse from faststream._internal._compat import DEF_KEY @@ -26,6 +26,7 @@ OperationBinding, http as http_bindings, ) +from faststream.specification.asyncapi.v3_0_0.schema.operation_reply import OperationReplyAddress from faststream.specification.asyncapi.v3_0_0.schema.operations import Action if TYPE_CHECKING: @@ -93,15 +94,19 @@ def get_app_schema( for operation_name, operation in operations.items(): reply_msgs: dict[str, Message | Reference] = {} + if not operation.reply: + continue for message in operation.reply.messages: reply_msgs['ReplyMessage'] = _resolve_reply_payloads( - 'ReplyMessage', + f'{operation_name.removesuffix("Subscribe")}:ReplyMessage', message, payloads, reply_messages, ) + operation.reply.messages = list(reply_msgs.values()) messages.update(reply_messages) + return ApplicationSchema( info=ApplicationInfo( title=title, @@ -179,6 +184,7 @@ def get_broker_channels( """Get the broker channels for an application.""" channels = {} operations = {} + operations_by_handler: dict[Callable, Operation] = {} for sub in filter(lambda s: s.specification.include_in_schema, broker.subscribers): for sub_key, sub_channel in sub.schema().items(): @@ -194,7 +200,7 @@ def get_broker_channels( channels[channel_key] = channel_obj - operations[f"{channel_key}Subscribe"] = Operation.from_sub( + operation = Operation.from_sub( messages=[ Reference(**{ "$ref": f"#/channels/{channel_key}/messages/{msg_name}", @@ -204,9 +210,17 @@ def get_broker_channels( channel=Reference(**{"$ref": f"#/channels/{channel_key}"}), operation=sub_channel.operation, reply=OperationReply( - messages=[Message.from_spec(sub_channel.operation.reply_message)], - ) + messages=[Message.from_spec(sub_channel.operation.reply_message)] if sub_channel.operation.reply_message else [], + address=OperationReplyAddress( + description=None, + location="$message.header#/replyTo", + ), + channel=None, + ) if not sub._no_reply else None, ) + operations[f"{channel_key}Subscribe"] = operation + for call in sub.specification.calls: + operations_by_handler[call.handler._original_call] = operation for pub in filter(lambda p: p.specification.include_in_schema, broker.publishers): for pub_key, pub_channel in pub.schema().items(): @@ -231,6 +245,11 @@ def get_broker_channels( channel=Reference(**{"$ref": f"#/channels/{channel_key}"}), operation=pub_channel.operation, ) + for call in pub.specification.calls: + sub_operation = operations_by_handler.get(call) + if sub_operation is None or sub_operation.reply is None: + continue + sub_operation.reply.channel = Reference(**{"$ref": f"#/channels/{channel_key}"}) return channels, operations diff --git a/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py b/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py index 5fac70648c..323038b73d 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py @@ -1,9 +1,7 @@ -from pydantic import BaseModel, Field -from typing_extensions import Self +from pydantic import BaseModel from faststream._internal._compat import PYDANTIC_V2 from faststream.specification.asyncapi.v3_0_0.schema.message import Message -from faststream.specification.schema import Operation from .utils import Reference @@ -22,9 +20,3 @@ class OperationReply(BaseModel): else: class Config: extra = "allow" - - @classmethod - def from_sub(cls, messages: list[Reference] ) -> Self: - return cls( - messages=messages - ) \ No newline at end of file diff --git a/faststream/specification/asyncapi/v3_0_0/schema/operations.py b/faststream/specification/asyncapi/v3_0_0/schema/operations.py index 238a34c510..3bcf57e60c 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/operations.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/operations.py @@ -65,7 +65,7 @@ def from_sub( messages: list[Reference], channel: Reference, operation: OperationSpec, - reply: OperationReply, + reply: OperationReply | None, ) -> Self: return cls( action=Action.RECEIVE, @@ -91,6 +91,7 @@ def from_pub( messages=messages, channel=channel, bindings=OperationBinding.from_pub(operation.bindings), + reply=None, summary=None, description=None, security=None, diff --git a/faststream/specification/schema/operation/model.py b/faststream/specification/schema/operation/model.py index 3e8d44d37c..b2780c3cdf 100644 --- a/faststream/specification/schema/operation/model.py +++ b/faststream/specification/schema/operation/model.py @@ -8,4 +8,4 @@ class Operation: message: Message bindings: OperationBinding | None - reply_message: Message | None + reply_message: Message | None = None diff --git a/faststream/specification/schema/reply/__init__.py b/faststream/specification/schema/reply/__init__.py new file mode 100644 index 0000000000..09ef2e907f --- /dev/null +++ b/faststream/specification/schema/reply/__init__.py @@ -0,0 +1,3 @@ +from .model import OperationReply, OperationReplyAddress + +__all__ = ("OperationReply", "OperationReplyAddress") diff --git a/faststream/specification/schema/reply/model.py b/faststream/specification/schema/reply/model.py new file mode 100644 index 0000000000..449518fa9d --- /dev/null +++ b/faststream/specification/schema/reply/model.py @@ -0,0 +1,14 @@ +from dataclasses import dataclass + +from faststream.specification.schema import Message + + +@dataclass +class OperationReplyAddress: + location: str + description: str | None = None + +@dataclass +class OperationReply: + message: Message | None + address: OperationReplyAddress | None From b6a7f7b54958b5ed88089f7c6c153cfe88fc43c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=B5=D1=84=D0=B0=D0=BD=20=C2=ABGambit=C2=BB?= =?UTF-8?q?=20=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B5=D0=BD=D0=BA=D0=BE?= Date: Sat, 1 Nov 2025 16:07:20 +0300 Subject: [PATCH 4/9] feat: Added reply to kafka subscriber --- faststream/confluent/subscriber/specification.py | 5 +++++ faststream/kafka/subscriber/specification.py | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/faststream/confluent/subscriber/specification.py b/faststream/confluent/subscriber/specification.py index 95680561a1..86386038e2 100644 --- a/faststream/confluent/subscriber/specification.py +++ b/faststream/confluent/subscriber/specification.py @@ -31,6 +31,7 @@ def name(self) -> str: def get_schema(self) -> dict[str, SubscriberSpec]: payloads = self.get_payloads() + reply_payloads = self.get_reply_payloads() channels = {} for t in self.topics: @@ -43,6 +44,10 @@ def get_schema(self) -> dict[str, SubscriberSpec]: title=f"{handler_name}:Message", payload=resolve_payloads(payloads), ), + reply_message=Message( + title=f"{handler_name}:ReplyMessage", + payload=resolve_payloads(reply_payloads), + ), bindings=None, ), bindings=ChannelBinding( diff --git a/faststream/kafka/subscriber/specification.py b/faststream/kafka/subscriber/specification.py index 5325069e9e..367f509821 100644 --- a/faststream/kafka/subscriber/specification.py +++ b/faststream/kafka/subscriber/specification.py @@ -31,6 +31,7 @@ def name(self) -> str: def get_schema(self) -> dict[str, SubscriberSpec]: payloads = self.get_payloads() + reply_payloads = self.get_reply_payloads() channels = {} for t in self.topics: @@ -43,6 +44,10 @@ def get_schema(self) -> dict[str, SubscriberSpec]: title=f"{handler_name}:Message", payload=resolve_payloads(payloads), ), + reply_message=Message( + title=f"{handler_name}:ReplyMessage", + payload=resolve_payloads(reply_payloads), + ), bindings=None, ), bindings=ChannelBinding( From 1cd974b190699123f1cbc349e203a051d56c8f1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=B5=D1=84=D0=B0=D0=BD=20=C2=ABGambit=C2=BB?= =?UTF-8?q?=20=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B5=D0=BD=D0=BA=D0=BE?= Date: Sat, 1 Nov 2025 16:14:23 +0300 Subject: [PATCH 5/9] feat: Added reply to nats subscriber --- faststream/nats/subscriber/specification.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/faststream/nats/subscriber/specification.py b/faststream/nats/subscriber/specification.py index 31f3f8b445..25152aa13b 100644 --- a/faststream/nats/subscriber/specification.py +++ b/faststream/nats/subscriber/specification.py @@ -23,6 +23,7 @@ def name(self) -> str: def get_schema(self) -> dict[str, SubscriberSpec]: payloads = self.get_payloads() + reply_payloads = self.get_reply_payloads() return { self.name: SubscriberSpec( @@ -32,6 +33,10 @@ def get_schema(self) -> dict[str, SubscriberSpec]: title=f"{self.name}:Message", payload=resolve_payloads(payloads), ), + reply_message=Message( + title=f"{self.name}:ReplyMessage", + payload=resolve_payloads(reply_payloads), + ), bindings=None, ), bindings=ChannelBinding( From b0b761b1e526b6e3881f81d43a738d9421d2fc1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=B5=D1=84=D0=B0=D0=BD=20=C2=ABGambit=C2=BB?= =?UTF-8?q?=20=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B5=D0=BD=D0=BA=D0=BE?= Date: Sat, 1 Nov 2025 16:14:42 +0300 Subject: [PATCH 6/9] feat: Added reply to redis subscriber --- faststream/redis/subscriber/specification.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/faststream/redis/subscriber/specification.py b/faststream/redis/subscriber/specification.py index 1ea1721b1d..83c49f3923 100644 --- a/faststream/redis/subscriber/specification.py +++ b/faststream/redis/subscriber/specification.py @@ -20,6 +20,7 @@ class RedisSubscriberSpecification( ): def get_schema(self) -> dict[str, SubscriberSpec]: payloads = self.get_payloads() + reply_payloads = self.get_reply_payloads() return { self.name: SubscriberSpec( @@ -29,6 +30,10 @@ def get_schema(self) -> dict[str, SubscriberSpec]: title=f"{self.name}:Message", payload=resolve_payloads(payloads), ), + reply_message=Message( + title=f"{self.name}:ReplyMessage", + payload=resolve_payloads(reply_payloads), + ), bindings=None, ), bindings=ChannelBinding( From 907af9ae0a8e6f2067da60f40aa2b4dc35f5643c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=B5=D1=84=D0=B0=D0=BD=20=C2=ABGambit=C2=BB?= =?UTF-8?q?=20=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B5=D0=BD=D0=BA=D0=BE?= Date: Sat, 1 Nov 2025 16:42:17 +0300 Subject: [PATCH 7/9] chore: lint --- .../endpoint/subscriber/specification.py | 5 ++- faststream/rabbit/publisher/specification.py | 1 - faststream/rabbit/subscriber/specification.py | 2 +- faststream/specification/asyncapi/message.py | 1 + .../specification/asyncapi/v3_0_0/generate.py | 40 ++++++++++++------- .../asyncapi/v3_0_0/schema/__init__.py | 2 +- .../asyncapi/v3_0_0/schema/operation_reply.py | 2 + .../specification/schema/reply/model.py | 1 + 8 files changed, 36 insertions(+), 18 deletions(-) diff --git a/faststream/_internal/endpoint/subscriber/specification.py b/faststream/_internal/endpoint/subscriber/specification.py index 2530dfc6eb..985cfacf0c 100644 --- a/faststream/_internal/endpoint/subscriber/specification.py +++ b/faststream/_internal/endpoint/subscriber/specification.py @@ -7,7 +7,10 @@ from faststream._internal.configs import BrokerConfig, SubscriberSpecificationConfig from faststream.exceptions import SetupError -from faststream.specification.asyncapi.message import parse_handler_params, parse_handler_return +from faststream.specification.asyncapi.message import ( + parse_handler_params, + parse_handler_return, +) from faststream.specification.asyncapi.utils import to_camelcase if TYPE_CHECKING: diff --git a/faststream/rabbit/publisher/specification.py b/faststream/rabbit/publisher/specification.py index 34fcd916cc..834ba356b5 100644 --- a/faststream/rabbit/publisher/specification.py +++ b/faststream/rabbit/publisher/specification.py @@ -70,7 +70,6 @@ def get_schema(self) -> dict[str, "PublisherSpec"]: served_words=2 if self.config.title_ is None else 1, ), ), - reply_message=None ), bindings=ChannelBinding( amqp=amqp.ChannelBinding( diff --git a/faststream/rabbit/subscriber/specification.py b/faststream/rabbit/subscriber/specification.py index c3b2b22d3a..737634cb60 100644 --- a/faststream/rabbit/subscriber/specification.py +++ b/faststream/rabbit/subscriber/specification.py @@ -63,7 +63,7 @@ def get_schema(self) -> dict[str, SubscriberSpec]: reply_message=Message( title=f"{channel_name}:ReplyMessage", payload=resolve_payloads(reply_payloads), - ) + ), ), bindings=ChannelBinding( amqp=amqp.ChannelBinding( diff --git a/faststream/specification/asyncapi/message.py b/faststream/specification/asyncapi/message.py index 6d208ea2c8..d1a2e06f70 100644 --- a/faststream/specification/asyncapi/message.py +++ b/faststream/specification/asyncapi/message.py @@ -35,6 +35,7 @@ def parse_handler_params(call: "CallModel", prefix: str = "") -> dict[str, Any]: return body + def parse_handler_return(call: "CallModel", prefix: str = "") -> dict[str, Any]: """Parses the handler parameters.""" model_container = getattr(call, "serializer", call) diff --git a/faststream/specification/asyncapi/v3_0_0/generate.py b/faststream/specification/asyncapi/v3_0_0/generate.py index 442cda404a..847136496d 100644 --- a/faststream/specification/asyncapi/v3_0_0/generate.py +++ b/faststream/specification/asyncapi/v3_0_0/generate.py @@ -1,7 +1,7 @@ import string import warnings from collections.abc import Sequence -from typing import Callable, TYPE_CHECKING, Any, Optional, Union +from typing import TYPE_CHECKING, Any, Optional, Union from urllib.parse import urlparse from faststream._internal._compat import DEF_KEY @@ -17,20 +17,22 @@ License, Message, Operation, + OperationReply, Reference, Server, - OperationReply, Tag, ) from faststream.specification.asyncapi.v3_0_0.schema.bindings import ( OperationBinding, http as http_bindings, ) -from faststream.specification.asyncapi.v3_0_0.schema.operation_reply import OperationReplyAddress +from faststream.specification.asyncapi.v3_0_0.schema.operation_reply import ( + OperationReplyAddress, +) from faststream.specification.asyncapi.v3_0_0.schema.operations import Action if TYPE_CHECKING: - from faststream._internal.basic_types import AnyHttpUrl + from faststream._internal.basic_types import AnyCallable, AnyHttpUrl from faststream._internal.broker import BrokerUsecase from faststream._internal.types import ConnectionType, MsgType from faststream.asgi.handlers import HttpHandler @@ -97,8 +99,10 @@ def get_app_schema( if not operation.reply: continue for message in operation.reply.messages: - reply_msgs['ReplyMessage'] = _resolve_reply_payloads( - f'{operation_name.removesuffix("Subscribe")}:ReplyMessage', + assert isinstance(message, Message) + + reply_msgs["ReplyMessage"] = _resolve_reply_payloads( + f"{operation_name.removesuffix('Subscribe')}:ReplyMessage", message, payloads, reply_messages, @@ -184,7 +188,7 @@ def get_broker_channels( """Get the broker channels for an application.""" channels = {} operations = {} - operations_by_handler: dict[Callable, Operation] = {} + operations_by_handler: dict[AnyCallable, Operation] = {} for sub in filter(lambda s: s.specification.include_in_schema, broker.subscribers): for sub_key, sub_channel in sub.schema().items(): @@ -210,13 +214,17 @@ def get_broker_channels( channel=Reference(**{"$ref": f"#/channels/{channel_key}"}), operation=sub_channel.operation, reply=OperationReply( - messages=[Message.from_spec(sub_channel.operation.reply_message)] if sub_channel.operation.reply_message else [], + messages=[Message.from_spec(sub_channel.operation.reply_message)] + if sub_channel.operation.reply_message + else [], address=OperationReplyAddress( description=None, location="$message.header#/replyTo", ), channel=None, - ) if not sub._no_reply else None, + ) + if not sub._no_reply + else None, ) operations[f"{channel_key}Subscribe"] = operation for call in sub.specification.calls: @@ -249,7 +257,9 @@ def get_broker_channels( sub_operation = operations_by_handler.get(call) if sub_operation is None or sub_operation.reply is None: continue - sub_operation.reply.channel = Reference(**{"$ref": f"#/channels/{channel_key}"}) + sub_operation.reply.channel = Reference(**{ + "$ref": f"#/channels/{channel_key}" + }) return channels, operations @@ -291,11 +301,12 @@ def get_asgi_routes( def _get_http_binding_method(methods: Sequence[str]) -> str: return next((method for method in methods if method != "HEAD"), "HEAD") + def _resolve_reply_payloads( - message_name: str, - m: Message, - payloads: dict[str, Any], - reply_messages: dict[str, Any], + message_name: str, + m: Message, + payloads: dict[str, Any], + reply_messages: dict[str, Any], ) -> Reference: assert isinstance(m.payload, dict) @@ -347,6 +358,7 @@ def _resolve_reply_payloads( **{"$ref": f"#/components/messages/{message_name}"}, ) + def _resolve_msg_payloads( message_name: str, m: Message, diff --git a/faststream/specification/asyncapi/v3_0_0/schema/__init__.py b/faststream/specification/asyncapi/v3_0_0/schema/__init__.py index 42b5c3a3c1..317f57b750 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/__init__.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/__init__.py @@ -5,8 +5,8 @@ from .info import ApplicationInfo from .license import License from .message import CorrelationId, Message -from .operations import Operation from .operation_reply import OperationReply +from .operations import Operation from .schema import ApplicationSchema from .servers import Server, ServerVariable from .tag import Tag diff --git a/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py b/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py index 323038b73d..4cc9fb5985 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/operation_reply.py @@ -10,6 +10,7 @@ class OperationReplyAddress(BaseModel): description: str | None = None location: str + class OperationReply(BaseModel): messages: list[Message | Reference] channel: Reference | None = None @@ -18,5 +19,6 @@ class OperationReply(BaseModel): if PYDANTIC_V2: model_config = {"extra": "allow"} else: + class Config: extra = "allow" diff --git a/faststream/specification/schema/reply/model.py b/faststream/specification/schema/reply/model.py index 449518fa9d..3ec3441cfa 100644 --- a/faststream/specification/schema/reply/model.py +++ b/faststream/specification/schema/reply/model.py @@ -8,6 +8,7 @@ class OperationReplyAddress: location: str description: str | None = None + @dataclass class OperationReply: message: Message | None From 961c2868a9ef0327d5b17860f4638f09dd108e5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=B5=D1=84=D0=B0=D0=BD=20=C2=ABGambit=C2=BB?= =?UTF-8?q?=20=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B5=D0=BD=D0=BA=D0=BE?= Date: Sat, 1 Nov 2025 17:01:01 +0300 Subject: [PATCH 8/9] chore: extract _resolve_payloads_common as common part --- .../specification/asyncapi/v3_0_0/generate.py | 110 ++++++++---------- 1 file changed, 46 insertions(+), 64 deletions(-) diff --git a/faststream/specification/asyncapi/v3_0_0/generate.py b/faststream/specification/asyncapi/v3_0_0/generate.py index 847136496d..515a9c94b8 100644 --- a/faststream/specification/asyncapi/v3_0_0/generate.py +++ b/faststream/specification/asyncapi/v3_0_0/generate.py @@ -302,18 +302,18 @@ def _get_http_binding_method(methods: Sequence[str]) -> str: return next((method for method in methods if method != "HEAD"), "HEAD") -def _resolve_reply_payloads( - message_name: str, - m: Message, +def _resolve_payloads_common( + *, + m: "Message", payloads: dict[str, Any], - reply_messages: dict[str, Any], -) -> Reference: + messages_target: dict[str, Any], + message_ref: str, + default_payload_title: str, +) -> "Reference": assert isinstance(m.payload, dict) m.payload = move_pydantic_refs(m.payload, DEF_KEY) - message_name = clear_key(message_name) - if DEF_KEY in m.payload: payloads.update(m.payload.pop(DEF_KEY)) @@ -328,19 +328,24 @@ def _resolve_reply_payloads( defs = payload.pop(DEF_KEY) or {} for def_name, def_schema in defs.items(): payloads[clear_key(def_name)] = def_schema + processed_payloads[clear_key(name)] = payload - one_of_list.append(Reference(**{"$ref": f"#/components/schemas/{name}"})) + one_of_list.append( + Reference(**{"$ref": f"#/components/schemas/{name}"}) + ) payloads.update(processed_payloads) m.payload["oneOf"] = one_of_list + assert m.title - reply_messages[clear_key(m.title)] = m - return Reference( - **{"$ref": f"#/components/messages/{message_name}"}, - ) + messages_target[clear_key(m.title)] = m + + return Reference(**{"$ref": message_ref}) + payloads.update(m.payload.pop(DEF_KEY, {})) - payload_name = m.payload.get("title", f"{message_name}:Payload") + + payload_name = m.payload.get("title", default_payload_title) payload_name = clear_key(payload_name) if payload_name in payloads and payloads[payload_name] != m.payload: @@ -352,67 +357,44 @@ def _resolve_reply_payloads( payloads[payload_name] = m.payload m.payload = {"$ref": f"#/components/schemas/{payload_name}"} + assert m.title - reply_messages[clear_key(m.title)] = m - return Reference( - **{"$ref": f"#/components/messages/{message_name}"}, + messages_target[clear_key(m.title)] = m + + return Reference(**{"$ref": message_ref}) + + +def _resolve_reply_payloads( + message_name: str, + m: "Message", + payloads: dict[str, Any], + reply_messages: dict[str, Any], +) -> "Reference": + message_name = clear_key(message_name) + + return _resolve_payloads_common( + m=m, + payloads=payloads, + messages_target=reply_messages, + message_ref=f"#/components/messages/{message_name}", + default_payload_title=f"{message_name}:Payload", ) def _resolve_msg_payloads( message_name: str, - m: Message, + m: "Message", channel_name: str, payloads: dict[str, Any], messages: dict[str, Any], -) -> Reference: - assert isinstance(m.payload, dict) - - m.payload = move_pydantic_refs(m.payload, DEF_KEY) - +) -> "Reference": message_name = clear_key(message_name) channel_name = clear_key(channel_name) - if DEF_KEY in m.payload: - payloads.update(m.payload.pop(DEF_KEY)) - - one_of = m.payload.get("oneOf", None) - if isinstance(one_of, dict): - one_of_list = [] - processed_payloads: dict[str, dict[str, Any]] = {} - for name, payload in one_of.items(): - # Promote nested Pydantic $defs from each payload into components/schemas - # so that referenced nested models are available globally. - if isinstance(payload, dict) and DEF_KEY in payload: - defs = payload.pop(DEF_KEY) or {} - for def_name, def_schema in defs.items(): - payloads[clear_key(def_name)] = def_schema - processed_payloads[clear_key(name)] = payload - one_of_list.append(Reference(**{"$ref": f"#/components/schemas/{name}"})) - - payloads.update(processed_payloads) - m.payload["oneOf"] = one_of_list - assert m.title - messages[clear_key(m.title)] = m - return Reference( - **{"$ref": f"#/components/messages/{channel_name}:{message_name}"}, - ) - - payloads.update(m.payload.pop(DEF_KEY, {})) - payload_name = m.payload.get("title", f"{channel_name}:{message_name}:Payload") - payload_name = clear_key(payload_name) - - if payload_name in payloads and payloads[payload_name] != m.payload: - warnings.warn( - f"Overwriting the message schema, data types have the same name: `{payload_name}`", - RuntimeWarning, - stacklevel=1, - ) - - payloads[payload_name] = m.payload - m.payload = {"$ref": f"#/components/schemas/{payload_name}"} - assert m.title - messages[clear_key(m.title)] = m - return Reference( - **{"$ref": f"#/components/messages/{channel_name}:{message_name}"}, + return _resolve_payloads_common( + m=m, + payloads=payloads, + messages_target=messages, + message_ref=f"#/components/messages/{channel_name}:{message_name}", + default_payload_title=f"{channel_name}:{message_name}:Payload", ) From bf9ce34ffdea3b293b3702f6f42d146e07652f39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=B5=D1=84=D0=B0=D0=BD=20=C2=ABGambit=C2=BB?= =?UTF-8?q?=20=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B5=D0=BD=D0=BA=D0=BE?= Date: Sun, 2 Nov 2025 13:50:46 +0300 Subject: [PATCH 9/9] chore: made reply not required parameter --- faststream/specification/asyncapi/v3_0_0/schema/operations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream/specification/asyncapi/v3_0_0/schema/operations.py b/faststream/specification/asyncapi/v3_0_0/schema/operations.py index 3bcf57e60c..7bdfe9b2f4 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/operations.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/operations.py @@ -65,7 +65,7 @@ def from_sub( messages: list[Reference], channel: Reference, operation: OperationSpec, - reply: OperationReply | None, + reply: OperationReply | None = None, ) -> Self: return cls( action=Action.RECEIVE,