Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a31a91e
Started on ensemble fingerprinting endpoint
sigurdp Oct 9, 2025
207f168
Removed option to filter on class name in SumoFingerprinter, adjusted…
sigurdp Oct 9, 2025
446acfd
Merge branch 'main' into ensemble-fingerprinting
sigurdp Oct 15, 2025
40bf64a
Typing
sigurdp Oct 15, 2025
1d5ea9f
wip
rubenthoms Oct 15, 2025
23578ca
wip
rubenthoms Oct 15, 2025
c98fea0
wip
rubenthoms Oct 15, 2025
6768b2a
Minor improvement
rubenthoms Oct 16, 2025
f8376ff
fix: linting
rubenthoms Oct 16, 2025
94e5f3a
Adjusted api code
rubenthoms Oct 16, 2025
c4a056d
Naming and file locations
rubenthoms Oct 16, 2025
1b1001e
Merge remote-tracking branch 'equinor/main' into ensemble-fingerprinting
rubenthoms Oct 16, 2025
83d8f03
fix: merge conflicts after merging in main
rubenthoms Oct 16, 2025
eb4f876
Final adjustments
rubenthoms Oct 16, 2025
d0bfcd7
fix: renaming not committed
rubenthoms Oct 16, 2025
81c4989
fix: import
rubenthoms Oct 16, 2025
305bd6d
wip
rubenthoms Oct 16, 2025
0c92fb3
fix: support null input for ensemble idents
rubenthoms Oct 16, 2025
2021fb4
Merge branch 'ensemble-fingerprinting' into add-cache-busting-to-modules
rubenthoms Oct 16, 2025
abf7e77
fix: refactoring after discussion
rubenthoms Oct 16, 2025
a90d01e
Updated schema
sigurdp Oct 16, 2025
c86c038
Merge branch 'ensemble-fingerprinting' into add-cache-busting-to-modules
rubenthoms Oct 16, 2025
2c812e9
wip
rubenthoms Oct 16, 2025
cb80a9c
Removed unused endpoint and supporting functions/models
sigurdp Oct 16, 2025
2c432c1
Finished all queries
rubenthoms Oct 16, 2025
67aa96a
Merge remote-tracking branch 'sigurdp/ensemble-fingerprinting' into a…
rubenthoms Oct 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions backend_py/primary/primary/routers/explore/router.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
from typing import List
import asyncio
import logging
from typing import List, Coroutine, Any


from fastapi import APIRouter, Depends, Path, Query, Body

from fastapi import APIRouter, Depends, Path, Query, Body, Response

from primary.auth.auth_helper import AuthHelper
from primary.middleware.add_browser_cache import no_cache
from primary.services.sumo_access.case_inspector import CaseInspector
from primary.services.sumo_access.sumo_inspector import SumoInspector
from primary.services.sumo_access.sumo_fingerprinter import get_sumo_fingerprinter_for_user
from primary.services.utils.authenticated_user import AuthenticatedUser
from primary.middleware.add_browser_cache import no_cache
from primary.utils.response_perf_metrics import ResponsePerfMetrics

from . import schemas

LOGGER = logging.getLogger(__name__)

router = APIRouter()


Expand Down Expand Up @@ -67,7 +70,6 @@ async def get_cases(


@router.get("/cases/{case_uuid}/ensembles/{ensemble_name}")
@no_cache
async def get_ensemble_details(
authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user),
case_uuid: str = Path(description="Sumo case uuid"),
Expand All @@ -80,7 +82,6 @@ async def get_ensemble_details(
realizations = await case_inspector.get_realizations_in_ensemble_async(ensemble_name)
field_identifiers = await case_inspector.get_field_identifiers_async()
stratigraphic_column_identifier = await case_inspector.get_stratigraphic_column_identifier_async()
timestamps = await case_inspector.get_ensemble_timestamps_async(ensemble_name)
standard_results = await case_inspector.get_standard_results_in_ensemble_async(ensemble_name)

if len(field_identifiers) != 1:
Expand All @@ -93,41 +94,42 @@ async def get_ensemble_details(
realizations=realizations,
fieldIdentifier=field_identifiers[0],
stratigraphicColumnIdentifier=stratigraphic_column_identifier,
timestamps=schemas.EnsembleTimestamps(
caseUpdatedAtUtcMs=timestamps.case_updated_at_utc_ms,
dataUpdatedAtUtcMs=timestamps.data_updated_at_utc_ms,
),
standardResults=standard_results,
)


@router.post("/ensembles/get_timestamps")
@no_cache
async def post_get_timestamps_for_ensembles(
@router.post("/ensembles/refresh_fingerprints")
async def post_refresh_fingerprints_for_ensembles(
# fmt:off
response: Response,
authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user),
ensemble_idents: list[schemas.EnsembleIdent] = Body(
description="A list of ensemble idents (aka; case uuid and ensemble name)"
),
) -> list[schemas.EnsembleTimestamps]:
ensemble_idents: list[schemas.EnsembleIdent] = Body(description="Ensembles to refresh and get fingerprints for, specified as pairs of caseUuid,ensembleName"),
# fmt:on
) -> list[str | None]:
"""
Fetches ensemble timestamps for a list of ensembles
Retrieves freshly calculated fingerprints for a list of ensembles
"""
return await asyncio.gather(
*[_get_ensemble_timestamps_for_ident_async(authenticated_user, ident) for ident in ensemble_idents]
)
perf_metrics = ResponsePerfMetrics(response)

# For how long should we cache the calculated fingerprints?
# Given that currently we will have the frontend call this endpoint every 5 minutes, a TTL of 5 minutes seems reasonable
fingerprinter = get_sumo_fingerprinter_for_user(authenticated_user=authenticated_user, cache_ttl_s=5 * 60)

async def _get_ensemble_timestamps_for_ident_async(
authenticated_user: AuthenticatedUser, ensemble_ident: schemas.EnsembleIdent
) -> schemas.EnsembleTimestamps:
case_uuid = ensemble_ident.caseUuid
ensemble_name = ensemble_ident.ensembleName
coros_arr: list[Coroutine[Any, Any, str]] = []
for ident in ensemble_idents:
coros_arr.append(fingerprinter.calc_and_store_ensemble_fp_async(ident.caseUuid, ident.ensembleName))

case_inspector = CaseInspector.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid)
raw_results = await asyncio.gather(*coros_arr, return_exceptions=True)
perf_metrics.record_lap("calc-and-write-fingerprints")

timestamps = await case_inspector.get_ensemble_timestamps_async(ensemble_name)
ret_fingerprints: list[str | None] = []
for res in raw_results:
if isinstance(res, str):
ret_fingerprints.append(res)
else:
LOGGER.warning(f"Unable to calculate fingerprint for ensemble {ident}: {res}")
ret_fingerprints.append(None)

return schemas.EnsembleTimestamps(
caseUpdatedAtUtcMs=timestamps.case_updated_at_utc_ms,
dataUpdatedAtUtcMs=timestamps.data_updated_at_utc_ms,
)
LOGGER.debug(f"Calculated and refreshed {len(ret_fingerprints)} fingerprints in: {perf_metrics.to_string()}")

return ret_fingerprints
1 change: 0 additions & 1 deletion backend_py/primary/primary/routers/explore/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ class EnsembleDetails(BaseModel):
caseUuid: str
realizations: Sequence[int]
stratigraphicColumnIdentifier: str
timestamps: EnsembleTimestamps
standardResults: Sequence[str]
11 changes: 5 additions & 6 deletions backend_py/primary/primary/routers/surface/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
async def determine_surf_task_fingerprint_async(
authenticated_user: AuthenticatedUser, addr: StatisticalSurfaceAddress
) -> str:
# For how long should we cache the ensemble fingerprint?
# The TTL for fingerprints should be aligned with the polling interval we use for busting the client side browser cache.
# Actually, the backend code that provides data for the the client side browser caching should probably go via the very same SumoFingerprinter cache.
fingerprinter = get_sumo_fingerprinter_for_user(authenticated_user=authenticated_user, cache_ttl_s=30)
# For how long should we cache the ensemble fingerprint here?
# Note that the explore endpoint that calculates/refreshes fingerprints sets a TTL of 5 minutes.
# Be a bit defensive here and set a TTL of 2 minutes.
fingerprinter = get_sumo_fingerprinter_for_user(authenticated_user=authenticated_user, cache_ttl_s=2 * 60)

# Note that we limit the ensemble portion of the fingerprint to surfaces
ensemble_fp = await fingerprinter.get_or_calc_ensemble_fp_async(addr.case_uuid, addr.ensemble_name, "surface")
ensemble_fp = await fingerprinter.get_or_calc_ensemble_fp_async(addr.case_uuid, addr.ensemble_name)

# Note that we include the ensemble fingerprint in the task hash/fingerprint
task_fp = sha256((addr.to_addr_str() + ensemble_fp).encode()).hexdigest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from fmu.sumo.explorer.objects import Case, SearchContext

from webviz_pkg.core_utils.perf_metrics import PerfMetrics
from webviz_pkg.core_utils.timestamp_utils import iso_str_to_timestamp_utc_ms
from primary.services.service_exceptions import (
Service,
NoDataError,
Expand All @@ -19,15 +18,9 @@
LOGGER = logging.getLogger(__name__)


class EnsembleTimestamps(BaseModel):
case_updated_at_utc_ms: int
data_updated_at_utc_ms: int


class EnsembleInfo(BaseModel):
name: str
realization_count: int
timestamps: EnsembleTimestamps


class CaseInspector:
Expand All @@ -49,33 +42,6 @@ async def _get_or_create_case_context_async(self) -> Case:

return self._cached_case_context

async def _get_case_updated_timestamp_async(self) -> int:
case = await self._get_or_create_case_context_async()
timestamp_str = case.metadata["_sumo"]["timestamp"] # Returns a datetime string.
return iso_str_to_timestamp_utc_ms(timestamp_str)

async def _get_ensemble_data_update_timestamp_async(self, ensemble_name: str) -> int:
timer = PerfMetrics()
case_context = await self._get_or_create_case_context_async()

search_context = SearchContext(self._sumo_client).filter(
uuid=case_context.uuid, ensemble=ensemble_name, realization=True
)

data_timestamp_int = await search_context.metrics.max_async("_sumo.timestamp")

timer.record_lap("aggregate_data_timestamps")
LOGGER.debug(f"get_last_data_change_timestamp_async {timer.to_string()}")

return data_timestamp_int or -1

async def get_ensemble_timestamps_async(self, ensemble_name: str) -> EnsembleTimestamps:
case_updated_at = await self._get_case_updated_timestamp_async()
# Data is occasionally None. This is likely due to data errors, so we just default to 0 (since an ensemble with bad data probably won't be used)
data_updated_at = await self._get_ensemble_data_update_timestamp_async(ensemble_name) or 0

return EnsembleTimestamps(case_updated_at_utc_ms=case_updated_at, data_updated_at_utc_ms=data_updated_at)

async def get_case_name_async(self) -> str:
"""Get name of the case"""
case = await self._get_or_create_case_context_async()
Expand All @@ -86,9 +52,7 @@ async def _get_ensemble_info_async(self, ensemble_uuid: str) -> EnsembleInfo:
ensemble_obj = await search_context.get_ensemble_by_uuid_async(ensemble_uuid)
realization_count = len(await ensemble_obj.realizations_async)

ensemble_timestamps = await self.get_ensemble_timestamps_async(ensemble_obj.name)

return EnsembleInfo(name=ensemble_obj.name, realization_count=realization_count, timestamps=ensemble_timestamps)
return EnsembleInfo(name=ensemble_obj.name, realization_count=realization_count)

async def get_ensembles_async(self) -> list[EnsembleInfo]:
"""Get list of ensembles for a case"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,52 @@ def __init__(self, authenticated_user: AuthenticatedUser, redis_client: redis.Re
self._redis_client = redis_client
self._cache_ttl_s = cache_ttl_s

async def get_or_calc_ensemble_fp_async(self, case_uuid: str, ensemble_name: str, class_name: str | None) -> str:
async def get_or_calc_ensemble_fp_async(self, case_uuid: str, ensemble_name: str) -> str:
"""
Get from cache or calculate the fingerprint string for contents of an ensemble.
See calc_ensemble_fp_async() for details.
"""
perf_metrics = PerfMetrics()

redis_key = self._make_full_redis_key(case_uuid=case_uuid, ensemble_name=ensemble_name, class_name=class_name)
redis_key = self._make_full_redis_key(case_uuid=case_uuid, ensemble_name=ensemble_name)

cached_fp = await self._redis_client.get(redis_key)
perf_metrics.record_lap("redis-get")
if cached_fp is not None:
# LOGGER.debug(f"get_or_calc_ensemble_fp_async() - from cache in: {perf_metrics.to_string()} [{cached_fp=}]")
LOGGER.debug(f"get_or_calc_ensemble_fp_async() - from cache in: {perf_metrics.to_string()} [{cached_fp=}]")
return cached_fp

new_fp = await calc_ensemble_fp_async(self._sumo_client, case_uuid, ensemble_name, class_name)
new_fp = await calc_ensemble_fp_async(self._sumo_client, case_uuid, ensemble_name, None)
perf_metrics.record_lap("calc-fp")

# Schedule the Redis set call, but don't await it
asyncio.create_task(self._redis_client.set(name=redis_key, value=new_fp, ex=self._cache_ttl_s))
perf_metrics.record_lap("schedule-redis-set")

# LOGGER.debug(f"get_or_calc_ensemble_fp_async() - calculated in: {perf_metrics.to_string()} [{new_fp=}]")
LOGGER.debug(f"get_or_calc_ensemble_fp_async() - calculated in: {perf_metrics.to_string()} [{new_fp=}]")
return new_fp

def _make_full_redis_key(self, case_uuid: str, ensemble_name: str, class_name: str | None) -> str:
return (
f"{_REDIS_KEY_PREFIX}:user:{self._user_id}:case:{case_uuid}:ens:{ensemble_name}:class:{class_name or 'ALL'}"
)
async def calc_and_store_ensemble_fp_async(self, case_uuid: str, ensemble_name: str) -> str:
"""
Calculate and unconditionally store fingerprint string for contents of an ensemble, also returning the result.
This method does not check the cache first, it will always calculate a new fingerprint and write it to the cache.
See calc_ensemble_fp_async() for details.
"""
perf_metrics = PerfMetrics()

redis_key = self._make_full_redis_key(case_uuid=case_uuid, ensemble_name=ensemble_name)

new_fp = await calc_ensemble_fp_async(self._sumo_client, case_uuid, ensemble_name, None)
perf_metrics.record_lap("calc-fp")

await self._redis_client.set(name=redis_key, value=new_fp, ex=self._cache_ttl_s)
perf_metrics.record_lap("redis-set")

LOGGER.debug(f"calc_and_store_ensemble_fp_async() - calculated in: {perf_metrics.to_string()} [{new_fp=}]")
return new_fp

def _make_full_redis_key(self, case_uuid: str, ensemble_name: str) -> str:
return f"{_REDIS_KEY_PREFIX}:user:{self._user_id}:case:{case_uuid}:ens:{ensemble_name}"


def get_sumo_fingerprinter_for_user(authenticated_user: AuthenticatedUser, cache_ttl_s: int) -> SumoFingerprinter:
Expand Down
2 changes: 1 addition & 1 deletion frontend/open-api/cachebusting-plugin/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type { Config } from "./types";
export const defaultConfig: Plugin.Config<Config> = {
name: "cache-busting",
output: "types",
cacheKey: "t",
cacheKey: "zCacheBust",
// No need to define this
_handlerLegacy: () => {},
_handler: handler,
Expand Down
2 changes: 1 addition & 1 deletion frontend/open-api/cachebusting-plugin/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const handler: Plugin.Handler<Config> = ({ context, plugin }) => {
location: "query",
explode: false,
name: cacheKey,
schema: { type: "number" },
schema: { type: "string" },
style: "form",
};
}
Expand Down
2 changes: 1 addition & 1 deletion frontend/open-api/cachebusting-plugin/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface Config {

/**
* The query parameter to use for caching
* @default "t"
* @default "f"
*/
cacheKey?: string | IR.ParameterObject;
}
34 changes: 18 additions & 16 deletions frontend/src/api/autogen/@tanstack/react-query.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
getFields,
getCases,
getEnsembleDetails,
postGetTimestampsForEnsembles,
postRefreshFingerprintsForEnsembles,
getVectorList,
getDeltaEnsembleVectorList,
getRealizationsVectorData,
Expand Down Expand Up @@ -84,9 +84,9 @@ import type {
GetFieldsData_api,
GetCasesData_api,
GetEnsembleDetailsData_api,
PostGetTimestampsForEnsemblesData_api,
PostGetTimestampsForEnsemblesError_api,
PostGetTimestampsForEnsemblesResponse_api,
PostRefreshFingerprintsForEnsemblesData_api,
PostRefreshFingerprintsForEnsemblesError_api,
PostRefreshFingerprintsForEnsemblesResponse_api,
GetVectorListData_api,
GetDeltaEnsembleVectorListData_api,
GetRealizationsVectorDataData_api,
Expand Down Expand Up @@ -258,35 +258,37 @@ export const getEnsembleDetailsOptions = (options: Options<GetEnsembleDetailsDat
});
};

export const postGetTimestampsForEnsemblesQueryKey = (options: Options<PostGetTimestampsForEnsemblesData_api>) => [
createQueryKey("postGetTimestampsForEnsembles", options),
];
export const postRefreshFingerprintsForEnsemblesQueryKey = (
options: Options<PostRefreshFingerprintsForEnsemblesData_api>,
) => [createQueryKey("postRefreshFingerprintsForEnsembles", options)];

export const postGetTimestampsForEnsemblesOptions = (options: Options<PostGetTimestampsForEnsemblesData_api>) => {
export const postRefreshFingerprintsForEnsemblesOptions = (
options: Options<PostRefreshFingerprintsForEnsemblesData_api>,
) => {
return queryOptions({
queryFn: async ({ queryKey, signal }) => {
const { data } = await postGetTimestampsForEnsembles({
const { data } = await postRefreshFingerprintsForEnsembles({
...options,
...queryKey[0],
signal,
throwOnError: true,
});
return data;
},
queryKey: postGetTimestampsForEnsemblesQueryKey(options),
queryKey: postRefreshFingerprintsForEnsemblesQueryKey(options),
});
};

export const postGetTimestampsForEnsemblesMutation = (
options?: Partial<Options<PostGetTimestampsForEnsemblesData_api>>,
export const postRefreshFingerprintsForEnsemblesMutation = (
options?: Partial<Options<PostRefreshFingerprintsForEnsemblesData_api>>,
) => {
const mutationOptions: UseMutationOptions<
PostGetTimestampsForEnsemblesResponse_api,
AxiosError<PostGetTimestampsForEnsemblesError_api>,
Options<PostGetTimestampsForEnsemblesData_api>
PostRefreshFingerprintsForEnsemblesResponse_api,
AxiosError<PostRefreshFingerprintsForEnsemblesError_api>,
Options<PostRefreshFingerprintsForEnsemblesData_api>
> = {
mutationFn: async (localOptions) => {
const { data } = await postGetTimestampsForEnsembles({
const { data } = await postRefreshFingerprintsForEnsembles({
...options,
...localOptions,
throwOnError: true,
Expand Down
Loading
Loading