From b13756ab9e6febdb33861fa35c84d8ac312af23b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Antonio=20Perdiguero=20L=C3=B3pez?= Date: Fri, 31 Jan 2025 17:34:52 +0100 Subject: [PATCH] :construction: WIP telemetry --- flama/telemetry/__init__.py | 2 + flama/telemetry/data_structures.py | 171 ++++++++++++++++++++++++ flama/telemetry/middleware.py | 155 +++++++++++++++++++++ tests/telemetry/__init__.py | 0 tests/telemetry/test_data_structures.py | 53 ++++++++ tests/telemetry/test_middleware.py | 113 ++++++++++++++++ 6 files changed, 494 insertions(+) create mode 100644 flama/telemetry/__init__.py create mode 100644 flama/telemetry/data_structures.py create mode 100644 flama/telemetry/middleware.py create mode 100644 tests/telemetry/__init__.py create mode 100644 tests/telemetry/test_data_structures.py create mode 100644 tests/telemetry/test_middleware.py diff --git a/flama/telemetry/__init__.py b/flama/telemetry/__init__.py new file mode 100644 index 00000000..f750ed92 --- /dev/null +++ b/flama/telemetry/__init__.py @@ -0,0 +1,2 @@ +from flama.telemetry.data_structures import * # noqa +from flama.telemetry.middleware import * # noqa diff --git a/flama/telemetry/data_structures.py b/flama/telemetry/data_structures.py new file mode 100644 index 00000000..ee3fc4e4 --- /dev/null +++ b/flama/telemetry/data_structures.py @@ -0,0 +1,171 @@ +import dataclasses +import datetime +import logging +import typing as t +from http.cookies import SimpleCookie + +from flama import Flama, types +from flama.authentication.types import AccessToken, RefreshToken +from flama.exceptions import HTTPException +from flama.http import Request as HTTPRequest + +logger = logging.getLogger(__name__) + +__all__ = ["Endpoint", "Authentication", "Request", "Response", "Error", "TelemetryData"] + + +@dataclasses.dataclass +class Endpoint: + path: str + name: t.Optional[str] + tags: dict[str, t.Any] + + @classmethod + async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Endpoint": + app: Flama = scope["app"] + + route, _ = app.router.resolve_route(scope) + + return cls(path=str(route.path), name=route.name, tags=route.tags) + + def to_dict(self) -> dict[str, t.Any]: + return {"path": self.path, "name": self.name, "tags": self.tags} + + +@dataclasses.dataclass +class Authentication: + access: t.Optional[AccessToken] + refresh: t.Optional[RefreshToken] + + @classmethod + async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Authentication": + app: Flama = scope["app"] + context = {"scope": scope, "request": HTTPRequest(scope, receive=receive)} + + try: + access = await app.injector.resolve(AccessToken).value(context) + except Exception: + access = None + + try: + refresh = await app.injector.resolve(RefreshToken).value(context) + except Exception: + refresh = None + + return cls(access=access, refresh=refresh) + + def to_dict(self) -> dict[str, t.Any]: + return {"access": self.access, "refresh": self.refresh} + + +@dataclasses.dataclass +class Request: + headers: dict[str, t.Any] + cookies: dict[str, t.Any] + query_parameters: dict[str, t.Any] + path_parameters: dict[str, t.Any] + body: bytes = b"" + timestamp: datetime.datetime = dataclasses.field( + init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) + ) + + @classmethod + async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Request": + app: Flama = scope["app"] + context = {"scope": scope, "request": HTTPRequest(scope, receive=receive)} + + headers = dict(await app.injector.resolve(types.Headers).value(context)) + cookies = dict(await app.injector.resolve(types.Cookies).value(context)) + query = dict(await app.injector.resolve(types.QueryParams).value(context)) + path = dict(await app.injector.resolve(types.PathParams).value(context)) + + return cls(headers=headers, cookies=cookies, query_parameters=query, path_parameters=path) + + def to_dict(self) -> dict[str, t.Any]: + return { + "timestamp": self.timestamp.isoformat(), + "headers": self.headers, + "cookies": self.headers, + "query_parameters": self.query_parameters, + "path_parameters": self.path_parameters, + "body": self.body, + } + + +@dataclasses.dataclass +class Response: + headers: t.Optional[dict[str, t.Any]] + cookies: t.Optional[dict[str, t.Any]] = dataclasses.field(init=False) + body: bytes = b"" + status_code: t.Optional[int] = None + timestamp: datetime.datetime = dataclasses.field( + init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) + ) + + def __post_init__(self): + if self.headers: + cookie = SimpleCookie() + cookie.load(self.headers.get("cookie", "")) + self.cookies = { + str(name): {**{str(k): str(v) for k, v in morsel.items()}, "value": morsel.value} + for name, morsel in cookie.items() + } + + def to_dict(self) -> dict[str, t.Any]: + return { + "timestamp": self.timestamp.isoformat(), + "headers": self.headers, + "cookies": self.headers, + "body": self.body, + "status_code": self.status_code, + } + + +@dataclasses.dataclass +class Error: + detail: str + status_code: t.Optional[int] = None + timestamp: datetime.datetime = dataclasses.field( + init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) + ) + + @classmethod + async def from_exception(cls, *, exception: Exception) -> "Error": + if isinstance(exception, HTTPException): + return cls(status_code=exception.status_code, detail=str(exception.detail)) + + return cls(detail=str(exception)) + + def to_dict(self) -> dict[str, t.Any]: + return {"timestamp": self.timestamp.isoformat(), "detail": self.detail, "status_code": self.status_code} + + +@dataclasses.dataclass +class TelemetryData: + type: t.Literal["http", "websocket"] + endpoint: Endpoint + authentication: Authentication + request: Request + response: t.Optional[Response] = None + error: t.Optional[Error] = None + extra: dict[t.Any, t.Any] = dataclasses.field(default_factory=dict) + + @classmethod + async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "TelemetryData": + return cls( + type=scope["type"], + endpoint=await Endpoint.from_scope(scope=scope, receive=receive, send=send), + authentication=await Authentication.from_scope(scope=scope, receive=receive, send=send), + request=await Request.from_scope(scope=scope, receive=receive, send=send), + ) + + def to_dict(self) -> dict[str, t.Any]: + return { + "type": self.type, + "endpoint": self.endpoint.to_dict(), + "authentication": self.authentication.to_dict(), + "request": self.request.to_dict(), + "response": self.response.to_dict() if self.response else None, + "error": self.error.to_dict() if self.error else None, + "extra": self.extra, + } diff --git a/flama/telemetry/middleware.py b/flama/telemetry/middleware.py new file mode 100644 index 00000000..c52a37e4 --- /dev/null +++ b/flama/telemetry/middleware.py @@ -0,0 +1,155 @@ +import abc +import logging +import typing as t + +from flama import Flama, concurrency, types +from flama.telemetry.data_structures import Error, Response, TelemetryData + +logger = logging.getLogger(__name__) + +__all__ = ["TelemetryMiddleware"] + + +PROJECT = "vortico-core" +SERVICE = "elektrococo" +TOPIC_ID = "telemetry-bus" + +HookFunction = t.Callable[[TelemetryData], t.Union[None, t.Awaitable[None]]] + + +class Wrapper(abc.ABC): + def __init__(self, app: Flama, data: TelemetryData) -> None: + self.app = app + self.data = data + + @classmethod + def build(cls, type: t.Literal["http", "websocket"], app: Flama, data: TelemetryData) -> "Wrapper": + if type == "websocket": + return WebSocketWrapper(app, data) + + return HTTPWrapper(app, data) + + async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None: + self._scope = scope + self._receive = receive + self._send = send + self._response_body = b"" + self._response_headers = None + self._response_status_code = None + + try: + await self.app(self._scope, self.receive, self.send) + self.data.response = Response(headers=self._response_headers, status_code=self._response_status_code) + except Exception as e: + self.data.error = await Error.from_exception(exception=e) + raise + + @abc.abstractmethod + async def receive(self) -> types.Message: + ... + + @abc.abstractmethod + async def send(self, message: types.Message) -> None: + ... + + +class HTTPWrapper(Wrapper): + async def receive(self) -> types.Message: + message = await self._receive() + + if message["type"] == "http.request": + self.data.request.body += message.get("body", b"") + + return message + + async def send(self, message: types.Message) -> None: + if message["type"] == "http.response.start": + self._response_headers = message.get("headers", []) + self._response_status_code = message.get("status") + elif message["type"] == "http.response.body": + self._response_body += message.get("body", b"") + + await self._send(message) + + +class WebSocketWrapper(Wrapper): + async def receive(self) -> types.Message: + message = await self._receive() + + if message["type"] == "websocket.receive": + self._response_body += message.get("body", b"") + elif message["type"] == "websocket.disconnect": + self._response_status_code = message.get("code", None) + self._response_body = message.get("reason", "").encode() + + return message + + async def send(self, message: types.Message) -> None: + if message["type"] == "websocket.send": + self.data.request.body += message.get("bytes", message.get("text", "").encode()) + elif message["type"] == "websocket.close": + self._response_status_code = message.get("code") + self._response_body = message.get("reason", "").encode() + + await self._send(message) + + +class TelemetryDataCollector: + data: TelemetryData + + def __init__(self, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send) -> None: + self.app = app + self._scope = scope + self._receive = receive + self._send = send + + @classmethod + async def build( + cls, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send + ) -> "TelemetryDataCollector": + self = cls(app, scope, receive, send) + self.data = await TelemetryData.from_scope(scope=scope, receive=receive, send=send) + return self + + async def __call__(self) -> None: + await Wrapper.build(self._scope["type"], self.app, self.data)( + scope=self._scope, receive=self._receive, send=self._send + ) + + +class TelemetryMiddleware: + def __init__( + self, + app: types.App, + log_level: int = logging.INFO, + *, + before: t.Optional[HookFunction] = None, + after: t.Optional[HookFunction] = None, + ) -> None: + self.app: Flama = t.cast(Flama, app) + self._log_level = log_level + self._before = before + self._after = after + + async def before(self, data: TelemetryData): + if self._before: + await concurrency.run(self._before, data) + + async def after(self, data: TelemetryData): + if self._after: + await concurrency.run(self._after, data) + + async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None: + if scope["type"] not in ("http", "websocket"): + await self.app(scope, receive, send) + return + + collector = await TelemetryDataCollector.build(self.app, scope, receive, send) + + await self.before(collector.data) + + try: + await collector() + finally: + await self.after(collector.data) + logger.log(self._log_level, "Telemetry: %s", str(collector.data)) diff --git a/tests/telemetry/__init__.py b/tests/telemetry/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/telemetry/test_data_structures.py b/tests/telemetry/test_data_structures.py new file mode 100644 index 00000000..e8aef71f --- /dev/null +++ b/tests/telemetry/test_data_structures.py @@ -0,0 +1,53 @@ +import datetime +from unittest.mock import MagicMock, patch + +import pytest + +from flama.telemetry.data_structures import Error, TelemetryData + + +@pytest.fixture(scope="function", autouse=True) +def add_routes(app): + @app.route("/") + def root(): + return {"puppy": "Canna"} + + +@pytest.fixture(scope="function") +def asgi_scope(app, asgi_scope): + asgi_scope["app"] = app + return asgi_scope + + +class TestCaseAuthentication: + def test_from_scope(self, asgi_scope, asgi_receive, asgi_send): + ... + + +class TestCaseEndpoint: + def test_from_scope(self, asgi_scope, asgi_receive, asgi_send): + ... + + +class TestCaseRequest: + def test_from_scope(self, asgi_scope, asgi_receive, asgi_send): + ... + + +class TestCaseError: + async def test_from_exception(self): + now = datetime.datetime.now() + with patch("datetime.datetime", MagicMock(now=MagicMock(return_value=now))): + try: + raise ValueError("Foo") + except ValueError as e: + error = await Error.from_exception(exception=e) + + assert error.to_dict() == {"detail": "Foo", "status_code": None, "timestamp": now.isoformat()} + + +class TestCaseTelemetryData: + async def test_from_scope(self, asgi_scope, asgi_receive, asgi_send): + data = await TelemetryData.from_scope(scope=asgi_scope, receive=asgi_receive, send=asgi_send) + + assert data.to_dict() == {} diff --git a/tests/telemetry/test_middleware.py b/tests/telemetry/test_middleware.py new file mode 100644 index 00000000..ca1a77ab --- /dev/null +++ b/tests/telemetry/test_middleware.py @@ -0,0 +1,113 @@ +import uuid + +import pytest + +from flama import Flama +from flama.authentication.components import AccessTokenComponent +from flama.authentication.middleware import AuthenticationMiddleware +from flama.middleware import Middleware + +TOKENS = { + "permission": b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJkYXRhIjogeyJwZXJtaXNzaW9ucyI6IFsiZmxhbWEudGVzdC5hdXRoI" + b"l19LCAiaWF0IjogMTY5ODQxMjkzMX0=.NLhM8r2g1I_oHG0zAAsRqDAuwPVvzI95Lnz2K7uupmo=", + "role": b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJkYXRhIjogeyJyb2xlcyI6IHsiZm9vIjogWyJmbGFtYS50ZXN0LmF1dGgiXX1" + b"9LCAiaWF0IjogMTY5ODQxMjI5OH0=.WK2PwipkiLATHsKwIsiljS_31h0-T6U0hZzoI62Skiw=", + "empty": b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJkYXRhIjoge30sICJpYXQiOiAxNjk4NDEzMDg3fQ==.xPwYcCE0Tq6UxVbhW" + b"RGqT8vRliJxHAqHs12X0oHE1Vg=", +} + + +class TestCaseAuthenticationMiddleware: + @pytest.fixture(scope="function") + def secret(self): + return uuid.UUID(int=0) + + @pytest.fixture(scope="function") + def app(self, secret): + return Flama( + schema=None, + docs=None, + components=[AccessTokenComponent(secret=secret.bytes)], + middleware=[Middleware(AuthenticationMiddleware)], + ) + + @pytest.fixture(scope="function", autouse=True) + def add_endpoints(self, app): + @app.route("/no-auth/") + def no_auth(): + return {"foo": "no-auth"} + + @app.route("/auth/", tags={"permissions": ["flama.test.auth"]}) + def auth(): + return {"foo": "auth"} + + @pytest.fixture(scope="function") + def headers(self, request): + if request.param is None: + return None + + try: + return {"access_token": f"Bearer {TOKENS[request.param].decode()}"} + except KeyError: + raise ValueError(f"Invalid token {request.param}") + + @pytest.fixture(scope="function") + def cookies(self, request): + if request.param is None: + return None + + try: + return {"access_token": TOKENS[request.param].decode()} + except KeyError: + raise ValueError(f"Invalid token {request.param}") + + @pytest.mark.parametrize( + ["path", "headers", "cookies", "status_code", "result"], + ( + pytest.param("/auth/", "permission", None, 200, {"foo": "auth"}, id="auth_header_token_permission"), + pytest.param("/auth/", "role", None, 200, {"foo": "auth"}, id="auth_header_token_role"), + pytest.param( + "/auth/", + "empty", + None, + 403, + {"detail": "Insufficient permissions", "error": None, "status_code": 403}, + id="auth_header_token_empty", + ), + pytest.param("/auth/", None, "permission", 200, {"foo": "auth"}, id="auth_cookie_token_permission"), + pytest.param("/auth/", None, "role", 200, {"foo": "auth"}, id="auth_cookie_token_role"), + pytest.param( + "/auth/", + None, + "empty", + 403, + {"detail": "Insufficient permissions", "error": None, "status_code": 403}, + id="auth_cookie_token_empty", + ), + pytest.param( + "/auth/", + None, + None, + 401, + {"detail": "Unauthorized", "error": None, "status_code": 401}, + id="auth_no_token", + ), + pytest.param( + "/no-auth/", "permission", None, 200, {"foo": "no-auth"}, id="no_auth_header_token_permission" + ), + pytest.param("/no-auth/", "role", None, 200, {"foo": "no-auth"}, id="no_auth_header_token_role"), + pytest.param( + "/no-auth/", None, "permission", 200, {"foo": "no-auth"}, id="no_auth_cookie_token_permission" + ), + pytest.param("/no-auth/", None, "role", 200, {"foo": "no-auth"}, id="no_auth_cookie_token_role"), + pytest.param("/no-auth/", None, None, 200, {"foo": "no-auth"}, id="no_auth_no_token"), + ), + indirect=["headers", "cookies"], + ) + async def test_request(self, client, path, headers, cookies, status_code, result): + client.headers = headers + client.cookies = cookies + response = await client.request("get", path) + + assert response.status_code == status_code + assert response.json() == result