From cb065fd3a962ab3dcd21954a03f62cf1622d83aa Mon Sep 17 00:00:00 2001 From: Manu Date: Tue, 2 Sep 2025 17:35:44 +0200 Subject: [PATCH 1/3] Allow transaction body to not be stored. In case someone is using harp to process requests for which they don't want the body to be stored (for compliance, security, ...) they can now configure the storage worker to skip storing the request body (and/or the response body) for some routes given as regular expression in the config. The new config looks like this: ```yaml storage: skip_storage_requests_payload: - '/api/uploads/.*' - '/health' skip_storage_responses_payload: - '/api/downloads/.*' ``` --- harp/_logging.py | 1 + .../tests/controllers/test_system.py | 25 ++++-- .../tests/with_storage/test_defaults.py | 2 + harp_apps/storage/services.yml | 3 + harp_apps/storage/settings/__init__.py | 4 + harp_apps/storage/tests/test_application.py | 2 + harp_apps/storage/tests/test_settings.py | 47 ++++++++++ harp_apps/storage/tests/test_worker.py | 89 +++++++++++++++++++ harp_apps/storage/worker.py | 52 +++++++++-- 9 files changed, 214 insertions(+), 11 deletions(-) create mode 100644 harp_apps/storage/tests/test_worker.py diff --git a/harp/_logging.py b/harp/_logging.py index 0747a265..77fc4c7b 100644 --- a/harp/_logging.py +++ b/harp/_logging.py @@ -87,6 +87,7 @@ def _get_logging_level(name: Optional[str], *, default="warning"): "harp_apps": {"level": _get_logging_level("harp")}, "harp_apps.http_client": {"level": _get_logging_level("http_client")}, "harp_apps.proxy": {"level": _get_logging_level("proxy")}, + "harp_apps.storage.worker": {"level": _get_logging_level("proxy")}, "httpcore": {"level": _get_logging_level("http_core")}, "httpx": {"level": _get_logging_level("http_core")}, "hypercorn.access": {"level": _get_logging_level("http", default="info")}, diff --git a/harp_apps/dashboard/tests/controllers/test_system.py b/harp_apps/dashboard/tests/controllers/test_system.py index 45d39227..d03f7dc5 100644 --- a/harp_apps/dashboard/tests/controllers/test_system.py +++ b/harp_apps/dashboard/tests/controllers/test_system.py @@ -28,6 +28,8 @@ async def test_get_settings_empty(self, controller: SystemController): "migrate": True, "url": "sqlite+aiosqlite:///:memory:", "redis": None, + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], }, } @@ -43,6 +45,8 @@ async def test_get_settings_redis(self, controller: SystemController): "migrate": True, "url": "sqlite+aiosqlite:///:memory:", "redis": {"url": "redis://redis.example.com:1234/42"}, + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], }, } @@ -57,6 +61,8 @@ async def test_get_settings_sqlalchemy(self, controller: SystemController): "migrate": True, "url": "sqlite+aiosqlite:///:memory:", "redis": None, + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], }, } @@ -71,6 +77,8 @@ async def test_get_settings_sqlalchemy_secure(self, controller: SystemController "url": RE(r".*://test:\*\*\*@.*"), "blobs": ANY, "redis": None, + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], }, } @@ -90,7 +98,8 @@ async def test_get_settings_empty(self, client: ASGICommunicator): assert response["headers"] == ((b"content-type", b"application/json"),) assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' - b':memory:","migrate":true,"blobs":{"type":"sql"},"redis":null}}' + b':memory:","migrate":true,"blobs":{"type":"sql"},"redis":null,' + b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' ) @parametrize_with_settings({}) @@ -103,7 +112,8 @@ async def test_get_settings_empty_redis(self, client: ASGICommunicator): assert response["headers"] == ((b"content-type", b"application/json"),) assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' - b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":null}}' + b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":null,' + b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' ) @parametrize_with_settings( @@ -121,7 +131,8 @@ async def test_get_settings_sqlalchemy(self, client: ASGICommunicator, blob_stor assert response["headers"] == ((b"content-type", b"application/json"),) assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' - b':memory:","migrate":true,"blobs":{"type":"sql"},"redis":null}}' + b':memory:","migrate":true,"blobs":{"type":"sql"},"redis":null,' + b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' ) @parametrize_with_settings( @@ -142,7 +153,8 @@ async def test_get_settings_sqlalchemy_blobs_in_redis(self, client: ASGICommunic assert response["headers"] == ((b"content-type", b"application/json"),) assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' - b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":null}}' + b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":null,' + b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' ) @parametrize_with_database_urls("postgresql") @@ -159,6 +171,8 @@ async def test_get_settings_sqlalchemy_secure(self, client: ASGICommunicator, bl "migrate": True, "redis": None, "url": RE(escape("postgresql+asyncpg://test:***@") + r".*"), + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], }, } @@ -181,5 +195,6 @@ async def test_get_settings_sqlalchemy_blobs_in_redis_secure(self, client: ASGIC assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":{"url":"redis://us' - b'er:***@localhost:6379/0"}}}' + b'er:***@localhost:6379/0"},' + b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' ) diff --git a/harp_apps/http_client/tests/with_storage/test_defaults.py b/harp_apps/http_client/tests/with_storage/test_defaults.py index ecbb2912..babd68e5 100644 --- a/harp_apps/http_client/tests/with_storage/test_defaults.py +++ b/harp_apps/http_client/tests/with_storage/test_defaults.py @@ -67,6 +67,8 @@ async def test_defaults_with_storage(self, applications): "migrate": ANY, "url": ANY, "redis": None, + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [] } assert type(system.provider.get(IBlobStorage)).__name__ == "SqlBlobStorage" diff --git a/harp_apps/storage/services.yml b/harp_apps/storage/services.yml index 82827d9e..98435bac 100644 --- a/harp_apps/storage/services.yml +++ b/harp_apps/storage/services.yml @@ -19,6 +19,9 @@ services: - name: "storage.worker" description: "Storage async worker" base: harp_apps.storage.worker.StorageAsyncWorkerQueue + arguments: + - "skip_storage_requests_payload" : !cfg "skip_storage_requests_payload" + "skip_storage_responses_payload" : !cfg "skip_storage_responses_payload" - condition: [!cfg "blobs.type == 'sql'", !!bool "true"] services: diff --git a/harp_apps/storage/settings/__init__.py b/harp_apps/storage/settings/__init__.py index a1b4e2a7..6eeef8db 100644 --- a/harp_apps/storage/settings/__init__.py +++ b/harp_apps/storage/settings/__init__.py @@ -1,5 +1,7 @@ from typing import Optional +from pydantic import Field + from .blobs import BlobStorageSettings from .database import DatabaseSettings from .redis import RedisSettings @@ -9,3 +11,5 @@ class StorageSettings(DatabaseSettings): migrate: bool = True blobs: BlobStorageSettings = BlobStorageSettings() redis: Optional[RedisSettings] = None + skip_storage_requests_payload: list[str] = Field(default_factory=list) + skip_storage_responses_payload: list[str] = Field(default_factory=list) diff --git a/harp_apps/storage/tests/test_application.py b/harp_apps/storage/tests/test_application.py index 18da8d33..698fbe1a 100644 --- a/harp_apps/storage/tests/test_application.py +++ b/harp_apps/storage/tests/test_application.py @@ -15,6 +15,8 @@ class TestStorageApplication(BaseTestForApplications): "migrate": True, "redis": None, "url": "sqlite+aiosqlite:///:memory:?cache=shared", + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], } @pytest.mark.parametrize( diff --git a/harp_apps/storage/tests/test_settings.py b/harp_apps/storage/tests/test_settings.py index 8a9b7c48..a8de1a1e 100644 --- a/harp_apps/storage/tests/test_settings.py +++ b/harp_apps/storage/tests/test_settings.py @@ -11,6 +11,8 @@ def test_empty_settings(): "url": "sqlite+aiosqlite:///:memory:?cache=shared", "blobs": {"type": "sql"}, "redis": None, + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], } assert asdict(settings) == {} @@ -24,6 +26,8 @@ def test_secure(): "url": "postgresql+asyncpg://user:***@localhost:5432/db", "blobs": {"type": "sql"}, "redis": None, + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], } assert asdict(settings, mode="python") == { @@ -35,6 +39,8 @@ def test_secure(): "url": "postgresql+asyncpg://user:password@localhost:5432/db", "blobs": {"type": "sql"}, "redis": None, + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], } assert asdict(settings, secure=False) == { @@ -49,6 +55,8 @@ def test_override_blob_storage_type(): "url": "sqlite+aiosqlite:///:memory:?cache=shared", "blobs": {"type": "redis"}, "redis": None, + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], } assert asdict(settings) == { @@ -63,6 +71,8 @@ def test_override_redis_url(): "migrate": True, "redis": {"url": "redis://example.com:1234/42"}, "url": "sqlite+aiosqlite:///:memory:?cache=shared", + 'skip_storage_requests_payload': [], + 'skip_storage_responses_payload': [], } assert asdict(settings) == { "blobs": {"type": "redis"}, @@ -74,3 +84,40 @@ def test_settings_normalization_does_not_hide_password(): app = Application(settings_type=StorageSettings) settings = app.normalize({"url": "postgresql://user:password@localhost:5432/db"}) assert settings["url"] == "postgresql+asyncpg://user:password@localhost:5432/db" + + +def test_skip_storage_payload_settings(): + settings = StorageSettings.from_kwargs( + skip_storage_requests_payload=["/api/uploads/*", "/health"], + skip_storage_responses_payload=["/api/downloads/*"] + ) + + assert asdict(settings, verbose=True) == { + "migrate": True, + "url": "sqlite+aiosqlite:///:memory:?cache=shared", + "blobs": {"type": "sql"}, + "redis": None, + 'skip_storage_requests_payload': ["/api/uploads/*", "/health"], + 'skip_storage_responses_payload': ["/api/downloads/*"], + } + + assert asdict(settings) == { + 'skip_storage_requests_payload': ["/api/uploads/*", "/health"], + 'skip_storage_responses_payload': ["/api/downloads/*"], + } + + +def test_matches_any_pattern_matching(): + from harp_apps.storage.worker import matches_any + import re + + patterns = [re.compile(r"/api/uploads/.*"), re.compile(r"/health")] + + # Test matching cases + assert matches_any("/api/uploads/file.txt", patterns) == True + assert matches_any("/health", patterns) == True + assert matches_any("/api/uploads/subfolder/image.png", patterns) == True + + # Test non-matching cases + assert matches_any("/api/downloads/file.txt", patterns) == False + assert matches_any("/status", patterns) == False diff --git a/harp_apps/storage/tests/test_worker.py b/harp_apps/storage/tests/test_worker.py new file mode 100644 index 00000000..82426d13 --- /dev/null +++ b/harp_apps/storage/tests/test_worker.py @@ -0,0 +1,89 @@ +from unittest.mock import AsyncMock + +from sqlalchemy.ext.asyncio import AsyncEngine +from whistle import IAsyncEventDispatcher + +from harp.utils.testing.mixins.controllers import _create_request +from harp_apps.proxy.events import HttpMessageEvent +from harp_apps.proxy.tests.with_storage.test_controllers_http_proxy import DispatcherTestFixtureMixin +from harp_apps.storage.services import SqlStorage +from harp_apps.storage.types import IStorage, IBlobStorage +from harp_apps.storage.worker import SKIP_REQUEST_PAYLOAD_STORAGE, StorageAsyncWorkerQueue +from harp_apps.storage.utils.testing.mixins import StorageTestFixtureMixin + + +class TestStorageAsyncWorkerQueue(StorageTestFixtureMixin, DispatcherTestFixtureMixin): + def create_worker( + self, + dispatcher: IAsyncEventDispatcher, + engine: AsyncEngine, + sql_storage: IStorage, + blob_storage: IBlobStorage, + ) -> StorageAsyncWorkerQueue: + worker = StorageAsyncWorkerQueue( + engine, + sql_storage, + blob_storage, + skip_storage_requests_payload=[r"/api/uploads/.*", r"/health"], + skip_storage_responses_payload=[r"/api/downloads/.*"], + ) + worker.register_events(dispatcher) + return worker + + async def test_skip_request_payload_storage_when_path_matches_pattern( + self, + sql_engine, blob_storage, + sql_storage: SqlStorage, + dispatcher: IAsyncEventDispatcher, + ): + worker = self.create_worker(dispatcher, sql_engine, sql_storage, blob_storage) + + transaction = await self.create_transaction( + sql_storage, + endpoint="/api/uploads/file.txt", + ) + + transaction.markers = set() + + request = await _create_request(b"file content", method='POST', path="/api/uploads/file.txt",) + + event = HttpMessageEvent(transaction, request) + + # Mock the push method to avoid actual background processing + worker.push = AsyncMock() + + # Process the message + await worker.on_transaction_message(event) + + # Verify that the skip marker was added to the transaction + assert SKIP_REQUEST_PAYLOAD_STORAGE in transaction.markers + + async def test_skip_response_payload_storage_when_marked( + self, + sql_engine, + blob_storage, + sql_storage: SqlStorage, + dispatcher: IAsyncEventDispatcher, + ): + worker = self.create_worker(dispatcher, sql_engine, sql_storage, blob_storage) + transaction = await self.create_transaction( + sql_storage, + endpoint="/api/downloads/data.json", + ) + + transaction.markers = set() + + # Create a mock response message + request = await _create_request(b"file content", method='POST', path="/api/downloads/data.json",) + + event = HttpMessageEvent(transaction, request) + + worker.push = AsyncMock() + worker.blob_storage.put = AsyncMock() + + # Process the message + await worker.on_transaction_message(event) + + # Verify that blob storage put was not called for the body + # (it should only be called for headers, not body due to the skip marker) + assert worker.blob_storage.put.call_count <= 1 # Only headers, not body diff --git a/harp_apps/storage/worker.py b/harp_apps/storage/worker.py index 9ba4763b..5369e282 100644 --- a/harp_apps/storage/worker.py +++ b/harp_apps/storage/worker.py @@ -1,3 +1,4 @@ +import re from datetime import UTC from functools import partial from math import log10 @@ -6,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine from whistle import IAsyncEventDispatcher +from harp import get_logger from harp.http import get_serializer_for from harp.models import Blob from harp.utils.background import AsyncWorkerQueue @@ -21,15 +23,40 @@ from harp_apps.storage.types import IBlobStorage, IStorage SKIP_STORAGE = "skip-storage" +SKIP_REQUEST_PAYLOAD_STORAGE = "skip-request-payload-storage" +SKIP_RESPONSE_PAYLOAD_STORAGE = "skip-response-payload-storage" + + +logger = get_logger("harp_apps.storage.worker") + + +def matches_any(text, compiled_patterns): + """Check if any compiled pattern matches the text""" + return any(pattern.search(text) for pattern in compiled_patterns) class StorageAsyncWorkerQueue(AsyncWorkerQueue): - def __init__(self, engine: AsyncEngine, storage: IStorage, blob_storage: IBlobStorage): + def __init__( + self, + engine: AsyncEngine, + storage: IStorage, + blob_storage: IBlobStorage, + skip_storage_requests_payload: list[str] | None = None, + skip_storage_responses_payload: list[str] | None = None, + ): self.engine = engine self.storage = storage self.blob_storage = blob_storage super().__init__() self.seen = set() + skip_storage_requests_payload = skip_storage_requests_payload or [] + skip_storage_responses_payload = skip_storage_responses_payload or [] + self.skip_storage_requests_payload = [ + re.compile(pattern) for pattern in skip_storage_requests_payload + ] + self.skip_storage_responses_payload = [ + re.compile(pattern) for pattern in skip_storage_responses_payload + ] def register_events(self, dispatcher: IAsyncEventDispatcher): dispatcher.add_listener(EVENT_TRANSACTION_STARTED, self.on_transaction_started) @@ -81,11 +108,24 @@ async def on_transaction_message(self, event: HttpMessageEvent): await self.push(partial(self.blob_storage.put, headers_blob), ignore_errors=True) message_data["headers"] = headers_blob.id - # Eventually store the content blob (later) - if self.pressure <= 1: - content_blob = Blob.from_data(serializer.body, content_type=event.message.headers.get("content-type")) - await self.push(partial(self.blob_storage.put, content_blob), ignore_errors=True) - message_data["body"] = content_blob.id + if event.message.kind == 'request': + if matches_any(event.message.path, self.skip_storage_requests_payload): + logger.info(f"Not storing request payload data for {serializer.summary}") + event.transaction.markers.add(SKIP_REQUEST_PAYLOAD_STORAGE) + if matches_any(event.message.path, self.skip_storage_responses_payload): + logger.info(f"Not storing response payload data for {serializer.summary}") + event.transaction.markers.add(SKIP_RESPONSE_PAYLOAD_STORAGE) + + if event.message.kind == 'request' and SKIP_REQUEST_PAYLOAD_STORAGE in event.transaction.markers: + logger.debug("Instructed not to store payload data for request") + elif event.message.kind == 'response' and SKIP_RESPONSE_PAYLOAD_STORAGE in event.transaction.markers: + logger.debug("Instructed not to store payload data for response") + else: + # Eventually store the content blob (later) + if self.pressure <= 1: + content_blob = Blob.from_data(serializer.body, content_type=event.message.headers.get("content-type")) + await self.push(partial(self.blob_storage.put, content_blob), ignore_errors=True) + message_data["body"] = content_blob.id async def create_message(): async with self.engine.connect() as conn: From 5e3af7750271577a37eef493904abe539b79bbb5 Mon Sep 17 00:00:00 2001 From: Manu Date: Wed, 3 Sep 2025 11:36:10 +0200 Subject: [PATCH 2/3] Formatting issues --- .../tests/controllers/test_system.py | 16 ++--- .../tests/with_storage/test_defaults.py | 4 +- harp_apps/storage/settings/__init__.py | 2 +- harp_apps/storage/tests/test_application.py | 4 +- harp_apps/storage/tests/test_settings.py | 44 ++++++------- harp_apps/storage/tests/test_worker.py | 61 +++++++++++-------- harp_apps/storage/worker.py | 26 ++++---- 7 files changed, 81 insertions(+), 76 deletions(-) diff --git a/harp_apps/dashboard/tests/controllers/test_system.py b/harp_apps/dashboard/tests/controllers/test_system.py index d03f7dc5..288ec090 100644 --- a/harp_apps/dashboard/tests/controllers/test_system.py +++ b/harp_apps/dashboard/tests/controllers/test_system.py @@ -28,8 +28,8 @@ async def test_get_settings_empty(self, controller: SystemController): "migrate": True, "url": "sqlite+aiosqlite:///:memory:", "redis": None, - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], }, } @@ -45,8 +45,8 @@ async def test_get_settings_redis(self, controller: SystemController): "migrate": True, "url": "sqlite+aiosqlite:///:memory:", "redis": {"url": "redis://redis.example.com:1234/42"}, - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], }, } @@ -61,8 +61,8 @@ async def test_get_settings_sqlalchemy(self, controller: SystemController): "migrate": True, "url": "sqlite+aiosqlite:///:memory:", "redis": None, - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], }, } @@ -77,8 +77,8 @@ async def test_get_settings_sqlalchemy_secure(self, controller: SystemController "url": RE(r".*://test:\*\*\*@.*"), "blobs": ANY, "redis": None, - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], }, } diff --git a/harp_apps/http_client/tests/with_storage/test_defaults.py b/harp_apps/http_client/tests/with_storage/test_defaults.py index babd68e5..ac0ef806 100644 --- a/harp_apps/http_client/tests/with_storage/test_defaults.py +++ b/harp_apps/http_client/tests/with_storage/test_defaults.py @@ -67,8 +67,8 @@ async def test_defaults_with_storage(self, applications): "migrate": ANY, "url": ANY, "redis": None, - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [] + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], } assert type(system.provider.get(IBlobStorage)).__name__ == "SqlBlobStorage" diff --git a/harp_apps/storage/settings/__init__.py b/harp_apps/storage/settings/__init__.py index 6eeef8db..70b01d88 100644 --- a/harp_apps/storage/settings/__init__.py +++ b/harp_apps/storage/settings/__init__.py @@ -12,4 +12,4 @@ class StorageSettings(DatabaseSettings): blobs: BlobStorageSettings = BlobStorageSettings() redis: Optional[RedisSettings] = None skip_storage_requests_payload: list[str] = Field(default_factory=list) - skip_storage_responses_payload: list[str] = Field(default_factory=list) + skip_storage_responses_payload: list[str] = Field(default_factory=list) diff --git a/harp_apps/storage/tests/test_application.py b/harp_apps/storage/tests/test_application.py index 698fbe1a..25a0b314 100644 --- a/harp_apps/storage/tests/test_application.py +++ b/harp_apps/storage/tests/test_application.py @@ -15,8 +15,8 @@ class TestStorageApplication(BaseTestForApplications): "migrate": True, "redis": None, "url": "sqlite+aiosqlite:///:memory:?cache=shared", - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], } @pytest.mark.parametrize( diff --git a/harp_apps/storage/tests/test_settings.py b/harp_apps/storage/tests/test_settings.py index a8de1a1e..240c23dd 100644 --- a/harp_apps/storage/tests/test_settings.py +++ b/harp_apps/storage/tests/test_settings.py @@ -11,8 +11,8 @@ def test_empty_settings(): "url": "sqlite+aiosqlite:///:memory:?cache=shared", "blobs": {"type": "sql"}, "redis": None, - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], } assert asdict(settings) == {} @@ -26,8 +26,8 @@ def test_secure(): "url": "postgresql+asyncpg://user:***@localhost:5432/db", "blobs": {"type": "sql"}, "redis": None, - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], } assert asdict(settings, mode="python") == { @@ -39,8 +39,8 @@ def test_secure(): "url": "postgresql+asyncpg://user:password@localhost:5432/db", "blobs": {"type": "sql"}, "redis": None, - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], } assert asdict(settings, secure=False) == { @@ -55,8 +55,8 @@ def test_override_blob_storage_type(): "url": "sqlite+aiosqlite:///:memory:?cache=shared", "blobs": {"type": "redis"}, "redis": None, - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], } assert asdict(settings) == { @@ -71,8 +71,8 @@ def test_override_redis_url(): "migrate": True, "redis": {"url": "redis://example.com:1234/42"}, "url": "sqlite+aiosqlite:///:memory:?cache=shared", - 'skip_storage_requests_payload': [], - 'skip_storage_responses_payload': [], + "skip_storage_requests_payload": [], + "skip_storage_responses_payload": [], } assert asdict(settings) == { "blobs": {"type": "redis"}, @@ -88,8 +88,7 @@ def test_settings_normalization_does_not_hide_password(): def test_skip_storage_payload_settings(): settings = StorageSettings.from_kwargs( - skip_storage_requests_payload=["/api/uploads/*", "/health"], - skip_storage_responses_payload=["/api/downloads/*"] + skip_storage_requests_payload=["/api/uploads/*", "/health"], skip_storage_responses_payload=["/api/downloads/*"] ) assert asdict(settings, verbose=True) == { @@ -97,27 +96,28 @@ def test_skip_storage_payload_settings(): "url": "sqlite+aiosqlite:///:memory:?cache=shared", "blobs": {"type": "sql"}, "redis": None, - 'skip_storage_requests_payload': ["/api/uploads/*", "/health"], - 'skip_storage_responses_payload': ["/api/downloads/*"], + "skip_storage_requests_payload": ["/api/uploads/*", "/health"], + "skip_storage_responses_payload": ["/api/downloads/*"], } assert asdict(settings) == { - 'skip_storage_requests_payload': ["/api/uploads/*", "/health"], - 'skip_storage_responses_payload': ["/api/downloads/*"], + "skip_storage_requests_payload": ["/api/uploads/*", "/health"], + "skip_storage_responses_payload": ["/api/downloads/*"], } def test_matches_any_pattern_matching(): - from harp_apps.storage.worker import matches_any import re + from harp_apps.storage.worker import matches_any + patterns = [re.compile(r"/api/uploads/.*"), re.compile(r"/health")] # Test matching cases - assert matches_any("/api/uploads/file.txt", patterns) == True - assert matches_any("/health", patterns) == True - assert matches_any("/api/uploads/subfolder/image.png", patterns) == True + assert matches_any("/api/uploads/file.txt", patterns) + assert matches_any("/health", patterns) + assert matches_any("/api/uploads/subfolder/image.png", patterns) # Test non-matching cases - assert matches_any("/api/downloads/file.txt", patterns) == False - assert matches_any("/status", patterns) == False + assert not matches_any("/api/downloads/file.txt", patterns) + assert not matches_any("/status", patterns) diff --git a/harp_apps/storage/tests/test_worker.py b/harp_apps/storage/tests/test_worker.py index 82426d13..01863c40 100644 --- a/harp_apps/storage/tests/test_worker.py +++ b/harp_apps/storage/tests/test_worker.py @@ -7,18 +7,18 @@ from harp_apps.proxy.events import HttpMessageEvent from harp_apps.proxy.tests.with_storage.test_controllers_http_proxy import DispatcherTestFixtureMixin from harp_apps.storage.services import SqlStorage -from harp_apps.storage.types import IStorage, IBlobStorage -from harp_apps.storage.worker import SKIP_REQUEST_PAYLOAD_STORAGE, StorageAsyncWorkerQueue +from harp_apps.storage.types import IBlobStorage, IStorage from harp_apps.storage.utils.testing.mixins import StorageTestFixtureMixin +from harp_apps.storage.worker import SKIP_REQUEST_PAYLOAD_STORAGE, StorageAsyncWorkerQueue class TestStorageAsyncWorkerQueue(StorageTestFixtureMixin, DispatcherTestFixtureMixin): def create_worker( - self, - dispatcher: IAsyncEventDispatcher, - engine: AsyncEngine, - sql_storage: IStorage, - blob_storage: IBlobStorage, + self, + dispatcher: IAsyncEventDispatcher, + engine: AsyncEngine, + sql_storage: IStorage, + blob_storage: IBlobStorage, ) -> StorageAsyncWorkerQueue: worker = StorageAsyncWorkerQueue( engine, @@ -31,10 +31,11 @@ def create_worker( return worker async def test_skip_request_payload_storage_when_path_matches_pattern( - self, - sql_engine, blob_storage, - sql_storage: SqlStorage, - dispatcher: IAsyncEventDispatcher, + self, + sql_engine, + blob_storage, + sql_storage: SqlStorage, + dispatcher: IAsyncEventDispatcher, ): worker = self.create_worker(dispatcher, sql_engine, sql_storage, blob_storage) @@ -44,26 +45,30 @@ async def test_skip_request_payload_storage_when_path_matches_pattern( ) transaction.markers = set() - - request = await _create_request(b"file content", method='POST', path="/api/uploads/file.txt",) - + + request = await _create_request( + b"file content", + method="POST", + path="/api/uploads/file.txt", + ) + event = HttpMessageEvent(transaction, request) - + # Mock the push method to avoid actual background processing worker.push = AsyncMock() - + # Process the message await worker.on_transaction_message(event) - + # Verify that the skip marker was added to the transaction assert SKIP_REQUEST_PAYLOAD_STORAGE in transaction.markers async def test_skip_response_payload_storage_when_marked( - self, - sql_engine, - blob_storage, - sql_storage: SqlStorage, - dispatcher: IAsyncEventDispatcher, + self, + sql_engine, + blob_storage, + sql_storage: SqlStorage, + dispatcher: IAsyncEventDispatcher, ): worker = self.create_worker(dispatcher, sql_engine, sql_storage, blob_storage) transaction = await self.create_transaction( @@ -72,18 +77,22 @@ async def test_skip_response_payload_storage_when_marked( ) transaction.markers = set() - + # Create a mock response message - request = await _create_request(b"file content", method='POST', path="/api/downloads/data.json",) + request = await _create_request( + b"file content", + method="POST", + path="/api/downloads/data.json", + ) event = HttpMessageEvent(transaction, request) worker.push = AsyncMock() worker.blob_storage.put = AsyncMock() - + # Process the message await worker.on_transaction_message(event) - + # Verify that blob storage put was not called for the body # (it should only be called for headers, not body due to the skip marker) assert worker.blob_storage.put.call_count <= 1 # Only headers, not body diff --git a/harp_apps/storage/worker.py b/harp_apps/storage/worker.py index 5369e282..8c608138 100644 --- a/harp_apps/storage/worker.py +++ b/harp_apps/storage/worker.py @@ -37,12 +37,12 @@ def matches_any(text, compiled_patterns): class StorageAsyncWorkerQueue(AsyncWorkerQueue): def __init__( - self, - engine: AsyncEngine, - storage: IStorage, - blob_storage: IBlobStorage, - skip_storage_requests_payload: list[str] | None = None, - skip_storage_responses_payload: list[str] | None = None, + self, + engine: AsyncEngine, + storage: IStorage, + blob_storage: IBlobStorage, + skip_storage_requests_payload: list[str] | None = None, + skip_storage_responses_payload: list[str] | None = None, ): self.engine = engine self.storage = storage @@ -51,12 +51,8 @@ def __init__( self.seen = set() skip_storage_requests_payload = skip_storage_requests_payload or [] skip_storage_responses_payload = skip_storage_responses_payload or [] - self.skip_storage_requests_payload = [ - re.compile(pattern) for pattern in skip_storage_requests_payload - ] - self.skip_storage_responses_payload = [ - re.compile(pattern) for pattern in skip_storage_responses_payload - ] + self.skip_storage_requests_payload = [re.compile(pattern) for pattern in skip_storage_requests_payload] + self.skip_storage_responses_payload = [re.compile(pattern) for pattern in skip_storage_responses_payload] def register_events(self, dispatcher: IAsyncEventDispatcher): dispatcher.add_listener(EVENT_TRANSACTION_STARTED, self.on_transaction_started) @@ -108,7 +104,7 @@ async def on_transaction_message(self, event: HttpMessageEvent): await self.push(partial(self.blob_storage.put, headers_blob), ignore_errors=True) message_data["headers"] = headers_blob.id - if event.message.kind == 'request': + if event.message.kind == "request": if matches_any(event.message.path, self.skip_storage_requests_payload): logger.info(f"Not storing request payload data for {serializer.summary}") event.transaction.markers.add(SKIP_REQUEST_PAYLOAD_STORAGE) @@ -116,9 +112,9 @@ async def on_transaction_message(self, event: HttpMessageEvent): logger.info(f"Not storing response payload data for {serializer.summary}") event.transaction.markers.add(SKIP_RESPONSE_PAYLOAD_STORAGE) - if event.message.kind == 'request' and SKIP_REQUEST_PAYLOAD_STORAGE in event.transaction.markers: + if event.message.kind == "request" and SKIP_REQUEST_PAYLOAD_STORAGE in event.transaction.markers: logger.debug("Instructed not to store payload data for request") - elif event.message.kind == 'response' and SKIP_RESPONSE_PAYLOAD_STORAGE in event.transaction.markers: + elif event.message.kind == "response" and SKIP_RESPONSE_PAYLOAD_STORAGE in event.transaction.markers: logger.debug("Instructed not to store payload data for response") else: # Eventually store the content blob (later) From 1bd43786521b91af4420b325243093d6cea0882d Mon Sep 17 00:00:00 2001 From: Manu Date: Fri, 3 Oct 2025 10:55:18 +0200 Subject: [PATCH 3/3] wip --- harp/_logging.py | 2 +- .../tests/controllers/test_system.py | 23 ++------- .../tests/with_storage/test_defaults.py | 2 - harp_apps/storage/services.yml | 3 -- harp_apps/storage/settings/__init__.py | 4 -- harp_apps/storage/tests/test_application.py | 2 - harp_apps/storage/tests/test_settings.py | 47 ------------------- harp_apps/storage/tests/test_worker.py | 32 ++++++++++--- harp_apps/storage/worker.py | 25 +++++----- 9 files changed, 42 insertions(+), 98 deletions(-) diff --git a/harp/_logging.py b/harp/_logging.py index 77fc4c7b..2e9775be 100644 --- a/harp/_logging.py +++ b/harp/_logging.py @@ -87,7 +87,7 @@ def _get_logging_level(name: Optional[str], *, default="warning"): "harp_apps": {"level": _get_logging_level("harp")}, "harp_apps.http_client": {"level": _get_logging_level("http_client")}, "harp_apps.proxy": {"level": _get_logging_level("proxy")}, - "harp_apps.storage.worker": {"level": _get_logging_level("proxy")}, + "harp_apps.storage": {"level": _get_logging_level("storage")}, "httpcore": {"level": _get_logging_level("http_core")}, "httpx": {"level": _get_logging_level("http_core")}, "hypercorn.access": {"level": _get_logging_level("http", default="info")}, diff --git a/harp_apps/dashboard/tests/controllers/test_system.py b/harp_apps/dashboard/tests/controllers/test_system.py index 288ec090..8bb4783f 100644 --- a/harp_apps/dashboard/tests/controllers/test_system.py +++ b/harp_apps/dashboard/tests/controllers/test_system.py @@ -28,8 +28,6 @@ async def test_get_settings_empty(self, controller: SystemController): "migrate": True, "url": "sqlite+aiosqlite:///:memory:", "redis": None, - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], }, } @@ -45,8 +43,6 @@ async def test_get_settings_redis(self, controller: SystemController): "migrate": True, "url": "sqlite+aiosqlite:///:memory:", "redis": {"url": "redis://redis.example.com:1234/42"}, - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], }, } @@ -61,8 +57,6 @@ async def test_get_settings_sqlalchemy(self, controller: SystemController): "migrate": True, "url": "sqlite+aiosqlite:///:memory:", "redis": None, - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], }, } @@ -77,8 +71,6 @@ async def test_get_settings_sqlalchemy_secure(self, controller: SystemController "url": RE(r".*://test:\*\*\*@.*"), "blobs": ANY, "redis": None, - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], }, } @@ -98,8 +90,7 @@ async def test_get_settings_empty(self, client: ASGICommunicator): assert response["headers"] == ((b"content-type", b"application/json"),) assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' - b':memory:","migrate":true,"blobs":{"type":"sql"},"redis":null,' - b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' + b':memory:","migrate":true,"blobs":{"type":"sql"},"redis":null}}' ) @parametrize_with_settings({}) @@ -112,8 +103,7 @@ async def test_get_settings_empty_redis(self, client: ASGICommunicator): assert response["headers"] == ((b"content-type", b"application/json"),) assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' - b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":null,' - b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' + b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":null}}' ) @parametrize_with_settings( @@ -131,8 +121,7 @@ async def test_get_settings_sqlalchemy(self, client: ASGICommunicator, blob_stor assert response["headers"] == ((b"content-type", b"application/json"),) assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' - b':memory:","migrate":true,"blobs":{"type":"sql"},"redis":null,' - b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' + b':memory:","migrate":true,"blobs":{"type":"sql"},"redis":null}}' ) @parametrize_with_settings( @@ -153,8 +142,7 @@ async def test_get_settings_sqlalchemy_blobs_in_redis(self, client: ASGICommunic assert response["headers"] == ((b"content-type", b"application/json"),) assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' - b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":null,' - b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' + b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":null}}' ) @parametrize_with_database_urls("postgresql") @@ -195,6 +183,5 @@ async def test_get_settings_sqlalchemy_blobs_in_redis_secure(self, client: ASGIC assert response["body"] == ( b'{"applications":["harp_apps.storage"],"storage":{"url":"sqlite+aiosqlite:///' b':memory:","migrate":true,"blobs":{"type":"redis"},"redis":{"url":"redis://us' - b'er:***@localhost:6379/0"},' - b'"skip_storage_requests_payload":[],"skip_storage_responses_payload":[]}}' + b'er:***@localhost:6379/0"}}}' ) diff --git a/harp_apps/http_client/tests/with_storage/test_defaults.py b/harp_apps/http_client/tests/with_storage/test_defaults.py index ac0ef806..ecbb2912 100644 --- a/harp_apps/http_client/tests/with_storage/test_defaults.py +++ b/harp_apps/http_client/tests/with_storage/test_defaults.py @@ -67,8 +67,6 @@ async def test_defaults_with_storage(self, applications): "migrate": ANY, "url": ANY, "redis": None, - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], } assert type(system.provider.get(IBlobStorage)).__name__ == "SqlBlobStorage" diff --git a/harp_apps/storage/services.yml b/harp_apps/storage/services.yml index 98435bac..82827d9e 100644 --- a/harp_apps/storage/services.yml +++ b/harp_apps/storage/services.yml @@ -19,9 +19,6 @@ services: - name: "storage.worker" description: "Storage async worker" base: harp_apps.storage.worker.StorageAsyncWorkerQueue - arguments: - - "skip_storage_requests_payload" : !cfg "skip_storage_requests_payload" - "skip_storage_responses_payload" : !cfg "skip_storage_responses_payload" - condition: [!cfg "blobs.type == 'sql'", !!bool "true"] services: diff --git a/harp_apps/storage/settings/__init__.py b/harp_apps/storage/settings/__init__.py index 70b01d88..a1b4e2a7 100644 --- a/harp_apps/storage/settings/__init__.py +++ b/harp_apps/storage/settings/__init__.py @@ -1,7 +1,5 @@ from typing import Optional -from pydantic import Field - from .blobs import BlobStorageSettings from .database import DatabaseSettings from .redis import RedisSettings @@ -11,5 +9,3 @@ class StorageSettings(DatabaseSettings): migrate: bool = True blobs: BlobStorageSettings = BlobStorageSettings() redis: Optional[RedisSettings] = None - skip_storage_requests_payload: list[str] = Field(default_factory=list) - skip_storage_responses_payload: list[str] = Field(default_factory=list) diff --git a/harp_apps/storage/tests/test_application.py b/harp_apps/storage/tests/test_application.py index 25a0b314..18da8d33 100644 --- a/harp_apps/storage/tests/test_application.py +++ b/harp_apps/storage/tests/test_application.py @@ -15,8 +15,6 @@ class TestStorageApplication(BaseTestForApplications): "migrate": True, "redis": None, "url": "sqlite+aiosqlite:///:memory:?cache=shared", - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], } @pytest.mark.parametrize( diff --git a/harp_apps/storage/tests/test_settings.py b/harp_apps/storage/tests/test_settings.py index 240c23dd..8a9b7c48 100644 --- a/harp_apps/storage/tests/test_settings.py +++ b/harp_apps/storage/tests/test_settings.py @@ -11,8 +11,6 @@ def test_empty_settings(): "url": "sqlite+aiosqlite:///:memory:?cache=shared", "blobs": {"type": "sql"}, "redis": None, - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], } assert asdict(settings) == {} @@ -26,8 +24,6 @@ def test_secure(): "url": "postgresql+asyncpg://user:***@localhost:5432/db", "blobs": {"type": "sql"}, "redis": None, - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], } assert asdict(settings, mode="python") == { @@ -39,8 +35,6 @@ def test_secure(): "url": "postgresql+asyncpg://user:password@localhost:5432/db", "blobs": {"type": "sql"}, "redis": None, - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], } assert asdict(settings, secure=False) == { @@ -55,8 +49,6 @@ def test_override_blob_storage_type(): "url": "sqlite+aiosqlite:///:memory:?cache=shared", "blobs": {"type": "redis"}, "redis": None, - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], } assert asdict(settings) == { @@ -71,8 +63,6 @@ def test_override_redis_url(): "migrate": True, "redis": {"url": "redis://example.com:1234/42"}, "url": "sqlite+aiosqlite:///:memory:?cache=shared", - "skip_storage_requests_payload": [], - "skip_storage_responses_payload": [], } assert asdict(settings) == { "blobs": {"type": "redis"}, @@ -84,40 +74,3 @@ def test_settings_normalization_does_not_hide_password(): app = Application(settings_type=StorageSettings) settings = app.normalize({"url": "postgresql://user:password@localhost:5432/db"}) assert settings["url"] == "postgresql+asyncpg://user:password@localhost:5432/db" - - -def test_skip_storage_payload_settings(): - settings = StorageSettings.from_kwargs( - skip_storage_requests_payload=["/api/uploads/*", "/health"], skip_storage_responses_payload=["/api/downloads/*"] - ) - - assert asdict(settings, verbose=True) == { - "migrate": True, - "url": "sqlite+aiosqlite:///:memory:?cache=shared", - "blobs": {"type": "sql"}, - "redis": None, - "skip_storage_requests_payload": ["/api/uploads/*", "/health"], - "skip_storage_responses_payload": ["/api/downloads/*"], - } - - assert asdict(settings) == { - "skip_storage_requests_payload": ["/api/uploads/*", "/health"], - "skip_storage_responses_payload": ["/api/downloads/*"], - } - - -def test_matches_any_pattern_matching(): - import re - - from harp_apps.storage.worker import matches_any - - patterns = [re.compile(r"/api/uploads/.*"), re.compile(r"/health")] - - # Test matching cases - assert matches_any("/api/uploads/file.txt", patterns) - assert matches_any("/health", patterns) - assert matches_any("/api/uploads/subfolder/image.png", patterns) - - # Test non-matching cases - assert not matches_any("/api/downloads/file.txt", patterns) - assert not matches_any("/status", patterns) diff --git a/harp_apps/storage/tests/test_worker.py b/harp_apps/storage/tests/test_worker.py index 01863c40..684f522e 100644 --- a/harp_apps/storage/tests/test_worker.py +++ b/harp_apps/storage/tests/test_worker.py @@ -12,6 +12,20 @@ from harp_apps.storage.worker import SKIP_REQUEST_PAYLOAD_STORAGE, StorageAsyncWorkerQueue +rules_config = { + "billiv":{ + "POST /api/uploads/*": { + "on_request": "transaction.markers.add('skip-request-payload-storage')" + }, + "GET /health": { + "on_request": "transaction.markers.add('skip-request-payload-storage')" + }, + "GET /api/downloads/*": { + "on_request": "transaction.markers.add('skip-response-payload-storage')" + } + } +} + class TestStorageAsyncWorkerQueue(StorageTestFixtureMixin, DispatcherTestFixtureMixin): def create_worker( self, @@ -20,14 +34,18 @@ def create_worker( sql_storage: IStorage, blob_storage: IBlobStorage, ) -> StorageAsyncWorkerQueue: - worker = StorageAsyncWorkerQueue( - engine, - sql_storage, - blob_storage, - skip_storage_requests_payload=[r"/api/uploads/.*", r"/health"], - skip_storage_responses_payload=[r"/api/downloads/.*"], - ) + # Create rules engine + from harp_apps.rules.models.rulesets import RuleSet + from harp_apps.rules.subscribers import RulesSubscriber + + ruleset = RuleSet() + ruleset.add(rules_config) # Add the rules configuration + rules_subscriber = RulesSubscriber(ruleset) + + worker = StorageAsyncWorkerQueue(engine, sql_storage, blob_storage) worker.register_events(dispatcher) + rules_subscriber.subscribe(dispatcher) # Subscribe rules to events + return worker async def test_skip_request_payload_storage_when_path_matches_pattern( diff --git a/harp_apps/storage/worker.py b/harp_apps/storage/worker.py index 8c608138..9773e2f3 100644 --- a/harp_apps/storage/worker.py +++ b/harp_apps/storage/worker.py @@ -41,18 +41,12 @@ def __init__( engine: AsyncEngine, storage: IStorage, blob_storage: IBlobStorage, - skip_storage_requests_payload: list[str] | None = None, - skip_storage_responses_payload: list[str] | None = None, ): self.engine = engine self.storage = storage self.blob_storage = blob_storage super().__init__() self.seen = set() - skip_storage_requests_payload = skip_storage_requests_payload or [] - skip_storage_responses_payload = skip_storage_responses_payload or [] - self.skip_storage_requests_payload = [re.compile(pattern) for pattern in skip_storage_requests_payload] - self.skip_storage_responses_payload = [re.compile(pattern) for pattern in skip_storage_responses_payload] def register_events(self, dispatcher: IAsyncEventDispatcher): dispatcher.add_listener(EVENT_TRANSACTION_STARTED, self.on_transaction_started) @@ -85,6 +79,8 @@ async def create_transaction(): await self.push(create_transaction) async def on_transaction_message(self, event: HttpMessageEvent): + if event.message.kind == "request": + logger.info(f"URL {event.message.path}") if SKIP_STORAGE in event.transaction.markers or self.pressure >= 3: return @@ -104,14 +100,15 @@ async def on_transaction_message(self, event: HttpMessageEvent): await self.push(partial(self.blob_storage.put, headers_blob), ignore_errors=True) message_data["headers"] = headers_blob.id - if event.message.kind == "request": - if matches_any(event.message.path, self.skip_storage_requests_payload): - logger.info(f"Not storing request payload data for {serializer.summary}") - event.transaction.markers.add(SKIP_REQUEST_PAYLOAD_STORAGE) - if matches_any(event.message.path, self.skip_storage_responses_payload): - logger.info(f"Not storing response payload data for {serializer.summary}") - event.transaction.markers.add(SKIP_RESPONSE_PAYLOAD_STORAGE) - + # if event.message.kind == "request": + # if matches_any(event.message.path, self.skip_storage_requests_payload): + # logger.info(f"Not storing request payload data for {serializer.summary}") + # event.transaction.markers.add(SKIP_REQUEST_PAYLOAD_STORAGE) + # if matches_any(event.message.path, self.skip_storage_responses_payload): + # logger.info(f"Not storing response payload data for {serializer.summary}") + # event.transaction.markers.add(SKIP_RESPONSE_PAYLOAD_STORAGE) + logger.info(f"Kind {event.message.kind}") + logger.info(f"Markers {event.transaction.markers}") if event.message.kind == "request" and SKIP_REQUEST_PAYLOAD_STORAGE in event.transaction.markers: logger.debug("Instructed not to store payload data for request") elif event.message.kind == "response" and SKIP_RESPONSE_PAYLOAD_STORAGE in event.transaction.markers: