diff --git a/.github/workflows/pr_tests.yaml b/.github/workflows/pr_tests.yaml index d659b1e0aa..8c6a294e01 100644 --- a/.github/workflows/pr_tests.yaml +++ b/.github/workflows/pr_tests.yaml @@ -463,6 +463,69 @@ jobs: if-no-files-found: error include-hidden-files: true + test-mqtt-smoke: + if: github.event.pull_request.draft == false + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0 + with: + persist-credentials: false + - uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4 + with: + version: "latest" + - name: Set up Python + uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0 + with: + python-version: "3.13" + - name: Install Dependencies + run: | + uv pip install --system --group optionals --group testing . + - name: Test + run: > + pytest -n auto + -vv -m "mqtt and not connected" + + test-mqtt-real: + # if: github.event.pull_request.draft == false + runs-on: ubuntu-latest + # needs: + # - test-basic + # - test-mqtt-smoke + services: + artemis: + image: apache/activemq-artemis:latest-alpine + env: + ANONYMOUS_LOGIN: "true" + ports: + - 1883:1883 + steps: + - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0 + with: + persist-credentials: false + - uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4 + with: + version: "latest" + - name: Set up Python + uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0 + with: + python-version: "3.13" + - name: Install Dependencies + run: | + uv pip install --system --group optionals --group testing . + - name: Test + run: > + pytest --cov --cov-report= + -vv -m "(slow and mqtt and connected) or (mqtt and connected)" + - name: Rename coverage file + run: mkdir coverage && mv .coverage coverage/.coverage.mqtt + - name: Store coverage files + uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 + with: + name: .coverage.mqtt + path: coverage + if-no-files-found: error + include-hidden-files: true + coverage-combine: if: github.event.pull_request.draft == false needs: diff --git a/docker-compose.yaml b/docker-compose.yaml index 6da8671730..732f8d0687 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -41,6 +41,15 @@ services: security_opt: - no-new-privileges:true + artemis: + image: apache/activemq-artemis:latest-alpine + environment: + ANONYMOUS_LOGIN: "true" + ports: + - 1883:1883 + security_opt: + - no-new-privileges:true + faststream: build: . volumes: diff --git a/faststream/exceptions.py b/faststream/exceptions.py index ffdf640891..38beccbceb 100644 --- a/faststream/exceptions.py +++ b/faststream/exceptions.py @@ -195,6 +195,11 @@ def __str__(self) -> str: pip install "faststream[nats]" """ +INSTALL_FASTSTREAM_MQTT = """ +To use MQTT with FastStream, please install dependencies:\n +pip install "faststream[mqtt]" +""" + INSTALL_UVICORN = """ To run FastStream ASGI App via CLI, please install uvicorn:\n pip install uvicorn diff --git a/faststream/mqtt/__init__.py b/faststream/mqtt/__init__.py new file mode 100644 index 0000000000..0ea0875204 --- /dev/null +++ b/faststream/mqtt/__init__.py @@ -0,0 +1,11 @@ +try: + from .broker import MQTTBroker +except ImportError as e: + if "'aiomqtt'" not in e.msg: + raise + + from faststream.exceptions import INSTALL_FASTSTREAM_MQTT + + raise ImportError(INSTALL_FASTSTREAM_MQTT) from e + +__all__ = ("MQTTBroker",) diff --git a/faststream/mqtt/broker/__init__.py b/faststream/mqtt/broker/__init__.py new file mode 100644 index 0000000000..5503800029 --- /dev/null +++ b/faststream/mqtt/broker/__init__.py @@ -0,0 +1,3 @@ +from .broker import MQTTBroker + +__all__ = ("MQTTBroker",) diff --git a/faststream/mqtt/broker/broker.py b/faststream/mqtt/broker/broker.py new file mode 100644 index 0000000000..f688cc2b5a --- /dev/null +++ b/faststream/mqtt/broker/broker.py @@ -0,0 +1,132 @@ +from collections.abc import Iterable +from typing import TYPE_CHECKING, Any, Optional + +import aiomqtt +import anyio +from fast_depends import dependency_provider +from typing_extensions import override + +from faststream._internal.basic_types import SendableMessage +from faststream._internal.broker import BrokerUsecase +from faststream._internal.constants import EMPTY +from faststream._internal.context.repository import ContextRepo +from faststream._internal.di import FastDependsConfig +from faststream.mqtt.broker.registrator import MQTTRegistrator +from faststream.mqtt.configs.broker import MQTTBrokerConfig +from faststream.mqtt.response import MQTTPublishCommand +from faststream.response import PublishType +from faststream.security import BaseSecurity +from faststream.specification.schema import BrokerSpec, Tag, TagDict + +if TYPE_CHECKING: + from types import TracebackType + + from fast_depends import Provider + from fast_depends.library.serializer import SerializerProto + + +class MQTTBroker( + MQTTRegistrator, + BrokerUsecase[aiomqtt.Message, aiomqtt.Client], +): + def __init__( + self, + *, + # mqtt broker params + hostname: str = "localhost", + port: int = 1883, + username: str | None = None, + password: str | None = None, + keepalive: int = 60, + bind_address: str = "", + bind_port: int = 0, + routers: Iterable[MQTTRegistrator] = (), + # FastDepends args + apply_types: bool = True, + serializer: Optional["SerializerProto"] = EMPTY, + provider: Optional["Provider"] = None, + context: Optional["ContextRepo"] = None, + # AsyncAPI args + security: Optional["BaseSecurity"] = None, + specification_url: str | None = None, + protocol: str | None = None, + protocol_version: str | None = "auto", + description: str | None = None, + tags: Iterable["Tag | TagDict"] = (), + ) -> None: + if specification_url is None: + specification_url = hostname + super().__init__( + config=MQTTBrokerConfig( + hostname="localhost", + port=port, + username=username, + password=password, + keepalive=keepalive, + bind_address=bind_address, + bind_port=bind_port, + extra_context={ + "broker": self, + }, + fd_config=FastDependsConfig( + use_fastdepends=apply_types, + serializer=serializer, + provider=provider or dependency_provider, + context=context or ContextRepo(), + ), + ), + specification=BrokerSpec( + url=[specification_url], + protocol=None, + protocol_version=None, + description="MQTT Broker", + tags=[], + security=None, + ), + routers=routers, + ) + + @override + async def _connect(self) -> aiomqtt.Client: + return await self.config.broker_config.connect() + + async def start(self) -> None: + await self.connect() + await super().start() + + async def stop( + self, + exc_type: type[BaseException] | None = None, + exc_val: BaseException | None = None, + exc_tb: Optional["TracebackType"] = None, + ) -> None: + await super().stop(exc_type, exc_val, exc_tb) + await self.config.broker_config.disconnect(exc_type, exc_val, exc_tb) + self._connection = None + + async def publish(self, message: "SendableMessage", topic: str) -> Any: + cmd = MQTTPublishCommand( + body=message, + destination=topic, + _publish_type=PublishType.PUBLISH, + ) + return await super()._basic_publish( + cmd, producer=self.config.broker_config.producer + ) + + async def ping(self, timeout: float | None) -> bool: + sleep_time = (timeout or 10) / 10 + ping_client = self.config.broker_config.create_client() + + with anyio.move_on_after(timeout) as cancel_scope: + while True: + if cancel_scope.cancel_called: + return False + try: + async with ping_client: + pass + except aiomqtt.MqttError: + await anyio.sleep(sleep_time) + else: + return True + return False diff --git a/faststream/mqtt/broker/registrator.py b/faststream/mqtt/broker/registrator.py new file mode 100644 index 0000000000..2fe42e677a --- /dev/null +++ b/faststream/mqtt/broker/registrator.py @@ -0,0 +1,46 @@ +from collections.abc import Iterable, Sequence +from typing import TYPE_CHECKING, Any, cast + +from aiomqtt import Message +from typing_extensions import override + +from faststream._internal.broker.registrator import Registrator +from faststream.mqtt.configs.broker import MQTTBrokerConfig +from faststream.mqtt.subscriber.factory import create_subscriber +from faststream.mqtt.subscriber.usecase import MQTTSubscriber + +if TYPE_CHECKING: + from fast_depends.dependencies import Dependant + + from faststream._internal.types import CustomCallable, SubscriberMiddleware + + +class MQTTRegistrator(Registrator[Message, MQTTBrokerConfig]): + @override + def subscriber( # type: ignore[override] + self, + topic: str, + *, + parser: "CustomCallable | None" = None, + decoder: "CustomCallable | None" = None, + dependencies: Iterable["Dependant"] = (), + middlewares: Sequence["SubscriberMiddleware[Any]"] = (), + title: str | None = None, + description: str | None = None, + include_in_schema: bool = True, + ) -> MQTTSubscriber: + subscriber = create_subscriber( + topic=topic, + config=cast("MQTTBrokerConfig", self.config), + title_=title, + description_=description, + include_in_schema=include_in_schema, + ) + super().subscriber(subscriber) + subscriber.add_call( + parser_=parser, + decoder_=decoder, + dependencies_=dependencies, + middlewares_=middlewares, + ) + return subscriber diff --git a/faststream/mqtt/configs/__init__.py b/faststream/mqtt/configs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/faststream/mqtt/configs/broker.py b/faststream/mqtt/configs/broker.py new file mode 100644 index 0000000000..b1279825e7 --- /dev/null +++ b/faststream/mqtt/configs/broker.py @@ -0,0 +1,57 @@ +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +import aiomqtt +from paho.mqtt.client import MQTT_CLEAN_START_FIRST_ONLY, CleanStartOption + +from faststream._internal.configs import BrokerConfig +from faststream.mqtt.publisher.producer import AiomqttFastProducer + +if TYPE_CHECKING: + from types import TracebackType + + +@dataclass(kw_only=True) +class MQTTBrokerConfig(BrokerConfig): + hostname: str + port: int = 1883 + username: str | None = None + password: str | None = None + keepalive: int = 60 + bind_address: str = "" + bind_port: int = 0 + clean_start: CleanStartOption = MQTT_CLEAN_START_FIRST_ONLY + __client: aiomqtt.Client | None = field(init=False, default=None) + + async def connect(self) -> aiomqtt.Client: + return await self.client.__aenter__() + + async def disconnect( + self, + exc_type: type[BaseException] | None = None, + exc_val: BaseException | None = None, + exc_tb: "TracebackType | None" = None, + ) -> None: + await self.client.__aexit__(exc_type, exc_val, exc_tb) + + @property + def client(self) -> aiomqtt.Client: + self.__client = self.__client or self.__get_client() + self.producer = AiomqttFastProducer(self.__client) + return self.__client + + def create_client(self) -> aiomqtt.Client: + return self.__get_client() + + def __get_client(self) -> aiomqtt.Client: + return aiomqtt.Client( + self.hostname, + self.port, + username=self.username, + password=self.password, + keepalive=self.keepalive, + bind_address=self.bind_address, + bind_port=self.bind_port, + clean_start=self.clean_start, + protocol=aiomqtt.ProtocolVersion.V5, + ) diff --git a/faststream/mqtt/message.py b/faststream/mqtt/message.py new file mode 100644 index 0000000000..217e4c9fea --- /dev/null +++ b/faststream/mqtt/message.py @@ -0,0 +1,11 @@ +from typing import Any + +from aiomqtt import Message + +from faststream.message import StreamMessage + + +class MQTTMessage(StreamMessage[Message]): + def __init__(self, *args: Any, topic: str, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.topic = topic diff --git a/faststream/mqtt/parser.py b/faststream/mqtt/parser.py new file mode 100644 index 0000000000..e00478094e --- /dev/null +++ b/faststream/mqtt/parser.py @@ -0,0 +1,34 @@ +from typing import TYPE_CHECKING, cast + +from faststream._internal.basic_types import DecodedMessage +from faststream._internal.constants import ContentType, ContentTypes +from faststream.message import decode_message +from faststream.mqtt.message import MQTTMessage + +if TYPE_CHECKING: + from aiomqtt import Message + + from faststream._internal.basic_types import DecodedMessage + from faststream.message import StreamMessage + + +class MQTTParser: + async def parse_message(self, message: "Message") -> MQTTMessage: + payload = message.payload + ct = ( + getattr(message.properties, "ContentType", ContentTypes.TEXT.value) # type: ignore[attr-defined] + if message.properties + else ContentTypes.TEXT.value + ) + return MQTTMessage( + raw_message=message, + body=payload, + topic=cast("str", message.topic), + content_type=ct, + ) + + async def decode_message( + self, + msg: "StreamMessage[Message]", + ) -> "DecodedMessage": + return decode_message(msg) diff --git a/faststream/mqtt/publisher/__init__.py b/faststream/mqtt/publisher/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/faststream/mqtt/publisher/producer.py b/faststream/mqtt/publisher/producer.py new file mode 100644 index 0000000000..9093ab876b --- /dev/null +++ b/faststream/mqtt/publisher/producer.py @@ -0,0 +1,50 @@ +from typing import Any + +import aiomqtt +from paho.mqtt.packettypes import PacketTypes +from paho.mqtt.properties import Properties +from typing_extensions import override + +from faststream._internal._compat import json_dumps +from faststream._internal.constants import ContentTypes +from faststream._internal.endpoint.utils import ParserComposition +from faststream._internal.producer import ProducerProto +from faststream._internal.types import CustomCallable +from faststream.exceptions import FeatureNotSupportedException +from faststream.mqtt.parser import MQTTParser +from faststream.mqtt.response import MQTTPublishCommand + + +class AiomqttFastProducer(ProducerProto[MQTTPublishCommand]): + def __init__( + self, + connection: aiomqtt.Client, + parser: "CustomCallable | None" = None, + decoder: "CustomCallable | None" = None, + ) -> None: + self._connection = connection + default = MQTTParser() + self._parser = ParserComposition(parser, default.parse_message) + self._decoder = ParserComposition(decoder, default.decode_message) + + @override + async def publish(self, cmd: "MQTTPublishCommand") -> None: + # todo support features + body = cmd.body + ct = ContentTypes.TEXT.value + if isinstance(body, (dict, list)): # TODO refactor + body = json_dumps(body) + ct = ContentTypes.JSON.value + properties = Properties(PacketTypes.PUBLISH) # type: ignore[no-untyped-call] + properties.ContentType = ct + await self._connection.publish(cmd.destination, body, properties=properties) + + @override + async def request(self, cmd: "MQTTPublishCommand") -> Any: + msg = "Request feature is not supported in mqtt" + raise FeatureNotSupportedException(msg) + + @override + async def publish_batch(self, cmd: "MQTTPublishCommand") -> Any: + msg = "Batch publishing is not implemented yet" + raise FeatureNotSupportedException(msg) diff --git a/faststream/mqtt/response.py b/faststream/mqtt/response.py new file mode 100644 index 0000000000..8c6f4e792c --- /dev/null +++ b/faststream/mqtt/response.py @@ -0,0 +1,47 @@ +from typing import TYPE_CHECKING, Any + +from typing_extensions import override + +from faststream.response import PublishCommand, PublishType +from faststream.response.response import Response + +if TYPE_CHECKING: + from faststream._internal.basic_types import SendableMessage + + +class MQTTResponse(Response): + def __init__( + self, + body: "SendableMessage", + *, + headers: dict[str, Any] | None = None, + correlation_id: str | None = None, + ) -> None: + super().__init__(body, headers=headers, correlation_id=correlation_id) + + @override + def as_publish_command(self) -> "MQTTPublishCommand": + return MQTTPublishCommand( + self.body, + headers=self.headers, + correlation_id=self.correlation_id, + _publish_type=PublishType.PUBLISH, + ) + + +class MQTTPublishCommand(PublishCommand): + @classmethod + def from_cmd( + cls, + cmd: "PublishCommand | MQTTPublishCommand", + *, + batch: bool = False, + ) -> "MQTTPublishCommand": + return cls( + cmd.body, + _publish_type=cmd.publish_type, + reply_to=cmd.reply_to, + destination=cmd.destination, + correlation_id=cmd.correlation_id, + headers=cmd.headers, + ) diff --git a/faststream/mqtt/subscriber/__init__.py b/faststream/mqtt/subscriber/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/faststream/mqtt/subscriber/config.py b/faststream/mqtt/subscriber/config.py new file mode 100644 index 0000000000..fcb3d9181d --- /dev/null +++ b/faststream/mqtt/subscriber/config.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass + +from faststream._internal.configs import ( + SubscriberSpecificationConfig, + SubscriberUsecaseConfig, +) +from faststream.middlewares import AckPolicy + + +@dataclass(kw_only=True) +class MQTTSubscriberSpecificationConfig(SubscriberSpecificationConfig): + topic: str + + +@dataclass(kw_only=True) +class MQTTSubscriberConfig(SubscriberUsecaseConfig): + topic: str + + @property + def ack_policy(self) -> AckPolicy: + # TODO: Implement ack policy logic + return AckPolicy.ACK_FIRST diff --git a/faststream/mqtt/subscriber/factory.py b/faststream/mqtt/subscriber/factory.py new file mode 100644 index 0000000000..9351b351e7 --- /dev/null +++ b/faststream/mqtt/subscriber/factory.py @@ -0,0 +1,36 @@ +from typing import Any + +from faststream._internal.endpoint.subscriber.call_item import CallsCollection +from faststream.mqtt.configs.broker import MQTTBrokerConfig +from faststream.mqtt.subscriber.config import ( + MQTTSubscriberConfig, + MQTTSubscriberSpecificationConfig, +) +from faststream.mqtt.subscriber.specification import MQTTSubscriberSpecification +from faststream.mqtt.subscriber.usecase import MQTTSubscriber + + +def create_subscriber( + *, + topic: str, + config: MQTTBrokerConfig, + # Specification args + title_: str | None, + description_: str | None, + include_in_schema: bool, +) -> MQTTSubscriber: + subscriber_config = MQTTSubscriberConfig(topic=topic, _outer_config=config) + + calls = CallsCollection[Any]() + + specification = MQTTSubscriberSpecification( + _outer_config=config, + calls=calls, + specification_config=MQTTSubscriberSpecificationConfig( + topic=topic, + title_=title_, + description_=description_, + include_in_schema=include_in_schema, + ), + ) + return MQTTSubscriber(subscriber_config, specification, calls) diff --git a/faststream/mqtt/subscriber/specification.py b/faststream/mqtt/subscriber/specification.py new file mode 100644 index 0000000000..a5b57ac6f8 --- /dev/null +++ b/faststream/mqtt/subscriber/specification.py @@ -0,0 +1,21 @@ +from faststream._internal.endpoint.subscriber import SubscriberSpecification +from faststream.mqtt.configs.broker import MQTTBrokerConfig +from faststream.mqtt.subscriber.config import MQTTSubscriberSpecificationConfig +from faststream.specification.schema import SubscriberSpec + + +class MQTTSubscriberSpecification( + SubscriberSpecification[MQTTBrokerConfig, MQTTSubscriberSpecificationConfig] +): + @property + def topics(self) -> list[str]: + return [self.config.topic] + + @property + def name(self) -> str: + if self.config.title_: + return self.config.title_ + return f"{self.config.topic}:{self.call_name}" + + def get_schema(self) -> dict[str, "SubscriberSpec"]: + raise NotImplementedError diff --git a/faststream/mqtt/subscriber/usecase.py b/faststream/mqtt/subscriber/usecase.py new file mode 100644 index 0000000000..1492532762 --- /dev/null +++ b/faststream/mqtt/subscriber/usecase.py @@ -0,0 +1,157 @@ +import asyncio +import logging +from collections.abc import AsyncIterator, Sequence +from typing import TYPE_CHECKING, Any + +import aiomqtt +from typing_extensions import override + +from faststream._internal.endpoint.publisher import PublisherProto +from faststream._internal.endpoint.subscriber import ( + SubscriberSpecification, + SubscriberUsecase, +) +from faststream._internal.endpoint.subscriber.mixins import TasksMixin +from faststream._internal.endpoint.utils import process_msg +from faststream.message import StreamMessage +from faststream.mqtt.parser import MQTTParser + +if TYPE_CHECKING: + from aiomqtt import Message + + from faststream._internal.endpoint.subscriber.call_item import CallsCollection + from faststream.mqtt.configs.broker import MQTTBrokerConfig + from faststream.mqtt.message import MQTTMessage + from faststream.mqtt.subscriber.config import ( + MQTTSubscriberConfig, + ) + + +class MQTTSubscriber(TasksMixin, SubscriberUsecase["Message"]): + _outer_config: "MQTTBrokerConfig" + + def __init__( + self, + config: "MQTTSubscriberConfig", + specification: "SubscriberSpecification[Any, Any]", + calls: "CallsCollection[Message]", + ) -> None: + parser = MQTTParser() + config.decoder = parser.decode_message + config.parser = parser.parse_message + super().__init__(config, specification, calls) + self.topic = config.topic + self.__client: aiomqtt.Client | None = None + self.__message_iterator: AsyncIterator[aiomqtt.Message] | None = None + + @override + async def start(self) -> None: + await super().start() + self.__client = self._outer_config.create_client() + await self.__client.__aenter__() + await self.__client.subscribe(self.topic) + + self._post_start() + if self.calls: + self.add_task(self.__run_consume_loop) + + @override + async def stop(self) -> None: + await super().stop() + if self.__client is not None: + await self.__client.unsubscribe(self.topic) + await self.__client.__aexit__(None, None, None) + self.__client = None + + @property + def _message_iterator(self) -> AsyncIterator[aiomqtt.Message]: + assert self.__client, "Client is not initialized" + if self.__message_iterator is None: + self.__message_iterator = self.__client.messages + return self.__message_iterator + + @override + async def __aiter__(self) -> AsyncIterator["MQTTMessage"]: # type: ignore[override] + assert not self.calls, ( + "You can't iterate over a subscriber with registered handlers." + ) + assert self.__client, "You should start subscriber at first." + + context = self._outer_config.fd_config.context + async_parser, async_decoder = self._get_parser_and_decoder() + + async for raw_message in self._message_iterator: + msg: MQTTMessage = await process_msg( # type: ignore[assignment] + msg=raw_message, + middlewares=( + m(raw_message, context=context) for m in self._broker_middlewares + ), + parser=async_parser, + decoder=async_decoder, + ) + yield msg + + @override + async def get_one( + self, + *, + timeout: float = 5.0, + no_ack: bool = True, + ) -> "MQTTMessage | None": + assert not self.calls, ( + "You can't use `get_one` method if subscriber has registered handlers." + ) + assert self.__client, "You should start subscriber at first." + + try: + raw_message = await asyncio.wait_for( + self.__client.messages.__anext__(), timeout + ) + except TimeoutError: + return None + + context = self._outer_config.fd_config.context + + async_parser, async_decoder = self._get_parser_and_decoder() + + msg: MQTTMessage | None = await process_msg( # type: ignore[assignment] + msg=raw_message, + middlewares=( + m(raw_message, context=context) for m in self._broker_middlewares + ), + parser=async_parser, + decoder=async_decoder, + ) + return msg + + def _make_response_publisher( + self, + message: "StreamMessage[Any]", + ) -> Sequence["PublisherProto"]: + raise NotImplementedError + + async def get_msg(self) -> aiomqtt.Message: + return await self._message_iterator.__anext__() + + def get_log_context( + self, + message: StreamMessage[aiomqtt.Message] | None, + ) -> dict[str, str]: + topic = self.topic if message is None else message.raw_message.topic.value + return { + "topic": topic, + "message_id": "" if message is None else str(message.raw_message.mid), + } + + async def __run_consume_loop(self) -> None: + # simple implementation + while self.running: + try: + msg = await self.get_msg() + except aiomqtt.MqttError as e: # noqa: PERF203 + self._log(logging.ERROR, "MQTT error occurred", exc_info=e) + # TODO now it breaks + raise + else: + if msg: + await self.consume(msg) diff --git a/pyproject.toml b/pyproject.toml index f4f6b59c37..6358a43e89 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,8 @@ nats = ["nats-py>=2.12.0,<=3.0.0"] redis = ["redis>=5.0.0,<8.0.0"] +mqtt = ["aiomqtt>=2.4.0"] + otel = ["opentelemetry-sdk>=1.24.0,<2.0.0"] cli = [ @@ -86,7 +88,7 @@ cli = [ prometheus = ["prometheus-client>=0.20.0,<0.30.0"] [dependency-groups] -optionals = ["faststream[rabbit,kafka,confluent,nats,redis,otel,cli,prometheus]"] +optionals = ["faststream[rabbit,kafka,confluent,nats,redis,mqtt,otel,cli,prometheus]"] docs = [ "mkdocs-material==9.7.0", @@ -203,6 +205,7 @@ markers = [ "confluent", "nats", "redis", + "mqtt", "slow", "connected", "all", diff --git a/tests/brokers/mqtt/__init__.py b/tests/brokers/mqtt/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/brokers/mqtt/basic.py b/tests/brokers/mqtt/basic.py new file mode 100644 index 0000000000..b923ee3677 --- /dev/null +++ b/tests/brokers/mqtt/basic.py @@ -0,0 +1,25 @@ +from typing import Any + +from faststream.mqtt import MQTTBroker +from tests.brokers.base.basic import BaseTestcaseConfig + + +class MQTTTestcaseConfig(BaseTestcaseConfig): + def get_broker( + self, + apply_types: bool = False, + **kwargs: Any, + ) -> MQTTBroker: + return MQTTBroker(apply_types=apply_types, **kwargs) + + def patch_broker(self, broker: MQTTBroker, **kwargs: Any) -> MQTTBroker: + return broker + + # TODO + # def get_router(self, **kwargs: Any) -> KafkaRouter: + # return MQTTRouter(**kwargs) + + +# class KafkaMemoryTestcaseConfig(KafkaTestcaseConfig): +# def patch_broker(self, broker: KafkaBroker, **kwargs: Any) -> KafkaBroker: +# return TestKafkaBroker(broker, **kwargs) diff --git a/tests/brokers/mqtt/conftest.py b/tests/brokers/mqtt/conftest.py new file mode 100644 index 0000000000..6b61faaa5c --- /dev/null +++ b/tests/brokers/mqtt/conftest.py @@ -0,0 +1,12 @@ +from dataclasses import dataclass + +import pytest + + +@dataclass +class Settings: ... + + +@pytest.fixture(scope="session") +def settings() -> Settings: + return Settings() diff --git a/tests/brokers/mqtt/test_connect.py b/tests/brokers/mqtt/test_connect.py new file mode 100644 index 0000000000..f49a4659a8 --- /dev/null +++ b/tests/brokers/mqtt/test_connect.py @@ -0,0 +1,10 @@ +import pytest + +from faststream.mqtt import MQTTBroker +from tests.brokers.base.connection import BrokerConnectionTestcase + + +@pytest.mark.mqtt() +@pytest.mark.connected() +class TestConnection(BrokerConnectionTestcase): + broker = MQTTBroker diff --git a/tests/brokers/mqtt/test_consume.py b/tests/brokers/mqtt/test_consume.py new file mode 100644 index 0000000000..17fd7c2a88 --- /dev/null +++ b/tests/brokers/mqtt/test_consume.py @@ -0,0 +1,10 @@ +import pytest + +from tests.brokers.base.consume import BrokerRealConsumeTestcase + +from .basic import MQTTTestcaseConfig + + +@pytest.mark.mqtt() +@pytest.mark.connected() +class TestConsume(MQTTTestcaseConfig, BrokerRealConsumeTestcase): ... diff --git a/uv.lock b/uv.lock index dd694c39dd..2dda3bef6f 100644 --- a/uv.lock +++ b/uv.lock @@ -58,6 +58,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bf/0d/4cb57231ff650a01123a09075bf098d8fdaf94b15a1a58465066b2251e8b/aiokafka-0.12.0-cp313-cp313-win_amd64.whl", hash = "sha256:bdc0a83eb386d2384325d6571f8ef65b4cfa205f8d1c16d7863e8d10cacd995a", size = 363194, upload-time = "2024-10-26T20:52:59.434Z" }, ] +[[package]] +name = "aiomqtt" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "paho-mqtt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/45/9a/863bc34c64bc4acb9720a9950bfc77d6f324640cdf1f420bb5d9ee624975/aiomqtt-2.4.0.tar.gz", hash = "sha256:ab0f18fc5b7ffaa57451c407417d674db837b00a9c7d953cccd02be64f046c17", size = 82718, upload-time = "2025-05-03T20:21:27.748Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/98/0c/2720665998d97d3a9521c03b138a22247e035ba54c4738e934da33c68699/aiomqtt-2.4.0-py3-none-any.whl", hash = "sha256:721296e2b79df5f6c7c4dfc91700ae0166953a4127735c92637859619dbd84e4", size = 15908, upload-time = "2025-05-03T20:21:26.337Z" }, +] + [[package]] name = "aiormq" version = "6.8.1" @@ -766,6 +778,9 @@ confluent = [ kafka = [ { name = "aiokafka" }, ] +mqtt = [ + { name = "aiomqtt" }, +] nats = [ { name = "nats-py" }, ] @@ -792,7 +807,7 @@ dev = [ { name = "dirty-equals" }, { name = "email-validator" }, { name = "fastapi" }, - { name = "faststream", extra = ["cli", "confluent", "kafka", "nats", "otel", "prometheus", "rabbit", "redis"] }, + { name = "faststream", extra = ["cli", "confluent", "kafka", "mqtt", "nats", "otel", "prometheus", "rabbit", "redis"] }, { name = "httpx" }, { name = "mdx-include" }, { name = "mike" }, @@ -863,7 +878,7 @@ lint = [ { name = "zizmor" }, ] optionals = [ - { name = "faststream", extra = ["cli", "confluent", "kafka", "nats", "otel", "prometheus", "rabbit", "redis"] }, + { name = "faststream", extra = ["cli", "confluent", "kafka", "mqtt", "nats", "otel", "prometheus", "rabbit", "redis"] }, ] test-core = [ { name = "covdefaults" }, @@ -902,6 +917,7 @@ testing = [ requires-dist = [ { name = "aio-pika", marker = "extra == 'rabbit'", specifier = ">=9,<10" }, { name = "aiokafka", marker = "extra == 'kafka'", specifier = ">=0.9,<0.13" }, + { name = "aiomqtt", marker = "extra == 'mqtt'", specifier = ">=2.4.0" }, { name = "anyio", specifier = ">=4.0,<5" }, { name = "confluent-kafka", marker = "python_full_version >= '3.13' and extra == 'confluent'", specifier = ">=2.6,!=2.8.1,<3" }, { name = "confluent-kafka", marker = "python_full_version < '3.13' and extra == 'confluent'", specifier = ">=2,!=2.8.1,<3" }, @@ -914,7 +930,7 @@ requires-dist = [ { name = "typing-extensions", specifier = ">=4.12.0" }, { name = "watchfiles", marker = "extra == 'cli'", specifier = ">=0.15.0,<1.2.0" }, ] -provides-extras = ["rabbit", "kafka", "confluent", "nats", "redis", "otel", "cli", "prometheus"] +provides-extras = ["rabbit", "kafka", "confluent", "nats", "redis", "mqtt", "otel", "cli", "prometheus"] [package.metadata.requires-dev] dev = [ @@ -926,7 +942,7 @@ dev = [ { name = "dirty-equals", specifier = "==0.11" }, { name = "email-validator", specifier = "==2.3.0" }, { name = "fastapi", specifier = "==0.123.0" }, - { name = "faststream", extras = ["rabbit", "kafka", "confluent", "nats", "redis", "otel", "cli", "prometheus"] }, + { name = "faststream", extras = ["rabbit", "kafka", "confluent", "nats", "redis", "mqtt", "otel", "cli", "prometheus"] }, { name = "httpx", specifier = "==0.28.1" }, { name = "mdx-include", specifier = "==1.4.2" }, { name = "mike", specifier = "==2.1.3" }, @@ -997,7 +1013,7 @@ lint = [ { name = "types-setuptools" }, { name = "zizmor", specifier = "==1.18.0" }, ] -optionals = [{ name = "faststream", extras = ["rabbit", "kafka", "confluent", "nats", "redis", "otel", "cli", "prometheus"] }] +optionals = [{ name = "faststream", extras = ["rabbit", "kafka", "confluent", "nats", "redis", "mqtt", "otel", "cli", "prometheus"] }] test-core = [ { name = "covdefaults", specifier = ">=2.3.0" }, { name = "dirty-equals", specifier = "==0.11" }, @@ -2070,6 +2086,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/90/96/04b8e52da071d28f5e21a805b19cb9390aa17a47462ac87f5e2696b9566d/paginate-0.5.7-py2.py3-none-any.whl", hash = "sha256:b885e2af73abcf01d9559fd5216b57ef722f8c42affbb63942377668e35c7591", size = 13746, upload-time = "2024-08-25T14:17:22.55Z" }, ] +[[package]] +name = "paho-mqtt" +version = "2.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/39/15/0a6214e76d4d32e7f663b109cf71fb22561c2be0f701d67f93950cd40542/paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834", size = 148848, upload-time = "2024-04-29T19:52:55.591Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c4/cb/00451c3cf31790287768bb12c6bec834f5d292eaf3022afc88e14b8afc94/paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee", size = 67219, upload-time = "2024-04-29T19:52:48.345Z" }, +] + [[package]] name = "pamqp" version = "3.3.0"