diff --git a/docker-compose.yaml b/docker-compose.yaml index 6da8671730..7a5c533044 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -41,6 +41,25 @@ services: security_opt: - no-new-privileges:true + postgres: + image: postgres:latest + environment: + POSTGRES_DB: broker + POSTGRES_USER: broker + POSTGRES_PASSWORD: brokerpass + ports: + - "5432:5432" + + mysql: + image: mysql:latest + environment: + MYSQL_DATABASE: broker + MYSQL_USER: broker + MYSQL_PASSWORD: brokerpass + MYSQL_ROOT_PASSWORD: brokerpass + ports: + - "3306:3306" + faststream: build: . volumes: diff --git a/faststream/_internal/producer.py b/faststream/_internal/producer.py index 35558c5937..cfa615d6c0 100644 --- a/faststream/_internal/producer.py +++ b/faststream/_internal/producer.py @@ -10,8 +10,8 @@ class ProducerProto(Protocol[PublishCommandType_contra]): - _parser: "AsyncCallable" - _decoder: "AsyncCallable" + # _parser: "AsyncCallable" + # _decoder: "AsyncCallable" @abstractmethod async def publish(self, cmd: "PublishCommandType_contra") -> Any: diff --git a/faststream/sqla/__init__.py b/faststream/sqla/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/faststream/sqla/annotations.py b/faststream/sqla/annotations.py new file mode 100644 index 0000000000..ff710bf18f --- /dev/null +++ b/faststream/sqla/annotations.py @@ -0,0 +1,15 @@ +from typing import Annotated + +from faststream._internal.context import Context +from faststream.annotations import ContextRepo, Logger +from faststream.params import NoCast +from faststream.sqla.message import SqlaMessage as SM +from faststream.sqla.broker.broker import SqlaBroker as SB + +__all__ = ( + "SqlaMessage", + "SqlaBroker", +) + +SqlaMessage = Annotated[SM, Context("message")] +SqlaBroker = Annotated[SB, Context("broker")] \ No newline at end of file diff --git a/faststream/sqla/broker/__init__.py b/faststream/sqla/broker/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/faststream/sqla/broker/broker.py b/faststream/sqla/broker/broker.py new file mode 100644 index 0000000000..d27a3ab899 --- /dev/null +++ b/faststream/sqla/broker/broker.py @@ -0,0 +1,111 @@ +from datetime import datetime +import logging +from typing import Any, Iterable, Optional, Union, override +from fast_depends import Provider, dependency_provider +from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine +from faststream._internal.context.repository import ContextRepo +from fast_depends.library.serializer import SerializerProto +from faststream._internal.basic_types import LoggerProto, SendableMessage +from faststream._internal.broker import BrokerUsecase +from faststream._internal.constants import EMPTY +from faststream._internal.di.config import FastDependsConfig +from faststream.security import BaseSecurity +from faststream.specification.schema.broker import BrokerSpec +from faststream.specification.schema.extra.tag import Tag, TagDict +from faststream.sqla.broker.registrator import SqlaRegistrator +from faststream.sqla.configs.broker import SqlaBrokerConfig +from faststream.sqla.broker.logging import make_sqla_logger_state +from faststream.sqla.publisher.producer import SqlaProducer +from faststream.sqla.response import SqlaPublishCommand + + +class SqlaBroker( + SqlaRegistrator, + BrokerUsecase[ + Any, + Any, + ], +): + url: list[str] + + def __init__( + self, + *, + engine: AsyncEngine, + # broker base args + routers: Iterable[SqlaRegistrator] = (), + # AsyncAPI args + security: Optional["BaseSecurity"] = None, + specification_url: str | Iterable[str] | None = None, + protocol: str | None = None, + protocol_version: str | None = "auto", + description: str | None = None, + tags: Iterable[Union["Tag", "TagDict"]] = (), + # logging args + logger: Optional["LoggerProto"] = EMPTY, + log_level: int = logging.INFO, + # FastDepends args + # apply_types: bool = True, + # serializer: Optional["SerializerProto"] = EMPTY, + # provider: Optional["Provider"] = None, + # context: Optional["ContextRepo"] = None, + ) -> None: + + super().__init__( + routers=routers, + config=SqlaBrokerConfig( + producer=SqlaProducer( + engine=engine, + ), + logger=make_sqla_logger_state( + logger=logger, + log_level=log_level, + ), + # fd_config=FastDependsConfig( + # use_fastdepends=apply_types, + # serializer=serializer, + # provider=provider or dependency_provider, + # context=context or ContextRepo(), + # ), + extra_context={ + "broker": self, + }, + ), + specification=BrokerSpec( + description=description, + url=specification_url, + protocol=protocol, + protocol_version=protocol_version, + security=security, + tags=tags, + ), + ) + + async def _connect(self) -> Any: + return True + + async def start(self) -> None: + await self.connect() + await super().start() + + @override + async def publish( + self, + message: "SendableMessage", + *, + queue: str, + next_attempt_at: datetime | None = None, + connection: AsyncConnection | None = None, + ) -> None: + """ + Args: + next_attempt_at: datetime with timezone + """ + cmd = SqlaPublishCommand( + message=message, + queue=queue, + next_attempt_at=next_attempt_at, + connection=connection, + ) + + return await super()._basic_publish(cmd, producer=self.config.producer) \ No newline at end of file diff --git a/faststream/sqla/broker/logging.py b/faststream/sqla/broker/logging.py new file mode 100644 index 0000000000..574eabfc5f --- /dev/null +++ b/faststream/sqla/broker/logging.py @@ -0,0 +1,50 @@ +import logging +from functools import partial +from typing import TYPE_CHECKING, Any + +from faststream._internal.logger import DefaultLoggerStorage, make_logger_state +from faststream._internal.logger.logging import get_broker_logger + +if TYPE_CHECKING: + from faststream._internal.basic_types import LoggerProto + from faststream._internal.context import ContextRepo + + +class SqlaParamsStorage(DefaultLoggerStorage): + def __init__(self) -> None: + super().__init__() + + self.logger_log_level = logging.INFO + + def set_level(self, level: int) -> None: + self.logger_log_level = level + + def register_subscriber(self, params: dict[str, Any]) -> None: + return + + def get_logger(self, *, context: "ContextRepo") -> "LoggerProto": + message_id_ln = 10 + + # TODO: generate unique logger names to not share between brokers + if not (lg := self._get_logger_ref()): + lg = get_broker_logger( + name="sqla", + default_context={}, + message_id_ln=message_id_ln, + fmt="".join(( + "%(asctime)s %(levelname)-8s - ", + f"%(message_id)-{message_id_ln}s ", + "- %(message)s", + )), + context=context, + log_level=self.logger_log_level, + ) + self._logger_ref.add(lg) + + return lg + + +make_sqla_logger_state = partial( + make_logger_state, + default_storage_cls=SqlaParamsStorage, +) diff --git a/faststream/sqla/broker/registrator.py b/faststream/sqla/broker/registrator.py new file mode 100644 index 0000000000..fbcc2c6a05 --- /dev/null +++ b/faststream/sqla/broker/registrator.py @@ -0,0 +1,276 @@ +from typing import Annotated, Any, Sequence, cast, override +from faststream.sqla.publisher.usecase import LogicPublisher +from typing_extensions import deprecated + +from faststream._internal.types import PublisherMiddleware +from faststream.sqla.publisher.factory import create_publisher +from sqlalchemy.ext.asyncio import AsyncEngine +from faststream._internal.broker.registrator import Registrator +from faststream._internal.constants import EMPTY +from faststream.middlewares.acknowledgement.config import AckPolicy +from faststream.sqla.configs.broker import SqlaBrokerConfig +from faststream.sqla.subscriber.factory import create_subscriber +from faststream.sqla.retry import RetryStrategyProto + + +class SqlaRegistrator(Registrator[Any, Any]): + def subscriber( + self, + queues: list[str], + *, + engine: AsyncEngine, + max_workers: int, + retry_strategy: RetryStrategyProto, + max_fetch_interval: float, + min_fetch_interval: float, + fetch_batch_size: int, + overfetch_factor: float, + flush_interval: float, + release_stuck_interval: float, + graceful_shutdown_timeout: float, + release_stuck_timeout: int, + ack_policy: AckPolicy = AckPolicy.NACK_ON_ERROR, + ) -> Any: + """ + Args: + max_workers: + Number of workers to process messages concurrently. + min_fetch_interval: + The minimum allowed interval between consecutive fetches. The + minimum interval is used if the last fetch returned the same number + of messages as the fetch's limit. + max_fetch_interval: + The maximum allowed interval between consecutive fetches. The + maximum interval is used if the last fetch returned fewer messages + than the fetch's limit. + fetch_batch_size: + The maximum allowed number of messages to fetch in a single batch. + A fetch's actual limit might be lower if the free capacity of the + acquired-but-not-yet-in-processing buffer is smaller. + overfetch_factor: + The factor by which the fetch_batch_size is multiplied to determine + the capacity of the acquired-but-not-yet-in-processing buffer. + flush_interval: + The interval at which the state of messages for which the processing + attempt has been completed or aborted is flushed to the database. + release_stuck_interval: + The interval at which the PROCESSING-state messages are marked back + as PENDING if the release_stuck_timeout since acquired_at has passed. + + Flow: + On start, the subscriber spawns four types of concurrent loops: + + 1. Fetch loop: + Periodically fetches PENDING or RETRYABLE messages from the database, + simultaneously updating them in the database: marking as PROCESSING, + setting acquired_at to now, and incrementing attempts_count. Only + messages with next_attempt_at <= now are fetched, ordered by + next_attempt_at. The fetched messages are placed into an internal + queue. The fetch limit is the minimum of fetch_batch_size and the + free buffer capacity (fetch_batch_size * overfetch_factor minus + currently queued messages). If the last fetch was "full" (returned + as many messages as the limit), the next fetch happens after + min_fetch_interval; otherwise after max_fetch_interval. + + 2. Worker loops (max_workers instances): + Each worker takes a message from the internal queue and checks if + the attempt is allowed by the retry_strategy. If allowed, the message + is processed, if not, Reject'ed. Depending on the processing result, + AckPolicy, and manual Ack/Nack/Reject, the message is Ack'ed, Nack'ed, + or Reject'ed. For Nack'ed messages the retry_strategy is consulted to + determine if and when the message might be retried. If allowed to be + retried, the message is marked as RETRYABLE, otherwise as FAILED. + Ack'ed messages are marked as COMPLETED and Reject'ed messages are + marked as FAILED. The message is then buffered for flushing. + + 3. Flush loop: + Periodically flushes the buffered message state changes to the + database. COMPLETED and FAILED messages are moved from the primary + table to the archive table. The state of RETRYABLE messages is + updated in the primary table. + + 4. Release stuck loop: + Periodically releases messages that have been stuck in PROCESSING + state for longer than release_stuck_timeout since acquired_at. These + messages are marked back as PENDING. + + On stop, all loops are gracefully stopped. Messages that have been + acquired but are not yet being processed are drained from the internal + queue and marked back as PENDING. The subscriber waits for all tasks to + complete within graceful_shutdown_timeout, then performs a final flush. + + Notes: + This design allows for work sharing between processes/nodes because + "select for update skip locked" is utilized. + + This design adheres to the "at least once" processing guarantee because + flushing changes to the database happens only after a processing + attempt. Messages might be processed more times than allowed by the + retry_strategy if, among other things, the flush doesn't happen due to + crash or failure after a message is processed. + + This design handles the poison message problem (messages that crash the + worker without the ability to catch the exception due to e.g. OOM + terminations) because attempts_count is incremented and retry_strategy + is consulted with prior to processing attempt. + + SQL queries: + Fetch: + WITH ready AS + (SELECT message.id AS id, + message.queue AS queue, + message.payload AS payload, + message.state AS state, + message.attempts_count AS attempts_count, + message.created_at AS created_at, + message.first_attempt_at AS first_attempt_at, + message.next_attempt_at AS next_attempt_at, + message.last_attempt_at AS last_attempt_at, + message.acquired_at AS acquired_at + FROM message + WHERE (message.state = $3::sqlamessagestate + OR message.state = $4::sqlamessagestate) + AND message.next_attempt_at <= now() + AND (message.queue = $5::VARCHAR OR message.queue = $6::VARCHAR) + ORDER BY message.next_attempt_at + LIMIT $7::INTEGER + FOR UPDATE SKIP LOCKED), + updated AS + (UPDATE message + SET state=$1::sqlamessagestate, + attempts_count=(message.attempts_count + $2::SMALLINT), + acquired_at=now() + WHERE message.id IN + (SELECT ready.id + FROM ready) RETURNING message.id, + message.queue, + message.payload, + message.state, + message.attempts_count, + message.created_at, + message.first_attempt_at, + message.next_attempt_at, + message.last_attempt_at, + message.acquired_at) + SELECT updated.id, + updated.queue, + updated.payload, + updated.state, + updated.attempts_count, + updated.created_at, + updated.first_attempt_at, + updated.next_attempt_at, + updated.last_attempt_at, + updated.acquired_at + FROM updated + ORDER BY updated.next_attempt_at; + + Flush: + For RETRYABLE messages: + UPDATE message + SET state=$1::sqlamessagestate, + first_attempt_at=$2::datetime, + next_attempt_at=$3::datetime, + last_attempt_at=$4::datetime, + acquired_at=$5::datetime + WHERE message.id = $6::BIGINT; + + For COMPLETED and FAILED messages: + BEGIN; + INSERT INTO message_archive ( + id, + queue, + payload, + state, + attempts_count, + created_at, + first_attempt_at, + last_attempt_at, + archived_at + ) + VALUES ( + $1::BIGINT, + $2::VARCHAR, + $3::BYTEA, + $4::sqlamessagestate, + $5::SMALLINT, + $6::TIMESTAMP WITH TIME ZONE, + $7::TIMESTAMP WITH TIME ZONE, + $8::TIMESTAMP WITH TIME ZONE, + $9::TIMESTAMP WITH TIME ZONE + ); + DELETE + FROM message + WHERE message.id IN ($1::BIGINT); + COMMIT; + + Release stuck: + UPDATE message + SET state=$1::sqlamessagestate, + next_attempt_at=now(), + acquired_at=$2::TIMESTAMP WITH TIME ZONE + WHERE message.id IN + (SELECT message.id + FROM message + WHERE message.state = $3::sqlamessagestate + AND message.acquired_at < $4::TIMESTAMP WITH TIME ZONE) + """ + workers = max_workers or 1 + + subscriber = create_subscriber( + engine=engine, + queues=queues, + max_workers=max_workers, + retry_strategy=retry_strategy, + max_fetch_interval=max_fetch_interval, + min_fetch_interval=min_fetch_interval, + fetch_batch_size=fetch_batch_size, + overfetch_factor=overfetch_factor, + flush_interval=flush_interval, + release_stuck_interval=release_stuck_interval, + graceful_shutdown_timeout=graceful_shutdown_timeout, + release_stuck_timeout=release_stuck_timeout, + config=cast("SqlaBrokerConfig", self.config), + ack_policy=ack_policy, + ) + + super().subscriber(subscriber) + + # subscriber.add_call( + # parser_=parser, + # decoder_=decoder, + # dependencies_=dependencies, + # middlewares_=middlewares, + # ) + + return subscriber + + @override + def publisher( + self, + middlewares: Annotated[ + Sequence["PublisherMiddleware"], + deprecated( + "This option was deprecated in 0.6.0. Use router-level middlewares instead." + "Scheduled to remove in 0.7.0", + ), + ] = (), + title: str | None = None, + description: str | None = None, + schema: Any | None = None, + include_in_schema: bool = True, + ) -> "LogicPublisher": + publisher = create_publisher( + # Specific + broker_config=cast("SqlaBrokerConfig", self.config), + middlewares=middlewares, + # AsyncAPI + title_=title, + description_=description, + schema_=schema, + include_in_schema=include_in_schema, + ) + + super().publisher(publisher) + + return publisher \ No newline at end of file diff --git a/faststream/sqla/client.py b/faststream/sqla/client.py new file mode 100644 index 0000000000..4174923c8a --- /dev/null +++ b/faststream/sqla/client.py @@ -0,0 +1,336 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +import asyncio +import enum +import json +import logging +from collections.abc import Awaitable, Callable, Sequence +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +import os +import random +import signal +from time import perf_counter + +from sqlalchemy import ( + BigInteger, + Column, + DateTime, + Enum, + Index, + LargeBinary, + MetaData, + SmallInteger, + String, + Table, + bindparam, + delete, + func, + insert, + or_, + select, + text, + update, + case, +) +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine +from sqlalchemy.ext.asyncio import AsyncConnection +from faststream.exceptions import FeatureNotSupportedException, SetupError +from faststream.sqla.message import SqlaMessage, SqlaMessageState + + +metadata = MetaData() + + +message = Table( + "message", + metadata, + Column("id", BigInteger, primary_key=True), + Column("queue", String(255), nullable=False, index=True), + Column("payload", LargeBinary, nullable=False), + Column( + "state", + Enum(SqlaMessageState), + nullable=False, + index=True, + server_default=SqlaMessageState.PENDING.name, + ), + Column("attempts_count", SmallInteger, nullable=False, default=0), + Column("created_at", DateTime, nullable=False, default=lambda: datetime.now(timezone.utc).replace(tzinfo=None)), + Column("first_attempt_at", DateTime), + Column( + "next_attempt_at", + DateTime, + nullable=False, + default=lambda: datetime.now(timezone.utc).replace(tzinfo=None), + index=True, + ), + Column("last_attempt_at", DateTime), + Column("acquired_at", DateTime), +) + + +message_archive = Table( + "message_archive", + metadata, + Column("id", BigInteger, primary_key=True), + Column("queue", String(255), nullable=False, index=True), + Column("payload", LargeBinary, nullable=False), + Column("state", Enum(SqlaMessageState), nullable=False, index=True), + Column("attempts_count", SmallInteger, nullable=False), + Column("created_at", DateTime, nullable=False), + Column("first_attempt_at", DateTime), + Column("last_attempt_at", DateTime), + Column("archived_at", DateTime, nullable=False, default=lambda: datetime.now(timezone.utc).replace(tzinfo=None)), +) + + +_MESSAGE_SELECT_COLUMNS = ( + message.c.id.label("id"), + message.c.queue.label("queue"), + message.c.payload.label("payload"), + message.c.state.label("state"), + message.c.attempts_count.label("attempts_count"), + message.c.created_at.label("created_at"), + message.c.first_attempt_at.label("first_attempt_at"), + message.c.next_attempt_at.label("next_attempt_at"), + message.c.last_attempt_at.label("last_attempt_at"), + message.c.acquired_at.label("acquired_at"), +) + + +class SqlaPostgresClient: + def __init__(self, engine: AsyncEngine): + self._engine = engine + + async def enqueue( + self, + payload: bytes, + *, + queue: str, + next_attempt_at: datetime | None = None, + connection: AsyncConnection | None = None, + ) -> None: + if next_attempt_at: + stmt = insert(message).values( + queue=queue, + payload=payload, + next_attempt_at=next_attempt_at, + ) + else: + stmt = insert(message).values( + queue=queue, + payload=payload, + ) + + if connection: + await connection.execute(stmt) + else: + async with self._engine.begin() as conn: + await conn.execute(stmt) + + async def fetch( + self, + queues: list[str], + *, + limit: int, + ) -> list[SqlaMessage]: + now = datetime.now(timezone.utc).replace(tzinfo=None) + print('now', now, "++++++++++++++++++++++++++++") + ready = ( + select(*_MESSAGE_SELECT_COLUMNS) + .where( + or_( + message.c.state == SqlaMessageState.PENDING, + message.c.state == SqlaMessageState.RETRYABLE, + ), + message.c.next_attempt_at <= now, + or_(*(message.c.queue == queue for queue in queues)), + ) + .order_by(message.c.next_attempt_at) + .limit(limit) + .with_for_update(skip_locked=True) + .cte("ready") + ) + updated = ( + update(message) + .where(message.c.id.in_(select(ready.c.id))) + .values( + state=SqlaMessageState.PROCESSING, + attempts_count=message.c.attempts_count + 1, + acquired_at=now, + last_attempt_at=now, + first_attempt_at=case((message.c.attempts_count == 0, now), else_=message.c.first_attempt_at), + ) + .returning(message) + .cte("updated") + ) + stmt = select(updated).order_by(updated.c.next_attempt_at) + async with self._engine.begin() as conn: + result = await conn.execute(stmt) + return [SqlaMessage(**row) for row in result.mappings()] + + async def retry(self, messages: Sequence[SqlaMessage]) -> None: + if not messages: + return + params = [ + { + "message_id": message.id, + "state": message.state, + "first_attempt_at": message.first_attempt_at, + "next_attempt_at": message.next_attempt_at, + "last_attempt_at": message.last_attempt_at, + } + for message in messages + ] + stmt = ( + update(message) + .where(message.c.id == bindparam("message_id")) + .values( + state=bindparam("state"), + first_attempt_at=bindparam("first_attempt_at"), + next_attempt_at=bindparam("next_attempt_at"), + last_attempt_at=bindparam("last_attempt_at"), + acquired_at=None, + ) + ) + async with self._engine.begin() as conn: + print('retry', params) + await conn.execute(stmt, params) + + async def archive(self, messages: Sequence[SqlaMessage]) -> None: + if not messages: + return + async with self._engine.begin() as conn: + values = [ + { + "id": item.id, + "queue": item.queue, + "payload": item.payload, + "state": item.state, + "attempts_count": item.attempts_count, + "created_at": item.created_at, + "first_attempt_at": item.first_attempt_at, + "last_attempt_at": item.last_attempt_at, + } + for item in messages + ] + stmt = message_archive.insert().values(values) + print('archive', values) + await conn.execute(stmt) + delete_stmt = delete(message).where(message.c.id.in_([item.id for item in messages])) + print('delete', messages) + await conn.execute(delete_stmt) + + async def release_stuck(self, timeout: int) -> None: + now = datetime.now(timezone.utc).replace(tzinfo=None) + select_stuck = ( + select(message.c.id) + .where( + message.c.state == SqlaMessageState.PROCESSING, + message.c.acquired_at < now - timedelta(seconds=timeout), + ) + ) + stmt = ( + update(message) + .where(message.c.id.in_(select_stuck)) + .values( + state=SqlaMessageState.PENDING, + next_attempt_at=now, + acquired_at=None, + ) + ) + async with self._engine.begin() as conn: + await conn.execute(stmt) + + +class SqlaMySqlClient(SqlaPostgresClient): + async def fetch( + self, + queues: list[str], + *, + limit: int, + ) -> list[SqlaMessage]: + now = datetime.now(timezone.utc).replace(tzinfo=None) + + async with self._engine.begin() as conn: + ready_stmt = ( + select(message.c.id.label("id")) + .where( + or_( + message.c.state == SqlaMessageState.PENDING, + message.c.state == SqlaMessageState.RETRYABLE, + ), + message.c.next_attempt_at <= now, + or_(*(message.c.queue == queue for queue in queues)), + ) + .order_by(message.c.next_attempt_at) + .limit(limit) + .with_for_update(skip_locked=True) + ) + + ready_result = await conn.execute(ready_stmt) + ready_ids = ready_result.scalars().all() + if not ready_ids: + return [] + + update_stmt = ( + update(message) + .where(message.c.id.in_(ready_ids)) + .values( + state=SqlaMessageState.PROCESSING, + attempts_count=message.c.attempts_count + 1, + acquired_at=now, + last_attempt_at=now, + first_attempt_at=case( + (message.c.attempts_count == 0, now), + else_=message.c.first_attempt_at, + ), + ) + ) + await conn.execute(update_stmt) + + fetch_stmt = ( + select(*_MESSAGE_SELECT_COLUMNS) + .where(message.c.id.in_(ready_ids)) + ) + fetched = await conn.execute(fetch_stmt) + rows = fetched.mappings().all() + + rows_by_id = {row["id"]: row for row in rows} + ordered_rows = [rows_by_id[id_] for id_ in ready_ids if id_ in rows_by_id] + return [SqlaMessage(**row) for row in ordered_rows] + + async def release_stuck(self, timeout: int) -> None: + now = datetime.now(timezone.utc).replace(tzinfo=None) + + select_stuck = ( + select(message.c.id) + .where( + message.c.state == SqlaMessageState.PROCESSING, + message.c.acquired_at < now - timedelta(seconds=timeout), + ) + .subquery() + ) + stmt = ( + update(message) + .where(message.c.id.in_(select(select_stuck.c.id))) + .values( + state=SqlaMessageState.PENDING, + next_attempt_at=now, + acquired_at=None, + ) + ) + async with self._engine.begin() as conn: + await conn.execute(stmt) + + +def create_sqla_client(engine: AsyncEngine) -> SqlaPostgresClient: + match engine.dialect.name.lower(): + case "mysql": + return SqlaMySqlClient(engine) + case "postgresql": + return SqlaPostgresClient(engine) + case _: + raise FeatureNotSupportedException diff --git a/faststream/sqla/configs/__init__.py b/faststream/sqla/configs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/faststream/sqla/configs/broker.py b/faststream/sqla/configs/broker.py new file mode 100644 index 0000000000..eef7ed5ca7 --- /dev/null +++ b/faststream/sqla/configs/broker.py @@ -0,0 +1,7 @@ +from dataclasses import dataclass +from faststream._internal.configs.broker import BrokerConfig + + +@dataclass(kw_only=True) +class SqlaBrokerConfig(BrokerConfig): + ... \ No newline at end of file diff --git a/faststream/sqla/configs/subscriber.py b/faststream/sqla/configs/subscriber.py new file mode 100644 index 0000000000..8a16a6040b --- /dev/null +++ b/faststream/sqla/configs/subscriber.py @@ -0,0 +1,32 @@ +from dataclasses import dataclass, field + +from sqlalchemy.ext.asyncio import AsyncEngine +from faststream._internal.configs.endpoint import SubscriberUsecaseConfig +from faststream._internal.constants import EMPTY +from faststream.middlewares.acknowledgement.config import AckPolicy +from faststream.sqla.configs.broker import SqlaBrokerConfig +from faststream.sqla.retry import RetryStrategyProto + + +@dataclass(kw_only=True) +class SqlaSubscriberConfig(SubscriberUsecaseConfig): + _outer_config: "SqlaBrokerConfig" = field(default_factory=SqlaBrokerConfig) + + engine: AsyncEngine + queues: list[str] + max_workers: int + retry_strategy: RetryStrategyProto + max_fetch_interval: float + min_fetch_interval: float + fetch_batch_size: int + overfetch_factor: float + flush_interval: float + release_stuck_interval: float + graceful_shutdown_timeout: float + release_stuck_timeout: int + + @property + def ack_policy(self) -> AckPolicy: + if self._ack_policy is EMPTY: + return AckPolicy.NACK_ON_ERROR + return self._ack_policy \ No newline at end of file diff --git a/faststream/sqla/exceptions.py b/faststream/sqla/exceptions.py new file mode 100644 index 0000000000..0fc17f40d4 --- /dev/null +++ b/faststream/sqla/exceptions.py @@ -0,0 +1,6 @@ +from faststream.exceptions import FastStreamException + + +class DatetimeMissingTimezoneException(FastStreamException): + def __str__(self) -> str: + return "This requires a datetime with a non-None timezone" \ No newline at end of file diff --git a/faststream/sqla/message.py b/faststream/sqla/message.py new file mode 100644 index 0000000000..b2f0672eb1 --- /dev/null +++ b/faststream/sqla/message.py @@ -0,0 +1,125 @@ +from dataclasses import dataclass, field +from datetime import datetime, timezone +import enum +from typing import Any, Callable, Coroutine + +from faststream.message.message import StreamMessage +from faststream.sqla.retry import RetryStrategyProto + + +class SqlaMessageState(str, enum.Enum): + """ + The message starts out as PENDING. When it is acquired by a worker, it is marked as + PROCESSING. After being acquired, depending on processing result, AckPolicy, retry + strategy, and presence of manual acknowledgement, the message can be marked as + COMPLETED, FAILED, or RETRYABLE prior to or after a processing attempt. A message + that is COMPLETED or FAILED is archived and will not be processed again. A RETRYABLE + message might be retried. + """ + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + RETRYABLE = "retryable" + + +class SqlaMessage(StreamMessage): + retry_strategy: RetryStrategyProto + + def __init__( + self, + id: int, + queue: str, + state: SqlaMessageState, + payload: bytes, + attempts_count: int, + created_at: datetime, + first_attempt_at: datetime | None, + next_attempt_at: datetime | None, + last_attempt_at: datetime | None, + acquired_at: datetime | None, + ) -> None: + self.id = id + self.queue = queue + self.state = state + self.payload = payload + self.attempts_count = attempts_count + self.created_at = created_at + self.first_attempt_at = first_attempt_at + self.next_attempt_at = next_attempt_at + self.last_attempt_at = last_attempt_at + self.acquired_at = acquired_at + + self.state_locked = False + self.to_archive = False + + super().__init__(raw_message=self, body=payload) + + async def ack(self) -> None: + await self._update_state_if_not_locked(self._ack) + + async def nack(self) -> None: + await self._update_state_if_not_locked(self._nack) + + async def reject(self) -> None: + await self._update_state_if_not_locked(self._reject) + + def _mark_completed(self) -> None: + self.state = SqlaMessageState.COMPLETED + self.next_attempt_at = None + self.to_archive = True + + def _mark_retryable(self, *, next_attempt_at: datetime) -> None: + self.state = SqlaMessageState.RETRYABLE + self.next_attempt_at = next_attempt_at + + def _mark_failed(self) -> None: + self.state = SqlaMessageState.FAILED + self.next_attempt_at = None + self.to_archive = True + + def _mark_pending(self) -> None: + self.state = SqlaMessageState.PENDING + self.acquired_at = None + + def _allow_attempt(self) -> bool: + self.last_attempt_at = datetime.now(timezone.utc).replace(tzinfo=None) + print('last_attempt_at', self.last_attempt_at, "++++++++++++++++++++++++++++") + if self.attempts_count == 1: + self.first_attempt_at = self.last_attempt_at + if not self.retry_strategy.allow_attempt( + first_attempt_at=self.first_attempt_at, + attempts_count=self.attempts_count, + ): + self._mark_failed() + return False + return True + + async def _update_state_if_not_locked(self, method: Callable[[], Coroutine[Any, Any, None]]) -> None: + if self.state_locked: + return + + await method() + + self.state_locked = True + + async def _ack(self) -> None: + self._mark_completed() + + async def _nack(self) -> None: + if not ( + next_attempt_at := self.retry_strategy.get_next_attempt_at( + first_attempt_at=self.first_attempt_at, + last_attempt_at=self.last_attempt_at, + attempts_count=self.attempts_count, + ) + ): + self._mark_failed() + else: + self._mark_retryable(next_attempt_at=next_attempt_at) + + async def _reject(self) -> None: + self._mark_failed() + + def __repr__(self) -> str: # TODO + return f"SqlaMessage(id={self.id}, queue={self.queue})" diff --git a/faststream/sqla/parser.py b/faststream/sqla/parser.py new file mode 100644 index 0000000000..0371495614 --- /dev/null +++ b/faststream/sqla/parser.py @@ -0,0 +1,21 @@ +from dataclasses import dataclass +from typing import Any + +from faststream._internal.basic_types import DecodedMessage +from faststream.message.utils import decode_message + + +@dataclass +class SqlaParser: + async def parse_message( + self, + message: Any, + ) -> Any: + return message + + async def decode_message( + self, + msg: Any, + ) -> "DecodedMessage": + """Decodes a message.""" + return decode_message(msg) \ No newline at end of file diff --git a/faststream/sqla/publisher/__init__.py b/faststream/sqla/publisher/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/faststream/sqla/publisher/config.py b/faststream/sqla/publisher/config.py new file mode 100644 index 0000000000..a3ef2f3b69 --- /dev/null +++ b/faststream/sqla/publisher/config.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass, field + +from faststream._internal.configs import ( + PublisherSpecificationConfig, + PublisherUsecaseConfig, +) +from faststream.sqla.configs.broker import SqlaBrokerConfig + + +@dataclass(kw_only=True) +class SqlaPublisherSpecificationConfig(PublisherSpecificationConfig): + queue: str + + +@dataclass(kw_only=True) +class SqlaPublisherConfig(PublisherUsecaseConfig): + _outer_config: "SqlaBrokerConfig" = field(default_factory=SqlaBrokerConfig) diff --git a/faststream/sqla/publisher/factory.py b/faststream/sqla/publisher/factory.py new file mode 100644 index 0000000000..0fbf67e009 --- /dev/null +++ b/faststream/sqla/publisher/factory.py @@ -0,0 +1,42 @@ +from collections.abc import Sequence +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from faststream.sqla.configs.broker import SqlaBrokerConfig + +from .config import SqlaPublisherConfig +from .specification import SqlaPublisherSpecification +from .usecase import LogicPublisher + +if TYPE_CHECKING: + from faststream._internal.types import PublisherMiddleware + + +def create_publisher( + *, + # Publisher args + broker_config: "SqlaBrokerConfig", + middlewares: Sequence["PublisherMiddleware"], + # AsyncAPI args + schema_: Any | None, + title_: str | None, + description_: str | None, + include_in_schema: bool, +) -> LogicPublisher: + publisher_config = SqlaPublisherConfig( + middlewares=middlewares, + _outer_config=broker_config, + ) + + specification = SqlaPublisherSpecification() + # _outer_config=broker_config, + # specification_config=SqlaPublisherSpecificationConfig( + # queue=queue, + # schema_=schema_, + # title_=title_, + # description_=description_, + # include_in_schema=include_in_schema, + # ), + # ) + + return LogicPublisher(publisher_config, specification) diff --git a/faststream/sqla/publisher/producer.py b/faststream/sqla/publisher/producer.py new file mode 100644 index 0000000000..ff63a749b3 --- /dev/null +++ b/faststream/sqla/publisher/producer.py @@ -0,0 +1,72 @@ +from abc import abstractmethod +from typing import Any, Optional, override +from fast_depends.library.serializer import SerializerProto +from sqlalchemy.ext.asyncio import AsyncEngine +from faststream._internal.producer import ProducerProto +from faststream.exceptions import FeatureNotSupportedException +from faststream.message.utils import encode_message +from faststream.sqla.client import SqlaPostgresClient, create_sqla_client +from faststream.sqla.response import SqlaPublishCommand + + +class SqlaProducerProto(ProducerProto[SqlaPublishCommand]): + def connect( + self, + connection: Any, + serializer: Optional["SerializerProto"], + ) -> None: ... + + def disconnect(self) -> None: ... + + @abstractmethod + async def publish(self, cmd: "SqlaPublishCommand") -> None: + ... + + async def request(self, cmd: "SqlaPublishCommand") -> None: + msg = "SqlaBroker doesn't support synchronous requests." + raise FeatureNotSupportedException(msg) + + async def publish_batch(self, cmd: "SqlaPublishCommand") -> None: + msg = "SqlaBroker doesn't support publishing in batches." + raise FeatureNotSupportedException(msg) + + +class SqlaProducer(SqlaProducerProto): + # _decoder: "AsyncCallable" + # _parser: "AsyncCallable" + + def __init__( + self, + *, + engine: AsyncEngine, + # parser: Optional["CustomCallable"], + # decoder: Optional["CustomCallable"], + ) -> None: + self.client = create_sqla_client(engine) + + self.serializer: SerializerProto | None = None + + # default = NatsParser(pattern="", is_ack_disabled=True) + # self._parser = ParserComposition(parser, default.parse_message) + # self._decoder = ParserComposition(decoder, default.decode_message) + + # self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState() + + def connect( + self, + connection: Any, + serializer: Optional["SerializerProto"], + ) -> None: + self.serializer = serializer + # self.__state = ConnectedState(connection) + + @override + async def publish(self, cmd: "SqlaPublishCommand") -> None: + payload, _ = encode_message(cmd.body, self.serializer) + + return await self.client.enqueue( + queue=cmd.destination, + payload=payload, + next_attempt_at=cmd.next_attempt_at, + connection=cmd.connection, + ) diff --git a/faststream/sqla/publisher/specification.py b/faststream/sqla/publisher/specification.py new file mode 100644 index 0000000000..b64d06d1e0 --- /dev/null +++ b/faststream/sqla/publisher/specification.py @@ -0,0 +1,2 @@ +class SqlaPublisherSpecification: + pass \ No newline at end of file diff --git a/faststream/sqla/publisher/usecase.py b/faststream/sqla/publisher/usecase.py new file mode 100644 index 0000000000..7aeea8c7ae --- /dev/null +++ b/faststream/sqla/publisher/usecase.py @@ -0,0 +1,72 @@ +from datetime import datetime +from typing import TYPE_CHECKING, Any, Iterable, Union + +from faststream._internal.types import PublisherMiddleware +from faststream.exceptions import FeatureNotSupportedException +from faststream.response.response import PublishCommand +from sqlalchemy.ext.asyncio import AsyncConnection +from typing_extensions import override + +from faststream._internal.endpoint.publisher import PublisherUsecase +from faststream.sqla.configs.broker import SqlaBrokerConfig +from faststream.sqla.publisher.config import SqlaPublisherConfig +from faststream.sqla.response import SqlaPublishCommand + +if TYPE_CHECKING: + from faststream._internal.basic_types import SendableMessage + from faststream._internal.endpoint.publisher import PublisherSpecification + + + +class LogicPublisher(PublisherUsecase): + _outer_config: "SqlaBrokerConfig" + + def __init__( + self, + config: "SqlaPublisherConfig", + specification: "PublisherSpecification[Any, Any]", + ) -> None: + super().__init__(config, specification) + + @override + async def publish( + self, + message: "SendableMessage", + queue: str, + next_attempt_at: datetime | None = None, + connection: AsyncConnection | None = None, + ) -> None: + """ + Args: + next_attempt_at: datetime with timezone + """ + cmd = SqlaPublishCommand( + message, + queue=queue, + next_attempt_at=next_attempt_at, + connection=connection, + ) + + return await self._basic_publish( + cmd, + producer=self._outer_config.producer, + _extra_middlewares=(), + ) + + @override + async def _publish( # TODO + self, + cmd: Union["PublishCommand", "SqlaPublishCommand"], + *, + _extra_middlewares: Iterable["PublisherMiddleware"], + ) -> None: + raise FeatureNotSupportedException + + @override + async def request( + self, + message: "SendableMessage", + queue: str, + next_attempt_at: datetime | None = None, + ) -> None: + raise FeatureNotSupportedException \ No newline at end of file diff --git a/faststream/sqla/response.py b/faststream/sqla/response.py new file mode 100644 index 0000000000..17c048699a --- /dev/null +++ b/faststream/sqla/response.py @@ -0,0 +1,40 @@ +from datetime import datetime +from typing import Any +from faststream._internal.basic_types import SendableMessage +from faststream.exceptions import FeatureNotSupportedException +from faststream.response.publish_type import PublishType +from faststream.response.response import PublishCommand +from sqlalchemy.ext.asyncio import AsyncConnection + +from faststream.sqla.exceptions import DatetimeMissingTimezoneException + + +class SqlaPublishCommand(PublishCommand): + def __init__( + self, + message: "SendableMessage", + *, + queue: str, + next_attempt_at: datetime | None = None, + connection: AsyncConnection | None = None, + ) -> None: + if next_attempt_at and next_attempt_at.tzinfo is None: + raise DatetimeMissingTimezoneException + + super().__init__( + body=message, + destination=queue, + _publish_type=PublishType.PUBLISH, + ) + self.next_attempt_at = next_attempt_at + if self.next_attempt_at: + self.next_attempt_at = self.next_attempt_at.replace(tzinfo=None) + self.connection = connection + + def add_headers( + self, + headers: dict[str, Any], + *, + override: bool = True, + ) -> None: + raise FeatureNotSupportedException \ No newline at end of file diff --git a/faststream/sqla/retry.py b/faststream/sqla/retry.py new file mode 100644 index 0000000000..fc343f1568 --- /dev/null +++ b/faststream/sqla/retry.py @@ -0,0 +1,166 @@ +import random +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Protocol + + +class RetryStrategyProto(Protocol): + def __init__(self, *args, **kwargs) -> None: + ... + + def get_next_attempt_at( + self, + *, + first_attempt_at: datetime, + last_attempt_at: datetime, + attempts_count: int, + ) -> datetime | None: + ... + + def allow_attempt( + self, + *, + first_attempt_at: datetime, + attempts_count: int, + ) -> bool: + ... + + +@dataclass(kw_only=True) +class RetryStrategyTemplate(ABC): + max_total_delay_seconds: float | None + max_attempts: int | None + + @abstractmethod + def _get_next_attempt_at( + self, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int + ) -> datetime: ... + + def get_next_attempt_at( + self, + *, + first_attempt_at: datetime, + last_attempt_at: datetime, + attempts_count: int, + ) -> datetime | None: + if self.max_attempts and attempts_count >= self.max_attempts: + return None + next_attempt_at = self._get_next_attempt_at( + first_attempt_at, last_attempt_at, attempts_count + ) + if self.max_total_delay_seconds and next_attempt_at - first_attempt_at > timedelta( + seconds=self.max_total_delay_seconds + ): + return None + return next_attempt_at + + def allow_attempt( + self, + *, + first_attempt_at: datetime, + attempts_count: int, + ) -> bool: + if self.max_attempts and attempts_count > self.max_attempts: + return False + if self.max_total_delay_seconds and datetime.now(tz=timezone.utc).replace(tzinfo=None) - first_attempt_at > timedelta( + seconds=self.max_total_delay_seconds + ): + return False + return True + + +@dataclass(kw_only=True) +class ConstantRetryStrategy(RetryStrategyTemplate): + delay_seconds: float + + def _get_next_attempt_at( + self, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int + ) -> datetime: + return last_attempt_at + timedelta(seconds=self.delay_seconds) + + +@dataclass(kw_only=True) +class LinearRetryStrategy(RetryStrategyTemplate): + initial_delay_seconds: float + step_seconds: float + + def _get_next_attempt_at( + self, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int + ) -> datetime: + delay = self.initial_delay_seconds + self.step_seconds * (attempts_count - 1) + return last_attempt_at + timedelta(seconds=delay) + + +@dataclass(kw_only=True) +class ExponentialBackoffRetryStrategy(RetryStrategyTemplate): + initial_delay_seconds: float + multiplier: float = 2.0 + max_delay_seconds: float | None = None + + def _get_next_attempt_at( + self, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int + ) -> datetime: + delay = self.initial_delay_seconds * (self.multiplier ** (attempts_count - 1)) + if self.max_delay_seconds: + delay = min(delay, self.max_delay_seconds) + return last_attempt_at + timedelta(seconds=delay) + + +@dataclass(kw_only=True) +class JitterRetryStrategy(RetryStrategyTemplate): + base_delay_seconds: float + jitter_seconds: float + _random: random.Random = field(default_factory=random.Random) + + def _get_next_attempt_at( + self, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int + ) -> datetime: + jitter = self._random.uniform(-self.jitter_seconds, self.jitter_seconds) + delay = max(0, self.base_delay_seconds + jitter) + return last_attempt_at + timedelta(seconds=delay) + + +@dataclass(kw_only=True) +class ExponentialBackoffJitterRetryStrategy(RetryStrategyTemplate): + initial_delay_seconds: float + multiplier: float = 2.0 + max_delay_seconds: float | None = None + jitter_factor: float = 0.5 + _random: random.Random = field(default_factory=random.Random) + + def _get_next_attempt_at( + self, first_attempt_at: datetime, last_attempt_at: datetime, attempts_count: int + ) -> datetime: + delay = self.initial_delay_seconds * (self.multiplier ** (attempts_count - 1)) + if self.max_delay_seconds is not None: + delay = min(delay, self.max_delay_seconds) + jitter = self._random.uniform(0, delay * self.jitter_factor) + delay = delay + jitter + return last_attempt_at + timedelta(seconds=delay) + + +@dataclass(kw_only=True) +class NoRetryStrategy(RetryStrategyProto): + max_attempts = 1 + + def get_next_attempt_at( + self, + *, + first_attempt_at: datetime, + last_attempt_at: datetime, + attempts_count: int, + ) -> datetime | None: + if attempts_count >= self.max_attempts: + return None + raise ValueError + + def allow_attempt( + self, + *, + first_attempt_at: datetime, + attempts_count: int, + ) -> bool: + if attempts_count > self.max_attempts: + return False + return True \ No newline at end of file diff --git a/faststream/sqla/subscriber/__init__.py b/faststream/sqla/subscriber/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/faststream/sqla/subscriber/factory.py b/faststream/sqla/subscriber/factory.py new file mode 100644 index 0000000000..0f4550afaf --- /dev/null +++ b/faststream/sqla/subscriber/factory.py @@ -0,0 +1,52 @@ +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncEngine + +from faststream._internal.endpoint.subscriber.call_item import CallsCollection +from faststream._internal.endpoint.subscriber.specification import SubscriberSpecification +from faststream.middlewares.acknowledgement.config import AckPolicy +from faststream.sqla.configs.broker import SqlaBrokerConfig +from faststream.sqla.configs.subscriber import SqlaSubscriberConfig +from faststream.sqla.retry import RetryStrategyProto +from faststream.sqla.subscriber.specification import SqlaSubscriberSpecification +from faststream.sqla.subscriber.usecase import SqlaSubscriber + + +def create_subscriber( + engine: AsyncEngine, + queues: list[str], + max_workers: int, + retry_strategy: RetryStrategyProto, + max_fetch_interval: float, + min_fetch_interval: float, + fetch_batch_size: int, + overfetch_factor: float, + flush_interval: float, + release_stuck_interval: float, + graceful_shutdown_timeout: float, + release_stuck_timeout: int, + config: "SqlaBrokerConfig", + ack_policy: AckPolicy, +) -> Any: + subscriber_config = SqlaSubscriberConfig( + engine=engine, + queues=queues, + max_workers=max_workers, + retry_strategy=retry_strategy, + max_fetch_interval=max_fetch_interval, + min_fetch_interval=min_fetch_interval, + fetch_batch_size=fetch_batch_size, + overfetch_factor=overfetch_factor, + flush_interval=flush_interval, + release_stuck_interval=release_stuck_interval, + graceful_shutdown_timeout=graceful_shutdown_timeout, + release_stuck_timeout=release_stuck_timeout, + _outer_config=config, + _ack_policy=ack_policy, + ) + + calls = CallsCollection[Any]() + + specification = SqlaSubscriberSpecification() + + return SqlaSubscriber(subscriber_config, specification, calls) \ No newline at end of file diff --git a/faststream/sqla/subscriber/specification.py b/faststream/sqla/subscriber/specification.py new file mode 100644 index 0000000000..66921c0c03 --- /dev/null +++ b/faststream/sqla/subscriber/specification.py @@ -0,0 +1,3 @@ +class SqlaSubscriberSpecification: + def call_name(self) -> str: + return "STUB" \ No newline at end of file diff --git a/faststream/sqla/subscriber/usecase.py b/faststream/sqla/subscriber/usecase.py new file mode 100644 index 0000000000..ed05f8a746 --- /dev/null +++ b/faststream/sqla/subscriber/usecase.py @@ -0,0 +1,224 @@ +import asyncio +from contextlib import suppress +import contextlib +import logging +from typing import Any, Callable, Coroutine, Sequence, TypeVar + +from faststream._internal.endpoint.subscriber.call_item import CallsCollection +from faststream._internal.endpoint.subscriber.mixins import TasksMixin +from faststream._internal.endpoint.subscriber.specification import SubscriberSpecification +from faststream._internal.endpoint.subscriber.usecase import SubscriberUsecase +from faststream._internal.types import MsgType +from faststream.sqla.client import SqlaPostgresClient, create_sqla_client +from faststream.sqla.configs.subscriber import SqlaSubscriberConfig +from faststream.sqla.message import SqlaMessage +from faststream.sqla.parser import SqlaParser + + +_CoroutineReturnType = TypeVar("_CoroutineReturnType") + + +class StopEventSetError(Exception): + pass + + +class SqlaSubscriber(TasksMixin, SubscriberUsecase[Any]): + def __init__( + self, + config: "SqlaSubscriberConfig", + specification: "SubscriberSpecification[Any, Any]", + calls: "CallsCollection[MsgType]", + ) -> None: + self.parser = SqlaParser() + config.parser = self.parser.parse_message + config.decoder = self.parser.decode_message + super().__init__(config, specification, calls) + + self._repo = create_sqla_client(config.engine) + self._queues = config.queues + self._max_fetch_interval = config.max_fetch_interval + self._min_fetch_interval = config.min_fetch_interval + self._fetch_batch_size = config.fetch_batch_size + self._buffer_capacity = int(config.fetch_batch_size * config.overfetch_factor) + self._flush_interval = config.flush_interval + self._release_stuck_interval = config.release_stuck_interval + self._worker_count = config.max_workers + self._retry_strategy = config.retry_strategy + self._graceful_shutdown_timeout = config.graceful_shutdown_timeout + self._release_stuck_timeout = config.release_stuck_timeout + + self._message_queue: asyncio.Queue[SqlaMessage] = asyncio.Queue() + self._result_buffer: list[SqlaMessage] = [] + self._stop_event = asyncio.Event() + + async def start(self) -> None: + print("start") + for _ in range(self._worker_count): + self.add_task(self._worker_loop) + self._post_start() + self.add_task(self._fetch_loop) + self.add_task(self._flush_loop) + self.add_task(self._release_stuck_loop) + self._stop_task = self.add_task(self._stop_event.wait) + await super().start() + + async def stop(self) -> None: + print("stop") + self._stop_event.set() + await self._draing_and_flush_acquired() + with suppress(asyncio.TimeoutError): + await asyncio.wait_for(self._stop(), timeout=self._graceful_shutdown_timeout) + await super().stop() + + async def _stop(self) -> None: + await asyncio.gather(*self.tasks) + task_flush = self.add_task(self._flush_results) + await task_flush + + async def _wait_until_stop_event(self, coro: Coroutine[Any, Any, _CoroutineReturnType]) -> _CoroutineReturnType: + coro_task = asyncio.create_task(coro) + done, _ = await asyncio.wait([coro_task, self._stop_task], return_when=asyncio.FIRST_COMPLETED) + + if coro_task in done: + return await coro_task + + if self._stop_task in done: + coro_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await coro_task + raise StopEventSetError + + async def _fetch_loop(self) -> None: + print("fetch_loop") + while True: + if self._stop_event.is_set(): + break + + free_slots = self._buffer_capacity - self._message_queue.qsize() + if free_slots > 0: + limit = min(self._fetch_batch_size, free_slots) + + try: + batch = await self._repo.fetch(self._queues, limit=limit) + except asyncio.CancelledError: + return + except Exception as exc: + self._log(logging.ERROR, "SqlaClient error", exc_info=exc) + await asyncio.sleep(5) + continue + + for row in batch: + await self._message_queue.put(row) + + if free_slots and len(batch) == limit: + timeout_ = self._min_fetch_interval + else: + timeout_ = self._max_fetch_interval + + try: + await asyncio.wait_for(self._stop_event.wait(), timeout=timeout_) + except asyncio.TimeoutError: + continue + else: + break + print("fetch_loop exit") + + async def _worker_loop(self) -> None: + print("worker_loop") + while True: + try: + message = await self._wait_until_stop_event(self._message_queue.get()) + except StopEventSetError: + break + + message.retry_strategy = self._retry_strategy + if message._allow_attempt(): + await self.consume(message) + + self._buffer_results([message]) + self._message_queue.task_done() + print("worker_loop exit") + + async def _flush_loop(self) -> None: + print("flush_loop") + while True: + if self._stop_event.is_set(): + break + + try: + await asyncio.wait_for(self._stop_event.wait(), timeout=self._flush_interval) + except asyncio.TimeoutError: + try: + await self._flush_results() + except asyncio.CancelledError: + return + except Exception as exc: + self._log(logging.ERROR, "SqlaClient error", exc_info=exc) + await asyncio.sleep(5) + continue + + continue + else: + break + await self._flush_results() + print("flush_loop exit") + + async def _release_stuck_loop(self) -> None: + print("release_stuck_loop") + while True: + if self._stop_event.is_set(): + break + + try: + await self._repo.release_stuck(timeout=self._release_stuck_timeout) + except asyncio.CancelledError: + return + except Exception as exc: + self._log(logging.ERROR, "SqlaClient error", exc_info=exc) + await asyncio.sleep(5) + continue + + try: + await asyncio.wait_for( + self._stop_event.wait(), timeout=self._release_stuck_interval + ) + except asyncio.TimeoutError: + continue + else: + break + print("release_stuck_loop exit") + + def _buffer_results(self, messages: Sequence[SqlaMessage]) -> None: + print("buffer_results") + if not messages: + return + self._result_buffer.extend(messages) + + async def _flush_results(self) -> None: + print("flush_results") + updates = list(self._result_buffer) + if not updates: + return + self._result_buffer.clear() + await self._repo.retry([item for item in updates if not item.to_archive]) + await self._repo.archive([item for item in updates if item.to_archive]) + + def _drain_acquired(self) -> list[SqlaMessage]: + print("drain_acquired") + drained: list[SqlaMessage] = [] + while True: + try: + message = self._message_queue.get_nowait() + except asyncio.QueueEmpty: + break + drained.append(message) + self._message_queue.task_done() + return drained + + async def _draing_and_flush_acquired(self) -> None: + drained = self._drain_acquired() + if drained: + for message in drained: + message._mark_pending() + self._buffer_results(drained) + self.add_task(self._flush_results) diff --git a/pyproject.toml b/pyproject.toml index bd9397d42d..441292924e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,6 +60,8 @@ dependencies = [ "fast-depends[pydantic]>=3.0.0", "typing-extensions>=4.12.0", "anyio>=4.0,<5", + "sqlalchemy>=2.0.44", + "asyncmy>=0.2.10", ] [project.optional-dependencies] @@ -107,7 +109,7 @@ docs = [ lint = [ "ruff==0.14.5", "bandit==1.8.6", - "semgrep==1.143.1", + "semgrep>=1", "codespell==2.4.1", "zizmor==1.16.3", "mypy==1.18.2", @@ -146,6 +148,7 @@ testing = [ "psutil==7.1.3", "prometheus-client>=0.20.0,<0.30.0", "opentelemetry-sdk>=1.24.0,<2.0.0", + "freezegun>=1.5.5", ] dev = [ @@ -205,6 +208,7 @@ markers = [ "redis", "slow", "connected", + "sqla", "all", ] asyncio_default_fixture_loop_scope = "function" diff --git a/tests/brokers/sqla/__init__.py b/tests/brokers/sqla/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/brokers/sqla/basic.py b/tests/brokers/sqla/basic.py new file mode 100644 index 0000000000..48fc5e4544 --- /dev/null +++ b/tests/brokers/sqla/basic.py @@ -0,0 +1,44 @@ +from typing import Any, AsyncGenerator + +import pytest +from sqlalchemy.ext.asyncio import AsyncEngine +from sqlalchemy import ( + BigInteger, + Column, + DateTime, + Enum, + Index, + LargeBinary, + MetaData, + SmallInteger, + String, + Table, + bindparam, + delete, + func, + insert, + or_, + select, + text, + update, +) +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from faststream.sqla.broker.broker import SqlaBroker +from faststream.sqla.message import SqlaMessageState +from tests.brokers.base.basic import BaseTestcaseConfig + + +class SqlaTestcaseConfig(BaseTestcaseConfig): + def get_broker( + self, + engine: AsyncEngine, + **kwargs: Any, + ) -> SqlaBroker: + return SqlaBroker(engine=engine, **kwargs) + + def patch_broker(self, broker: SqlaBroker, **kwargs: Any) -> SqlaBroker: + return broker + + def get_router(self, **kwargs: Any) -> None: + raise NotImplementedError diff --git a/tests/brokers/sqla/conftest.py b/tests/brokers/sqla/conftest.py new file mode 100644 index 0000000000..23b7b28b50 --- /dev/null +++ b/tests/brokers/sqla/conftest.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +from datetime import datetime, timezone +import os +from typing import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import ( + BigInteger, + Column, + DateTime, + Enum, + LargeBinary, + MetaData, + SmallInteger, + String, + Table, + func, + text, +) +from sqlalchemy.dialects import mysql, postgresql +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from faststream.sqla.message import SqlaMessageState + + +@pytest_asyncio.fixture(params=["postgresql", "mysql"]) +async def engine(request: pytest.FixtureRequest) -> AsyncGenerator[AsyncEngine, None]: + backend = request.param + match backend: + case "postgresql": + url = "postgresql+asyncpg://broker:brokerpass@localhost:5432/broker" + case "mysql": + url = "mysql+asyncmy://broker:brokerpass@localhost:3306/broker" + case _: + raise ValueError + + engine = create_async_engine(url) + + try: + yield engine + finally: + await engine.dispose() + + +@pytest_asyncio.fixture +async def recreate_tables(engine: AsyncEngine) -> None: + match engine.dialect.name: + case "postgresql": + timestamp_type = postgresql.TIMESTAMP(precision=3) + case "mysql": + timestamp_type = mysql.TIMESTAMP(fsp=3) + case _: + raise ValueError + + metadata = MetaData() + + message = Table( + "message", + metadata, + Column("id", BigInteger, primary_key=True), + Column("queue", String(255), nullable=False, index=True), + Column("payload", LargeBinary, nullable=False), + Column( + "state", + Enum(SqlaMessageState), + nullable=False, + index=True, + server_default=SqlaMessageState.PENDING.name, + ), + Column("attempts_count", SmallInteger, nullable=False, default=0), + Column("created_at", timestamp_type, nullable=False, default=lambda: datetime.now(timezone.utc).replace(tzinfo=None)), + Column("first_attempt_at", timestamp_type), + Column( + "next_attempt_at", + timestamp_type, + nullable=False, + default=lambda: datetime.now(timezone.utc).replace(tzinfo=None), + index=True, + ), + Column("last_attempt_at", timestamp_type), + Column("acquired_at", timestamp_type), + ) + + + message_archive = Table( + "message_archive", + metadata, + Column("id", BigInteger, primary_key=True), + Column("queue", String(255), nullable=False, index=True), + Column("payload", LargeBinary, nullable=False), + Column("state", Enum(SqlaMessageState), nullable=False, index=True), + Column("attempts_count", SmallInteger, nullable=False), + Column("created_at", timestamp_type, nullable=False), + Column("first_attempt_at", timestamp_type), + Column("last_attempt_at", timestamp_type), + Column("archived_at", timestamp_type, nullable=False, default=lambda: datetime.now(timezone.utc).replace(tzinfo=None)), + ) + + async with engine.begin() as conn: + await conn.run_sync(metadata.drop_all, checkfirst=True) + await conn.run_sync(metadata.create_all) diff --git a/tests/brokers/sqla/test_consume.py b/tests/brokers/sqla/test_consume.py new file mode 100644 index 0000000000..fd4730586b --- /dev/null +++ b/tests/brokers/sqla/test_consume.py @@ -0,0 +1,646 @@ +import asyncio +import json +from datetime import datetime, timedelta, timezone +from typing import Any + +import pytest +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine + +from faststream.sqla.message import SqlaMessage, SqlaMessageState +from faststream.sqla.annotations import SqlaMessage as SqlaMessageAnnotation +from faststream.sqla.annotations import SqlaBroker as SqlaBrokerAnnotation +from faststream.sqla.broker.broker import SqlaBroker as SqlaBroker +from faststream.sqla.retry import ConstantRetryStrategy, NoRetryStrategy +from tests.brokers.sqla.basic import SqlaTestcaseConfig + + +# @pytest.mark.sqla() +@pytest.mark.connected() +class TestConsume(SqlaTestcaseConfig): + @pytest.mark.asyncio() + async def test_consume(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + attempted = [] + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=1, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.01, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + nonlocal attempted + attempted.append(msg) + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.start() + + await asyncio.sleep(0.5) + + assert len(attempted) == 1 + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message_archive;")) + result = result.mappings().one() + assert result["queue"] == "default1" + assert json.loads(result["payload"]) == {"message": "hello1"} + assert result["state"] == SqlaMessageState.COMPLETED.name + assert result["attempts_count"] == 1 + assert result["created_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) + assert result["first_attempt_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) and result["first_attempt_at"] > result["created_at"] + assert result["last_attempt_at"] == result["first_attempt_at"] + assert result["archived_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) and result["archived_at"] > result["first_attempt_at"] + + @pytest.mark.asyncio() + async def test_consume_retry_allowed(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=1, + retry_strategy=ConstantRetryStrategy(delay_seconds=10, max_total_delay_seconds=None, max_attempts=None), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.1, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + return 1/0 + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.start() + + await asyncio.sleep(0.5) + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message;")) + result = result.mappings().one() + assert result["queue"] == "default1" + assert json.loads(result["payload"]) == {"message": "hello1"} + assert result["state"] == SqlaMessageState.RETRYABLE.name + assert result["attempts_count"] == 1 + assert result["created_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None).replace(tzinfo=None) + assert result["first_attempt_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None).replace(tzinfo=None) and result["first_attempt_at"] > result["created_at"] + assert result["last_attempt_at"] == result["first_attempt_at"] + + assert result["next_attempt_at"] > datetime.now(tz=timezone.utc).replace(tzinfo=None).replace(tzinfo=None) + timedelta(seconds=5) + assert result["acquired_at"] == None + + @pytest.mark.asyncio() + @pytest.mark.parametrize("end_state", [SqlaMessageState.COMPLETED, SqlaMessageState.FAILED]) + async def test_consume_retry_not_allowed(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event, end_state: SqlaMessageState) -> None: + broker = self.get_broker(engine=engine) + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=1, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.1, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + if end_state == SqlaMessageState.FAILED: + return 1/0 + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.start() + + await asyncio.sleep(0.5) + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message_archive;")) + result = result.mappings().one() + assert result["queue"] == "default1" + assert json.loads(result["payload"]) == {"message": "hello1"} + assert result["state"] == end_state.name + assert result["attempts_count"] == 1 + assert result["created_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) + assert result["first_attempt_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) and result["first_attempt_at"] > result["created_at"] + assert result["last_attempt_at"] == result["first_attempt_at"] + + assert result["archived_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) and result["archived_at"] > result["first_attempt_at"] + + @pytest.mark.asyncio() + async def test_consume_retry_not_allowed_prior_to_attempt(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + attempted = [] + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=1, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.1, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + nonlocal attempted + attempted.append(msg) + + await broker.publish({"message": "hello1"}, queue="default1") + async with engine.begin() as conn: + match engine.dialect.name: + case "postgresql": + stmt = text( + "UPDATE message " + "SET attempts_count = 1, first_attempt_at = NOW() " + "WHERE id = 1;" + ) + case "mysql": + stmt = text( + "UPDATE message " + "SET attempts_count = 1, first_attempt_at = UTC_TIMESTAMP(3) " + "WHERE id = 1;" + ) + result = await conn.execute( + stmt + ) + await broker.start() + + await asyncio.sleep(0.5) + + assert len(attempted) == 0 + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message_archive;")) + result = result.mappings().one() + assert result["queue"] == "default1" + assert json.loads(result["payload"]) == {"message": "hello1"} + assert result["state"] == SqlaMessageState.FAILED.name + assert result["attempts_count"] == 2 + assert result["created_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) + assert result["first_attempt_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) and result["first_attempt_at"] > result["created_at"] + assert result["last_attempt_at"] > result["first_attempt_at"] + + assert result["archived_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) and result["archived_at"] > result["first_attempt_at"] + + @pytest.mark.asyncio() + async def test_consume_full_retry_flow(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + attempted = [] + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=1, + retry_strategy=ConstantRetryStrategy(delay_seconds=0.01, max_total_delay_seconds=None, max_attempts=3), + max_fetch_interval=0.01, + min_fetch_interval=0.01, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.1, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + nonlocal attempted + attempted.append(msg) + return 1/0 + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.start() + + await asyncio.sleep(0.5) + + assert len(attempted) == 3 + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message_archive;")) + result = result.mappings().one() + assert result["queue"] == "default1" + assert json.loads(result["payload"]) == {"message": "hello1"} + assert result["state"] == SqlaMessageState.FAILED.name + assert result["attempts_count"] == 3 + assert result["created_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) + assert result["first_attempt_at"] < datetime.now(tz=timezone.utc).replace(tzinfo=None) and result["first_attempt_at"] > result["created_at"] + assert result["last_attempt_at"] > result["first_attempt_at"] + + @pytest.mark.asyncio() + async def test_consume_by_queues(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + messages = [] + + @broker.subscriber( + engine=engine, + queues=["default1", "default2"], + max_workers=1, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.1, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + nonlocal messages + messages.append(msg["message"]) + if msg["message"] == "hello2": + event.set() + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.publish({"message": "hello3"}, queue="default3") + await broker.publish({"message": "hello2"}, queue="default2") + await broker.start() + + await asyncio.wait_for(event.wait(), timeout=self.timeout) + + assert messages == ["hello1", "hello2"] + + @pytest.mark.asyncio() + async def test_consume_by_next_attempt_at(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + messages = [] + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=1, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=0.01, + min_fetch_interval=0.01, + fetch_batch_size=1, + overfetch_factor=1, + flush_interval=0.01, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + nonlocal messages + messages.append(msg["message"]) + + await broker.publish({"message": "hello1"}, queue="default1", next_attempt_at=datetime.now(tz=timezone.utc) - timedelta(seconds=10)) + await broker.publish({"message": "hello2"}, queue="default1", next_attempt_at=datetime.now(tz=timezone.utc) + timedelta(seconds=10)) + await broker.publish({"message": "hello3"}, queue="default1", next_attempt_at=datetime.now(tz=timezone.utc) - timedelta(seconds=20)) + await broker.start() + + await asyncio.sleep(0.5) + + assert messages == ["hello3", "hello1"] + + @pytest.mark.asyncio() + async def test_consume_stop_current_messages_are_flushed(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=2, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.01, + release_stuck_interval=10, + graceful_shutdown_timeout=2, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + event.set() + await asyncio.sleep(1) + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.publish({"message": "hello2"}, queue="default1") + await broker.start() + await asyncio.wait_for(event.wait(), timeout=self.timeout) + await broker.stop() + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message_archive;")) + result = result.mappings().all() + assert len(result) == 2 + assert result[0]["state"] == SqlaMessageState.COMPLETED.name + + @pytest.mark.asyncio() + async def test_consume_manual_ack_takes_precedence(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=2, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.01, + release_stuck_interval=10, + graceful_shutdown_timeout=2, + release_stuck_timeout=10, + ) + async def handler(msg: SqlaMessageAnnotation, msg_body: dict) -> None: + await msg.ack() + return 1/0 + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.publish({"message": "hello2"}, queue="default1") + await broker.start() + + await asyncio.sleep(0.5) + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message_archive;")) + result = result.mappings().all() + assert len(result) == 2 + assert result[0]["state"] == SqlaMessageState.COMPLETED.name + assert result[1]["state"] == SqlaMessageState.COMPLETED.name + + @pytest.mark.asyncio() + async def test_consume_manual_nack_takes_precedence(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=2, + retry_strategy=ConstantRetryStrategy(delay_seconds=0, max_total_delay_seconds=None, max_attempts=3), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.01, + release_stuck_interval=10, + graceful_shutdown_timeout=2, + release_stuck_timeout=10, + ) + async def handler(msg: SqlaMessageAnnotation, msg_body: dict) -> None: + await msg.nack() + return + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.publish({"message": "hello2"}, queue="default1") + await broker.start() + + await asyncio.sleep(0.5) + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message;")) + result = result.mappings().all() + assert len(result) == 2 + assert result[0]["state"] == SqlaMessageState.RETRYABLE.name + assert result[1]["state"] == SqlaMessageState.RETRYABLE.name + + @pytest.mark.asyncio() + async def test_consume_manual_reject_takes_precedence(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=2, + retry_strategy=ConstantRetryStrategy(delay_seconds=0, max_total_delay_seconds=None, max_attempts=3), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.01, + release_stuck_interval=10, + graceful_shutdown_timeout=2, + release_stuck_timeout=10, + ) + async def handler(msg: SqlaMessageAnnotation, msg_body: dict) -> None: + await msg.reject() + return + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.publish({"message": "hello2"}, queue="default1") + await broker.start() + + await asyncio.sleep(0.5) + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message_archive;")) + result = result.mappings().all() + assert len(result) == 2 + assert result[0]["state"] == SqlaMessageState.FAILED.name + assert result[1]["state"] == SqlaMessageState.FAILED.name + + @pytest.mark.asyncio() + async def test_consume_context_fields(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + message_ = None + broker_ = None + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=2, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=10, + min_fetch_interval=10, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.01, + release_stuck_interval=10, + graceful_shutdown_timeout=2, + release_stuck_timeout=10, + ) + async def handler(msg: SqlaMessageAnnotation, broker: SqlaBrokerAnnotation) -> None: + nonlocal message_ + nonlocal broker_ + message_ = msg + broker_ = broker + event.set() + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.start() + await asyncio.wait_for(event.wait(), timeout=self.timeout) + assert isinstance(message_, SqlaMessage) + assert isinstance(broker_, SqlaBroker) + + @pytest.mark.asyncio() + async def test_consume_concurrency(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + attempted = [] + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=4, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=10, + min_fetch_interval=0, + fetch_batch_size=4, + overfetch_factor=1, + flush_interval=0.1, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + await asyncio.sleep(1) + nonlocal attempted + attempted.append(msg) + + for idx in range(8): + await broker.publish({"message": f"hello{idx+1}"}, queue="default1") + await broker.start() + + await asyncio.sleep(1.5) + assert len(attempted) == 4 + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message_archive WHERE state = 'COMPLETED';")) + result = result.mappings().all() + assert len(result) == 4 + + await asyncio.sleep(1) + assert len(attempted) == 8 + + async with engine.begin() as conn: + result = await conn.execute(text("SELECT * FROM message_archive WHERE state = 'COMPLETED';")) + result = result.mappings().all() + assert len(result) == 8 + + @pytest.mark.asyncio() + async def test_consume_fetch_intervals(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + attempted = [] + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=4, + retry_strategy=NoRetryStrategy(), + max_fetch_interval=10, + min_fetch_interval=0, + fetch_batch_size=4, + overfetch_factor=1, + flush_interval=0.1, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=10, + ) + async def handler(msg: Any) -> None: + nonlocal attempted + attempted.append(msg) + + for idx in range(7): + await broker.publish({"message": f"hello{idx+1}"}, queue="default1") + await broker.start() + + await asyncio.sleep(0.5) + await broker.publish({"message": f"hello{idx+1}"}, queue="default1") + await asyncio.sleep(0.1) + + assert len(attempted) == 7 + + @pytest.mark.asyncio() + async def test_consume_release_stuck(self, engine: AsyncEngine, recreate_tables: None, event: asyncio.Event) -> None: + broker = self.get_broker(engine=engine) + + attempted = [] + + @broker.subscriber( + engine=engine, + queues=["default1"], + max_workers=1, + retry_strategy=ConstantRetryStrategy(delay_seconds=0, max_total_delay_seconds=None, max_attempts=2), + max_fetch_interval=0, + min_fetch_interval=0, + fetch_batch_size=5, + overfetch_factor=1, + flush_interval=0.1, + release_stuck_interval=10, + graceful_shutdown_timeout=10, + release_stuck_timeout=60, + ) + async def handler(msg: Any) -> None: + nonlocal attempted + attempted.append(msg) + + await broker.publish({"message": "hello1"}, queue="default1") + await broker.publish({"message": "hello2"}, queue="default1") + async with engine.begin() as conn: + match engine.dialect.name: + case "postgresql": + stmt = text( + "UPDATE message " + "SET attempts_count = 1, " + "first_attempt_at = NOW() - INTERVAL '100 seconds', " + "acquired_at = NOW() - INTERVAL '100 seconds', " + "state = 'PROCESSING' " + "WHERE id = 1;" + ) + case "mysql": + stmt = text( + "UPDATE message " + "SET attempts_count = 1, " + "first_attempt_at = DATE_SUB(UTC_TIMESTAMP(3), INTERVAL 100 SECOND), " + "acquired_at = DATE_SUB(UTC_TIMESTAMP(3), INTERVAL 100 SECOND), " + "state = 'PROCESSING' " + "WHERE id = 1;" + ) + await conn.execute( + stmt + ) + + match engine.dialect.name: + case "postgresql": + stmt = text( + "UPDATE message " + "SET attempts_count = 1, " + "first_attempt_at = NOW() - INTERVAL '10 seconds', " + "acquired_at = NOW() - INTERVAL '10 seconds', " + "state = 'PROCESSING' " + "WHERE id = 2;" + ) + case "mysql": + stmt = text( + "UPDATE message " + "SET attempts_count = 1, " + "first_attempt_at = DATE_SUB(UTC_TIMESTAMP(3), INTERVAL 10 SECOND), " + "acquired_at = DATE_SUB(UTC_TIMESTAMP(3), INTERVAL 10 SECOND), " + "state = 'PROCESSING' " + "WHERE id = 2;" + ) + await conn.execute( + stmt + ) + + await broker.start() + + await asyncio.sleep(0.5) + + assert len(attempted) == 1 \ No newline at end of file diff --git a/tests/brokers/sqla/test_retry_strategy.py b/tests/brokers/sqla/test_retry_strategy.py new file mode 100644 index 0000000000..8321952f24 --- /dev/null +++ b/tests/brokers/sqla/test_retry_strategy.py @@ -0,0 +1,258 @@ +import random +from datetime import datetime, timedelta, timezone + +from freezegun import freeze_time + +from faststream.sqla.retry import ( + ConstantRetryStrategy, + ExponentialBackoffJitterRetryStrategy, + ExponentialBackoffRetryStrategy, + JitterRetryStrategy, + LinearRetryStrategy, + NoRetryStrategy, + RetryStrategyTemplate, +) + + +def check_allow_attempt( + strategy: RetryStrategyTemplate, max_attempts: int, max_total_delay_seconds: int +) -> None: + first_attempt_at = datetime(2024, 1, 1, 12, 0, 0) + + with freeze_time(first_attempt_at): + assert strategy.allow_attempt(first_attempt_at=first_attempt_at, attempts_count=1) is True + assert strategy.allow_attempt(first_attempt_at=first_attempt_at, attempts_count=max_attempts) is True + assert strategy.allow_attempt(first_attempt_at=first_attempt_at, attempts_count=max_attempts + 1) is False + + within_limit = first_attempt_at + timedelta(seconds=max_total_delay_seconds // 2) + with freeze_time(within_limit): + assert strategy.allow_attempt(first_attempt_at=first_attempt_at, attempts_count=1) is True + + past_limit = first_attempt_at + timedelta(seconds=max_total_delay_seconds + 1) + with freeze_time(past_limit): + assert strategy.allow_attempt(first_attempt_at=first_attempt_at, attempts_count=1) is False + + +def test_no_retry_strategy() -> None: + strategy = NoRetryStrategy() + first_attempt_at = datetime.now(timezone.utc) + + assert strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=first_attempt_at, + attempts_count=1, + ) is None + assert strategy.allow_attempt(first_attempt_at=first_attempt_at, attempts_count=1) is True + assert strategy.allow_attempt(first_attempt_at=first_attempt_at, attempts_count=2) is False + + +def test_constant_retry_strategy() -> None: + strategy = ConstantRetryStrategy( + delay_seconds=10, max_total_delay_seconds=60, max_attempts=4 + ) + first_attempt_at = datetime.now(timezone.utc) + + result1 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=first_attempt_at, + attempts_count=1, + ) + assert result1 == first_attempt_at + timedelta(seconds=10) + + result2 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result1, + attempts_count=2, + ) + assert result2 == result1 + timedelta(seconds=10) + + result3 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result2, + attempts_count=3, + ) + assert result3 == result2 + timedelta(seconds=10) + + assert strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result3, + attempts_count=4, + ) is None + + check_allow_attempt(strategy, max_attempts=4, max_total_delay_seconds=60) + + +def test_linear_retry_strategy() -> None: + strategy = LinearRetryStrategy( + initial_delay_seconds=1, + step_seconds=2, + max_total_delay_seconds=60, + max_attempts=4, + ) + first_attempt_at = datetime.now(timezone.utc) + + result1 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=first_attempt_at, + attempts_count=1, + ) + assert result1 == first_attempt_at + timedelta(seconds=1) + + result2 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result1, + attempts_count=2, + ) + assert result2 == result1 + timedelta(seconds=3) + + result3 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result2, + attempts_count=3, + ) + assert result3 == result2 + timedelta(seconds=5) + + assert strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result3, + attempts_count=4, + ) is None + + check_allow_attempt(strategy, max_attempts=4, max_total_delay_seconds=60) + + +def test_exponential_backoff_retry_strategy() -> None: + strategy = ExponentialBackoffRetryStrategy( + initial_delay_seconds=1, + multiplier=2, + max_delay_seconds=5, + max_total_delay_seconds=60, + max_attempts=5, + ) + first_attempt_at = datetime.now(timezone.utc) + + result1 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=first_attempt_at, + attempts_count=1, + ) + assert result1 == first_attempt_at + timedelta(seconds=1) + + result2 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result1, + attempts_count=2, + ) + assert result2 == result1 + timedelta(seconds=2) + + result3 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result2, + attempts_count=3, + ) + assert result3 == result2 + timedelta(seconds=4) + + result4 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result3, + attempts_count=4, + ) + assert result4 == result3 + timedelta(seconds=5) + + assert strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result4, + attempts_count=5, + ) is None + + check_allow_attempt(strategy, max_attempts=5, max_total_delay_seconds=60) + + +def test_jitter_retry_strategy() -> None: + rng = random.Random(42) + strategy = JitterRetryStrategy( + base_delay_seconds=10, + jitter_seconds=2, + max_total_delay_seconds=60, + max_attempts=4, + _random=rng, + ) + first_attempt_at = datetime.now(timezone.utc) + + result1 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=first_attempt_at, + attempts_count=1, + ) + delay1 = (result1 - first_attempt_at).total_seconds() + assert 8 <= delay1 <= 12 + + result2 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result1, + attempts_count=2, + ) + delay2 = (result2 - result1).total_seconds() + assert 8 <= delay2 <= 12 + + result3 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result2, + attempts_count=3, + ) + delay3 = (result3 - result2).total_seconds() + assert 8 <= delay3 <= 12 + + assert strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result3, + attempts_count=4, + ) is None + + check_allow_attempt(strategy, max_attempts=4, max_total_delay_seconds=60) + + +def test_exponential_backoff_jitter_retry_strategy() -> None: + rng = random.Random(42) + strategy = ExponentialBackoffJitterRetryStrategy( + initial_delay_seconds=1, + multiplier=2, + max_delay_seconds=10, + jitter_factor=0.5, + max_total_delay_seconds=60, + max_attempts=4, + _random=rng, + ) + first_attempt_at = datetime.now(timezone.utc) + + result1 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=first_attempt_at, + attempts_count=1, + ) + delay1 = (result1 - first_attempt_at).total_seconds() + assert 1 <= delay1 <= 1.5 + + result2 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result1, + attempts_count=2, + ) + delay2 = (result2 - result1).total_seconds() + assert 2 <= delay2 <= 3 + + result3 = strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result2, + attempts_count=3, + ) + delay3 = (result3 - result2).total_seconds() + assert 4 <= delay3 <= 6 + + assert strategy.get_next_attempt_at( + first_attempt_at=first_attempt_at, + last_attempt_at=result3, + attempts_count=4, + ) is None + + check_allow_attempt(strategy, max_attempts=4, max_total_delay_seconds=60) diff --git a/uv.lock b/uv.lock index c74105a78c..ae50b627bc 100644 --- a/uv.lock +++ b/uv.lock @@ -753,6 +753,7 @@ source = { editable = "." } dependencies = [ { name = "anyio" }, { name = "fast-depends", extra = ["pydantic"] }, + { name = "sqlalchemy" }, { name = "typing-extensions" }, ] @@ -911,6 +912,7 @@ requires-dist = [ { name = "opentelemetry-sdk", marker = "extra == 'otel'", specifier = ">=1.24.0,<2.0.0" }, { name = "prometheus-client", marker = "extra == 'prometheus'", specifier = ">=0.20.0,<0.30.0" }, { name = "redis", marker = "extra == 'redis'", specifier = ">=5.0.0,<7.0.0" }, + { name = "sqlalchemy", specifier = ">=2.0.44" }, { name = "typer", marker = "extra == 'cli'", specifier = ">=0.9,!=0.12,<=0.19.2" }, { name = "typing-extensions", specifier = ">=4.12.0" }, { name = "watchfiles", marker = "extra == 'cli'", specifier = ">=0.15.0,<1.2.0" }, @@ -1104,6 +1106,67 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/86/f1/62a193f0227cf15a920390abe675f386dec35f7ae3ffe6da582d3ade42c7/googleapis_common_protos-1.70.0-py3-none-any.whl", hash = "sha256:b8bfcca8c25a2bb253e0e0b0adaf8c00773e5e6af6fd92397576680b807e0fd8", size = 294530, upload-time = "2025-04-14T10:17:01.271Z" }, ] +[[package]] +name = "greenlet" +version = "3.2.4" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/03/b8/704d753a5a45507a7aab61f18db9509302ed3d0a27ac7e0359ec2905b1a6/greenlet-3.2.4.tar.gz", hash = "sha256:0dca0d95ff849f9a364385f36ab49f50065d76964944638be9691e1832e9f86d", size = 188260, upload-time = "2025-08-07T13:24:33.51Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7d/ed/6bfa4109fcb23a58819600392564fea69cdc6551ffd5e69ccf1d52a40cbc/greenlet-3.2.4-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:8c68325b0d0acf8d91dde4e6f930967dd52a5302cd4062932a6b2e7c2969f47c", size = 271061, upload-time = "2025-08-07T13:17:15.373Z" }, + { url = "https://files.pythonhosted.org/packages/2a/fc/102ec1a2fc015b3a7652abab7acf3541d58c04d3d17a8d3d6a44adae1eb1/greenlet-3.2.4-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:94385f101946790ae13da500603491f04a76b6e4c059dab271b3ce2e283b2590", size = 629475, upload-time = "2025-08-07T13:42:54.009Z" }, + { url = "https://files.pythonhosted.org/packages/c5/26/80383131d55a4ac0fb08d71660fd77e7660b9db6bdb4e8884f46d9f2cc04/greenlet-3.2.4-cp310-cp310-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:f10fd42b5ee276335863712fa3da6608e93f70629c631bf77145021600abc23c", size = 640802, upload-time = "2025-08-07T13:45:25.52Z" }, + { url = "https://files.pythonhosted.org/packages/9f/7c/e7833dbcd8f376f3326bd728c845d31dcde4c84268d3921afcae77d90d08/greenlet-3.2.4-cp310-cp310-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:c8c9e331e58180d0d83c5b7999255721b725913ff6bc6cf39fa2a45841a4fd4b", size = 636703, upload-time = "2025-08-07T13:53:12.622Z" }, + { url = "https://files.pythonhosted.org/packages/e9/49/547b93b7c0428ede7b3f309bc965986874759f7d89e4e04aeddbc9699acb/greenlet-3.2.4-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:58b97143c9cc7b86fc458f215bd0932f1757ce649e05b640fea2e79b54cedb31", size = 635417, upload-time = "2025-08-07T13:18:25.189Z" }, + { url = "https://files.pythonhosted.org/packages/7f/91/ae2eb6b7979e2f9b035a9f612cf70f1bf54aad4e1d125129bef1eae96f19/greenlet-3.2.4-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c2ca18a03a8cfb5b25bc1cbe20f3d9a4c80d8c3b13ba3df49ac3961af0b1018d", size = 584358, upload-time = "2025-08-07T13:18:23.708Z" }, + { url = "https://files.pythonhosted.org/packages/f7/85/433de0c9c0252b22b16d413c9407e6cb3b41df7389afc366ca204dbc1393/greenlet-3.2.4-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:9fe0a28a7b952a21e2c062cd5756d34354117796c6d9215a87f55e38d15402c5", size = 1113550, upload-time = "2025-08-07T13:42:37.467Z" }, + { url = "https://files.pythonhosted.org/packages/a1/8d/88f3ebd2bc96bf7747093696f4335a0a8a4c5acfcf1b757717c0d2474ba3/greenlet-3.2.4-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8854167e06950ca75b898b104b63cc646573aa5fef1353d4508ecdd1ee76254f", size = 1137126, upload-time = "2025-08-07T13:18:20.239Z" }, + { url = "https://files.pythonhosted.org/packages/f1/29/74242b7d72385e29bcc5563fba67dad94943d7cd03552bac320d597f29b2/greenlet-3.2.4-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:f47617f698838ba98f4ff4189aef02e7343952df3a615f847bb575c3feb177a7", size = 1544904, upload-time = "2025-11-04T12:42:04.763Z" }, + { url = "https://files.pythonhosted.org/packages/c8/e2/1572b8eeab0f77df5f6729d6ab6b141e4a84ee8eb9bc8c1e7918f94eda6d/greenlet-3.2.4-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:af41be48a4f60429d5cad9d22175217805098a9ef7c40bfef44f7669fb9d74d8", size = 1611228, upload-time = "2025-11-04T12:42:08.423Z" }, + { url = "https://files.pythonhosted.org/packages/d6/6f/b60b0291d9623c496638c582297ead61f43c4b72eef5e9c926ef4565ec13/greenlet-3.2.4-cp310-cp310-win_amd64.whl", hash = "sha256:73f49b5368b5359d04e18d15828eecc1806033db5233397748f4ca813ff1056c", size = 298654, upload-time = "2025-08-07T13:50:00.469Z" }, + { url = "https://files.pythonhosted.org/packages/a4/de/f28ced0a67749cac23fecb02b694f6473f47686dff6afaa211d186e2ef9c/greenlet-3.2.4-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:96378df1de302bc38e99c3a9aa311967b7dc80ced1dcc6f171e99842987882a2", size = 272305, upload-time = "2025-08-07T13:15:41.288Z" }, + { url = "https://files.pythonhosted.org/packages/09/16/2c3792cba130000bf2a31c5272999113f4764fd9d874fb257ff588ac779a/greenlet-3.2.4-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:1ee8fae0519a337f2329cb78bd7a8e128ec0f881073d43f023c7b8d4831d5246", size = 632472, upload-time = "2025-08-07T13:42:55.044Z" }, + { url = "https://files.pythonhosted.org/packages/ae/8f/95d48d7e3d433e6dae5b1682e4292242a53f22df82e6d3dda81b1701a960/greenlet-3.2.4-cp311-cp311-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:94abf90142c2a18151632371140b3dba4dee031633fe614cb592dbb6c9e17bc3", size = 644646, upload-time = "2025-08-07T13:45:26.523Z" }, + { url = "https://files.pythonhosted.org/packages/d5/5e/405965351aef8c76b8ef7ad370e5da58d57ef6068df197548b015464001a/greenlet-3.2.4-cp311-cp311-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:4d1378601b85e2e5171b99be8d2dc85f594c79967599328f95c1dc1a40f1c633", size = 640519, upload-time = "2025-08-07T13:53:13.928Z" }, + { url = "https://files.pythonhosted.org/packages/25/5d/382753b52006ce0218297ec1b628e048c4e64b155379331f25a7316eb749/greenlet-3.2.4-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:0db5594dce18db94f7d1650d7489909b57afde4c580806b8d9203b6e79cdc079", size = 639707, upload-time = "2025-08-07T13:18:27.146Z" }, + { url = "https://files.pythonhosted.org/packages/1f/8e/abdd3f14d735b2929290a018ecf133c901be4874b858dd1c604b9319f064/greenlet-3.2.4-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2523e5246274f54fdadbce8494458a2ebdcdbc7b802318466ac5606d3cded1f8", size = 587684, upload-time = "2025-08-07T13:18:25.164Z" }, + { url = "https://files.pythonhosted.org/packages/5d/65/deb2a69c3e5996439b0176f6651e0052542bb6c8f8ec2e3fba97c9768805/greenlet-3.2.4-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:1987de92fec508535687fb807a5cea1560f6196285a4cde35c100b8cd632cc52", size = 1116647, upload-time = "2025-08-07T13:42:38.655Z" }, + { url = "https://files.pythonhosted.org/packages/3f/cc/b07000438a29ac5cfb2194bfc128151d52f333cee74dd7dfe3fb733fc16c/greenlet-3.2.4-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:55e9c5affaa6775e2c6b67659f3a71684de4c549b3dd9afca3bc773533d284fa", size = 1142073, upload-time = "2025-08-07T13:18:21.737Z" }, + { url = "https://files.pythonhosted.org/packages/67/24/28a5b2fa42d12b3d7e5614145f0bd89714c34c08be6aabe39c14dd52db34/greenlet-3.2.4-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:c9c6de1940a7d828635fbd254d69db79e54619f165ee7ce32fda763a9cb6a58c", size = 1548385, upload-time = "2025-11-04T12:42:11.067Z" }, + { url = "https://files.pythonhosted.org/packages/6a/05/03f2f0bdd0b0ff9a4f7b99333d57b53a7709c27723ec8123056b084e69cd/greenlet-3.2.4-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:03c5136e7be905045160b1b9fdca93dd6727b180feeafda6818e6496434ed8c5", size = 1613329, upload-time = "2025-11-04T12:42:12.928Z" }, + { url = "https://files.pythonhosted.org/packages/d8/0f/30aef242fcab550b0b3520b8e3561156857c94288f0332a79928c31a52cf/greenlet-3.2.4-cp311-cp311-win_amd64.whl", hash = "sha256:9c40adce87eaa9ddb593ccb0fa6a07caf34015a29bf8d344811665b573138db9", size = 299100, upload-time = "2025-08-07T13:44:12.287Z" }, + { url = "https://files.pythonhosted.org/packages/44/69/9b804adb5fd0671f367781560eb5eb586c4d495277c93bde4307b9e28068/greenlet-3.2.4-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:3b67ca49f54cede0186854a008109d6ee71f66bd57bb36abd6d0a0267b540cdd", size = 274079, upload-time = "2025-08-07T13:15:45.033Z" }, + { url = "https://files.pythonhosted.org/packages/46/e9/d2a80c99f19a153eff70bc451ab78615583b8dac0754cfb942223d2c1a0d/greenlet-3.2.4-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ddf9164e7a5b08e9d22511526865780a576f19ddd00d62f8a665949327fde8bb", size = 640997, upload-time = "2025-08-07T13:42:56.234Z" }, + { url = "https://files.pythonhosted.org/packages/3b/16/035dcfcc48715ccd345f3a93183267167cdd162ad123cd93067d86f27ce4/greenlet-3.2.4-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:f28588772bb5fb869a8eb331374ec06f24a83a9c25bfa1f38b6993afe9c1e968", size = 655185, upload-time = "2025-08-07T13:45:27.624Z" }, + { url = "https://files.pythonhosted.org/packages/31/da/0386695eef69ffae1ad726881571dfe28b41970173947e7c558d9998de0f/greenlet-3.2.4-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:5c9320971821a7cb77cfab8d956fa8e39cd07ca44b6070db358ceb7f8797c8c9", size = 649926, upload-time = "2025-08-07T13:53:15.251Z" }, + { url = "https://files.pythonhosted.org/packages/68/88/69bf19fd4dc19981928ceacbc5fd4bb6bc2215d53199e367832e98d1d8fe/greenlet-3.2.4-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c60a6d84229b271d44b70fb6e5fa23781abb5d742af7b808ae3f6efd7c9c60f6", size = 651839, upload-time = "2025-08-07T13:18:30.281Z" }, + { url = "https://files.pythonhosted.org/packages/19/0d/6660d55f7373b2ff8152401a83e02084956da23ae58cddbfb0b330978fe9/greenlet-3.2.4-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3b3812d8d0c9579967815af437d96623f45c0f2ae5f04e366de62a12d83a8fb0", size = 607586, upload-time = "2025-08-07T13:18:28.544Z" }, + { url = "https://files.pythonhosted.org/packages/8e/1a/c953fdedd22d81ee4629afbb38d2f9d71e37d23caace44775a3a969147d4/greenlet-3.2.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:abbf57b5a870d30c4675928c37278493044d7c14378350b3aa5d484fa65575f0", size = 1123281, upload-time = "2025-08-07T13:42:39.858Z" }, + { url = "https://files.pythonhosted.org/packages/3f/c7/12381b18e21aef2c6bd3a636da1088b888b97b7a0362fac2e4de92405f97/greenlet-3.2.4-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:20fb936b4652b6e307b8f347665e2c615540d4b42b3b4c8a321d8286da7e520f", size = 1151142, upload-time = "2025-08-07T13:18:22.981Z" }, + { url = "https://files.pythonhosted.org/packages/27/45/80935968b53cfd3f33cf99ea5f08227f2646e044568c9b1555b58ffd61c2/greenlet-3.2.4-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:ee7a6ec486883397d70eec05059353b8e83eca9168b9f3f9a361971e77e0bcd0", size = 1564846, upload-time = "2025-11-04T12:42:15.191Z" }, + { url = "https://files.pythonhosted.org/packages/69/02/b7c30e5e04752cb4db6202a3858b149c0710e5453b71a3b2aec5d78a1aab/greenlet-3.2.4-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:326d234cbf337c9c3def0676412eb7040a35a768efc92504b947b3e9cfc7543d", size = 1633814, upload-time = "2025-11-04T12:42:17.175Z" }, + { url = "https://files.pythonhosted.org/packages/e9/08/b0814846b79399e585f974bbeebf5580fbe59e258ea7be64d9dfb253c84f/greenlet-3.2.4-cp312-cp312-win_amd64.whl", hash = "sha256:a7d4e128405eea3814a12cc2605e0e6aedb4035bf32697f72deca74de4105e02", size = 299899, upload-time = "2025-08-07T13:38:53.448Z" }, + { url = "https://files.pythonhosted.org/packages/49/e8/58c7f85958bda41dafea50497cbd59738c5c43dbbea5ee83d651234398f4/greenlet-3.2.4-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:1a921e542453fe531144e91e1feedf12e07351b1cf6c9e8a3325ea600a715a31", size = 272814, upload-time = "2025-08-07T13:15:50.011Z" }, + { url = "https://files.pythonhosted.org/packages/62/dd/b9f59862e9e257a16e4e610480cfffd29e3fae018a68c2332090b53aac3d/greenlet-3.2.4-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cd3c8e693bff0fff6ba55f140bf390fa92c994083f838fece0f63be121334945", size = 641073, upload-time = "2025-08-07T13:42:57.23Z" }, + { url = "https://files.pythonhosted.org/packages/f7/0b/bc13f787394920b23073ca3b6c4a7a21396301ed75a655bcb47196b50e6e/greenlet-3.2.4-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:710638eb93b1fa52823aa91bf75326f9ecdfd5e0466f00789246a5280f4ba0fc", size = 655191, upload-time = "2025-08-07T13:45:29.752Z" }, + { url = "https://files.pythonhosted.org/packages/f2/d6/6adde57d1345a8d0f14d31e4ab9c23cfe8e2cd39c3baf7674b4b0338d266/greenlet-3.2.4-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:c5111ccdc9c88f423426df3fd1811bfc40ed66264d35aa373420a34377efc98a", size = 649516, upload-time = "2025-08-07T13:53:16.314Z" }, + { url = "https://files.pythonhosted.org/packages/7f/3b/3a3328a788d4a473889a2d403199932be55b1b0060f4ddd96ee7cdfcad10/greenlet-3.2.4-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:d76383238584e9711e20ebe14db6c88ddcedc1829a9ad31a584389463b5aa504", size = 652169, upload-time = "2025-08-07T13:18:32.861Z" }, + { url = "https://files.pythonhosted.org/packages/ee/43/3cecdc0349359e1a527cbf2e3e28e5f8f06d3343aaf82ca13437a9aa290f/greenlet-3.2.4-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:23768528f2911bcd7e475210822ffb5254ed10d71f4028387e5a99b4c6699671", size = 610497, upload-time = "2025-08-07T13:18:31.636Z" }, + { url = "https://files.pythonhosted.org/packages/b8/19/06b6cf5d604e2c382a6f31cafafd6f33d5dea706f4db7bdab184bad2b21d/greenlet-3.2.4-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:00fadb3fedccc447f517ee0d3fd8fe49eae949e1cd0f6a611818f4f6fb7dc83b", size = 1121662, upload-time = "2025-08-07T13:42:41.117Z" }, + { url = "https://files.pythonhosted.org/packages/a2/15/0d5e4e1a66fab130d98168fe984c509249c833c1a3c16806b90f253ce7b9/greenlet-3.2.4-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:d25c5091190f2dc0eaa3f950252122edbbadbb682aa7b1ef2f8af0f8c0afefae", size = 1149210, upload-time = "2025-08-07T13:18:24.072Z" }, + { url = "https://files.pythonhosted.org/packages/1c/53/f9c440463b3057485b8594d7a638bed53ba531165ef0ca0e6c364b5cc807/greenlet-3.2.4-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:6e343822feb58ac4d0a1211bd9399de2b3a04963ddeec21530fc426cc121f19b", size = 1564759, upload-time = "2025-11-04T12:42:19.395Z" }, + { url = "https://files.pythonhosted.org/packages/47/e4/3bb4240abdd0a8d23f4f88adec746a3099f0d86bfedb623f063b2e3b4df0/greenlet-3.2.4-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:ca7f6f1f2649b89ce02f6f229d7c19f680a6238af656f61e0115b24857917929", size = 1634288, upload-time = "2025-11-04T12:42:21.174Z" }, + { url = "https://files.pythonhosted.org/packages/0b/55/2321e43595e6801e105fcfdee02b34c0f996eb71e6ddffca6b10b7e1d771/greenlet-3.2.4-cp313-cp313-win_amd64.whl", hash = "sha256:554b03b6e73aaabec3745364d6239e9e012d64c68ccd0b8430c64ccc14939a8b", size = 299685, upload-time = "2025-08-07T13:24:38.824Z" }, + { url = "https://files.pythonhosted.org/packages/22/5c/85273fd7cc388285632b0498dbbab97596e04b154933dfe0f3e68156c68c/greenlet-3.2.4-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:49a30d5fda2507ae77be16479bdb62a660fa51b1eb4928b524975b3bde77b3c0", size = 273586, upload-time = "2025-08-07T13:16:08.004Z" }, + { url = "https://files.pythonhosted.org/packages/d1/75/10aeeaa3da9332c2e761e4c50d4c3556c21113ee3f0afa2cf5769946f7a3/greenlet-3.2.4-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:299fd615cd8fc86267b47597123e3f43ad79c9d8a22bebdce535e53550763e2f", size = 686346, upload-time = "2025-08-07T13:42:59.944Z" }, + { url = "https://files.pythonhosted.org/packages/c0/aa/687d6b12ffb505a4447567d1f3abea23bd20e73a5bed63871178e0831b7a/greenlet-3.2.4-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:c17b6b34111ea72fc5a4e4beec9711d2226285f0386ea83477cbb97c30a3f3a5", size = 699218, upload-time = "2025-08-07T13:45:30.969Z" }, + { url = "https://files.pythonhosted.org/packages/dc/8b/29aae55436521f1d6f8ff4e12fb676f3400de7fcf27fccd1d4d17fd8fecd/greenlet-3.2.4-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:b4a1870c51720687af7fa3e7cda6d08d801dae660f75a76f3845b642b4da6ee1", size = 694659, upload-time = "2025-08-07T13:53:17.759Z" }, + { url = "https://files.pythonhosted.org/packages/92/2e/ea25914b1ebfde93b6fc4ff46d6864564fba59024e928bdc7de475affc25/greenlet-3.2.4-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:061dc4cf2c34852b052a8620d40f36324554bc192be474b9e9770e8c042fd735", size = 695355, upload-time = "2025-08-07T13:18:34.517Z" }, + { url = "https://files.pythonhosted.org/packages/72/60/fc56c62046ec17f6b0d3060564562c64c862948c9d4bc8aa807cf5bd74f4/greenlet-3.2.4-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:44358b9bf66c8576a9f57a590d5f5d6e72fa4228b763d0e43fee6d3b06d3a337", size = 657512, upload-time = "2025-08-07T13:18:33.969Z" }, + { url = "https://files.pythonhosted.org/packages/23/6e/74407aed965a4ab6ddd93a7ded3180b730d281c77b765788419484cdfeef/greenlet-3.2.4-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:2917bdf657f5859fbf3386b12d68ede4cf1f04c90c3a6bc1f013dd68a22e2269", size = 1612508, upload-time = "2025-11-04T12:42:23.427Z" }, + { url = "https://files.pythonhosted.org/packages/0d/da/343cd760ab2f92bac1845ca07ee3faea9fe52bee65f7bcb19f16ad7de08b/greenlet-3.2.4-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:015d48959d4add5d6c9f6c5210ee3803a830dce46356e3bc326d6776bde54681", size = 1680760, upload-time = "2025-11-04T12:42:25.341Z" }, + { url = "https://files.pythonhosted.org/packages/e3/a5/6ddab2b4c112be95601c13428db1d8b6608a8b6039816f2ba09c346c08fc/greenlet-3.2.4-cp314-cp314-win_amd64.whl", hash = "sha256:e37ab26028f12dbb0ff65f29a8d3d44a765c61e729647bf2ddfbbed621726f01", size = 303425, upload-time = "2025-08-07T13:32:27.59Z" }, +] + [[package]] name = "griffe" version = "1.7.3" @@ -3064,6 +3127,51 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235, upload-time = "2024-02-25T23:20:01.196Z" }, ] +[[package]] +name = "sqlalchemy" +version = "2.0.44" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "greenlet", marker = "platform_machine == 'AMD64' or platform_machine == 'WIN32' or platform_machine == 'aarch64' or platform_machine == 'amd64' or platform_machine == 'ppc64le' or platform_machine == 'win32' or platform_machine == 'x86_64'" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f0/f2/840d7b9496825333f532d2e3976b8eadbf52034178aac53630d09fe6e1ef/sqlalchemy-2.0.44.tar.gz", hash = "sha256:0ae7454e1ab1d780aee69fd2aae7d6b8670a581d8847f2d1e0f7ddfbf47e5a22", size = 9819830, upload-time = "2025-10-10T14:39:12.935Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a2/a7/e9ccfa7eecaf34c6f57d8cb0bb7cbdeeff27017cc0f5d0ca90fdde7a7c0d/sqlalchemy-2.0.44-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7c77f3080674fc529b1bd99489378c7f63fcb4ba7f8322b79732e0258f0ea3ce", size = 2137282, upload-time = "2025-10-10T15:36:10.965Z" }, + { url = "https://files.pythonhosted.org/packages/b1/e1/50bc121885bdf10833a4f65ecbe9fe229a3215f4d65a58da8a181734cae3/sqlalchemy-2.0.44-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4c26ef74ba842d61635b0152763d057c8d48215d5be9bb8b7604116a059e9985", size = 2127322, upload-time = "2025-10-10T15:36:12.428Z" }, + { url = "https://files.pythonhosted.org/packages/46/f2/a8573b7230a3ce5ee4b961a2d510d71b43872513647398e595b744344664/sqlalchemy-2.0.44-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f4a172b31785e2f00780eccab00bc240ccdbfdb8345f1e6063175b3ff12ad1b0", size = 3214772, upload-time = "2025-10-10T15:34:15.09Z" }, + { url = "https://files.pythonhosted.org/packages/4a/d8/c63d8adb6a7edaf8dcb6f75a2b1e9f8577960a1e489606859c4d73e7d32b/sqlalchemy-2.0.44-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f9480c0740aabd8cb29c329b422fb65358049840b34aba0adf63162371d2a96e", size = 3214434, upload-time = "2025-10-10T15:47:00.473Z" }, + { url = "https://files.pythonhosted.org/packages/ee/a6/243d277a4b54fae74d4797957a7320a5c210c293487f931cbe036debb697/sqlalchemy-2.0.44-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:17835885016b9e4d0135720160db3095dc78c583e7b902b6be799fb21035e749", size = 3155365, upload-time = "2025-10-10T15:34:17.932Z" }, + { url = "https://files.pythonhosted.org/packages/5f/f8/6a39516ddd75429fd4ee5a0d72e4c80639fab329b2467c75f363c2ed9751/sqlalchemy-2.0.44-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:cbe4f85f50c656d753890f39468fcd8190c5f08282caf19219f684225bfd5fd2", size = 3178910, upload-time = "2025-10-10T15:47:02.346Z" }, + { url = "https://files.pythonhosted.org/packages/43/f0/118355d4ad3c39d9a2f5ee4c7304a9665b3571482777357fa9920cd7a6b4/sqlalchemy-2.0.44-cp310-cp310-win32.whl", hash = "sha256:2fcc4901a86ed81dc76703f3b93ff881e08761c63263c46991081fd7f034b165", size = 2105624, upload-time = "2025-10-10T15:38:15.552Z" }, + { url = "https://files.pythonhosted.org/packages/61/83/6ae5f9466f8aa5d0dcebfff8c9c33b98b27ce23292df3b990454b3d434fd/sqlalchemy-2.0.44-cp310-cp310-win_amd64.whl", hash = "sha256:9919e77403a483ab81e3423151e8ffc9dd992c20d2603bf17e4a8161111e55f5", size = 2129240, upload-time = "2025-10-10T15:38:17.175Z" }, + { url = "https://files.pythonhosted.org/packages/e3/81/15d7c161c9ddf0900b076b55345872ed04ff1ed6a0666e5e94ab44b0163c/sqlalchemy-2.0.44-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:0fe3917059c7ab2ee3f35e77757062b1bea10a0b6ca633c58391e3f3c6c488dd", size = 2140517, upload-time = "2025-10-10T15:36:15.64Z" }, + { url = "https://files.pythonhosted.org/packages/d4/d5/4abd13b245c7d91bdf131d4916fd9e96a584dac74215f8b5bc945206a974/sqlalchemy-2.0.44-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:de4387a354ff230bc979b46b2207af841dc8bf29847b6c7dbe60af186d97aefa", size = 2130738, upload-time = "2025-10-10T15:36:16.91Z" }, + { url = "https://files.pythonhosted.org/packages/cb/3c/8418969879c26522019c1025171cefbb2a8586b6789ea13254ac602986c0/sqlalchemy-2.0.44-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c3678a0fb72c8a6a29422b2732fe423db3ce119c34421b5f9955873eb9b62c1e", size = 3304145, upload-time = "2025-10-10T15:34:19.569Z" }, + { url = "https://files.pythonhosted.org/packages/94/2d/fdb9246d9d32518bda5d90f4b65030b9bf403a935cfe4c36a474846517cb/sqlalchemy-2.0.44-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3cf6872a23601672d61a68f390e44703442639a12ee9dd5a88bbce52a695e46e", size = 3304511, upload-time = "2025-10-10T15:47:05.088Z" }, + { url = "https://files.pythonhosted.org/packages/7d/fb/40f2ad1da97d5c83f6c1269664678293d3fe28e90ad17a1093b735420549/sqlalchemy-2.0.44-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:329aa42d1be9929603f406186630135be1e7a42569540577ba2c69952b7cf399", size = 3235161, upload-time = "2025-10-10T15:34:21.193Z" }, + { url = "https://files.pythonhosted.org/packages/95/cb/7cf4078b46752dca917d18cf31910d4eff6076e5b513c2d66100c4293d83/sqlalchemy-2.0.44-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:70e03833faca7166e6a9927fbee7c27e6ecde436774cd0b24bbcc96353bce06b", size = 3261426, upload-time = "2025-10-10T15:47:07.196Z" }, + { url = "https://files.pythonhosted.org/packages/f8/3b/55c09b285cb2d55bdfa711e778bdffdd0dc3ffa052b0af41f1c5d6e582fa/sqlalchemy-2.0.44-cp311-cp311-win32.whl", hash = "sha256:253e2f29843fb303eca6b2fc645aca91fa7aa0aa70b38b6950da92d44ff267f3", size = 2105392, upload-time = "2025-10-10T15:38:20.051Z" }, + { url = "https://files.pythonhosted.org/packages/c7/23/907193c2f4d680aedbfbdf7bf24c13925e3c7c292e813326c1b84a0b878e/sqlalchemy-2.0.44-cp311-cp311-win_amd64.whl", hash = "sha256:7a8694107eb4308a13b425ca8c0e67112f8134c846b6e1f722698708741215d5", size = 2130293, upload-time = "2025-10-10T15:38:21.601Z" }, + { url = "https://files.pythonhosted.org/packages/62/c4/59c7c9b068e6813c898b771204aad36683c96318ed12d4233e1b18762164/sqlalchemy-2.0.44-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:72fea91746b5890f9e5e0997f16cbf3d53550580d76355ba2d998311b17b2250", size = 2139675, upload-time = "2025-10-10T16:03:31.064Z" }, + { url = "https://files.pythonhosted.org/packages/d6/ae/eeb0920537a6f9c5a3708e4a5fc55af25900216bdb4847ec29cfddf3bf3a/sqlalchemy-2.0.44-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:585c0c852a891450edbb1eaca8648408a3cc125f18cf433941fa6babcc359e29", size = 2127726, upload-time = "2025-10-10T16:03:35.934Z" }, + { url = "https://files.pythonhosted.org/packages/d8/d5/2ebbabe0379418eda8041c06b0b551f213576bfe4c2f09d77c06c07c8cc5/sqlalchemy-2.0.44-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9b94843a102efa9ac68a7a30cd46df3ff1ed9c658100d30a725d10d9c60a2f44", size = 3327603, upload-time = "2025-10-10T15:35:28.322Z" }, + { url = "https://files.pythonhosted.org/packages/45/e5/5aa65852dadc24b7d8ae75b7efb8d19303ed6ac93482e60c44a585930ea5/sqlalchemy-2.0.44-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:119dc41e7a7defcefc57189cfa0e61b1bf9c228211aba432b53fb71ef367fda1", size = 3337842, upload-time = "2025-10-10T15:43:45.431Z" }, + { url = "https://files.pythonhosted.org/packages/41/92/648f1afd3f20b71e880ca797a960f638d39d243e233a7082c93093c22378/sqlalchemy-2.0.44-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0765e318ee9179b3718c4fd7ba35c434f4dd20332fbc6857a5e8df17719c24d7", size = 3264558, upload-time = "2025-10-10T15:35:29.93Z" }, + { url = "https://files.pythonhosted.org/packages/40/cf/e27d7ee61a10f74b17740918e23cbc5bc62011b48282170dc4c66da8ec0f/sqlalchemy-2.0.44-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:2e7b5b079055e02d06a4308d0481658e4f06bc7ef211567edc8f7d5dce52018d", size = 3301570, upload-time = "2025-10-10T15:43:48.407Z" }, + { url = "https://files.pythonhosted.org/packages/3b/3d/3116a9a7b63e780fb402799b6da227435be878b6846b192f076d2f838654/sqlalchemy-2.0.44-cp312-cp312-win32.whl", hash = "sha256:846541e58b9a81cce7dee8329f352c318de25aa2f2bbe1e31587eb1f057448b4", size = 2103447, upload-time = "2025-10-10T15:03:21.678Z" }, + { url = "https://files.pythonhosted.org/packages/25/83/24690e9dfc241e6ab062df82cc0df7f4231c79ba98b273fa496fb3dd78ed/sqlalchemy-2.0.44-cp312-cp312-win_amd64.whl", hash = "sha256:7cbcb47fd66ab294703e1644f78971f6f2f1126424d2b300678f419aa73c7b6e", size = 2130912, upload-time = "2025-10-10T15:03:24.656Z" }, + { url = "https://files.pythonhosted.org/packages/45/d3/c67077a2249fdb455246e6853166360054c331db4613cda3e31ab1cadbef/sqlalchemy-2.0.44-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:ff486e183d151e51b1d694c7aa1695747599bb00b9f5f604092b54b74c64a8e1", size = 2135479, upload-time = "2025-10-10T16:03:37.671Z" }, + { url = "https://files.pythonhosted.org/packages/2b/91/eabd0688330d6fd114f5f12c4f89b0d02929f525e6bf7ff80aa17ca802af/sqlalchemy-2.0.44-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:0b1af8392eb27b372ddb783b317dea0f650241cea5bd29199b22235299ca2e45", size = 2123212, upload-time = "2025-10-10T16:03:41.755Z" }, + { url = "https://files.pythonhosted.org/packages/b0/bb/43e246cfe0e81c018076a16036d9b548c4cc649de241fa27d8d9ca6f85ab/sqlalchemy-2.0.44-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2b61188657e3a2b9ac4e8f04d6cf8e51046e28175f79464c67f2fd35bceb0976", size = 3255353, upload-time = "2025-10-10T15:35:31.221Z" }, + { url = "https://files.pythonhosted.org/packages/b9/96/c6105ed9a880abe346b64d3b6ddef269ddfcab04f7f3d90a0bf3c5a88e82/sqlalchemy-2.0.44-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b87e7b91a5d5973dda5f00cd61ef72ad75a1db73a386b62877d4875a8840959c", size = 3260222, upload-time = "2025-10-10T15:43:50.124Z" }, + { url = "https://files.pythonhosted.org/packages/44/16/1857e35a47155b5ad927272fee81ae49d398959cb749edca6eaa399b582f/sqlalchemy-2.0.44-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:15f3326f7f0b2bfe406ee562e17f43f36e16167af99c4c0df61db668de20002d", size = 3189614, upload-time = "2025-10-10T15:35:32.578Z" }, + { url = "https://files.pythonhosted.org/packages/88/ee/4afb39a8ee4fc786e2d716c20ab87b5b1fb33d4ac4129a1aaa574ae8a585/sqlalchemy-2.0.44-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:1e77faf6ff919aa8cd63f1c4e561cac1d9a454a191bb864d5dd5e545935e5a40", size = 3226248, upload-time = "2025-10-10T15:43:51.862Z" }, + { url = "https://files.pythonhosted.org/packages/32/d5/0e66097fc64fa266f29a7963296b40a80d6a997b7ac13806183700676f86/sqlalchemy-2.0.44-cp313-cp313-win32.whl", hash = "sha256:ee51625c2d51f8baadf2829fae817ad0b66b140573939dd69284d2ba3553ae73", size = 2101275, upload-time = "2025-10-10T15:03:26.096Z" }, + { url = "https://files.pythonhosted.org/packages/03/51/665617fe4f8c6450f42a6d8d69243f9420f5677395572c2fe9d21b493b7b/sqlalchemy-2.0.44-cp313-cp313-win_amd64.whl", hash = "sha256:c1c80faaee1a6c3428cecf40d16a2365bcf56c424c92c2b6f0f9ad204b899e9e", size = 2127901, upload-time = "2025-10-10T15:03:27.548Z" }, + { url = "https://files.pythonhosted.org/packages/9c/5e/6a29fa884d9fb7ddadf6b69490a9d45fded3b38541713010dad16b77d015/sqlalchemy-2.0.44-py3-none-any.whl", hash = "sha256:19de7ca1246fbef9f9d1bff8f1ab25641569df226364a0e40457dc5457c54b05", size = 1928718, upload-time = "2025-10-10T15:29:45.32Z" }, +] + [[package]] name = "sse-starlette" version = "3.0.2"