Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,25 @@ services:
security_opt:
- no-new-privileges:true

postgres:
image: postgres:latest
environment:
POSTGRES_DB: broker
POSTGRES_USER: broker
POSTGRES_PASSWORD: brokerpass
ports:
- "5432:5432"

mysql:
image: mysql:latest
environment:
MYSQL_DATABASE: broker
MYSQL_USER: broker
MYSQL_PASSWORD: brokerpass
MYSQL_ROOT_PASSWORD: brokerpass
ports:
- "3306:3306"

faststream:
build: .
volumes:
Expand Down
4 changes: 2 additions & 2 deletions faststream/_internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@


class ProducerProto(Protocol[PublishCommandType_contra]):
_parser: "AsyncCallable"
_decoder: "AsyncCallable"
# _parser: "AsyncCallable"
# _decoder: "AsyncCallable"

@abstractmethod
async def publish(self, cmd: "PublishCommandType_contra") -> Any:
Expand Down
Empty file added faststream/sqla/__init__.py
Empty file.
15 changes: 15 additions & 0 deletions faststream/sqla/annotations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Annotated

from faststream._internal.context import Context
from faststream.annotations import ContextRepo, Logger
from faststream.params import NoCast
from faststream.sqla.message import SqlaMessage as SM
from faststream.sqla.broker.broker import SqlaBroker as SB

__all__ = (
"SqlaMessage",
"SqlaBroker",
)

SqlaMessage = Annotated[SM, Context("message")]
SqlaBroker = Annotated[SB, Context("broker")]
Empty file.
111 changes: 111 additions & 0 deletions faststream/sqla/broker/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from datetime import datetime
import logging
from typing import Any, Iterable, Optional, Union, override
from fast_depends import Provider, dependency_provider
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
from faststream._internal.context.repository import ContextRepo
from fast_depends.library.serializer import SerializerProto
from faststream._internal.basic_types import LoggerProto, SendableMessage
from faststream._internal.broker import BrokerUsecase
from faststream._internal.constants import EMPTY
from faststream._internal.di.config import FastDependsConfig
from faststream.security import BaseSecurity
from faststream.specification.schema.broker import BrokerSpec
from faststream.specification.schema.extra.tag import Tag, TagDict
from faststream.sqla.broker.registrator import SqlaRegistrator
from faststream.sqla.configs.broker import SqlaBrokerConfig
from faststream.sqla.broker.logging import make_sqla_logger_state
from faststream.sqla.publisher.producer import SqlaProducer
from faststream.sqla.response import SqlaPublishCommand


class SqlaBroker(
SqlaRegistrator,
BrokerUsecase[
Any,
Any,
],
):
url: list[str]

def __init__(
self,
*,
engine: AsyncEngine,
# broker base args
routers: Iterable[SqlaRegistrator] = (),
# AsyncAPI args
security: Optional["BaseSecurity"] = None,
specification_url: str | Iterable[str] | None = None,
protocol: str | None = None,
protocol_version: str | None = "auto",
description: str | None = None,
tags: Iterable[Union["Tag", "TagDict"]] = (),
# logging args
logger: Optional["LoggerProto"] = EMPTY,
log_level: int = logging.INFO,
# FastDepends args
# apply_types: bool = True,
# serializer: Optional["SerializerProto"] = EMPTY,
# provider: Optional["Provider"] = None,
# context: Optional["ContextRepo"] = None,
) -> None:

super().__init__(
routers=routers,
config=SqlaBrokerConfig(
producer=SqlaProducer(
engine=engine,
),
logger=make_sqla_logger_state(
logger=logger,
log_level=log_level,
),
# fd_config=FastDependsConfig(
# use_fastdepends=apply_types,
# serializer=serializer,
# provider=provider or dependency_provider,
# context=context or ContextRepo(),
# ),
extra_context={
"broker": self,
},
),
specification=BrokerSpec(
description=description,
url=specification_url,
protocol=protocol,
protocol_version=protocol_version,
security=security,
tags=tags,
),
)

async def _connect(self) -> Any:
return True

async def start(self) -> None:
await self.connect()
await super().start()

@override
async def publish(
self,
message: "SendableMessage",
*,
queue: str,
next_attempt_at: datetime | None = None,
connection: AsyncConnection | None = None,
) -> None:
"""
Args:
next_attempt_at: datetime with timezone
"""
cmd = SqlaPublishCommand(
message=message,
queue=queue,
next_attempt_at=next_attempt_at,
connection=connection,
)

return await super()._basic_publish(cmd, producer=self.config.producer)
50 changes: 50 additions & 0 deletions faststream/sqla/broker/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging
from functools import partial
from typing import TYPE_CHECKING, Any

from faststream._internal.logger import DefaultLoggerStorage, make_logger_state
from faststream._internal.logger.logging import get_broker_logger

if TYPE_CHECKING:
from faststream._internal.basic_types import LoggerProto
from faststream._internal.context import ContextRepo


class SqlaParamsStorage(DefaultLoggerStorage):
def __init__(self) -> None:
super().__init__()

self.logger_log_level = logging.INFO

def set_level(self, level: int) -> None:
self.logger_log_level = level

def register_subscriber(self, params: dict[str, Any]) -> None:
return

def get_logger(self, *, context: "ContextRepo") -> "LoggerProto":
message_id_ln = 10

# TODO: generate unique logger names to not share between brokers
if not (lg := self._get_logger_ref()):
lg = get_broker_logger(
name="sqla",
default_context={},
message_id_ln=message_id_ln,
fmt="".join((
"%(asctime)s %(levelname)-8s - ",
f"%(message_id)-{message_id_ln}s ",
"- %(message)s",
)),
context=context,
log_level=self.logger_log_level,
)
self._logger_ref.add(lg)

return lg


make_sqla_logger_state = partial(
make_logger_state,
default_storage_cls=SqlaParamsStorage,
)
Loading