diff --git a/harp/_logging.py b/harp/_logging.py index 0747a265..2e9775be 100644 --- a/harp/_logging.py +++ b/harp/_logging.py @@ -87,6 +87,7 @@ def _get_logging_level(name: Optional[str], *, default="warning"): "harp_apps": {"level": _get_logging_level("harp")}, "harp_apps.http_client": {"level": _get_logging_level("http_client")}, "harp_apps.proxy": {"level": _get_logging_level("proxy")}, + "harp_apps.storage": {"level": _get_logging_level("storage")}, "httpcore": {"level": _get_logging_level("http_core")}, "httpx": {"level": _get_logging_level("http_core")}, "hypercorn.access": {"level": _get_logging_level("http", default="info")}, diff --git a/harp_apps/dashboard/tests/controllers/test_system.py b/harp_apps/dashboard/tests/controllers/test_system.py index 45d39227..8bb4783f 100644 --- a/harp_apps/dashboard/tests/controllers/test_system.py +++ b/harp_apps/dashboard/tests/controllers/test_system.py @@ -159,6 +159,8 @@ async def test_get_settings_sqlalchemy_secure(self, client: ASGICommunicator, bl "migrate": True, "redis": None, "url": RE(escape("postgresql+asyncpg://test:***@") + r".*"), + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], }, } diff --git a/harp_apps/storage/tests/test_worker.py b/harp_apps/storage/tests/test_worker.py new file mode 100644 index 00000000..684f522e --- /dev/null +++ b/harp_apps/storage/tests/test_worker.py @@ -0,0 +1,116 @@ +from unittest.mock import AsyncMock + +from sqlalchemy.ext.asyncio import AsyncEngine +from whistle import IAsyncEventDispatcher + +from harp.utils.testing.mixins.controllers import _create_request +from harp_apps.proxy.events import HttpMessageEvent +from harp_apps.proxy.tests.with_storage.test_controllers_http_proxy import DispatcherTestFixtureMixin +from harp_apps.storage.services import SqlStorage +from harp_apps.storage.types import IBlobStorage, IStorage +from harp_apps.storage.utils.testing.mixins import StorageTestFixtureMixin +from harp_apps.storage.worker import SKIP_REQUEST_PAYLOAD_STORAGE, StorageAsyncWorkerQueue + + +rules_config = { + "billiv":{ + "POST /api/uploads/*": { + "on_request": "transaction.markers.add('skip-request-payload-storage')" + }, + "GET /health": { + "on_request": "transaction.markers.add('skip-request-payload-storage')" + }, + "GET /api/downloads/*": { + "on_request": "transaction.markers.add('skip-response-payload-storage')" + } + } +} + +class TestStorageAsyncWorkerQueue(StorageTestFixtureMixin, DispatcherTestFixtureMixin): + def create_worker( + self, + dispatcher: IAsyncEventDispatcher, + engine: AsyncEngine, + sql_storage: IStorage, + blob_storage: IBlobStorage, + ) -> StorageAsyncWorkerQueue: + # Create rules engine + from harp_apps.rules.models.rulesets import RuleSet + from harp_apps.rules.subscribers import RulesSubscriber + + ruleset = RuleSet() + ruleset.add(rules_config) # Add the rules configuration + rules_subscriber = RulesSubscriber(ruleset) + + worker = StorageAsyncWorkerQueue(engine, sql_storage, blob_storage) + worker.register_events(dispatcher) + rules_subscriber.subscribe(dispatcher) # Subscribe rules to events + + return worker + + async def test_skip_request_payload_storage_when_path_matches_pattern( + self, + sql_engine, + blob_storage, + sql_storage: SqlStorage, + dispatcher: IAsyncEventDispatcher, + ): + worker = self.create_worker(dispatcher, sql_engine, sql_storage, blob_storage) + + transaction = await self.create_transaction( + sql_storage, + endpoint="/api/uploads/file.txt", + ) + + transaction.markers = set() + + request = await _create_request( + b"file content", + method="POST", + path="/api/uploads/file.txt", + ) + + event = HttpMessageEvent(transaction, request) + + # Mock the push method to avoid actual background processing + worker.push = AsyncMock() + + # Process the message + await worker.on_transaction_message(event) + + # Verify that the skip marker was added to the transaction + assert SKIP_REQUEST_PAYLOAD_STORAGE in transaction.markers + + async def test_skip_response_payload_storage_when_marked( + self, + sql_engine, + blob_storage, + sql_storage: SqlStorage, + dispatcher: IAsyncEventDispatcher, + ): + worker = self.create_worker(dispatcher, sql_engine, sql_storage, blob_storage) + transaction = await self.create_transaction( + sql_storage, + endpoint="/api/downloads/data.json", + ) + + transaction.markers = set() + + # Create a mock response message + request = await _create_request( + b"file content", + method="POST", + path="/api/downloads/data.json", + ) + + event = HttpMessageEvent(transaction, request) + + worker.push = AsyncMock() + worker.blob_storage.put = AsyncMock() + + # Process the message + await worker.on_transaction_message(event) + + # Verify that blob storage put was not called for the body + # (it should only be called for headers, not body due to the skip marker) + assert worker.blob_storage.put.call_count <= 1 # Only headers, not body diff --git a/harp_apps/storage/worker.py b/harp_apps/storage/worker.py index 9ba4763b..9773e2f3 100644 --- a/harp_apps/storage/worker.py +++ b/harp_apps/storage/worker.py @@ -1,3 +1,4 @@ +import re from datetime import UTC from functools import partial from math import log10 @@ -6,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine from whistle import IAsyncEventDispatcher +from harp import get_logger from harp.http import get_serializer_for from harp.models import Blob from harp.utils.background import AsyncWorkerQueue @@ -21,10 +23,25 @@ from harp_apps.storage.types import IBlobStorage, IStorage SKIP_STORAGE = "skip-storage" +SKIP_REQUEST_PAYLOAD_STORAGE = "skip-request-payload-storage" +SKIP_RESPONSE_PAYLOAD_STORAGE = "skip-response-payload-storage" + + +logger = get_logger("harp_apps.storage.worker") + + +def matches_any(text, compiled_patterns): + """Check if any compiled pattern matches the text""" + return any(pattern.search(text) for pattern in compiled_patterns) class StorageAsyncWorkerQueue(AsyncWorkerQueue): - def __init__(self, engine: AsyncEngine, storage: IStorage, blob_storage: IBlobStorage): + def __init__( + self, + engine: AsyncEngine, + storage: IStorage, + blob_storage: IBlobStorage, + ): self.engine = engine self.storage = storage self.blob_storage = blob_storage @@ -62,6 +79,8 @@ async def create_transaction(): await self.push(create_transaction) async def on_transaction_message(self, event: HttpMessageEvent): + if event.message.kind == "request": + logger.info(f"URL {event.message.path}") if SKIP_STORAGE in event.transaction.markers or self.pressure >= 3: return @@ -81,11 +100,25 @@ async def on_transaction_message(self, event: HttpMessageEvent): await self.push(partial(self.blob_storage.put, headers_blob), ignore_errors=True) message_data["headers"] = headers_blob.id - # Eventually store the content blob (later) - if self.pressure <= 1: - content_blob = Blob.from_data(serializer.body, content_type=event.message.headers.get("content-type")) - await self.push(partial(self.blob_storage.put, content_blob), ignore_errors=True) - message_data["body"] = content_blob.id + # if event.message.kind == "request": + # if matches_any(event.message.path, self.skip_storage_requests_payload): + # logger.info(f"Not storing request payload data for {serializer.summary}") + # event.transaction.markers.add(SKIP_REQUEST_PAYLOAD_STORAGE) + # if matches_any(event.message.path, self.skip_storage_responses_payload): + # logger.info(f"Not storing response payload data for {serializer.summary}") + # event.transaction.markers.add(SKIP_RESPONSE_PAYLOAD_STORAGE) + logger.info(f"Kind {event.message.kind}") + logger.info(f"Markers {event.transaction.markers}") + if event.message.kind == "request" and SKIP_REQUEST_PAYLOAD_STORAGE in event.transaction.markers: + logger.debug("Instructed not to store payload data for request") + elif event.message.kind == "response" and SKIP_RESPONSE_PAYLOAD_STORAGE in event.transaction.markers: + logger.debug("Instructed not to store payload data for response") + else: + # Eventually store the content blob (later) + if self.pressure <= 1: + content_blob = Blob.from_data(serializer.body, content_type=event.message.headers.get("content-type")) + await self.push(partial(self.blob_storage.put, content_blob), ignore_errors=True) + message_data["body"] = content_blob.id async def create_message(): async with self.engine.connect() as conn: