Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
42efb93
wip
rubenthoms Oct 17, 2025
b13ae37
Backend implementations for persistence
rubenthoms Oct 17, 2025
deaa37c
Updated lock file
rubenthoms Oct 17, 2025
e5c52a9
fix: linting
rubenthoms Oct 17, 2025
348a5d7
Adding nosec comments to local dev code
rubenthoms Oct 17, 2025
9ade9ff
fix: mypy
rubenthoms Oct 17, 2025
3cc435a
fix: linting etc.
rubenthoms Oct 17, 2025
5e1877e
fix: regenerated frontend api files
rubenthoms Oct 19, 2025
126fca0
Adjusted generated api files
rubenthoms Oct 20, 2025
0000411
Removed snapshot previews
rubenthoms Oct 20, 2025
bcb6c6c
Removed SPA adjustments in nginx config
rubenthoms Oct 20, 2025
8be97b4
Merge branch 'persistence/main' into persistence/backend-database-access
rubenthoms Oct 20, 2025
2af12fa
Adjusted after review comments
rubenthoms Oct 20, 2025
bc856f7
First adjustments after review discussion
rubenthoms Oct 23, 2025
2009a18
wip
rubenthoms Oct 23, 2025
e56e498
Refactoring
rubenthoms Oct 24, 2025
90daf5b
Multiple bug fixes
rubenthoms Oct 24, 2025
00f2148
Removed unused imports
rubenthoms Oct 24, 2025
ebc509c
Final fixes
rubenthoms Oct 24, 2025
fed88fa
Re-generated api files, adjusted comments
rubenthoms Oct 27, 2025
2d79071
Further simplifications
rubenthoms Oct 27, 2025
23c985f
Documentation for endpoints
rubenthoms Oct 27, 2025
e15e22a
`FilterFactory` to ensure filter fields exist
rubenthoms Oct 27, 2025
dce4044
Re-generated api files
rubenthoms Oct 27, 2025
c021409
Removal of `limit` and `offset` from stores' `get_many_async`
rubenthoms Oct 27, 2025
169c8ba
Adjustments to comments
rubenthoms Oct 27, 2025
52c715d
Missing @no-cache decorator and removal of summary fields
rubenthoms Oct 27, 2025
1bfaad8
Multiple adjustments after review
rubenthoms Oct 27, 2025
64f0c05
Update backend_py/primary/primary/routers/persistence/router.py
rubenthoms Oct 27, 2025
742cbfd
Update backend_py/primary/primary/routers/persistence/router.py
rubenthoms Oct 27, 2025
b73484d
Update frontend/src/api/autogen/sdk.gen.ts
rubenthoms Oct 27, 2025
7cea0de
Update frontend/src/api/autogen/sdk.gen.ts
rubenthoms Oct 27, 2025
2955bab
Adjustments according to review comments
rubenthoms Oct 28, 2025
96d7ae1
Merge branch 'persistence/main' into persistence/backend-database-access
rubenthoms Oct 28, 2025
babcb4e
Marked util function as private
rubenthoms Oct 28, 2025
2770917
Re-generated api files
rubenthoms Oct 28, 2025
65dec0e
Comments for database document models
rubenthoms Oct 28, 2025
a07a523
formatting
rubenthoms Oct 28, 2025
dac5fff
Linting
rubenthoms Oct 28, 2025
fe366dc
Giving fake connection string for prod DB to skip local setup
rubenthoms Oct 28, 2025
2f886c3
formatting
rubenthoms Oct 28, 2025
608a875
Unused import
rubenthoms Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend_py/primary/primary/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@
REDIS_CACHE_URL = "redis://redis-cache:6379"

COSMOS_DB_PROD_CONNECTION_STRING = os.environ.get("WEBVIZ_DB_CONNECTION_STRING", None)
COSMOS_DB_EMULATOR_URI = "https://host.docker.internal:8081/"
COSMOS_DB_EMULATOR_KEY = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"
COSMOS_DB_EMULATOR_URI = "https://cosmos-db-emulator:8081"
COSMOS_DB_EMULATOR_KEY = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
5 changes: 1 addition & 4 deletions backend_py/primary/primary/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@
logging.getLogger("primary.routers.grid3d").setLevel(logging.DEBUG)
logging.getLogger("primary.routers.dev").setLevel(logging.DEBUG)
logging.getLogger("primary.routers.surface").setLevel(logging.DEBUG)
logging.getLogger("primary.persistence.cosmosdb").setLevel(logging.DEBUG)
logging.getLogger("primary.persistence.session_store").setLevel(logging.DEBUG)
logging.getLogger("primary.persistence.snapshot_store").setLevel(logging.DEBUG)
logging.getLogger("primary.persistence.tasks").setLevel(logging.DEBUG)
logging.getLogger("primary.persistence").setLevel(logging.DEBUG)
# logging.getLogger("primary.auth").setLevel(logging.DEBUG)
# logging.getLogger("uvicorn.error").setLevel(logging.DEBUG)
# logging.getLogger("uvicorn.access").setLevel(logging.DEBUG)
Expand Down
26 changes: 23 additions & 3 deletions backend_py/primary/primary/persistence/_utils.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
import hashlib
from typing import Any, cast
from typing import Any, TypeVar, cast

from azure.core.async_paging import AsyncPageIterator, AsyncItemPaged


T = TypeVar("T")


# Utility function to hash a JSON string using SHA-256
# This function mimics the behavior of TextEncoder in JavaScript, which encodes strings to
# UTF-8 before hashing. The output is a hexadecimal string representation of the hash.
#
# It is important that this function returns the same hash as the JavaScript version
def hash_sha256(json_string: str) -> str:
data = json_string.encode("utf-8") # Matches TextEncoder behavior
def hash_session_content_string(string: str) -> str:
data = string.encode("utf-8") # Matches TextEncoder behavior
hash_bytes = hashlib.sha256(data).digest()
hash_hex = "".join(f"{b:02x}" for b in hash_bytes)
return hash_hex


def cast_query_params(params: list[dict[str, Any]]) -> list[dict[str, object]]:
return cast(list[dict[str, object]], params)


def query_by_page(query_iterable: AsyncItemPaged[T], page_token: str | None) -> AsyncPageIterator[T]:
"""
Cosmosdb's `by_page` returns a more narrow subtype than anticipated. This makes
extra's like `.continuation_token` not show up in returned value's type.

This util function correctly casts the return value to the expected type
"""
pager = query_iterable.by_page(page_token)

if not isinstance(pager, AsyncPageIterator):
raise TypeError("Expected AsyncPageIterator from query_items_by_page_token_async")

return cast(AsyncPageIterator[T], pager)
51 changes: 37 additions & 14 deletions backend_py/primary/primary/persistence/cosmosdb/cosmos_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
from azure.cosmos import exceptions
from pydantic import BaseModel, ValidationError

from primary.persistence._utils import query_by_page

from .cosmos_database import CosmosDatabase
from .exceptions import (
DatabaseAccessError,
DatabaseAccessIntegrityError,
DatabaseAccessNotFoundError,
DatabaseAccessConflictError,
DatabaseAccessPreconditionFailedError,
Expand All @@ -21,16 +24,16 @@

T = TypeVar("T", bound=BaseModel)

"""
CosmosContainer provides access to a specific container in a Cosmos DB database.
It allows for querying, inserting, updating, and deleting items in the container.
It uses a Pydantic model for item validation and serialization.

It is designed to be used with asynchronous context management, ensuring proper resource cleanup.
"""
class CosmosContainer(Generic[T]):
"""
CosmosContainer provides access to a specific container in a Cosmos DB database.
It allows for querying, inserting, updating, and deleting items in the container.
It uses a Pydantic model for item validation and serialization.

It is designed to be used with asynchronous context management, ensuring proper resource cleanup.
"""

class CosmosContainer(Generic[T]):
def __init__(
self,
database_name: str,
Expand All @@ -46,19 +49,21 @@ def __init__(
self._validation_model: Type[T] = validation_model

@classmethod
def create(cls, database_name: str, container_name: str, validation_model: Type[T]) -> "CosmosContainer[T]":
def create_instance(
cls, database_name: str, container_name: str, validation_model: Type[T]
) -> "CosmosContainer[T]":
"""Create a CosmosContainer instance."""
database = CosmosDatabase.create(database_name)
database = CosmosDatabase.create_instance(database_name)
container = database.get_container(container_name)
logger.debug("[CosmosContainer] Created for container '%s' in database '%s'", container_name, database_name)
return cls(database_name, container_name, database, container, validation_model)

async def __aenter__(self) -> "CosmosContainer[T]": # pylint: disable=C9001
async def __aenter__(self) -> "CosmosContainer[T]":
return self

async def __aexit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None: # pylint: disable=C9001
) -> None:
await self.close_async()

def _make_exception(self, op: str, exc: exceptions.CosmosHttpResponseError) -> DatabaseAccessError:
Expand Down Expand Up @@ -124,6 +129,24 @@ async def query_items_async(self, query: str, parameters: Optional[List[Dict[str
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception("query_items_async", error)

async def query_items_by_page_token_async(
self,
query: str,
page_token: str | None,
parameters: Optional[List[Dict[str, object]]] = None,
page_size: Optional[int] = None,
) -> tuple[list[T], str | None]:
query_iterable = self._container.query_items(query=query, parameters=parameters, max_item_count=page_size)

pager = query_by_page(query_iterable, page_token)
page = await anext(pager)

token = pager.continuation_token

items = [self._validation_model.model_validate(item) async for item in page]

return (items, token)

async def get_item_async(self, item_id: str, partition_key: str) -> T:
try:
item = await self._container.read_item(item=item_id, partition_key=partition_key)
Expand Down Expand Up @@ -152,14 +175,14 @@ async def delete_item_async(self, item_id: str, partition_key: str) -> None:
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception("delete_item_async", error) from error

async def update_item_async(self, item_id: str, partition_key: str, updated_item: T) -> None:
async def update_item_async(self, item_id: str, updated_item: T) -> None:
try:
validated = self._validation_model.model_validate(updated_item).model_dump(by_alias=True, mode="json")

if validated.get("id") and validated["id"] != item_id:
raise ValueError(f"id mismatch: payload id {validated['id']} != path id {item_id}")
raise DatabaseAccessIntegrityError(f"id mismatch: payload id {validated['id']} != path id {item_id}")

await self._container.replace_item(item=item_id, body=validated, partition_key=partition_key)
await self._container.replace_item(item=item_id, body=validated)

logger.debug("[CosmosContainer] Updated item '%s' in '%s'", item_id, self._container_name)
except ValidationError as validation_error:
Expand Down
13 changes: 10 additions & 3 deletions backend_py/primary/primary/persistence/cosmosdb/cosmos_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@


class CosmosDatabase:
"""
CosmosDatabase provides access to a Cosmos DB database.
It allows for getting container proxies within the database.

It is designed to be used with asynchronous context management, ensuring proper resource cleanup.
"""

def __init__(self, database_name: str, client: CosmosClient):
self._database_name = database_name
self._client = client
self._database = self._client.get_database_client(database_name)

@classmethod
def create(cls, database_name: str) -> "CosmosDatabase":
def create_instance(cls, database_name: str) -> "CosmosDatabase":
if COSMOS_DB_PROD_CONNECTION_STRING:
client = CosmosClient.from_connection_string(COSMOS_DB_PROD_CONNECTION_STRING)
elif COSMOS_DB_EMULATOR_URI and COSMOS_DB_EMULATOR_KEY:
Expand All @@ -26,12 +33,12 @@ def create(cls, database_name: str) -> "CosmosDatabase":
self = cls(database_name, client)
return self

async def __aenter__(self) -> "CosmosDatabase": # pylint: disable=C9001
async def __aenter__(self) -> "CosmosDatabase":
return self

async def __aexit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None: # pylint: disable=C9001
) -> None:
await self.close_async()

def _make_exception(self, message: str) -> ServiceRequestError:
Expand Down
16 changes: 4 additions & 12 deletions backend_py/primary/primary/persistence/cosmosdb/error_converter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Dict, NoReturn, Optional, Type
from typing import Dict, NoReturn, Type

from primary.services.service_exceptions import Service, ServiceRequestError

Expand All @@ -27,15 +27,12 @@

def convert_data_access_error_to_service_error(
err: DatabaseAccessError,
*,
context: Optional[str] = None,
messages: Optional[Dict[Type[DatabaseAccessError], str]] = None,
) -> ServiceRequestError:
"""
Convert a DatabaseAccess* error to a ServiceRequestError (without raising).
You can customize messages per exception type via the 'messages' dict.
"""
msgs = {**_DEFAULT_MESSAGES, **(messages or {})}
msgs = {**_DEFAULT_MESSAGES}

# Find the most specific message for the concrete type
msg = None
Expand All @@ -55,22 +52,17 @@ def convert_data_access_error_to_service_error(
if getattr(err, "activity_id", None):
details.append(f"activity_id={err.activity_id}")

prefix = f"{context}: " if context else ""
suffix = f" ({', '.join(details)})" if details else ""
message = f"{prefix}{msg}{suffix}"
message = f"{msg}{suffix}"

# Chain the original exception for traceback preservation
return ServiceRequestError(message, Service.DATABASE)


def raise_service_error_from_database_access(
err: DatabaseAccessError,
*,
context: Optional[str] = None,
messages: Optional[Dict[Type[DatabaseAccessError], str]] = None,
) -> NoReturn:
"""
Convert and raise immediately, chaining the original error.
"""
service_err = convert_data_access_error_to_service_error(err, context=context, messages=messages)
service_err = convert_data_access_error_to_service_error(err)
raise service_err from err
4 changes: 4 additions & 0 deletions backend_py/primary/primary/persistence/cosmosdb/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ class DatabaseAccessThrottledError(DatabaseAccessError):

class DatabaseAccessTransportError(DatabaseAccessError):
"""Other transport / HTTP errors."""


class DatabaseAccessIntegrityError(DatabaseAccessError):
"""Data integrity error."""
Loading