Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions .github/workflows/codspeed_benchmarks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: Benchmarks

on:
workflow_dispatch:

jobs:
benchmarks:
name: Run benchmarks
runs-on: codspeed-macro
services:
kafka:
image: confluentinc/cp-kafka:8.0.0
ports:
- 9092:9092
env:
KAFKA_NODE_ID: "1"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://127.0.0.1:9092"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_BROKER_ID: "1"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
rabbitmq:
image: rabbitmq:alpine
ports:
- 5672:5672
nats:
image: diementros/nats:js
ports:
- 4222:4222
redis:
image: redis:alpine
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.13"
- name: Install Dependencies
run: |
uv pip install --system --group optionals --group testing .
- name: Install Pydantic v2
run: uv pip install --system --prerelease=disallow "pydantic>=2.0.0,<3.0.0"
- name: Run the benchmarks
uses: CodSpeedHQ/action@v4
with:
mode: walltime
run: uv run pytest benchmarks/ --codspeed
token: ${{ secrets.CODSPEED_TOKEN }}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ htmlcov
token
.DS_Store
*.egg-info
.codspeed

docs/site/
docs/site_build/
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,9 @@ More integration features can be found [here](https://faststream.ag2.ai/latest/g

---

## Benchmarks
We use codspeed to run benchmarks for both FastStream itself and raw clients.

## Stay in touch

Please show your support and stay in touch by:
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ docker run --rm \
--network=host \
python:3.12-slim \
/bin/bash -c "
pip install 'faststream[rabbit,redis,nats,kafka,confluent]==0.6.0rc0' fast-depends psutil && \
pip install 'faststream[rabbit,redis,nats,kafka,confluent]==0.6.0rc0' fast-depends psutil pytest && \
python bench.py"
```
11 changes: 9 additions & 2 deletions benchmarks/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ class TestCase(Protocol):
broker_type: str
comment: str

def setup_method(self) -> None: ...

@asynccontextmanager
async def start(self) -> AsyncIterator[float]: ...

@asynccontextmanager
async def test_consume_message(self) -> None: ...


@dataclass
class MeasureResult:
Expand Down Expand Up @@ -57,9 +62,11 @@ async def main(case: TestCase, measure_time: int) -> MeasureResult:


if __name__ == "__main__":
from rabbit_cases.basic import RabbitTestCase
from rabbit_cases.test_aiopika import TestRabbitCase

case: TestCase = TestRabbitCase()

case: TestCase = RabbitTestCase()
case.setup_method()

bench_file = Path(__file__).resolve().parent / "benches.csv"

Expand Down
22 changes: 11 additions & 11 deletions benchmarks/benches.csv
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FastStream Version;Broker;Total Events;Event per second;Elapsed Time;Measure Time;Python Version;Comments;Host Memory
0.5.48;RabbitMQ;530557;883.1;600.7908401489258;2025-08-15T16:27:12.832939+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;RabbitMQ;584133;972.42;600.7002210617065;2025-08-15T14:55:56.580142+00:00;3.12.11;Consume Any Message;15.03 GB
0.5.48;NATS;804013;1339.69;600.1491076946259;2025-08-15T17:13:45.211546+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;NATS;954535;1590.36;600.2000441551208;2025-08-15T17:37:55.896376+00:00;3.12.11;Consume Any Message;15.03 GB
0.5.48;Redis;683286;1138.22;600.3099718093872;2025-08-15T18:29:38.791560+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;Redis;653702;1088.94;600.3076493740082;2025-08-15T18:42:33.665307+00:00;3.12.11;Consume Any Message;15.03 GB
0.5.48;Kafka;315477;525.64;600.1727740764618;2025-08-17T12:15:45.879776+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;Kafka;351082;584.99;600.1487319469452;2025-08-17T12:33:43.495466+00:00;3.12.11;Consume Any Message;15.03 GB
0.5.48;Confluent;357784;595.93;600.3742282390594;2025-08-17T14:55:24.771995+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;Confluent;385732;642.49;600.3732979297638;2025-08-17T15:37:25.498074+00:00;3.12.11;Consume Any Message;15.03 GB
FastStream Version;Broker;Total Events;Event per second;Elapsed Time;Measure Time;Python Version;Comments;Host Memory
0.5.48;RabbitMQ;530557;883.1;600.7908401489258;2025-08-15T16:27:12.832939+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;RabbitMQ;584133;972.42;600.7002210617065;2025-08-15T14:55:56.580142+00:00;3.12.11;Consume Any Message;15.03 GB
0.5.48;NATS;804013;1339.69;600.1491076946259;2025-08-15T17:13:45.211546+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;NATS;954535;1590.36;600.2000441551208;2025-08-15T17:37:55.896376+00:00;3.12.11;Consume Any Message;15.03 GB
0.5.48;Redis;683286;1138.22;600.3099718093872;2025-08-15T18:29:38.791560+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;Redis;653702;1088.94;600.3076493740082;2025-08-15T18:42:33.665307+00:00;3.12.11;Consume Any Message;15.03 GB
0.5.48;Kafka;315477;525.64;600.1727740764618;2025-08-17T12:15:45.879776+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;Kafka;351082;584.99;600.1487319469452;2025-08-17T12:33:43.495466+00:00;3.12.11;Consume Any Message;15.03 GB
0.5.48;Confluent;357784;595.93;600.3742282390594;2025-08-17T14:55:24.771995+00:00;3.12.11;Consume Any Message;15.03 GB
0.6.0rc0;Confluent;385732;642.49;600.3732979297638;2025-08-17T15:37:25.498074+00:00;3.12.11;Consume Any Message;15.03 GB
Empty file.
11 changes: 11 additions & 0 deletions benchmarks/confluent_cases/schemas/msgspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from msgspec import Struct


class BaseSchema(Struct):
name: str
age: int
fullname: str


class Schema(BaseSchema):
children: list[BaseSchema]
11 changes: 11 additions & 0 deletions benchmarks/confluent_cases/schemas/pydantic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pydantic import BaseModel


class BaseSchema(BaseModel):
name: str
age: int
fullname: str


class Schema(BaseSchema):
children: list[BaseSchema]
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import asyncio
import time
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

import pytest

from faststream.confluent import KafkaBroker, TopicPartition


class ConfluentTestCase:
@pytest.mark.asyncio()
@pytest.mark.benchmark(
min_time=150,
max_time=300,
)
class TestConfluentCase:
comment = "Consume Any Message"
broker_type = "Confluent"

def __init__(self) -> None:
def setup_method(self) -> None:
self.EVENTS_PROCESSED = 0

broker = self.broker = KafkaBroker(logger=None, graceful_timeout=10)
Expand Down Expand Up @@ -41,3 +49,8 @@ async def start(self) -> AsyncIterator[float]:
})

yield start_time

async def test_consume_message(self) -> None:
async with self.start() as start_time:
await asyncio.sleep(6.0)
assert self.EVENTS_PROCESSED > 1
92 changes: 92 additions & 0 deletions benchmarks/confluent_cases/test_confluent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import asyncio
import json
import time
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

import pytest
from confluent_kafka import Consumer, Producer, TopicPartition

from faststream._internal.utils.functions import run_in_executor

from .schemas.pydantic import Schema


@pytest.mark.asyncio()
@pytest.mark.benchmark(
min_time=150,
max_time=300,
)
class TestConfluentCase:
comment = "Pure confluent client with pydantic"
broker_type = "Confluent"

def setup_method(self) -> None:
self.EVENTS_PROCESSED = 0

self.producer = Producer({
"bootstrap.servers": "localhost:9092",
})

self.consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"enable.auto.commit": True,
"auto.offset.reset": "earliest"
})
self.consumer.assign([TopicPartition("in", 0, 0)])

@asynccontextmanager
async def start(self) -> AsyncIterator[float]:
stop_event = asyncio.Event()

def acked(err, msg) -> None: # noqa: ANN001
if err is not None:
print(f"Failed to deliver message: {msg!s}: {err!s}")

def handle() -> None:
while not stop_event.is_set():
try:
msg = self.consumer.poll(timeout=0.01)
except RuntimeError:
break
if msg is None:
continue
self.EVENTS_PROCESSED += 1
data = json.loads(msg.value().decode("utf-8"))
parsed = Schema(**data)
self.producer.produce("in", value=parsed.model_dump_json().encode("utf-8"), callback=acked)
self.producer.flush()

loop = asyncio.get_event_loop()
start_time = time.time()
executor_task = loop.run_in_executor(None, handle)

value = json.dumps({
"name": "John",
"age": 39,
"fullname": "LongString" * 8,
"children": [
{
"name": "Mike",
"age": 8,
"fullname": "LongString" * 8
}
]
}).encode("utf-8")

await run_in_executor(None, self.producer.produce, "in", value=value)
await run_in_executor(None, self.producer.poll, 0)

try:
yield start_time
finally:
stop_event.set()
await executor_task
await run_in_executor(None, self.producer.flush)
await run_in_executor(None, self.consumer.close)

async def test_consume_message(self) -> None:
async with self.start() as start_time:
await asyncio.sleep(6.0)
assert self.EVENTS_PROCESSED > 1
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
import asyncio
import time
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fast_depends.msgspec import MsgSpecSerializer
from msgspec import Struct
import pytest

from fast_depends.msgspec import MsgSpecSerializer
from faststream.confluent import KafkaBroker


class BaseSchema(Struct):
name: str
age: int
fullname: str
from .schemas.msgspec import Schema


class Schema(BaseSchema):
children: list[BaseSchema]


class ConfluentTestCase:
@pytest.mark.asyncio()
@pytest.mark.benchmark(
min_time=150,
max_time=300,
)
class TestConfluentCase:
comment = "Consume Msgspec Struct"
broker_type = "Confluent"

def __init__(self) -> None:
def setup_method(self) -> None:
self.EVENTS_PROCESSED = 0

broker = self.broker = KafkaBroker(
Expand Down Expand Up @@ -55,3 +53,8 @@ async def start(self) -> AsyncIterator[float]:
})

yield start_time

async def test_consume_message(self) -> None:
async with self.start() as start_time:
await asyncio.sleep(6.0)
assert self.EVENTS_PROCESSED > 1
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
import asyncio
import time
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from pydantic import BaseModel
import pytest

from faststream.confluent import KafkaBroker

from .schemas.pydantic import Schema

class BaseSchema(BaseModel):
name: str
age: int
fullname: str


class Schema(BaseSchema):
children: list[BaseSchema]


class ConfluentTestCase:
@pytest.mark.asyncio()
@pytest.mark.benchmark(
min_time=150,
max_time=300,
)
class TestConfluentCase:
comment = "Consume Pydantic Model"
broker_type = "Confluent"

def __init__(self) -> None:
def setup_method(self) -> None:
self.EVENTS_PROCESSED = 0

broker = self.broker = KafkaBroker(logger=None, graceful_timeout=10)
Expand Down Expand Up @@ -50,3 +48,8 @@ async def start(self) -> AsyncIterator[float]:
})

yield start_time

async def test_consume_message(self) -> None:
async with self.start() as start_time:
await asyncio.sleep(6.0)
assert self.EVENTS_PROCESSED > 1
Empty file.
11 changes: 11 additions & 0 deletions benchmarks/kafka_cases/schemas/msgspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from msgspec import Struct


class BaseSchema(Struct):
name: str
age: int
fullname: str


class Schema(BaseSchema):
children: list[BaseSchema]
Loading
Loading