Skip to content
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
42efb93
wip
rubenthoms Oct 17, 2025
b13ae37
Backend implementations for persistence
rubenthoms Oct 17, 2025
deaa37c
Updated lock file
rubenthoms Oct 17, 2025
e5c52a9
fix: linting
rubenthoms Oct 17, 2025
348a5d7
Adding nosec comments to local dev code
rubenthoms Oct 17, 2025
9ade9ff
fix: mypy
rubenthoms Oct 17, 2025
3cc435a
fix: linting etc.
rubenthoms Oct 17, 2025
5e1877e
fix: regenerated frontend api files
rubenthoms Oct 19, 2025
126fca0
Adjusted generated api files
rubenthoms Oct 20, 2025
0000411
Removed snapshot previews
rubenthoms Oct 20, 2025
bcb6c6c
Removed SPA adjustments in nginx config
rubenthoms Oct 20, 2025
8be97b4
Merge branch 'persistence/main' into persistence/backend-database-access
rubenthoms Oct 20, 2025
2af12fa
Adjusted after review comments
rubenthoms Oct 20, 2025
bc856f7
First adjustments after review discussion
rubenthoms Oct 23, 2025
2009a18
wip
rubenthoms Oct 23, 2025
e56e498
Refactoring
rubenthoms Oct 24, 2025
90daf5b
Multiple bug fixes
rubenthoms Oct 24, 2025
00f2148
Removed unused imports
rubenthoms Oct 24, 2025
ebc509c
Final fixes
rubenthoms Oct 24, 2025
fed88fa
Re-generated api files, adjusted comments
rubenthoms Oct 27, 2025
2d79071
Further simplifications
rubenthoms Oct 27, 2025
23c985f
Documentation for endpoints
rubenthoms Oct 27, 2025
e15e22a
`FilterFactory` to ensure filter fields exist
rubenthoms Oct 27, 2025
dce4044
Re-generated api files
rubenthoms Oct 27, 2025
c021409
Removal of `limit` and `offset` from stores' `get_many_async`
rubenthoms Oct 27, 2025
169c8ba
Adjustments to comments
rubenthoms Oct 27, 2025
52c715d
Missing @no-cache decorator and removal of summary fields
rubenthoms Oct 27, 2025
1bfaad8
Multiple adjustments after review
rubenthoms Oct 27, 2025
64f0c05
Update backend_py/primary/primary/routers/persistence/router.py
rubenthoms Oct 27, 2025
742cbfd
Update backend_py/primary/primary/routers/persistence/router.py
rubenthoms Oct 27, 2025
b73484d
Update frontend/src/api/autogen/sdk.gen.ts
rubenthoms Oct 27, 2025
7cea0de
Update frontend/src/api/autogen/sdk.gen.ts
rubenthoms Oct 27, 2025
2955bab
Adjustments according to review comments
rubenthoms Oct 28, 2025
96d7ae1
Merge branch 'persistence/main' into persistence/backend-database-access
rubenthoms Oct 28, 2025
babcb4e
Marked util function as private
rubenthoms Oct 28, 2025
2770917
Re-generated api files
rubenthoms Oct 28, 2025
65dec0e
Comments for database document models
rubenthoms Oct 28, 2025
a07a523
formatting
rubenthoms Oct 28, 2025
dac5fff
Linting
rubenthoms Oct 28, 2025
fe366dc
Giving fake connection string for prod DB to skip local setup
rubenthoms Oct 28, 2025
2f886c3
formatting
rubenthoms Oct 28, 2025
608a875
Unused import
rubenthoms Oct 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
786 changes: 784 additions & 2 deletions backend_py/primary/poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions backend_py/primary/primary/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@
DEFAULT_STALE_WHILE_REVALIDATE = 3600 * 24 # 24 hour
REDIS_USER_SESSION_URL = "redis://redis-user-session:6379"
REDIS_CACHE_URL = "redis://redis-cache:6379"

COSMOS_DB_PROD_CONNECTION_STRING = os.environ.get("WEBVIZ_DB_CONNECTION_STRING", None)
COSMOS_DB_EMULATOR_URI = "https://host.docker.internal:8081/"
COSMOS_DB_EMULATOR_KEY = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
7 changes: 7 additions & 0 deletions backend_py/primary/primary/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from starsessions.stores.redis import RedisStore
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware

from primary.persistence.setup_local_database import maybe_setup_local_database
from primary.auth.auth_helper import AuthHelper
from primary.auth.enforce_logged_in_middleware import EnforceLoggedInMiddleware
from primary.middleware.add_process_time_to_server_timing_middleware import AddProcessTimeToServerTimingMiddleware
Expand All @@ -34,6 +35,7 @@
from primary.routers.vfp.router import router as vfp_router
from primary.routers.well.router import router as well_router
from primary.routers.well_completions.router import router as well_completions_router
from primary.routers.persistence.router import router as persistence_router
from primary.services.sumo_access.sumo_fingerprinter import SumoFingerprinterFactory
from primary.services.utils.httpx_async_client_wrapper import HTTPX_ASYNC_CLIENT_WRAPPER
from primary.services.utils.task_meta_tracker import TaskMetaTrackerFactory
Expand All @@ -57,12 +59,16 @@
logging.getLogger("primary.routers.grid3d").setLevel(logging.DEBUG)
logging.getLogger("primary.routers.dev").setLevel(logging.DEBUG)
logging.getLogger("primary.routers.surface").setLevel(logging.DEBUG)
logging.getLogger("primary.persistence").setLevel(logging.DEBUG)
# logging.getLogger("primary.auth").setLevel(logging.DEBUG)
# logging.getLogger("uvicorn.error").setLevel(logging.DEBUG)
# logging.getLogger("uvicorn.access").setLevel(logging.DEBUG)

LOGGER = logging.getLogger(__name__)

# Setup Cosmos DB emulator database if running locally
maybe_setup_local_database()


def custom_generate_unique_id(route: APIRoute) -> str:
return f"{route.name}"
Expand Down Expand Up @@ -115,6 +121,7 @@ async def lifespan_handler_async(_fastapi_app: FastAPI) -> AsyncIterator[None]:
app.include_router(rft_router, prefix="/rft", tags=["rft"])
app.include_router(vfp_router, prefix="/vfp", tags=["vfp"])
app.include_router(dev_router, prefix="/dev", tags=["dev"], include_in_schema=False)
app.include_router(persistence_router, prefix="/persistence", tags=["persistence"])

auth_helper = AuthHelper()
app.include_router(auth_helper.router)
Expand Down
Empty file.
38 changes: 38 additions & 0 deletions backend_py/primary/primary/persistence/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import hashlib
from typing import Any, TypeVar, cast

from azure.core.async_paging import AsyncPageIterator, AsyncItemPaged


T = TypeVar("T")


# Utility function to hash a JSON string using SHA-256
# This function mimics the behavior of TextEncoder in JavaScript, which encodes strings to
# UTF-8 before hashing. The output is a hexadecimal string representation of the hash.
#
# It is important that this function returns the same hash as the JavaScript version
def hash_session_content_string(string: str) -> str:
data = string.encode("utf-8") # Matches TextEncoder behavior
hash_bytes = hashlib.sha256(data).digest()
hash_hex = "".join(f"{b:02x}" for b in hash_bytes)
return hash_hex


def cast_query_params(params: list[dict[str, Any]]) -> list[dict[str, object]]:
return cast(list[dict[str, object]], params)


def query_by_page(query_iterable: AsyncItemPaged[T], page_token: str | None) -> AsyncPageIterator[T]:
"""
Cosmosdb's `by_page` returns a more narrow subtype than anticipated. This makes
extra's like `.continuation_token` not show up in returned value's type.

This util function correctly casts the return value to the expected type
"""
pager = query_iterable.by_page(page_token)

if not isinstance(pager, AsyncPageIterator):
raise TypeError("Expected AsyncPageIterator from query_items_by_page_token_async")

return cast(AsyncPageIterator[T], pager)
Empty file.
234 changes: 234 additions & 0 deletions backend_py/primary/primary/persistence/cosmosdb/cosmos_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import logging
from types import TracebackType
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar
from azure.cosmos.aio import ContainerProxy
from azure.cosmos import exceptions
from pydantic import BaseModel, ValidationError

from primary.persistence._utils import query_by_page

from .cosmos_database import CosmosDatabase
from .exceptions import (
DatabaseAccessError,
DatabaseAccessIntegrityError,
DatabaseAccessNotFoundError,
DatabaseAccessConflictError,
DatabaseAccessPreconditionFailedError,
DatabaseAccessPermissionError,
DatabaseAccessThrottledError,
DatabaseAccessTransportError,
)


logger = logging.getLogger(__name__)

T = TypeVar("T", bound=BaseModel)


class CosmosContainer(Generic[T]):
"""
CosmosContainer provides access to a specific container in a Cosmos DB database.
It allows for querying, inserting, updating, and deleting items in the container.
It uses a Pydantic model for item validation and serialization.

It is designed to be used with asynchronous context management, ensuring proper resource cleanup.
"""

def __init__(
self,
database_name: str,
container_name: str,
database: CosmosDatabase,
container: ContainerProxy,
validation_model: Type[T],
):
self._database_name = database_name
self._container_name = container_name
self._database = database
self._container = container
self._validation_model: Type[T] = validation_model

@classmethod
def create(cls, database_name: str, container_name: str, validation_model: Type[T]) -> "CosmosContainer[T]":
"""Create a CosmosContainer instance."""
database = CosmosDatabase.create(database_name)
container = database.get_container(container_name)
logger.debug("[CosmosContainer] Created for container '%s' in database '%s'", container_name, database_name)
return cls(database_name, container_name, database, container, validation_model)

async def __aenter__(self) -> "CosmosContainer[T]":
return self

async def __aexit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
await self.close_async()

def _make_exception(self, op: str, exc: exceptions.CosmosHttpResponseError) -> DatabaseAccessError:
"""Map Cosmos error to a data-access exception with rich context and re-raise."""
headers = getattr(exc, "headers", {}) or {}
status = getattr(exc, "status_code", None)
# Cosmos uses x-ms-substatus for more detail (e.g., 1002)
substatus_raw = headers.get("x-ms-substatus")
try:
substatus = int(substatus_raw) if substatus_raw is not None else None
except ValueError:
substatus = None
activity_id = headers.get("x-ms-activity-id")

msg = (
f"[{op}] Cosmos error on {self._database_name}/{self._container_name}: "
f"{getattr(exc, 'message', None) or str(exc)} "
f"(status={status}, substatus={substatus}, activity_id={activity_id})"
)

# Log with stack trace
logger.exception(
"[CosmosContainer] %s",
msg,
extra={
"database": self._database_name,
"container": self._container_name,
"operation": op,
"status_code": status,
"sub_status": substatus,
"activity_id": activity_id,
},
)

if status == 404:
return DatabaseAccessNotFoundError(msg, status_code=status, sub_status=substatus, activity_id=activity_id)
if status == 409:
return DatabaseAccessConflictError(msg, status_code=status, sub_status=substatus, activity_id=activity_id)
if status == 412:
return DatabaseAccessPreconditionFailedError(
msg, status_code=status, sub_status=substatus, activity_id=activity_id
)
if status in (401, 403):
return DatabaseAccessPermissionError(msg, status_code=status, sub_status=substatus, activity_id=activity_id)
if status in (429, 503):
# Typically retryable
return DatabaseAccessThrottledError(msg, status_code=status, sub_status=substatus, activity_id=activity_id)

# Fallback
return DatabaseAccessTransportError(msg, status_code=status, sub_status=substatus, activity_id=activity_id)

async def query_items_async(self, query: str, parameters: Optional[List[Dict[str, object]]] = None) -> List[T]:
try:
items_iterable = self._container.query_items(
query=query,
parameters=parameters or [],
)
items = [item async for item in items_iterable]
return [self._validation_model.model_validate(item) for item in items]
except ValidationError as validation_error:
logger.error("[CosmosContainer] Validation error in '%s': %s", self._container_name, validation_error)
raise
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception("query_items_async", error)

async def query_items_by_page_token_async(
self,
query: str,
page_token: str | None,
parameters: Optional[List[Dict[str, object]]] = None,
page_size: Optional[int] = None,
) -> tuple[list[T], str | None]:
query_iterable = self._container.query_items(query=query, parameters=parameters, max_item_count=page_size)

pager = query_by_page(query_iterable, page_token)
page = await anext(pager)

token = pager.continuation_token

items = [self._validation_model.model_validate(item) async for item in page]

return (items, token)

async def get_item_async(self, item_id: str, partition_key: str) -> T:
try:
item = await self._container.read_item(item=item_id, partition_key=partition_key)
return self._validation_model.model_validate(item)
except ValidationError as validation_error:
logger.error("[CosmosContainer] Validation error in '%s': %s", self._container_name, validation_error)
raise
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception("get_item_async", error) from error

async def insert_item_async(self, item: T) -> str:
try:
body: Dict[str, Any] = self._validation_model.model_validate(item).model_dump(by_alias=True, mode="json")
result = await self._container.create_item(body)
return result["id"]
except ValidationError as validation_error:
logger.error("[CosmosContainer] Validation error in '%s': %s", self._container_name, validation_error)
raise
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception("insert_item_async", error) from error

async def delete_item_async(self, item_id: str, partition_key: str) -> None:
try:
await self._container.delete_item(item=item_id, partition_key=partition_key)
logger.debug("[CosmosContainer] Deleted item '%s' from '%s'", item_id, self._container_name)
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception("delete_item_async", error) from error

async def update_item_async(self, item_id: str, updated_item: T) -> None:
try:
validated = self._validation_model.model_validate(updated_item).model_dump(by_alias=True, mode="json")

if validated.get("id") and validated["id"] != item_id:
raise DatabaseAccessIntegrityError(f"id mismatch: payload id {validated['id']} != path id {item_id}")

await self._container.replace_item(item=item_id, body=validated)

logger.debug("[CosmosContainer] Updated item '%s' in '%s'", item_id, self._container_name)
except ValidationError as validation_error:
logger.error("[CosmosContainer] Validation error in '%s': %s", self._container_name, validation_error)
raise
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception("update_item_async", error) from error

async def patch_item_async(
self,
item_id: str,
partition_key: str,
patch_operations: list[dict],
*,
filter_predicate: str | None = None,
) -> None:
try:
await self._container.patch_item(
item=item_id,
partition_key=partition_key,
patch_operations=patch_operations,
filter_predicate=filter_predicate,
no_response=True,
)
logger.debug("[CosmosContainer] Patched item '%s' in '%s'", item_id, self._container_name)
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception("patch_item_async", error) from error

async def query_projection_async(
self,
query: str,
parameters: Optional[List[dict]] = None,
) -> List[Dict[str, Any]]:
"""
Run a query that returns raw dicts (no Pydantic validation), useful for
projections like SELECT c.id, c.partitionKey.
"""
try:
items_iterable = self._container.query_items(
query=query,
parameters=parameters or [],
)
return [item async for item in items_iterable]
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception("query_projection_async", error) from error

async def close_async(self) -> None:
"""Close the container."""
if self._database:
logger.debug("[CosmosContainer] Closing container '%s/%s'", self._database_name, self._container_name)
await self._database.close_async()
61 changes: 61 additions & 0 deletions backend_py/primary/primary/persistence/cosmosdb/cosmos_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from types import TracebackType
from typing import Optional, Type
from azure.cosmos.aio import CosmosClient, ContainerProxy
from azure.cosmos import exceptions

from primary.config import COSMOS_DB_PROD_CONNECTION_STRING, COSMOS_DB_EMULATOR_URI, COSMOS_DB_EMULATOR_KEY
from primary.services.service_exceptions import Service, ServiceRequestError


class CosmosDatabase:
"""
CosmosDatabase provides access to a Cosmos DB database.
It allows for getting container proxies within the database.

It is designed to be used with asynchronous context management, ensuring proper resource cleanup.
"""

def __init__(self, database_name: str, client: CosmosClient):
self._database_name = database_name
self._client = client
self._database = self._client.get_database_client(database_name)

@classmethod
def create(cls, database_name: str) -> "CosmosDatabase":
if COSMOS_DB_PROD_CONNECTION_STRING:
client = CosmosClient.from_connection_string(COSMOS_DB_PROD_CONNECTION_STRING)
elif COSMOS_DB_EMULATOR_URI and COSMOS_DB_EMULATOR_KEY:
client = CosmosClient(COSMOS_DB_EMULATOR_URI, COSMOS_DB_EMULATOR_KEY, connection_verify=False)
else:
raise ServiceRequestError(
"No Cosmos DB production connection string or emulator URI/key provided.", Service.DATABASE
)
self = cls(database_name, client)
return self

async def __aenter__(self) -> "CosmosDatabase":
return self

async def __aexit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
await self.close_async()

def _make_exception(self, message: str) -> ServiceRequestError:
return ServiceRequestError(f"CosmosDatabase ({self._database_name}): {message}", Service.DATABASE)

def get_container(self, container_name: str) -> ContainerProxy:
if not self._client or not self._database:
raise self._make_exception("Database client is not initialized or already closed.")
if not container_name or not isinstance(container_name, str):
raise self._make_exception("Invalid container name.")

try:
container = self._database.get_container_client(container_name)
return container
except exceptions.CosmosHttpResponseError as error:
raise self._make_exception(f"Unable to access container '{container_name}': {error.message}") from error

async def close_async(self) -> None:
if self._client:
await self._client.close()
Loading