Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions faststream/confluent/prometheus/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ def __init__(
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Optional[Sequence[float]] = None,
is_multiprocess: bool = False,
multiprocess_dir: Optional[str] = None,
) -> None:
super().__init__(
settings_provider_factory=settings_provider_factory,
registry=registry,
app_name=app_name,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
is_multiprocess=is_multiprocess,
multiprocess_dir=multiprocess_dir,
)
4 changes: 4 additions & 0 deletions faststream/kafka/prometheus/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ def __init__(
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Optional[Sequence[float]] = None,
is_multiprocess: bool = False,
multiprocess_dir: Optional[str] = None,
) -> None:
super().__init__(
settings_provider_factory=settings_provider_factory,
registry=registry,
app_name=app_name,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
is_multiprocess=is_multiprocess,
multiprocess_dir=multiprocess_dir,
)
4 changes: 4 additions & 0 deletions faststream/nats/prometheus/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ def __init__(
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Optional[Sequence[float]] = None,
is_multiprocess: bool = False,
multiprocess_dir: Optional[str] = None,
) -> None:
super().__init__(
settings_provider_factory=settings_provider_factory,
registry=registry,
app_name=app_name,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
is_multiprocess=is_multiprocess,
multiprocess_dir=multiprocess_dir,
)
9 changes: 9 additions & 0 deletions faststream/prometheus/middleware.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
from typing import TYPE_CHECKING, Any, Callable, Optional, Sequence

from prometheus_client import multiprocess

from faststream import BaseMiddleware
from faststream.exceptions import IgnoredException
from faststream.prometheus.consts import (
Expand Down Expand Up @@ -182,6 +184,8 @@ def __init__(
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Optional[Sequence[float]] = None,
is_multiprocess: bool = False,
multiprocess_dir: Optional[str] = None,
):
if app_name is EMPTY:
app_name = metrics_prefix
Expand All @@ -197,6 +201,11 @@ def __init__(
app_name=app_name,
)

if is_multiprocess:
if multiprocess_dir is None:
raise ValueError("Multiprocess mode requires multiprocess_dir.")
multiprocess.MultiProcessCollector(registry, path=multiprocess_dir)

def __call__(self, msg: Optional[Any]) -> BaseMiddleware:
return PrometheusMiddleware(
msg=msg,
Expand Down
4 changes: 4 additions & 0 deletions faststream/rabbit/prometheus/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ def __init__(
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Optional[Sequence[float]] = None,
is_multiprocess: bool = False,
multiprocess_dir: Optional[str] = None,
) -> None:
super().__init__(
settings_provider_factory=lambda _: RabbitMetricsSettingsProvider(),
registry=registry,
app_name=app_name,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
is_multiprocess=is_multiprocess,
multiprocess_dir=multiprocess_dir,
)
4 changes: 4 additions & 0 deletions faststream/redis/prometheus/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ def __init__(
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Optional[Sequence[float]] = None,
is_multiprocess: bool = False,
multiprocess_dir: Optional[str] = None,
) -> None:
super().__init__(
settings_provider_factory=settings_provider_factory,
registry=registry,
app_name=app_name,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
is_multiprocess=is_multiprocess,
multiprocess_dir=multiprocess_dir,
)
52 changes: 52 additions & 0 deletions tests/prometheus/confluent/test_confluent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -77,3 +79,53 @@ def get_broker(self, apply_types: bool = False, **kwargs):
apply_types=apply_types,
**kwargs,
)


class TestKafkaPrometheusMiddleware:
def test_multiprocess_requires_directory(self):
with pytest.raises(ValueError): # noqa: PT011
KafkaPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=None,
)

def test_multiprocess_with_directory(self):
with TemporaryDirectory() as temp_dir:
middleware = KafkaPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=temp_dir,
)
assert middleware._metrics_container._registry is not None

def test_multiprocess_directory_creation(self):
with TemporaryDirectory() as temp_dir:
middleware = KafkaPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=temp_dir,
)

assert Path(temp_dir).exists()

# In multiprocess mode, the registry should have a MultiProcessCollector
collectors = list(
middleware._metrics_container._registry._collector_to_names.keys()
)
assert any("MultiProcessCollector" in str(c) for c in collectors)

def test_single_process_no_directory(self):
middleware = KafkaPrometheusMiddleware(
registry=CollectorRegistry(), app_name="test-app", is_multiprocess=False
)
assert middleware._metrics_container._registry is not None

# In single process mode, there should be no MultiProcessCollector
collectors = list(
middleware._metrics_container._registry._collector_to_names.keys()
)
assert not any("MultiProcessCollector" in str(c) for c in collectors)
52 changes: 52 additions & 0 deletions tests/prometheus/kafka/test_kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -80,3 +82,53 @@ def get_broker(self, apply_types: bool = False, **kwargs):
apply_types=apply_types,
**kwargs,
)


class TestKafkaPrometheusMiddleware:
def test_multiprocess_requires_directory(self):
with pytest.raises(ValueError): # noqa: PT011
KafkaPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=None,
)

def test_multiprocess_with_directory(self):
with TemporaryDirectory() as temp_dir:
middleware = KafkaPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=temp_dir,
)
assert middleware._metrics_container._registry is not None

def test_multiprocess_directory_creation(self):
with TemporaryDirectory() as temp_dir:
middleware = KafkaPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=temp_dir,
)

assert Path(temp_dir).exists()

# In multiprocess mode, the registry should have a MultiProcessCollector
collectors = list(
middleware._metrics_container._registry._collector_to_names.keys()
)
assert any("MultiProcessCollector" in str(c) for c in collectors)

def test_single_process_no_directory(self):
middleware = KafkaPrometheusMiddleware(
registry=CollectorRegistry(), app_name="test-app", is_multiprocess=False
)
assert middleware._metrics_container._registry is not None

# In single process mode, there should be no MultiProcessCollector
collectors = list(
middleware._metrics_container._registry._collector_to_names.keys()
)
assert not any("MultiProcessCollector" in str(c) for c in collectors)
52 changes: 52 additions & 0 deletions tests/prometheus/nats/test_nats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -84,3 +86,53 @@ def get_broker(self, apply_types: bool = False, **kwargs):
apply_types=apply_types,
**kwargs,
)


class TestNatsPrometheusMiddleware:
def test_multiprocess_requires_directory(self):
with pytest.raises(ValueError): # noqa: PT011
NatsPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=None,
)

def test_multiprocess_with_directory(self):
with TemporaryDirectory() as temp_dir:
middleware = NatsPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=temp_dir,
)
assert middleware._metrics_container._registry is not None

def test_multiprocess_directory_creation(self):
with TemporaryDirectory() as temp_dir:
middleware = NatsPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=temp_dir,
)

assert Path(temp_dir).exists()

# In multiprocess mode, the registry should have a MultiProcessCollector
collectors = list(
middleware._metrics_container._registry._collector_to_names.keys()
)
assert any("MultiProcessCollector" in str(c) for c in collectors)

def test_single_process_no_directory(self):
middleware = NatsPrometheusMiddleware(
registry=CollectorRegistry(), app_name="test-app", is_multiprocess=False
)
assert middleware._metrics_container._registry is not None

# In single process mode, there should be no MultiProcessCollector
collectors = list(
middleware._metrics_container._registry._collector_to_names.keys()
)
assert not any("MultiProcessCollector" in str(c) for c in collectors)
53 changes: 53 additions & 0 deletions tests/prometheus/rabbit/test_rabbit.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from pathlib import Path
from tempfile import TemporaryDirectory

import pytest
from prometheus_client import CollectorRegistry

Expand Down Expand Up @@ -40,3 +43,53 @@ def get_broker(self, apply_types: bool = False, **kwargs):
apply_types=apply_types,
**kwargs,
)


class TestKafkaPrometheusMiddleware:
def test_multiprocess_requires_directory(self):
with pytest.raises(ValueError): # noqa: PT011
RabbitPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=None,
)

def test_multiprocess_with_directory(self):
with TemporaryDirectory() as temp_dir:
middleware = RabbitPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=temp_dir,
)
assert middleware._metrics_container._registry is not None

def test_multiprocess_directory_creation(self):
with TemporaryDirectory() as temp_dir:
middleware = RabbitPrometheusMiddleware(
registry=CollectorRegistry(),
app_name="test-app",
is_multiprocess=True,
multiprocess_dir=temp_dir,
)

assert Path(temp_dir).exists()

# In multiprocess mode, the registry should have a MultiProcessCollector
collectors = list(
middleware._metrics_container._registry._collector_to_names.keys()
)
assert any("MultiProcessCollector" in str(c) for c in collectors)

def test_single_process_no_directory(self):
middleware = RabbitPrometheusMiddleware(
registry=CollectorRegistry(), app_name="test-app", is_multiprocess=False
)
assert middleware._metrics_container._registry is not None

# In single process mode, there should be no MultiProcessCollector
collectors = list(
middleware._metrics_container._registry._collector_to_names.keys()
)
assert not any("MultiProcessCollector" in str(c) for c in collectors)
Loading
Loading