-
Notifications
You must be signed in to change notification settings - Fork 11
[Persistence]: Backend implementation #1211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
rubenthoms
merged 42 commits into
equinor:persistence/main
from
rubenthoms:persistence/backend-database-access
Oct 28, 2025
Merged
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
42efb93
wip
rubenthoms b13ae37
Backend implementations for persistence
rubenthoms deaa37c
Updated lock file
rubenthoms e5c52a9
fix: linting
rubenthoms 348a5d7
Adding nosec comments to local dev code
rubenthoms 9ade9ff
fix: mypy
rubenthoms 3cc435a
fix: linting etc.
rubenthoms 5e1877e
fix: regenerated frontend api files
rubenthoms 126fca0
Adjusted generated api files
rubenthoms 0000411
Removed snapshot previews
rubenthoms bcb6c6c
Removed SPA adjustments in nginx config
rubenthoms 8be97b4
Merge branch 'persistence/main' into persistence/backend-database-access
rubenthoms 2af12fa
Adjusted after review comments
rubenthoms bc856f7
First adjustments after review discussion
rubenthoms 2009a18
wip
rubenthoms e56e498
Refactoring
rubenthoms 90daf5b
Multiple bug fixes
rubenthoms 00f2148
Removed unused imports
rubenthoms ebc509c
Final fixes
rubenthoms fed88fa
Re-generated api files, adjusted comments
rubenthoms 2d79071
Further simplifications
rubenthoms 23c985f
Documentation for endpoints
rubenthoms e15e22a
`FilterFactory` to ensure filter fields exist
rubenthoms dce4044
Re-generated api files
rubenthoms c021409
Removal of `limit` and `offset` from stores' `get_many_async`
rubenthoms 169c8ba
Adjustments to comments
rubenthoms 52c715d
Missing @no-cache decorator and removal of summary fields
rubenthoms 1bfaad8
Multiple adjustments after review
rubenthoms 64f0c05
Update backend_py/primary/primary/routers/persistence/router.py
rubenthoms 742cbfd
Update backend_py/primary/primary/routers/persistence/router.py
rubenthoms b73484d
Update frontend/src/api/autogen/sdk.gen.ts
rubenthoms 7cea0de
Update frontend/src/api/autogen/sdk.gen.ts
rubenthoms 2955bab
Adjustments according to review comments
rubenthoms 96d7ae1
Merge branch 'persistence/main' into persistence/backend-database-access
rubenthoms babcb4e
Marked util function as private
rubenthoms 2770917
Re-generated api files
rubenthoms 65dec0e
Comments for database document models
rubenthoms a07a523
formatting
rubenthoms dac5fff
Linting
rubenthoms fe366dc
Giving fake connection string for prod DB to skip local setup
rubenthoms 2f886c3
formatting
rubenthoms 608a875
Unused import
rubenthoms File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| import hashlib | ||
| from typing import Any, TypeVar, cast | ||
|
|
||
| from azure.core.async_paging import AsyncPageIterator, AsyncItemPaged | ||
|
|
||
|
|
||
| T = TypeVar("T") | ||
|
|
||
|
|
||
| # Utility function to hash a JSON string using SHA-256 | ||
| # This function mimics the behavior of TextEncoder in JavaScript, which encodes strings to | ||
| # UTF-8 before hashing. The output is a hexadecimal string representation of the hash. | ||
| # | ||
| # It is important that this function returns the same hash as the JavaScript version | ||
| def hash_session_content_string(string: str) -> str: | ||
| data = string.encode("utf-8") # Matches TextEncoder behavior | ||
| hash_bytes = hashlib.sha256(data).digest() | ||
| hash_hex = "".join(f"{b:02x}" for b in hash_bytes) | ||
| return hash_hex | ||
|
|
||
|
|
||
| def cast_query_params(params: list[dict[str, Any]]) -> list[dict[str, object]]: | ||
| return cast(list[dict[str, object]], params) | ||
|
|
||
|
|
||
| def query_by_page(query_iterable: AsyncItemPaged[T], page_token: str | None) -> AsyncPageIterator[T]: | ||
| """ | ||
| Cosmosdb's `by_page` returns a more narrow subtype than anticipated. This makes | ||
| extra's like `.continuation_token` not show up in returned value's type. | ||
|
|
||
| This util function correctly casts the return value to the expected type | ||
| """ | ||
| pager = query_iterable.by_page(page_token) | ||
|
|
||
| if not isinstance(pager, AsyncPageIterator): | ||
| raise TypeError("Expected AsyncPageIterator from query_items_by_page_token_async") | ||
|
|
||
| return cast(AsyncPageIterator[T], pager) |
Empty file.
234 changes: 234 additions & 0 deletions
234
backend_py/primary/primary/persistence/cosmosdb/cosmos_container.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,234 @@ | ||
| import logging | ||
| from types import TracebackType | ||
| from typing import Any, Dict, Generic, List, Optional, Type, TypeVar | ||
| from azure.cosmos.aio import ContainerProxy | ||
| from azure.cosmos import exceptions | ||
| from pydantic import BaseModel, ValidationError | ||
|
|
||
| from primary.persistence._utils import query_by_page | ||
|
|
||
| from .cosmos_database import CosmosDatabase | ||
| from .exceptions import ( | ||
| DatabaseAccessError, | ||
| DatabaseAccessIntegrityError, | ||
| DatabaseAccessNotFoundError, | ||
| DatabaseAccessConflictError, | ||
| DatabaseAccessPreconditionFailedError, | ||
| DatabaseAccessPermissionError, | ||
| DatabaseAccessThrottledError, | ||
| DatabaseAccessTransportError, | ||
| ) | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| T = TypeVar("T", bound=BaseModel) | ||
|
|
||
|
|
||
| class CosmosContainer(Generic[T]): | ||
| """ | ||
| CosmosContainer provides access to a specific container in a Cosmos DB database. | ||
| It allows for querying, inserting, updating, and deleting items in the container. | ||
| It uses a Pydantic model for item validation and serialization. | ||
|
|
||
| It is designed to be used with asynchronous context management, ensuring proper resource cleanup. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| database_name: str, | ||
| container_name: str, | ||
| database: CosmosDatabase, | ||
| container: ContainerProxy, | ||
| validation_model: Type[T], | ||
| ): | ||
| self._database_name = database_name | ||
| self._container_name = container_name | ||
| self._database = database | ||
| self._container = container | ||
| self._validation_model: Type[T] = validation_model | ||
|
|
||
| @classmethod | ||
| def create(cls, database_name: str, container_name: str, validation_model: Type[T]) -> "CosmosContainer[T]": | ||
| """Create a CosmosContainer instance.""" | ||
| database = CosmosDatabase.create(database_name) | ||
| container = database.get_container(container_name) | ||
| logger.debug("[CosmosContainer] Created for container '%s' in database '%s'", container_name, database_name) | ||
| return cls(database_name, container_name, database, container, validation_model) | ||
|
|
||
| async def __aenter__(self) -> "CosmosContainer[T]": | ||
| return self | ||
|
|
||
| async def __aexit__( | ||
| self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] | ||
| ) -> None: | ||
| await self.close_async() | ||
|
|
||
| def _make_exception(self, op: str, exc: exceptions.CosmosHttpResponseError) -> DatabaseAccessError: | ||
| """Map Cosmos error to a data-access exception with rich context and re-raise.""" | ||
| headers = getattr(exc, "headers", {}) or {} | ||
| status = getattr(exc, "status_code", None) | ||
| # Cosmos uses x-ms-substatus for more detail (e.g., 1002) | ||
| substatus_raw = headers.get("x-ms-substatus") | ||
| try: | ||
| substatus = int(substatus_raw) if substatus_raw is not None else None | ||
| except ValueError: | ||
| substatus = None | ||
| activity_id = headers.get("x-ms-activity-id") | ||
|
|
||
| msg = ( | ||
| f"[{op}] Cosmos error on {self._database_name}/{self._container_name}: " | ||
| f"{getattr(exc, 'message', None) or str(exc)} " | ||
| f"(status={status}, substatus={substatus}, activity_id={activity_id})" | ||
| ) | ||
|
|
||
| # Log with stack trace | ||
| logger.exception( | ||
| "[CosmosContainer] %s", | ||
| msg, | ||
| extra={ | ||
| "database": self._database_name, | ||
| "container": self._container_name, | ||
| "operation": op, | ||
| "status_code": status, | ||
| "sub_status": substatus, | ||
| "activity_id": activity_id, | ||
| }, | ||
| ) | ||
|
|
||
| if status == 404: | ||
| return DatabaseAccessNotFoundError(msg, status_code=status, sub_status=substatus, activity_id=activity_id) | ||
| if status == 409: | ||
| return DatabaseAccessConflictError(msg, status_code=status, sub_status=substatus, activity_id=activity_id) | ||
| if status == 412: | ||
| return DatabaseAccessPreconditionFailedError( | ||
| msg, status_code=status, sub_status=substatus, activity_id=activity_id | ||
| ) | ||
| if status in (401, 403): | ||
| return DatabaseAccessPermissionError(msg, status_code=status, sub_status=substatus, activity_id=activity_id) | ||
| if status in (429, 503): | ||
| # Typically retryable | ||
| return DatabaseAccessThrottledError(msg, status_code=status, sub_status=substatus, activity_id=activity_id) | ||
|
|
||
| # Fallback | ||
| return DatabaseAccessTransportError(msg, status_code=status, sub_status=substatus, activity_id=activity_id) | ||
|
|
||
| async def query_items_async(self, query: str, parameters: Optional[List[Dict[str, object]]] = None) -> List[T]: | ||
| try: | ||
| items_iterable = self._container.query_items( | ||
| query=query, | ||
| parameters=parameters or [], | ||
| ) | ||
| items = [item async for item in items_iterable] | ||
| return [self._validation_model.model_validate(item) for item in items] | ||
| except ValidationError as validation_error: | ||
| logger.error("[CosmosContainer] Validation error in '%s': %s", self._container_name, validation_error) | ||
| raise | ||
| except exceptions.CosmosHttpResponseError as error: | ||
| raise self._make_exception("query_items_async", error) | ||
|
|
||
| async def query_items_by_page_token_async( | ||
| self, | ||
| query: str, | ||
| page_token: str | None, | ||
| parameters: Optional[List[Dict[str, object]]] = None, | ||
| page_size: Optional[int] = None, | ||
| ) -> tuple[list[T], str | None]: | ||
| query_iterable = self._container.query_items(query=query, parameters=parameters, max_item_count=page_size) | ||
|
|
||
| pager = query_by_page(query_iterable, page_token) | ||
| page = await anext(pager) | ||
|
|
||
| token = pager.continuation_token | ||
|
|
||
| items = [self._validation_model.model_validate(item) async for item in page] | ||
|
|
||
| return (items, token) | ||
|
|
||
| async def get_item_async(self, item_id: str, partition_key: str) -> T: | ||
| try: | ||
| item = await self._container.read_item(item=item_id, partition_key=partition_key) | ||
| return self._validation_model.model_validate(item) | ||
| except ValidationError as validation_error: | ||
| logger.error("[CosmosContainer] Validation error in '%s': %s", self._container_name, validation_error) | ||
| raise | ||
| except exceptions.CosmosHttpResponseError as error: | ||
| raise self._make_exception("get_item_async", error) from error | ||
|
|
||
| async def insert_item_async(self, item: T) -> str: | ||
| try: | ||
| body: Dict[str, Any] = self._validation_model.model_validate(item).model_dump(by_alias=True, mode="json") | ||
| result = await self._container.create_item(body) | ||
| return result["id"] | ||
| except ValidationError as validation_error: | ||
| logger.error("[CosmosContainer] Validation error in '%s': %s", self._container_name, validation_error) | ||
| raise | ||
| except exceptions.CosmosHttpResponseError as error: | ||
| raise self._make_exception("insert_item_async", error) from error | ||
|
|
||
| async def delete_item_async(self, item_id: str, partition_key: str) -> None: | ||
| try: | ||
| await self._container.delete_item(item=item_id, partition_key=partition_key) | ||
| logger.debug("[CosmosContainer] Deleted item '%s' from '%s'", item_id, self._container_name) | ||
| except exceptions.CosmosHttpResponseError as error: | ||
| raise self._make_exception("delete_item_async", error) from error | ||
|
|
||
| async def update_item_async(self, item_id: str, updated_item: T) -> None: | ||
| try: | ||
| validated = self._validation_model.model_validate(updated_item).model_dump(by_alias=True, mode="json") | ||
|
|
||
| if validated.get("id") and validated["id"] != item_id: | ||
| raise DatabaseAccessIntegrityError(f"id mismatch: payload id {validated['id']} != path id {item_id}") | ||
|
|
||
| await self._container.replace_item(item=item_id, body=validated) | ||
|
|
||
| logger.debug("[CosmosContainer] Updated item '%s' in '%s'", item_id, self._container_name) | ||
| except ValidationError as validation_error: | ||
| logger.error("[CosmosContainer] Validation error in '%s': %s", self._container_name, validation_error) | ||
| raise | ||
| except exceptions.CosmosHttpResponseError as error: | ||
| raise self._make_exception("update_item_async", error) from error | ||
|
|
||
| async def patch_item_async( | ||
| self, | ||
| item_id: str, | ||
| partition_key: str, | ||
| patch_operations: list[dict], | ||
| *, | ||
| filter_predicate: str | None = None, | ||
| ) -> None: | ||
| try: | ||
| await self._container.patch_item( | ||
| item=item_id, | ||
| partition_key=partition_key, | ||
| patch_operations=patch_operations, | ||
| filter_predicate=filter_predicate, | ||
| no_response=True, | ||
| ) | ||
| logger.debug("[CosmosContainer] Patched item '%s' in '%s'", item_id, self._container_name) | ||
| except exceptions.CosmosHttpResponseError as error: | ||
| raise self._make_exception("patch_item_async", error) from error | ||
|
|
||
| async def query_projection_async( | ||
| self, | ||
| query: str, | ||
| parameters: Optional[List[dict]] = None, | ||
| ) -> List[Dict[str, Any]]: | ||
| """ | ||
| Run a query that returns raw dicts (no Pydantic validation), useful for | ||
| projections like SELECT c.id, c.partitionKey. | ||
| """ | ||
| try: | ||
| items_iterable = self._container.query_items( | ||
| query=query, | ||
| parameters=parameters or [], | ||
| ) | ||
| return [item async for item in items_iterable] | ||
| except exceptions.CosmosHttpResponseError as error: | ||
| raise self._make_exception("query_projection_async", error) from error | ||
|
|
||
| async def close_async(self) -> None: | ||
| """Close the container.""" | ||
| if self._database: | ||
| logger.debug("[CosmosContainer] Closing container '%s/%s'", self._database_name, self._container_name) | ||
| await self._database.close_async() |
61 changes: 61 additions & 0 deletions
61
backend_py/primary/primary/persistence/cosmosdb/cosmos_database.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| from types import TracebackType | ||
| from typing import Optional, Type | ||
| from azure.cosmos.aio import CosmosClient, ContainerProxy | ||
| from azure.cosmos import exceptions | ||
|
|
||
| from primary.config import COSMOS_DB_PROD_CONNECTION_STRING, COSMOS_DB_EMULATOR_URI, COSMOS_DB_EMULATOR_KEY | ||
| from primary.services.service_exceptions import Service, ServiceRequestError | ||
|
|
||
|
|
||
| class CosmosDatabase: | ||
| """ | ||
| CosmosDatabase provides access to a Cosmos DB database. | ||
| It allows for getting container proxies within the database. | ||
|
|
||
| It is designed to be used with asynchronous context management, ensuring proper resource cleanup. | ||
| """ | ||
|
|
||
| def __init__(self, database_name: str, client: CosmosClient): | ||
| self._database_name = database_name | ||
| self._client = client | ||
| self._database = self._client.get_database_client(database_name) | ||
|
|
||
| @classmethod | ||
| def create(cls, database_name: str) -> "CosmosDatabase": | ||
| if COSMOS_DB_PROD_CONNECTION_STRING: | ||
| client = CosmosClient.from_connection_string(COSMOS_DB_PROD_CONNECTION_STRING) | ||
| elif COSMOS_DB_EMULATOR_URI and COSMOS_DB_EMULATOR_KEY: | ||
| client = CosmosClient(COSMOS_DB_EMULATOR_URI, COSMOS_DB_EMULATOR_KEY, connection_verify=False) | ||
| else: | ||
| raise ServiceRequestError( | ||
| "No Cosmos DB production connection string or emulator URI/key provided.", Service.DATABASE | ||
| ) | ||
| self = cls(database_name, client) | ||
| return self | ||
|
|
||
| async def __aenter__(self) -> "CosmosDatabase": | ||
| return self | ||
|
|
||
| async def __aexit__( | ||
| self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] | ||
| ) -> None: | ||
| await self.close_async() | ||
|
|
||
| def _make_exception(self, message: str) -> ServiceRequestError: | ||
| return ServiceRequestError(f"CosmosDatabase ({self._database_name}): {message}", Service.DATABASE) | ||
|
|
||
| def get_container(self, container_name: str) -> ContainerProxy: | ||
| if not self._client or not self._database: | ||
| raise self._make_exception("Database client is not initialized or already closed.") | ||
| if not container_name or not isinstance(container_name, str): | ||
| raise self._make_exception("Invalid container name.") | ||
|
|
||
| try: | ||
| container = self._database.get_container_client(container_name) | ||
| return container | ||
| except exceptions.CosmosHttpResponseError as error: | ||
| raise self._make_exception(f"Unable to access container '{container_name}': {error.message}") from error | ||
|
|
||
| async def close_async(self) -> None: | ||
| if self._client: | ||
| await self._client.close() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.