From 0dad5a8682ac56875f323cd378b4a6e92808c5cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Antonio=20Perdiguero=20L=C3=B3pez?= Date: Sun, 16 Feb 2025 22:17:45 +0100 Subject: [PATCH] :sparkles: Module for gathering telemetry data --- flama/middleware.py | 9 - flama/telemetry/__init__.py | 2 + flama/telemetry/data_structures.py | 184 ++++++++++++ flama/telemetry/middleware.py | 155 ++++++++++ tests/telemetry/__init__.py | 0 tests/telemetry/test_data_structures.py | 359 ++++++++++++++++++++++++ tests/telemetry/test_middleware.py | 276 ++++++++++++++++++ 7 files changed, 976 insertions(+), 9 deletions(-) create mode 100644 flama/telemetry/__init__.py create mode 100644 flama/telemetry/data_structures.py create mode 100644 flama/telemetry/middleware.py create mode 100644 tests/telemetry/__init__.py create mode 100644 tests/telemetry/test_data_structures.py create mode 100644 tests/telemetry/test_middleware.py diff --git a/flama/middleware.py b/flama/middleware.py index 101c4b68..e5e39735 100644 --- a/flama/middleware.py +++ b/flama/middleware.py @@ -16,7 +16,6 @@ from flama.http import Request, Response __all__ = [ - "AuthenticationMiddleware", "BaseHTTPMiddleware", "CORSMiddleware", "ExceptionMiddleware", @@ -42,14 +41,6 @@ async def __call__(self, scope: "types.Scope", receive: "types.Receive", send: " SessionMiddleware = None # type: ignore[assignment] -class AuthenticationMiddleware(starlette.middleware.authentication.AuthenticationMiddleware): - def __init__(self, app: "types.App", *args, **kwargs): - super().__init__(app, *args, **kwargs) # type: ignore[arg-type] - - async def __call__(self, scope: "types.Scope", receive: "types.Receive", send: "types.Send") -> None: # type: ignore[overrid] - return await super().__call__(scope, receive, send) # type: ignore[assignment] - - class BaseHTTPMiddleware(starlette.middleware.base.BaseHTTPMiddleware): def __init__(self, app: "types.App", *args, **kwargs): super().__init__(app, *args, **kwargs) # type: ignore[arg-type] diff --git a/flama/telemetry/__init__.py b/flama/telemetry/__init__.py new file mode 100644 index 00000000..f750ed92 --- /dev/null +++ b/flama/telemetry/__init__.py @@ -0,0 +1,2 @@ +from flama.telemetry.data_structures import * # noqa +from flama.telemetry.middleware import * # noqa diff --git a/flama/telemetry/data_structures.py b/flama/telemetry/data_structures.py new file mode 100644 index 00000000..bcd68bd2 --- /dev/null +++ b/flama/telemetry/data_structures.py @@ -0,0 +1,184 @@ +import dataclasses +import datetime +import logging +import typing as t +from http.cookies import SimpleCookie + +from flama import Flama, types +from flama.authentication.types import AccessToken, RefreshToken +from flama.exceptions import HTTPException +from flama.http import Request as HTTPRequest + +logger = logging.getLogger(__name__) + +__all__ = ["Endpoint", "Authentication", "Request", "Response", "Error", "TelemetryData"] + + +@dataclasses.dataclass +class Endpoint: + path: str + name: t.Optional[str] + tags: dict[str, t.Any] + + @classmethod + async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Endpoint": + app: Flama = scope["app"] + route, _ = app.router.resolve_route(scope) + + return cls(path=str(route.path), name=route.name, tags=route.tags) + + def to_dict(self) -> dict[str, t.Any]: + return { + "path": self.path, + "name": self.name, + "tags": self.tags, + } + + +@dataclasses.dataclass +class Authentication: + access: t.Optional[AccessToken] + refresh: t.Optional[RefreshToken] + + @classmethod + async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Authentication": + app: Flama = scope["app"] + context = {"scope": scope, "request": HTTPRequest(scope, receive=receive)} + + try: + access = await app.injector.resolve(AccessToken).value(context) + except Exception: + access = None + + try: + refresh = await app.injector.resolve(RefreshToken).value(context) + except Exception: + refresh = None + + return cls(access=access, refresh=refresh) + + def to_dict(self) -> dict[str, t.Any]: + return { + "access": self.access.to_dict() if self.access else None, + "refresh": self.refresh.to_dict() if self.refresh else None, + } + + +@dataclasses.dataclass +class Request: + headers: dict[str, t.Any] + cookies: dict[str, t.Any] + query_parameters: dict[str, t.Any] + path_parameters: dict[str, t.Any] + body: bytes = b"" + timestamp: datetime.datetime = dataclasses.field( + init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) + ) + + @classmethod + async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Request": + app: Flama = scope["app"] + context = {"scope": scope, "request": HTTPRequest(scope, receive=receive), "route": app.resolve_route(scope)[0]} + + headers = dict(await app.injector.resolve(types.Headers).value(context)) + cookies = dict(await app.injector.resolve(types.Cookies).value(context)) + query = dict(await app.injector.resolve(types.QueryParams).value(context)) + path = dict(await app.injector.resolve(types.PathParams).value(context)) + + return cls(headers=headers, cookies=cookies, query_parameters=query, path_parameters=path) + + def to_dict(self) -> dict[str, t.Any]: + return { + "timestamp": self.timestamp.isoformat(), + "headers": self.headers, + "cookies": self.cookies, + "query_parameters": self.query_parameters, + "path_parameters": self.path_parameters, + "body": self.body, + } + + +@dataclasses.dataclass +class Response: + headers: t.Optional[dict[str, t.Any]] + cookies: t.Optional[dict[str, t.Any]] = dataclasses.field(init=False) + body: bytes = b"" + status_code: t.Optional[int] = None + timestamp: datetime.datetime = dataclasses.field( + init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) + ) + + def __post_init__(self): + if self.headers: + cookie = SimpleCookie() + cookie.load(self.headers.get("cookie", "")) + else: + cookie = {} + + self.cookies = { + str(name): {**{str(k): str(v) for k, v in morsel.items()}, "value": morsel.value} + for name, morsel in cookie.items() + } + + def to_dict(self) -> dict[str, t.Any]: + return { + "timestamp": self.timestamp.isoformat(), + "headers": self.headers, + "cookies": self.cookies, + "body": self.body, + "status_code": self.status_code, + } + + +@dataclasses.dataclass +class Error: + detail: str + status_code: t.Optional[int] = None + timestamp: datetime.datetime = dataclasses.field( + init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) + ) + + @classmethod + async def from_exception(cls, *, exception: Exception) -> "Error": + if isinstance(exception, HTTPException): + return cls(status_code=exception.status_code, detail=str(exception.detail)) + + return cls(detail=str(exception)) + + def to_dict(self) -> dict[str, t.Any]: + return { + "timestamp": self.timestamp.isoformat(), + "detail": self.detail, + "status_code": self.status_code, + } + + +@dataclasses.dataclass +class TelemetryData: + type: t.Literal["http", "websocket"] + endpoint: Endpoint + authentication: Authentication + request: Request + response: t.Optional[Response] = None + error: t.Optional[Error] = None + extra: dict[t.Any, t.Any] = dataclasses.field(default_factory=dict) + + @classmethod + async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "TelemetryData": + return cls( + type=scope["type"], + endpoint=await Endpoint.from_scope(scope=scope, receive=receive, send=send), + authentication=await Authentication.from_scope(scope=scope, receive=receive, send=send), + request=await Request.from_scope(scope=scope, receive=receive, send=send), + ) + + def to_dict(self) -> dict[str, t.Any]: + return { + "type": self.type, + "endpoint": self.endpoint.to_dict(), + "authentication": self.authentication.to_dict(), + "request": self.request.to_dict(), + "response": self.response.to_dict() if self.response else None, + "error": self.error.to_dict() if self.error else None, + "extra": self.extra, + } diff --git a/flama/telemetry/middleware.py b/flama/telemetry/middleware.py new file mode 100644 index 00000000..3cd2119f --- /dev/null +++ b/flama/telemetry/middleware.py @@ -0,0 +1,155 @@ +import abc +import logging +import typing as t + +from flama import Flama, concurrency, types +from flama.telemetry.data_structures import Error, Response, TelemetryData + +logger = logging.getLogger(__name__) + +__all__ = ["TelemetryMiddleware"] + + +PROJECT = "vortico-core" +SERVICE = "elektrococo" +TOPIC_ID = "telemetry-bus" + +HookFunction = t.Callable[[TelemetryData], t.Union[None, t.Awaitable[None]]] + + +class Wrapper(abc.ABC): + def __init__(self, app: Flama, data: TelemetryData) -> None: + self.app = app + self.data = data + + @classmethod + def build(cls, type: t.Literal["http", "websocket"], app: Flama, data: TelemetryData) -> "Wrapper": + if type == "websocket": + return WebSocketWrapper(app, data) + + return HTTPWrapper(app, data) + + async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None: + self._scope = scope + self._receive = receive + self._send = send + self._response_body = b"" + self._response_headers = None + self._response_status_code = None + + try: + await self.app(self._scope, self.receive, self.send) + self.data.response = Response( + headers=self._response_headers, body=self._response_body, status_code=self._response_status_code + ) + except Exception as e: + self.data.error = await Error.from_exception(exception=e) + raise + + @abc.abstractmethod + async def receive(self) -> types.Message: ... + + @abc.abstractmethod + async def send(self, message: types.Message) -> None: ... + + +class HTTPWrapper(Wrapper): + async def receive(self) -> types.Message: + message = await self._receive() + + if message["type"] == "http.request": + self.data.request.body += message.get("body", b"") + + return message + + async def send(self, message: types.Message) -> None: + if message["type"] == "http.response.start": + self._response_headers = {k.decode(): v.decode() for (k, v) in message.get("headers", [])} + self._response_status_code = message.get("status") + elif message["type"] == "http.response.body": + self._response_body += message.get("body", b"") + + await self._send(message) + + +class WebSocketWrapper(Wrapper): + async def receive(self) -> types.Message: + message = await self._receive() + + if message["type"] == "websocket.receive": + self._response_body += message.get("body", b"") + elif message["type"] == "websocket.disconnect": + self._response_status_code = message.get("code", None) + self._response_body = message.get("reason", "").encode() + + return message + + async def send(self, message: types.Message) -> None: + if message["type"] == "websocket.send": + self.data.request.body += message.get("bytes", message.get("text", "").encode()) + elif message["type"] == "websocket.close": + self._response_status_code = message.get("code") + self._response_body = message.get("reason", "").encode() + + await self._send(message) + + +class TelemetryDataCollector: + data: TelemetryData + + def __init__(self, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send) -> None: + self.app = app + self._scope = scope + self._receive = receive + self._send = send + + @classmethod + async def build( + cls, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send + ) -> "TelemetryDataCollector": + self = cls(app, scope, receive, send) + self.data = await TelemetryData.from_scope(scope=scope, receive=receive, send=send) + return self + + async def __call__(self) -> None: + await Wrapper.build(self._scope["type"], self.app, self.data)( + scope=self._scope, receive=self._receive, send=self._send + ) + + +class TelemetryMiddleware: + def __init__( + self, + app: types.App, + log_level: int = logging.NOTSET, + *, + before: t.Optional[HookFunction] = None, + after: t.Optional[HookFunction] = None, + ) -> None: + self.app: Flama = t.cast(Flama, app) + self._log_level = log_level + self._before = before + self._after = after + + async def before(self, data: TelemetryData): + if self._before: + await concurrency.run(self._before, data) + + async def after(self, data: TelemetryData): + if self._after: + await concurrency.run(self._after, data) + + async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None: + if scope["type"] not in ("http", "websocket"): + await self.app(scope, receive, send) + return + + collector = await TelemetryDataCollector.build(self.app, scope, receive, send) + + await self.before(collector.data) + + try: + await collector() + finally: + await self.after(collector.data) + logger.log(self._log_level, "Telemetry: %s", str(collector.data)) diff --git a/tests/telemetry/__init__.py b/tests/telemetry/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/telemetry/test_data_structures.py b/tests/telemetry/test_data_structures.py new file mode 100644 index 00000000..30380c72 --- /dev/null +++ b/tests/telemetry/test_data_structures.py @@ -0,0 +1,359 @@ +import datetime +import http +import uuid +from unittest.mock import MagicMock, patch + +import pytest + +from flama import Flama, authentication +from flama.exceptions import HTTPException +from flama.telemetry.data_structures import Authentication, Endpoint, Error, Request, Response, TelemetryData + +TOKEN = ( + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJkYXRhIjogeyJmb28iOiAiYmFyIn0sICJpYXQiOiAwfQ==.J3zdedMZSFNOimstjJat0V" + "28rM_b1UU62XCp9dg_5kg=" +) + + +@pytest.fixture(scope="function") +def secret(): + return uuid.UUID(int=0) + + +@pytest.fixture(scope="function") +def app(secret): + return Flama( + schema=None, + docs=None, + components=[ + authentication.AccessTokenComponent(secret=secret.bytes), + authentication.RefreshTokenComponent(secret=secret.bytes), + ], + ) + + +@pytest.fixture(scope="function", autouse=True) +def add_routes(app): + @app.route("/") + def root(): ... + + @app.route("/query_parameters/") + def query_parameters(x: int): + return {"x": x} + + @app.route("/path_parameters/{x:int}/") + def path_parameters(x: int): + return {"x": x} + + @app.route("/tags/", tags={"foo": "bar"}) + def tags(): ... + + @app.route("/name/", name="foo") + def name(): ... + + +@pytest.fixture(scope="function") +def asgi_scope(app, asgi_scope): + asgi_scope["app"] = app + return asgi_scope + + +class TestCaseAuthentication: + @pytest.mark.parametrize( + ["scope", "result"], + [ + pytest.param( + {"path": "/"}, + {"access": None, "refresh": None}, + id="no_auth", + ), + pytest.param( + {"path": "/", "headers": [(b"cookie", f"access_token={TOKEN}".encode())]}, + { + "access": { + "header": {"alg": "HS256", "typ": "JWT"}, + "payload": {"data": {"foo": "bar"}, "iat": 0}, + }, + "refresh": None, + }, + id="access", + ), + pytest.param( + {"path": "/", "headers": [(b"cookie", f"refresh_token={TOKEN}".encode())]}, + { + "access": None, + "refresh": { + "header": {"alg": "HS256", "typ": "JWT"}, + "payload": {"data": {"foo": "bar"}, "iat": 0}, + }, + }, + id="refresh", + ), + pytest.param( + { + "path": "/", + "headers": [(b"cookie", f"access_token={TOKEN}; refresh_token={TOKEN}".encode())], + }, + { + "access": { + "header": {"alg": "HS256", "typ": "JWT"}, + "payload": {"data": {"foo": "bar"}, "iat": 0}, + }, + "refresh": { + "header": {"alg": "HS256", "typ": "JWT"}, + "payload": {"data": {"foo": "bar"}, "iat": 0}, + }, + }, + id="access_and_refresh", + ), + ], + ) + async def test_from_scope(self, asgi_scope, asgi_receive, asgi_send, scope, result): + asgi_scope.update(scope) + now = datetime.datetime.now() + with patch("datetime.datetime", MagicMock(now=MagicMock(return_value=now))): + data = await Authentication.from_scope(scope=asgi_scope, receive=asgi_receive, send=asgi_send) + + assert data.to_dict() == result + + +class TestCaseEndpoint: + @pytest.mark.parametrize( + ["scope", "result"], + [ + pytest.param( + {"path": "/"}, + {"name": "root", "path": "/", "tags": {}}, + id="default_name", + ), + pytest.param( + {"path": "/name/"}, + {"name": "foo", "path": "/name/", "tags": {}}, + id="explicit_name", + ), + pytest.param( + {"path": "/tags/"}, + {"name": "tags", "path": "/tags/", "tags": {"foo": "bar"}}, + id="tags", + ), + ], + ) + async def test_from_scope(self, asgi_scope, asgi_receive, asgi_send, scope, result): + asgi_scope.update(scope) + now = datetime.datetime.now() + with patch("datetime.datetime", MagicMock(now=MagicMock(return_value=now))): + data = await Endpoint.from_scope(scope=asgi_scope, receive=asgi_receive, send=asgi_send) + + assert data.to_dict() == result + + +class TestCaseRequest: + @pytest.mark.parametrize( + ["scope", "result"], + [ + pytest.param( + {"path": "/"}, + { + "body": b"", + "cookies": {}, + "headers": {}, + "path_parameters": {}, + "query_parameters": {}, + }, + id="empty", + ), + pytest.param( + {"path": "/path_parameters/1/"}, + { + "body": b"", + "cookies": {}, + "headers": {}, + "path_parameters": {"x": 1}, + "query_parameters": {}, + }, + id="path_parameters", + ), + pytest.param( + {"path": "/query_parameters/", "query_string": b"x=1"}, + { + "body": b"", + "cookies": {}, + "headers": {}, + "path_parameters": {}, + "query_parameters": {"x": "1"}, + }, + id="query_parameters", + ), + pytest.param( + {"path": "/", "headers": [(b"foo", b"bar")]}, + { + "body": b"", + "cookies": {}, + "headers": {"foo": "bar"}, + "path_parameters": {}, + "query_parameters": {}, + }, + id="headers", + ), + pytest.param( + {"path": "/", "headers": [(b"cookie", b"foo=bar")]}, + { + "body": b"", + "cookies": { + "foo": { + "comment": "", + "domain": "", + "expires": "", + "httponly": "", + "max-age": "", + "path": "", + "samesite": "", + "secure": "", + "value": "bar", + "version": "", + } + }, + "headers": {"cookie": "foo=bar"}, + "path_parameters": {}, + "query_parameters": {}, + }, + id="cookies", + ), + ], + ) + async def test_from_scope(self, asgi_scope, asgi_receive, asgi_send, scope, result): + asgi_scope.update(scope) + now = datetime.datetime.now() + result["timestamp"] = now.isoformat() + with patch("datetime.datetime", MagicMock(now=MagicMock(return_value=now))): + data = await Request.from_scope(scope=asgi_scope, receive=asgi_receive, send=asgi_send) + + assert data.to_dict() == result + + +class TestCaseResponse: + @pytest.mark.parametrize( + ["headers", "body", "status_code", "result"], + [ + pytest.param( + {}, + b"", + None, + { + "body": b"", + "cookies": {}, + "headers": {}, + "status_code": None, + }, + id="empty", + ), + pytest.param( + {}, + b"foo", + http.HTTPStatus.OK, + { + "body": b"foo", + "cookies": {}, + "headers": {}, + "status_code": http.HTTPStatus.OK, + }, + id="body", + ), + pytest.param( + {"foo": "bar"}, + b"", + http.HTTPStatus.OK, + { + "body": b"", + "cookies": {}, + "headers": {"foo": "bar"}, + "status_code": http.HTTPStatus.OK, + }, + id="headers", + ), + pytest.param( + {"cookie": "foo=bar"}, + b"", + http.HTTPStatus.OK, + { + "body": b"", + "cookies": { + "foo": { + "comment": "", + "domain": "", + "expires": "", + "httponly": "", + "max-age": "", + "path": "", + "samesite": "", + "secure": "", + "value": "bar", + "version": "", + } + }, + "headers": {"cookie": "foo=bar"}, + "status_code": http.HTTPStatus.OK, + }, + id="cookies", + ), + ], + ) + def test_init(self, headers, body, status_code, result): + now = datetime.datetime.now() + result["timestamp"] = now.isoformat() + with patch("datetime.datetime", MagicMock(now=MagicMock(return_value=now))): + data = Response(headers=headers, body=body, status_code=status_code) + + assert data.to_dict() == result + + +class TestCaseError: + @pytest.mark.parametrize( + ["exception", "result"], + [ + pytest.param( + ValueError("foo"), + {"detail": "foo", "status_code": None}, + id="exception", + ), + pytest.param( + HTTPException(status_code=400, detail="foo"), + {"detail": "foo", "status_code": 400}, + id="http", + ), + ], + ) + async def test_from_exception(self, exception, result): + now = datetime.datetime.now() + result["timestamp"] = now.isoformat() + with patch("datetime.datetime", MagicMock(now=MagicMock(return_value=now))): + try: + raise exception + except Exception as e: + error = await Error.from_exception(exception=e) + + assert error.to_dict() == result + + +class TestCaseTelemetryData: + async def test_from_scope(self, asgi_scope, asgi_receive, asgi_send): + now = datetime.datetime.now() + with patch("datetime.datetime", MagicMock(now=MagicMock(return_value=now))): + data = await TelemetryData.from_scope(scope=asgi_scope, receive=asgi_receive, send=asgi_send) + + assert data.to_dict() == { + "authentication": {"access": None, "refresh": None}, + "endpoint": {"name": "root", "path": "/", "tags": {}}, + "error": None, + "extra": {}, + "request": { + "body": b"", + "cookies": {}, + "headers": {}, + "path_parameters": {}, + "query_parameters": {}, + "timestamp": now.isoformat(), + }, + "response": None, + "type": "http", + } diff --git a/tests/telemetry/test_middleware.py b/tests/telemetry/test_middleware.py new file mode 100644 index 00000000..3ae0e51b --- /dev/null +++ b/tests/telemetry/test_middleware.py @@ -0,0 +1,276 @@ +import datetime +import http +import importlib.metadata +import uuid +from unittest.mock import AsyncMock, MagicMock, call, patch + +import pytest + +from flama import Flama, authentication, types +from flama.middleware import Middleware +from flama.telemetry import Authentication, Endpoint, Error, Request, Response, TelemetryData, TelemetryMiddleware + +SECRET = uuid.UUID(int=0) + +TOKEN = ( + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJkYXRhIjogeyJmb28iOiAiYmFyIn0sICJpYXQiOiAwfQ==.J3zdedMZSFNOimstjJat0V" + "28rM_b1UU62XCp9dg_5kg=" +) +DECODED_TOKEN = authentication.JWT.decode(TOKEN.encode(), SECRET.bytes) + + +class TestCaseTelemetryMiddleware: + # TODO: WebSocketWrapper is not tested + + @pytest.fixture(scope="function") + def app(self): + return Flama( + schema=None, + docs=None, + components=[ + authentication.AccessTokenComponent(secret=SECRET.bytes), + authentication.RefreshTokenComponent(secret=SECRET.bytes), + ], + ) + + @pytest.fixture(scope="function", autouse=True) + def add_endpoints(self, app): + @app.post("/{x:int}/", name="foo", tags={"foo": "bar"}) + def root(x: int, y: int, body: types.Body): + return {"x": x, "y": y, "body": body} + + @app.post("/error/", name="error") + def error(): + raise ValueError("foo") + + @pytest.mark.parametrize( + [ + "path", + "request_params", + "request_body", + "request_cookies", + "response", + "exception", + "before", + "after", + "data", + ], + [ + pytest.param( + "/1/", + {"y": 1}, + b"body", + {"access_token": TOKEN}, + {"x": 1, "y": 1, "body": "body"}, + None, + None, + None, + None, + id="ok_no_hooks", + ), + pytest.param( + "/1/", + {"y": 1}, + b"body", + {"access_token": TOKEN}, + {"x": 1, "y": 1, "body": "body"}, + None, + MagicMock(), + MagicMock(), + TelemetryData( + type="http", + endpoint=Endpoint(path="/{x}/", name="foo", tags={"foo": "bar"}), + authentication=Authentication(access=authentication.AccessToken(DECODED_TOKEN), refresh=None), + request=Request( + headers={ + "host": "localapp", + "accept": "*/*", + "accept-encoding": "gzip, deflate", + "connection": "keep-alive", + "user-agent": f"flama/{importlib.metadata.version('flama')}", + "content-length": "4", + "cookie": "access_token=eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJkYXRhIjogeyJmb28iOiAiY" + "mFyIn0sICJpYXQiOiAwfQ==.J3zdedMZSFNOimstjJat0V28rM_b1UU62XCp9dg_5kg=", + }, + cookies={ + "access_token": { + "expires": "", + "path": "", + "comment": "", + "domain": "", + "max-age": "", + "secure": "", + "httponly": "", + "version": "", + "samesite": "", + "value": "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJkYXRhIjogeyJmb28iOiAiYmFyIn0sICJ" + "pYXQiOiAwfQ==.J3zdedMZSFNOimstjJat0V28rM_b1UU62XCp9dg_5kg=", + } + }, + query_parameters={"y": "1"}, + path_parameters={"x": 1}, + body=b"body", + ), + response=Response( + headers={"content-length": "27", "content-type": "application/json"}, + body=b'{"x":1,"y":1,"body":"body"}', + status_code=http.HTTPStatus.OK, + ), + ), + id="ok_sync_hooks", + ), + pytest.param( + "/1/", + {"y": 1}, + b"body", + {"access_token": TOKEN}, + {"x": 1, "y": 1, "body": "body"}, + None, + AsyncMock(), + AsyncMock(), + TelemetryData( + type="http", + endpoint=Endpoint(path="/{x}/", name="foo", tags={"foo": "bar"}), + authentication=Authentication(access=authentication.AccessToken(DECODED_TOKEN), refresh=None), + request=Request( + headers={ + "host": "localapp", + "accept": "*/*", + "accept-encoding": "gzip, deflate", + "connection": "keep-alive", + "user-agent": f"flama/{importlib.metadata.version('flama')}", + "content-length": "4", + "cookie": "access_token=eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJkYXRhIjogeyJmb28iOiAiY" + "mFyIn0sICJpYXQiOiAwfQ==.J3zdedMZSFNOimstjJat0V28rM_b1UU62XCp9dg_5kg=", + }, + cookies={ + "access_token": { + "expires": "", + "path": "", + "comment": "", + "domain": "", + "max-age": "", + "secure": "", + "httponly": "", + "version": "", + "samesite": "", + "value": "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJkYXRhIjogeyJmb28iOiAiYmFyIn0sICJ" + "pYXQiOiAwfQ==.J3zdedMZSFNOimstjJat0V28rM_b1UU62XCp9dg_5kg=", + } + }, + query_parameters={"y": "1"}, + path_parameters={"x": 1}, + body=b"body", + ), + response=Response( + headers={"content-length": "27", "content-type": "application/json"}, + body=b'{"x":1,"y":1,"body":"body"}', + status_code=http.HTTPStatus.OK, + ), + ), + id="ok_async_hooks", + ), + pytest.param( + "/error/", + {}, + None, + {}, + None, + ValueError("foo"), + None, + None, + None, + id="error_no_hooks", + ), + pytest.param( + "/error/", + {}, + None, + {}, + None, + ValueError("foo"), + MagicMock(), + MagicMock(), + TelemetryData( + type="http", + endpoint=Endpoint(path="/error/", name="error", tags={}), + authentication=Authentication(access=None, refresh=None), + request=Request( + headers={ + "host": "localapp", + "accept": "*/*", + "accept-encoding": "gzip, deflate", + "connection": "keep-alive", + "user-agent": f"flama/{importlib.metadata.version('flama')}", + "content-length": "0", + }, + cookies={}, + query_parameters={}, + path_parameters={}, + body=b"", + ), + error=Error(detail="foo", status_code=None), + ), + id="error_sync_hooks", + ), + pytest.param( + "/error/", + {}, + None, + {}, + None, + ValueError("foo"), + AsyncMock(), + AsyncMock(), + TelemetryData( + type="http", + endpoint=Endpoint(path="/error/", name="error", tags={}), + authentication=Authentication(access=None, refresh=None), + request=Request( + headers={ + "host": "localapp", + "accept": "*/*", + "accept-encoding": "gzip, deflate", + "connection": "keep-alive", + "user-agent": f"flama/{importlib.metadata.version('flama')}", + "content-length": "0", + }, + cookies={}, + query_parameters={}, + path_parameters={}, + body=b"", + ), + error=Error(detail="foo", status_code=None), + ), + id="error_async_hooks", + ), + ], + indirect=["exception"], + ) + async def test_request( + self, app, client, path, request_params, request_body, request_cookies, response, exception, before, after, data + ): + app.add_middleware(Middleware(TelemetryMiddleware, before=before, after=after)) + + client.cookies = request_cookies + + now = datetime.datetime.now() + + if data: + data.request.timestamp = now + if data.response: + data.response.timestamp = now + if data.error: + data.error.timestamp = now + + with exception, patch("datetime.datetime", MagicMock(now=MagicMock(return_value=now))): + r = await client.post(path, params=request_params, content=request_body) + + assert r.status_code == http.HTTPStatus.OK + assert r.json() == response + + if before: + assert before.call_args_list == [call(data)] + + if after: + assert after.call_args_list == [call(data)]