Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions harp/_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def _get_logging_level(name: Optional[str], *, default="warning"):
"harp_apps": {"level": _get_logging_level("harp")},
"harp_apps.http_client": {"level": _get_logging_level("http_client")},
"harp_apps.proxy": {"level": _get_logging_level("proxy")},
"harp_apps.storage": {"level": _get_logging_level("storage")},
"httpcore": {"level": _get_logging_level("http_core")},
"httpx": {"level": _get_logging_level("http_core")},
"hypercorn.access": {"level": _get_logging_level("http", default="info")},
Expand Down
2 changes: 2 additions & 0 deletions harp_apps/dashboard/tests/controllers/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ async def test_get_settings_sqlalchemy_secure(self, client: ASGICommunicator, bl
"migrate": True,
"redis": None,
"url": RE(escape("postgresql+asyncpg://test:***@") + r".*"),
"skip_storage_requests_payload": [],
"skip_storage_responses_payload": [],
},
}

Expand Down
116 changes: 116 additions & 0 deletions harp_apps/storage/tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from unittest.mock import AsyncMock

from sqlalchemy.ext.asyncio import AsyncEngine
from whistle import IAsyncEventDispatcher

from harp.utils.testing.mixins.controllers import _create_request
from harp_apps.proxy.events import HttpMessageEvent
from harp_apps.proxy.tests.with_storage.test_controllers_http_proxy import DispatcherTestFixtureMixin
from harp_apps.storage.services import SqlStorage
from harp_apps.storage.types import IBlobStorage, IStorage
from harp_apps.storage.utils.testing.mixins import StorageTestFixtureMixin
from harp_apps.storage.worker import SKIP_REQUEST_PAYLOAD_STORAGE, StorageAsyncWorkerQueue


rules_config = {
"billiv":{
"POST /api/uploads/*": {
"on_request": "transaction.markers.add('skip-request-payload-storage')"
},
"GET /health": {
"on_request": "transaction.markers.add('skip-request-payload-storage')"
},
"GET /api/downloads/*": {
"on_request": "transaction.markers.add('skip-response-payload-storage')"
}
}
}

class TestStorageAsyncWorkerQueue(StorageTestFixtureMixin, DispatcherTestFixtureMixin):
def create_worker(
self,
dispatcher: IAsyncEventDispatcher,
engine: AsyncEngine,
sql_storage: IStorage,
blob_storage: IBlobStorage,
) -> StorageAsyncWorkerQueue:
# Create rules engine
from harp_apps.rules.models.rulesets import RuleSet
from harp_apps.rules.subscribers import RulesSubscriber

ruleset = RuleSet()
ruleset.add(rules_config) # Add the rules configuration
rules_subscriber = RulesSubscriber(ruleset)

worker = StorageAsyncWorkerQueue(engine, sql_storage, blob_storage)
worker.register_events(dispatcher)
rules_subscriber.subscribe(dispatcher) # Subscribe rules to events

return worker

async def test_skip_request_payload_storage_when_path_matches_pattern(
self,
sql_engine,
blob_storage,
sql_storage: SqlStorage,
dispatcher: IAsyncEventDispatcher,
):
worker = self.create_worker(dispatcher, sql_engine, sql_storage, blob_storage)

transaction = await self.create_transaction(
sql_storage,
endpoint="/api/uploads/file.txt",
)

transaction.markers = set()

request = await _create_request(
b"file content",
method="POST",
path="/api/uploads/file.txt",
)

event = HttpMessageEvent(transaction, request)

# Mock the push method to avoid actual background processing
worker.push = AsyncMock()

# Process the message
await worker.on_transaction_message(event)

# Verify that the skip marker was added to the transaction
assert SKIP_REQUEST_PAYLOAD_STORAGE in transaction.markers

async def test_skip_response_payload_storage_when_marked(
self,
sql_engine,
blob_storage,
sql_storage: SqlStorage,
dispatcher: IAsyncEventDispatcher,
):
worker = self.create_worker(dispatcher, sql_engine, sql_storage, blob_storage)
transaction = await self.create_transaction(
sql_storage,
endpoint="/api/downloads/data.json",
)

transaction.markers = set()

# Create a mock response message
request = await _create_request(
b"file content",
method="POST",
path="/api/downloads/data.json",
)

event = HttpMessageEvent(transaction, request)

worker.push = AsyncMock()
worker.blob_storage.put = AsyncMock()

# Process the message
await worker.on_transaction_message(event)

# Verify that blob storage put was not called for the body
# (it should only be called for headers, not body due to the skip marker)
assert worker.blob_storage.put.call_count <= 1 # Only headers, not body
45 changes: 39 additions & 6 deletions harp_apps/storage/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from datetime import UTC
from functools import partial
from math import log10
Expand All @@ -6,6 +7,7 @@
from sqlalchemy.ext.asyncio import AsyncEngine
from whistle import IAsyncEventDispatcher

from harp import get_logger
from harp.http import get_serializer_for
from harp.models import Blob
from harp.utils.background import AsyncWorkerQueue
Expand All @@ -21,10 +23,25 @@
from harp_apps.storage.types import IBlobStorage, IStorage

SKIP_STORAGE = "skip-storage"
SKIP_REQUEST_PAYLOAD_STORAGE = "skip-request-payload-storage"
SKIP_RESPONSE_PAYLOAD_STORAGE = "skip-response-payload-storage"


logger = get_logger("harp_apps.storage.worker")


def matches_any(text, compiled_patterns):
"""Check if any compiled pattern matches the text"""
return any(pattern.search(text) for pattern in compiled_patterns)


class StorageAsyncWorkerQueue(AsyncWorkerQueue):
def __init__(self, engine: AsyncEngine, storage: IStorage, blob_storage: IBlobStorage):
def __init__(
self,
engine: AsyncEngine,
storage: IStorage,
blob_storage: IBlobStorage,
):
self.engine = engine
self.storage = storage
self.blob_storage = blob_storage
Expand Down Expand Up @@ -62,6 +79,8 @@ async def create_transaction():
await self.push(create_transaction)

async def on_transaction_message(self, event: HttpMessageEvent):
if event.message.kind == "request":
logger.info(f"URL {event.message.path}")
if SKIP_STORAGE in event.transaction.markers or self.pressure >= 3:
return

Expand All @@ -81,11 +100,25 @@ async def on_transaction_message(self, event: HttpMessageEvent):
await self.push(partial(self.blob_storage.put, headers_blob), ignore_errors=True)
message_data["headers"] = headers_blob.id

# Eventually store the content blob (later)
if self.pressure <= 1:
content_blob = Blob.from_data(serializer.body, content_type=event.message.headers.get("content-type"))
await self.push(partial(self.blob_storage.put, content_blob), ignore_errors=True)
message_data["body"] = content_blob.id
# if event.message.kind == "request":
# if matches_any(event.message.path, self.skip_storage_requests_payload):
# logger.info(f"Not storing request payload data for {serializer.summary}")
# event.transaction.markers.add(SKIP_REQUEST_PAYLOAD_STORAGE)
# if matches_any(event.message.path, self.skip_storage_responses_payload):
# logger.info(f"Not storing response payload data for {serializer.summary}")
# event.transaction.markers.add(SKIP_RESPONSE_PAYLOAD_STORAGE)
logger.info(f"Kind {event.message.kind}")
logger.info(f"Markers {event.transaction.markers}")
if event.message.kind == "request" and SKIP_REQUEST_PAYLOAD_STORAGE in event.transaction.markers:
logger.debug("Instructed not to store payload data for request")
elif event.message.kind == "response" and SKIP_RESPONSE_PAYLOAD_STORAGE in event.transaction.markers:
logger.debug("Instructed not to store payload data for response")
else:
# Eventually store the content blob (later)
if self.pressure <= 1:
content_blob = Blob.from_data(serializer.body, content_type=event.message.headers.get("content-type"))
await self.push(partial(self.blob_storage.put, content_blob), ignore_errors=True)
message_data["body"] = content_blob.id

async def create_message():
async with self.engine.connect() as conn:
Expand Down