Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry #172

Merged
merged 1 commit into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions flama/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from flama.http import Request, Response

__all__ = [
"AuthenticationMiddleware",
"BaseHTTPMiddleware",
"CORSMiddleware",
"ExceptionMiddleware",
Expand All @@ -42,14 +41,6 @@ async def __call__(self, scope: "types.Scope", receive: "types.Receive", send: "
SessionMiddleware = None # type: ignore[assignment]


class AuthenticationMiddleware(starlette.middleware.authentication.AuthenticationMiddleware):
def __init__(self, app: "types.App", *args, **kwargs):
super().__init__(app, *args, **kwargs) # type: ignore[arg-type]

async def __call__(self, scope: "types.Scope", receive: "types.Receive", send: "types.Send") -> None: # type: ignore[overrid]
return await super().__call__(scope, receive, send) # type: ignore[assignment]


class BaseHTTPMiddleware(starlette.middleware.base.BaseHTTPMiddleware):
def __init__(self, app: "types.App", *args, **kwargs):
super().__init__(app, *args, **kwargs) # type: ignore[arg-type]
Expand Down
2 changes: 2 additions & 0 deletions flama/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from flama.telemetry.data_structures import * # noqa
from flama.telemetry.middleware import * # noqa
184 changes: 184 additions & 0 deletions flama/telemetry/data_structures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import dataclasses
import datetime
import logging
import typing as t
from http.cookies import SimpleCookie

from flama import Flama, types
from flama.authentication.types import AccessToken, RefreshToken
from flama.exceptions import HTTPException
from flama.http import Request as HTTPRequest

logger = logging.getLogger(__name__)

__all__ = ["Endpoint", "Authentication", "Request", "Response", "Error", "TelemetryData"]


@dataclasses.dataclass
class Endpoint:
path: str
name: t.Optional[str]
tags: dict[str, t.Any]

@classmethod
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Endpoint":
app: Flama = scope["app"]
route, _ = app.router.resolve_route(scope)

return cls(path=str(route.path), name=route.name, tags=route.tags)

def to_dict(self) -> dict[str, t.Any]:
return {
"path": self.path,
"name": self.name,
"tags": self.tags,
}


@dataclasses.dataclass
class Authentication:
access: t.Optional[AccessToken]
refresh: t.Optional[RefreshToken]

@classmethod
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Authentication":
app: Flama = scope["app"]
context = {"scope": scope, "request": HTTPRequest(scope, receive=receive)}

try:
access = await app.injector.resolve(AccessToken).value(context)
except Exception:
access = None

try:
refresh = await app.injector.resolve(RefreshToken).value(context)
except Exception:
refresh = None

return cls(access=access, refresh=refresh)

def to_dict(self) -> dict[str, t.Any]:
return {
"access": self.access.to_dict() if self.access else None,
"refresh": self.refresh.to_dict() if self.refresh else None,
}


@dataclasses.dataclass
class Request:
headers: dict[str, t.Any]
cookies: dict[str, t.Any]
query_parameters: dict[str, t.Any]
path_parameters: dict[str, t.Any]
body: bytes = b""
timestamp: datetime.datetime = dataclasses.field(
init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
)

@classmethod
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "Request":
app: Flama = scope["app"]
context = {"scope": scope, "request": HTTPRequest(scope, receive=receive), "route": app.resolve_route(scope)[0]}

headers = dict(await app.injector.resolve(types.Headers).value(context))
cookies = dict(await app.injector.resolve(types.Cookies).value(context))
query = dict(await app.injector.resolve(types.QueryParams).value(context))
path = dict(await app.injector.resolve(types.PathParams).value(context))

return cls(headers=headers, cookies=cookies, query_parameters=query, path_parameters=path)

def to_dict(self) -> dict[str, t.Any]:
return {
"timestamp": self.timestamp.isoformat(),
"headers": self.headers,
"cookies": self.cookies,
"query_parameters": self.query_parameters,
"path_parameters": self.path_parameters,
"body": self.body,
}


@dataclasses.dataclass
class Response:
headers: t.Optional[dict[str, t.Any]]
cookies: t.Optional[dict[str, t.Any]] = dataclasses.field(init=False)
body: bytes = b""
status_code: t.Optional[int] = None
timestamp: datetime.datetime = dataclasses.field(
init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
)

def __post_init__(self):
if self.headers:
cookie = SimpleCookie()
cookie.load(self.headers.get("cookie", ""))
else:
cookie = {}

self.cookies = {
str(name): {**{str(k): str(v) for k, v in morsel.items()}, "value": morsel.value}
for name, morsel in cookie.items()
}

def to_dict(self) -> dict[str, t.Any]:
return {
"timestamp": self.timestamp.isoformat(),
"headers": self.headers,
"cookies": self.cookies,
"body": self.body,
"status_code": self.status_code,
}


@dataclasses.dataclass
class Error:
detail: str
status_code: t.Optional[int] = None
timestamp: datetime.datetime = dataclasses.field(
init=False, default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
)

@classmethod
async def from_exception(cls, *, exception: Exception) -> "Error":
if isinstance(exception, HTTPException):
return cls(status_code=exception.status_code, detail=str(exception.detail))

return cls(detail=str(exception))

def to_dict(self) -> dict[str, t.Any]:
return {
"timestamp": self.timestamp.isoformat(),
"detail": self.detail,
"status_code": self.status_code,
}


@dataclasses.dataclass
class TelemetryData:
type: t.Literal["http", "websocket"]
endpoint: Endpoint
authentication: Authentication
request: Request
response: t.Optional[Response] = None
error: t.Optional[Error] = None
extra: dict[t.Any, t.Any] = dataclasses.field(default_factory=dict)

@classmethod
async def from_scope(cls, *, scope: types.Scope, receive: types.Receive, send: types.Send) -> "TelemetryData":
return cls(
type=scope["type"],
endpoint=await Endpoint.from_scope(scope=scope, receive=receive, send=send),
authentication=await Authentication.from_scope(scope=scope, receive=receive, send=send),
request=await Request.from_scope(scope=scope, receive=receive, send=send),
)

def to_dict(self) -> dict[str, t.Any]:
return {
"type": self.type,
"endpoint": self.endpoint.to_dict(),
"authentication": self.authentication.to_dict(),
"request": self.request.to_dict(),
"response": self.response.to_dict() if self.response else None,
"error": self.error.to_dict() if self.error else None,
"extra": self.extra,
}
155 changes: 155 additions & 0 deletions flama/telemetry/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import abc
import logging
import typing as t

from flama import Flama, concurrency, types
from flama.telemetry.data_structures import Error, Response, TelemetryData

logger = logging.getLogger(__name__)

__all__ = ["TelemetryMiddleware"]


PROJECT = "vortico-core"
SERVICE = "elektrococo"
TOPIC_ID = "telemetry-bus"

HookFunction = t.Callable[[TelemetryData], t.Union[None, t.Awaitable[None]]]


class Wrapper(abc.ABC):
def __init__(self, app: Flama, data: TelemetryData) -> None:
self.app = app
self.data = data

@classmethod
def build(cls, type: t.Literal["http", "websocket"], app: Flama, data: TelemetryData) -> "Wrapper":
if type == "websocket":
return WebSocketWrapper(app, data)

return HTTPWrapper(app, data)

async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None:
self._scope = scope
self._receive = receive
self._send = send
self._response_body = b""
self._response_headers = None
self._response_status_code = None

try:
await self.app(self._scope, self.receive, self.send)
self.data.response = Response(
headers=self._response_headers, body=self._response_body, status_code=self._response_status_code
)
except Exception as e:
self.data.error = await Error.from_exception(exception=e)
raise

@abc.abstractmethod
async def receive(self) -> types.Message: ...

@abc.abstractmethod
async def send(self, message: types.Message) -> None: ...


class HTTPWrapper(Wrapper):
async def receive(self) -> types.Message:
message = await self._receive()

if message["type"] == "http.request":
self.data.request.body += message.get("body", b"")

return message

async def send(self, message: types.Message) -> None:
if message["type"] == "http.response.start":
self._response_headers = {k.decode(): v.decode() for (k, v) in message.get("headers", [])}
self._response_status_code = message.get("status")
elif message["type"] == "http.response.body":
self._response_body += message.get("body", b"")

await self._send(message)


class WebSocketWrapper(Wrapper):
async def receive(self) -> types.Message:
message = await self._receive()

if message["type"] == "websocket.receive":
self._response_body += message.get("body", b"")
elif message["type"] == "websocket.disconnect":
self._response_status_code = message.get("code", None)
self._response_body = message.get("reason", "").encode()

return message

async def send(self, message: types.Message) -> None:
if message["type"] == "websocket.send":
self.data.request.body += message.get("bytes", message.get("text", "").encode())
elif message["type"] == "websocket.close":
self._response_status_code = message.get("code")
self._response_body = message.get("reason", "").encode()

await self._send(message)


class TelemetryDataCollector:
data: TelemetryData

def __init__(self, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send) -> None:
self.app = app
self._scope = scope
self._receive = receive
self._send = send

@classmethod
async def build(
cls, app: Flama, scope: types.Scope, receive: types.Receive, send: types.Send
) -> "TelemetryDataCollector":
self = cls(app, scope, receive, send)
self.data = await TelemetryData.from_scope(scope=scope, receive=receive, send=send)
return self

async def __call__(self) -> None:
await Wrapper.build(self._scope["type"], self.app, self.data)(
scope=self._scope, receive=self._receive, send=self._send
)


class TelemetryMiddleware:
def __init__(
self,
app: types.App,
log_level: int = logging.NOTSET,
*,
before: t.Optional[HookFunction] = None,
after: t.Optional[HookFunction] = None,
) -> None:
self.app: Flama = t.cast(Flama, app)
self._log_level = log_level
self._before = before
self._after = after

async def before(self, data: TelemetryData):
if self._before:
await concurrency.run(self._before, data)

async def after(self, data: TelemetryData):
if self._after:
await concurrency.run(self._after, data)

async def __call__(self, scope: types.Scope, receive: types.Receive, send: types.Send) -> None:
if scope["type"] not in ("http", "websocket"):
await self.app(scope, receive, send)
return

collector = await TelemetryDataCollector.build(self.app, scope, receive, send)

await self.before(collector.data)

try:
await collector()
finally:
await self.after(collector.data)
logger.log(self._log_level, "Telemetry: %s", str(collector.data))
Empty file added tests/telemetry/__init__.py
Empty file.
Loading
Loading