Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,69 @@ jobs:
if-no-files-found: error
include-hidden-files: true

test-mqtt-smoke:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
persist-credentials: false
- uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4
with:
version: "latest"
- name: Set up Python
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.13"
- name: Install Dependencies
run: |
uv pip install --system --group optionals --group testing .
- name: Test
run: >
pytest -n auto
-vv -m "mqtt and not connected"

test-mqtt-real:
# if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
# needs:
# - test-basic
# - test-mqtt-smoke
services:
artemis:
image: apache/activemq-artemis:latest-alpine
env:
ANONYMOUS_LOGIN: "true"
ports:
- 1883:1883
steps:
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
persist-credentials: false
- uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4
with:
version: "latest"
- name: Set up Python
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.13"
- name: Install Dependencies
run: |
uv pip install --system --group optionals --group testing .
- name: Test
run: >
pytest --cov --cov-report=
-vv -m "(slow and mqtt and connected) or (mqtt and connected)"
- name: Rename coverage file
run: mkdir coverage && mv .coverage coverage/.coverage.mqtt
- name: Store coverage files
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
with:
name: .coverage.mqtt
path: coverage
if-no-files-found: error
include-hidden-files: true

coverage-combine:
if: github.event.pull_request.draft == false
needs:
Expand Down
9 changes: 9 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ services:
security_opt:
- no-new-privileges:true

artemis:
image: apache/activemq-artemis:latest-alpine
environment:
ANONYMOUS_LOGIN: "true"
ports:
- 1883:1883
security_opt:
- no-new-privileges:true

faststream:
build: .
volumes:
Expand Down
5 changes: 5 additions & 0 deletions faststream/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ def __str__(self) -> str:
pip install "faststream[nats]"
"""

INSTALL_FASTSTREAM_MQTT = """
To use MQTT with FastStream, please install dependencies:\n
pip install "faststream[mqtt]"
"""

INSTALL_UVICORN = """
To run FastStream ASGI App via CLI, please install uvicorn:\n
pip install uvicorn
Expand Down
11 changes: 11 additions & 0 deletions faststream/mqtt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
try:
from .broker import MQTTBroker
except ImportError as e:
if "'aiomqtt'" not in e.msg:
raise

from faststream.exceptions import INSTALL_FASTSTREAM_MQTT

raise ImportError(INSTALL_FASTSTREAM_MQTT) from e

__all__ = ("MQTTBroker",)
3 changes: 3 additions & 0 deletions faststream/mqtt/broker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .broker import MQTTBroker

__all__ = ("MQTTBroker",)
132 changes: 132 additions & 0 deletions faststream/mqtt/broker/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Optional

import aiomqtt
import anyio
from fast_depends import dependency_provider
from typing_extensions import override

from faststream._internal.basic_types import SendableMessage
from faststream._internal.broker import BrokerUsecase
from faststream._internal.constants import EMPTY
from faststream._internal.context.repository import ContextRepo
from faststream._internal.di import FastDependsConfig
from faststream.mqtt.broker.registrator import MQTTRegistrator
from faststream.mqtt.configs.broker import MQTTBrokerConfig
from faststream.mqtt.response import MQTTPublishCommand
from faststream.response import PublishType
from faststream.security import BaseSecurity
from faststream.specification.schema import BrokerSpec, Tag, TagDict

if TYPE_CHECKING:
from types import TracebackType

from fast_depends import Provider
from fast_depends.library.serializer import SerializerProto


class MQTTBroker(
MQTTRegistrator,
BrokerUsecase[aiomqtt.Message, aiomqtt.Client],
):
def __init__(
self,
*,
# mqtt broker params
hostname: str = "localhost",
port: int = 1883,
username: str | None = None,
password: str | None = None,
keepalive: int = 60,
bind_address: str = "",
bind_port: int = 0,
routers: Iterable[MQTTRegistrator] = (),
# FastDepends args
apply_types: bool = True,
serializer: Optional["SerializerProto"] = EMPTY,
provider: Optional["Provider"] = None,
context: Optional["ContextRepo"] = None,
# AsyncAPI args
security: Optional["BaseSecurity"] = None,
specification_url: str | None = None,
protocol: str | None = None,
protocol_version: str | None = "auto",
description: str | None = None,
tags: Iterable["Tag | TagDict"] = (),
) -> None:
if specification_url is None:
specification_url = hostname
super().__init__(
config=MQTTBrokerConfig(
hostname="localhost",
port=port,
username=username,
password=password,
keepalive=keepalive,
bind_address=bind_address,
bind_port=bind_port,
extra_context={
"broker": self,
},
fd_config=FastDependsConfig(
use_fastdepends=apply_types,
serializer=serializer,
provider=provider or dependency_provider,
context=context or ContextRepo(),
),
),
specification=BrokerSpec(
url=[specification_url],
protocol=None,
protocol_version=None,
description="MQTT Broker",
tags=[],
security=None,
),
routers=routers,
)

@override
async def _connect(self) -> aiomqtt.Client:
return await self.config.broker_config.connect()

async def start(self) -> None:
await self.connect()
await super().start()

async def stop(
self,
exc_type: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: Optional["TracebackType"] = None,
) -> None:
await super().stop(exc_type, exc_val, exc_tb)
await self.config.broker_config.disconnect(exc_type, exc_val, exc_tb)
self._connection = None

async def publish(self, message: "SendableMessage", topic: str) -> Any:
cmd = MQTTPublishCommand(
body=message,
destination=topic,
_publish_type=PublishType.PUBLISH,
)
return await super()._basic_publish(
cmd, producer=self.config.broker_config.producer
)

async def ping(self, timeout: float | None) -> bool:
sleep_time = (timeout or 10) / 10
ping_client = self.config.broker_config.create_client()

with anyio.move_on_after(timeout) as cancel_scope:
while True:
if cancel_scope.cancel_called:
return False
try:
async with ping_client:
pass
except aiomqtt.MqttError:
await anyio.sleep(sleep_time)
else:
return True
return False
46 changes: 46 additions & 0 deletions faststream/mqtt/broker/registrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING, Any, cast

from aiomqtt import Message
from typing_extensions import override

from faststream._internal.broker.registrator import Registrator
from faststream.mqtt.configs.broker import MQTTBrokerConfig
from faststream.mqtt.subscriber.factory import create_subscriber
from faststream.mqtt.subscriber.usecase import MQTTSubscriber

if TYPE_CHECKING:
from fast_depends.dependencies import Dependant

from faststream._internal.types import CustomCallable, SubscriberMiddleware


class MQTTRegistrator(Registrator[Message, MQTTBrokerConfig]):
@override
def subscriber( # type: ignore[override]
self,
topic: str,
*,
parser: "CustomCallable | None" = None,
decoder: "CustomCallable | None" = None,
dependencies: Iterable["Dependant"] = (),
middlewares: Sequence["SubscriberMiddleware[Any]"] = (),
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> MQTTSubscriber:
subscriber = create_subscriber(
topic=topic,
config=cast("MQTTBrokerConfig", self.config),
title_=title,
description_=description,
include_in_schema=include_in_schema,
)
super().subscriber(subscriber)
subscriber.add_call(
parser_=parser,
decoder_=decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
return subscriber
Empty file.
57 changes: 57 additions & 0 deletions faststream/mqtt/configs/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

import aiomqtt
from paho.mqtt.client import MQTT_CLEAN_START_FIRST_ONLY, CleanStartOption

from faststream._internal.configs import BrokerConfig
from faststream.mqtt.publisher.producer import AiomqttFastProducer

if TYPE_CHECKING:
from types import TracebackType


@dataclass(kw_only=True)
class MQTTBrokerConfig(BrokerConfig):
hostname: str
port: int = 1883
username: str | None = None
password: str | None = None
keepalive: int = 60
bind_address: str = ""
bind_port: int = 0
clean_start: CleanStartOption = MQTT_CLEAN_START_FIRST_ONLY
__client: aiomqtt.Client | None = field(init=False, default=None)

async def connect(self) -> aiomqtt.Client:
return await self.client.__aenter__()

async def disconnect(
self,
exc_type: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: "TracebackType | None" = None,
) -> None:
await self.client.__aexit__(exc_type, exc_val, exc_tb)

@property
def client(self) -> aiomqtt.Client:
self.__client = self.__client or self.__get_client()
self.producer = AiomqttFastProducer(self.__client)
return self.__client

def create_client(self) -> aiomqtt.Client:
return self.__get_client()

def __get_client(self) -> aiomqtt.Client:
return aiomqtt.Client(
self.hostname,
self.port,
username=self.username,
password=self.password,
keepalive=self.keepalive,
bind_address=self.bind_address,
bind_port=self.bind_port,
clean_start=self.clean_start,
protocol=aiomqtt.ProtocolVersion.V5,
)
11 changes: 11 additions & 0 deletions faststream/mqtt/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Any

from aiomqtt import Message

from faststream.message import StreamMessage


class MQTTMessage(StreamMessage[Message]):
def __init__(self, *args: Any, topic: str, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.topic = topic
34 changes: 34 additions & 0 deletions faststream/mqtt/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import TYPE_CHECKING, cast

from faststream._internal.basic_types import DecodedMessage
from faststream._internal.constants import ContentType, ContentTypes
from faststream.message import decode_message
from faststream.mqtt.message import MQTTMessage

if TYPE_CHECKING:
from aiomqtt import Message

from faststream._internal.basic_types import DecodedMessage
from faststream.message import StreamMessage


class MQTTParser:
async def parse_message(self, message: "Message") -> MQTTMessage:
payload = message.payload
ct = (
getattr(message.properties, "ContentType", ContentTypes.TEXT.value) # type: ignore[attr-defined]
if message.properties
else ContentTypes.TEXT.value
)
return MQTTMessage(
raw_message=message,
body=payload,
topic=cast("str", message.topic),
content_type=ct,
)

async def decode_message(
self,
msg: "StreamMessage[Message]",
) -> "DecodedMessage":
return decode_message(msg)
Empty file.
Loading
Loading