Skip to content

Commit 8e0ea73

Browse files
authored
Add ASGI FastStream support ✨ (#64)
* Add faststream config * Update config * Add FastStream bootstrapper * Fix annotations * Update * Update * Update * Add health checks * Update * Add prometheus config * Update * Update * Update * Fix logging * Update prometheus config * Update * Update * Update * Update * Fix tests * Update * Update * Update
1 parent f603581 commit 8e0ea73

File tree

9 files changed

+412
-16
lines changed

9 files changed

+412
-16
lines changed

README.md

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ With <b>microbootstrap</b>, you receive an application with lightweight built-in
4545

4646
Those instruments can be bootstrapped for:
4747

48-
- `fastapi`
49-
- `litestar`
48+
- `fastapi`,
49+
- `litestar`,
50+
- or `faststream` service,
51+
- or even a service that doesn't use one of these frameworks.
5052

5153
Interested? Let's dive right in ⚡
5254

@@ -71,22 +73,30 @@ Interested? Let's dive right in ⚡
7173

7274
## Installation
7375

74-
You can install the package using either `pip` or `poetry`.
7576
Also, you can specify extras during installation for concrete framework:
7677

7778
- `fastapi`
7879
- `litestar`
80+
- `faststream` (ASGI app)
81+
82+
Also we have `granian` extra that is requires for `create_granian_server`.
83+
84+
For uv:
85+
86+
```bash
87+
uv add "microbootstrap[fastapi]"
88+
```
7989

8090
For poetry:
8191

8292
```bash
83-
$ poetry add microbootstrap -E fastapi
93+
poetry add microbootstrap -E fastapi
8494
```
8595

8696
For pip:
8797

8898
```bash
89-
$ pip install microbootstrap[fastapi]
99+
pip install "microbootstrap[fastapi]"
90100
```
91101

92102
## Quickstart
@@ -224,7 +234,7 @@ These settings are subsequently passed to the [sentry-sdk](https://pypi.org/proj
224234

225235
### Prometheus
226236

227-
Prometheus integration presents a challenge because the underlying libraries for `FastAPI` and `Litestar` differ significantly, making it impossible to unify them under a single interface. As a result, the Prometheus settings for `FastAPI` and `Litestar` must be configured separately.
237+
Prometheus integration presents a challenge because the underlying libraries for `FastAPI`, `Litestar` and `FastStream` differ significantly, making it impossible to unify them under a single interface. As a result, the Prometheus settings for `FastAPI`, `Litestar` and `FastStream` must be configured separately.
228238

229239
#### FastAPI
230240

@@ -234,7 +244,7 @@ To bootstrap prometheus you have to provide `prometheus_metrics_path`
234244
from microbootstrap.settings import FastApiSettings
235245

236246

237-
class YourFastApiSettings(FastApiSettings):
247+
class YourSettings(FastApiSettings):
238248
service_name: str
239249

240250
prometheus_metrics_path: str = "/metrics"
@@ -255,7 +265,7 @@ Parameters description:
255265
- `prometheus_instrument_params` - will be passed to `Instrumentor.instrument(...)`.
256266
- `prometheus_expose_params` - will be passed to `Instrumentor.expose(...)`.
257267

258-
FastApi prometheus bootstrapper uses [prometheus-fastapi-instrumentator](https://github.com/trallnag/prometheus-fastapi-instrumentator) that's why there are three different dict for parameters.
268+
FastAPI prometheus bootstrapper uses [prometheus-fastapi-instrumentator](https://github.com/trallnag/prometheus-fastapi-instrumentator) that's why there are three different dict for parameters.
259269

260270
#### Litestar
261271

@@ -265,7 +275,7 @@ To bootstrap prometheus you have to provide `prometheus_metrics_path`
265275
from microbootstrap.settings import LitestarSettings
266276

267277

268-
class YourFastApiSettings(LitestarSettings):
278+
class YourSettings(LitestarSettings):
269279
service_name: str
270280

271281
prometheus_metrics_path: str = "/metrics"
@@ -280,6 +290,30 @@ Parameters description:
280290
- `prometheus_metrics_path` - path to metrics handler.
281291
- `prometheus_additional_params` - will be passed to `litestar.contrib.prometheus.PrometheusConfig`.
282292

293+
#### FastStream
294+
295+
To bootstrap prometheus you have to provide `prometheus_metrics_path` and `prometheus_middleware_cls`:
296+
297+
```python
298+
from microbootstrap import FastStreamSettings
299+
from faststream.redis.prometheus import RedisPrometheusMiddleware
300+
301+
302+
class YourSettings(FastStreamSettings):
303+
service_name: str
304+
305+
prometheus_metrics_path: str = "/metrics"
306+
prometheus_middleware_cls: type[FastStreamPrometheusMiddlewareProtocol] | None = RedisPrometheusMiddleware
307+
308+
... # Other settings here
309+
```
310+
311+
Parameters description:
312+
313+
- `service_name` - will be attached to metric's names, there are no name restrictions.
314+
- `prometheus_metrics_path` - path to metrics handler.
315+
- `prometheus_middleware_cls` - Prometheus middleware for your broker.
316+
283317
### Opentelemetry
284318

285319
To bootstrap Opentelemetry, you must provide several parameters:
@@ -293,7 +327,7 @@ To bootstrap Opentelemetry, you must provide several parameters:
293327
However, additional parameters can also be supplied if needed.
294328

295329
```python
296-
from microbootstrap.settings import BaseServiceSettings
330+
from microbootstrap.settings import BaseServiceSettings, FastStreamPrometheusMiddlewareProtocol
297331
from microbootstrap.instruments.opentelemetry_instrument import OpenTelemetryInstrumentor
298332

299333

@@ -326,6 +360,21 @@ Parameters description:
326360

327361
These settings are subsequently passed to [opentelemetry](https://opentelemetry.io/), finalizing your Opentelemetry integration.
328362

363+
#### FastStream
364+
365+
For FastStream you also should pass `opentelemetry_middleware_cls` - OpenTelemetry middleware for your broker
366+
367+
```python
368+
from microbootstrap import FastStreamSettings, FastStreamTelemetryMiddlewareProtocol
369+
from faststream.redis.opentelemetry import RedisTelemetryMiddleware
370+
371+
372+
class YourSettings(FastStreamSettings):
373+
...
374+
opentelemetry_middleware_cls: type[FastStreamTelemetryMiddlewareProtocol] | None = RedisTelemetryMiddleware
375+
...
376+
```
377+
329378
### Logging
330379

331380
<b>microbootstrap</b> provides in-memory JSON logging through the use of [structlog](https://pypi.org/project/structlog/).
@@ -412,6 +461,18 @@ Parameter descriptions:
412461
- `swagger_offline_docs` - A boolean value that, when set to True, allows the Swagger JS bundles to be accessed offline. This is because the service starts to host via static.
413462
- `swagger_extra_params` - Additional parameters to pass into the OpenAPI configuration.
414463

464+
#### FastStream AsyncAPI documentation
465+
466+
AsyncAPI documentation is available by default under `/asyncapi` route. You can change that by setting `asyncapi_path`:
467+
468+
```python
469+
from microbootstrap import FastStreamSettings
470+
471+
472+
class YourSettings(FastStreamSettings):
473+
asyncapi_path: str | None = None
474+
```
475+
415476
### Health checks
416477

417478
```python

microbootstrap/__init__.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,36 @@
11
from microbootstrap.instruments.cors_instrument import CorsConfig
22
from microbootstrap.instruments.health_checks_instrument import HealthChecksConfig
33
from microbootstrap.instruments.logging_instrument import LoggingConfig
4-
from microbootstrap.instruments.opentelemetry_instrument import OpentelemetryConfig
5-
from microbootstrap.instruments.prometheus_instrument import FastApiPrometheusConfig, LitestarPrometheusConfig
4+
from microbootstrap.instruments.opentelemetry_instrument import (
5+
FastStreamOpentelemetryConfig,
6+
FastStreamTelemetryMiddlewareProtocol,
7+
OpentelemetryConfig,
8+
)
9+
from microbootstrap.instruments.prometheus_instrument import (
10+
FastApiPrometheusConfig,
11+
FastStreamPrometheusConfig,
12+
FastStreamPrometheusMiddlewareProtocol,
13+
LitestarPrometheusConfig,
14+
)
615
from microbootstrap.instruments.sentry_instrument import SentryConfig
716
from microbootstrap.instruments.swagger_instrument import SwaggerConfig
8-
from microbootstrap.settings import FastApiSettings, InstrumentsSetupperSettings, LitestarSettings
17+
from microbootstrap.settings import (
18+
FastApiSettings,
19+
FastStreamSettings,
20+
InstrumentsSetupperSettings,
21+
LitestarSettings,
22+
)
923

1024

1125
__all__ = (
1226
"CorsConfig",
1327
"FastApiPrometheusConfig",
1428
"FastApiSettings",
29+
"FastStreamOpentelemetryConfig",
30+
"FastStreamPrometheusConfig",
31+
"FastStreamPrometheusMiddlewareProtocol",
32+
"FastStreamSettings",
33+
"FastStreamTelemetryMiddlewareProtocol",
1534
"HealthChecksConfig",
1635
"InstrumentsSetupperSettings",
1736
"LitestarPrometheusConfig",
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from __future__ import annotations
2+
import json
3+
import typing
4+
5+
import prometheus_client
6+
import structlog
7+
import typing_extensions
8+
from faststream.asgi import AsgiFastStream, AsgiResponse
9+
from faststream.asgi import get as handle_get
10+
11+
from microbootstrap.bootstrappers.base import ApplicationBootstrapper
12+
from microbootstrap.config.faststream import FastStreamConfig
13+
from microbootstrap.instruments.health_checks_instrument import HealthChecksInstrument
14+
from microbootstrap.instruments.logging_instrument import LoggingInstrument
15+
from microbootstrap.instruments.opentelemetry_instrument import (
16+
BaseOpentelemetryInstrument,
17+
FastStreamOpentelemetryConfig,
18+
)
19+
from microbootstrap.instruments.prometheus_instrument import FastStreamPrometheusConfig, PrometheusInstrument
20+
from microbootstrap.instruments.sentry_instrument import SentryInstrument
21+
from microbootstrap.settings import FastStreamSettings
22+
23+
24+
class KwargsAsgiFastStream(AsgiFastStream):
25+
def __init__(self, **kwargs: typing.Any) -> None: # noqa: ANN401
26+
# `broker` argument is positional-only
27+
super().__init__(kwargs.pop("broker", None), **kwargs)
28+
29+
30+
class FastStreamBootstrapper(ApplicationBootstrapper[FastStreamSettings, AsgiFastStream, FastStreamConfig]):
31+
application_config = FastStreamConfig()
32+
application_type = KwargsAsgiFastStream
33+
34+
def bootstrap_before(self: typing_extensions.Self) -> dict[str, typing.Any]:
35+
return {
36+
"title": self.settings.service_name,
37+
"version": self.settings.service_version,
38+
"description": self.settings.service_description,
39+
"on_shutdown": [self.teardown],
40+
"on_startup": [self.console_writer.print_bootstrap_table],
41+
"asyncapi_path": self.settings.asyncapi_path,
42+
}
43+
44+
45+
FastStreamBootstrapper.use_instrument()(SentryInstrument)
46+
47+
48+
@FastStreamBootstrapper.use_instrument()
49+
class FastStreamOpentelemetryInstrument(BaseOpentelemetryInstrument[FastStreamOpentelemetryConfig]):
50+
def is_ready(self) -> bool:
51+
return bool(self.instrument_config.opentelemetry_middleware_cls and super().is_ready())
52+
53+
def bootstrap_after(self, application: AsgiFastStream) -> AsgiFastStream: # type: ignore[override]
54+
if self.instrument_config.opentelemetry_middleware_cls and application.broker:
55+
application.broker.add_middleware(
56+
self.instrument_config.opentelemetry_middleware_cls(tracer_provider=self.tracer_provider)
57+
)
58+
return application
59+
60+
@classmethod
61+
def get_config_type(cls) -> type[FastStreamOpentelemetryConfig]:
62+
return FastStreamOpentelemetryConfig
63+
64+
65+
@FastStreamBootstrapper.use_instrument()
66+
class FastStreamLoggingInstrument(LoggingInstrument):
67+
def bootstrap_before(self) -> dict[str, typing.Any]:
68+
return {"logger": structlog.get_logger("microbootstrap-faststream")}
69+
70+
71+
@FastStreamBootstrapper.use_instrument()
72+
class FastStreamPrometheusInstrument(PrometheusInstrument[FastStreamPrometheusConfig]):
73+
def is_ready(self) -> bool:
74+
return bool(self.instrument_config.prometheus_middleware_cls and super().is_ready())
75+
76+
def bootstrap_before(self) -> dict[str, typing.Any]:
77+
self.collector_registry = prometheus_client.CollectorRegistry()
78+
return {
79+
"asgi_routes": (
80+
(
81+
self.instrument_config.prometheus_metrics_path,
82+
prometheus_client.make_asgi_app(self.collector_registry),
83+
),
84+
)
85+
}
86+
87+
def bootstrap_after(self, application: AsgiFastStream) -> AsgiFastStream: # type: ignore[override]
88+
if self.instrument_config.prometheus_middleware_cls and application.broker:
89+
application.broker.add_middleware(
90+
self.instrument_config.prometheus_middleware_cls(registry=self.collector_registry)
91+
)
92+
return application
93+
94+
@classmethod
95+
def get_config_type(cls) -> type[FastStreamPrometheusConfig]:
96+
return FastStreamPrometheusConfig
97+
98+
99+
@FastStreamBootstrapper.use_instrument()
100+
class FastStreamHealthChecksInstrument(HealthChecksInstrument):
101+
def bootstrap_before(self) -> dict[str, typing.Any]:
102+
@handle_get
103+
async def check_health(scope: typing.Any) -> AsgiResponse: # noqa: ANN401, ARG001
104+
health_check_data = await self.health_check.check_health()
105+
return (
106+
AsgiResponse(json.dumps(health_check_data).encode(), 200, headers={"content-type": "text/plain"})
107+
if health_check_data["health_status"]
108+
else AsgiResponse(b"Service is unhealthy", 500, headers={"content-type": "application/json"})
109+
)
110+
111+
return {"asgi_routes": ((self.instrument_config.health_checks_path, check_health),)}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from __future__ import annotations
2+
import dataclasses
3+
import typing
4+
5+
6+
if typing.TYPE_CHECKING:
7+
import faststream.asyncapi.schema as asyncapi
8+
from faststream.asgi.types import ASGIApp
9+
from faststream.broker.core.usecase import BrokerUsecase
10+
from faststream.types import AnyDict, AnyHttpUrl, Lifespan
11+
12+
13+
@dataclasses.dataclass
14+
class FastStreamConfig:
15+
broker: BrokerUsecase[typing.Any, typing.Any] | None = None
16+
asgi_routes: typing.Sequence[tuple[str, ASGIApp]] = ()
17+
lifespan: Lifespan | None = None
18+
terms_of_service: AnyHttpUrl | None = None
19+
license: asyncapi.License | asyncapi.LicenseDict | AnyDict | None = None
20+
contact: asyncapi.Contact | asyncapi.ContactDict | AnyDict | None = None
21+
tags: typing.Sequence[asyncapi.Tag | asyncapi.TagDict | AnyDict] | None = None
22+
external_docs: asyncapi.ExternalDocs | asyncapi.ExternalDocsDict | AnyDict | None = None
23+
identifier: str | None = None
24+
on_startup: typing.Sequence[typing.Callable[..., typing.Any]] = ()
25+
after_startup: typing.Sequence[typing.Callable[..., typing.Any]] = ()
26+
on_shutdown: typing.Sequence[typing.Callable[..., typing.Any]] = ()
27+
after_shutdown: typing.Sequence[typing.Callable[..., typing.Any]] = ()

0 commit comments

Comments
 (0)