Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion faststream/_internal/configs/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from faststream._internal.logger import LoggerState
from faststream._internal.producer import ProducerProto, ProducerUnset

from faststream._internal.configs.settings import SettingsContainer

if TYPE_CHECKING:
from fast_depends.dependencies import Dependant

Expand All @@ -18,7 +20,7 @@
class BrokerConfig:
prefix: str = ""
include_in_schema: bool | None = True

settings: SettingsContainer = None
broker_middlewares: Sequence["BrokerMiddleware[Any]"] = ()
broker_parser: Optional["CustomCallable"] = None
broker_decoder: Optional["CustomCallable"] = None
Expand Down
16 changes: 16 additions & 0 deletions faststream/_internal/configs/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Any


class Settings:
def __init__(self, key: str) -> None:
self.key = key


class SettingsContainer:
def __init__(self, **kwargs: Any) -> None:
self._items = dict(kwargs)

def resolve_from(self, item: Any) -> Any:
if isinstance(item, Settings):
return self._items.get(item.key)
return item
3 changes: 3 additions & 0 deletions faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import anyio
from aio_pika import IncomingMessage, RobustConnection, connect_robust
from faststream._internal.configs.settings import SettingsContainer
from typing_extensions import deprecated, override

from faststream.__about__ import SERVICE_NAME
Expand Down Expand Up @@ -107,6 +108,7 @@ def __init__(
# FastDepends args
apply_types: bool = True,
serializer: Optional["SerializerProto"] = EMPTY,
settings: SettingsContainer = EMPTY
) -> None:
"""Initialize the RabbitBroker.

Expand Down Expand Up @@ -180,6 +182,7 @@ def __init__(
# Basic args
routers=routers,
config=RabbitBrokerConfig(
settings=settings,
channel_manager=cm,
producer=producer,
declarer=declarer,
Expand Down
2 changes: 2 additions & 0 deletions faststream/rabbit/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def routing(
queue: Union["RabbitQueue", str, None] = None,
routing_key: str = "",
) -> str:
self.queue = self._outer_config.settings.resolve_from(self.queue)
if not routing_key:
if q := RabbitQueue.validate(queue):
routing_key = q.routing()
Expand All @@ -80,6 +81,7 @@ def routing(
return routing_key

async def start(self) -> None:
self.queue = self._outer_config.settings.resolve_from(self.queue)
if self.exchange is not None:
await self._outer_config.declarer.declare_exchange(self.exchange)
return await super().start()
Expand Down
14 changes: 9 additions & 5 deletions faststream/rabbit/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from faststream._internal.endpoint.subscriber import SubscriberUsecase
from faststream._internal.endpoint.utils import process_msg
from faststream._internal.configs.settings import Settings
from faststream.rabbit.parser import AioPikaParser
from faststream.rabbit.publisher.fake import RabbitFakePublisher
from faststream.rabbit.schemas import RabbitExchange
Expand Down Expand Up @@ -40,9 +41,8 @@ def __init__(
specification: "SubscriberSpecification[Any, Any]",
calls: "CallsCollection[IncomingMessage]",
) -> None:
parser = AioPikaParser(pattern=config.queue.path_regex)
config.decoder = parser.decode_message
config.parser = parser.parse_message
config.decoder = None
config.parser = None
super().__init__(
config,
specification=specification,
Expand Down Expand Up @@ -70,6 +70,10 @@ def routing(self) -> str:
@override
async def start(self) -> None:
"""Starts the consumer for the RabbitMQ queue."""
self.queue = self._outer_config.settings.resolve_from(self.queue)
parser = AioPikaParser(pattern=self.queue.path_regex)
self._decoder = parser.decode_message
self._parser = parser.parse_message
await super().start()

queue_to_bind = self.queue.add_prefix(self._outer_config.prefix)
Expand Down Expand Up @@ -214,7 +218,7 @@ def build_log_context(
exchange: Optional["RabbitExchange"] = None,
) -> dict[str, str]:
return {
"queue": queue.name,
"queue": getattr(queue, "name", ""),
"exchange": getattr(exchange, "name", ""),
"message_id": getattr(message, "message_id", ""),
}
Expand All @@ -225,6 +229,6 @@ def get_log_context(
) -> dict[str, str]:
return self.build_log_context(
message=message,
queue=self.queue,
queue=self._outer_config.settings.resolve_from(self.queue),
exchange=self.exchange,
)
Loading