Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend_py/primary/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend_py/primary/primary/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
logging.getLogger("primary.routers.grid3d").setLevel(logging.DEBUG)
logging.getLogger("primary.routers.dev").setLevel(logging.DEBUG)
logging.getLogger("primary.routers.surface").setLevel(logging.DEBUG)
logging.getLogger("primary.services.user_session_manager").setLevel(logging.DEBUG)
# logging.getLogger("primary.auth").setLevel(logging.DEBUG)
# logging.getLogger("uvicorn.error").setLevel(logging.DEBUG)
# logging.getLogger("uvicorn.access").setLevel(logging.DEBUG)
Expand Down
23 changes: 18 additions & 5 deletions backend_py/primary/primary/routers/dev/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from webviz_pkg.core_utils.background_tasks import run_in_background_task

from primary.auth.auth_helper import AuthenticatedUser, AuthHelper
from primary.middleware.add_browser_cache import no_cache
from primary.services.user_session_manager.user_session_manager import UserSessionManager
from primary.services.user_session_manager.user_session_manager import UserComponent
from primary.services.user_session_manager.user_session_manager import _USER_SESSION_DEFS
Expand All @@ -32,6 +33,7 @@


@router.get("/provoke_error/{error_type}")
@no_cache
async def get_provoke_error(
# fmt:off
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
Expand Down Expand Up @@ -78,6 +80,7 @@ def _always_throws_error() -> None:


@router.get("/tasks/purge")
@no_cache
async def get_tasks_purge(
response: Response,
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
Expand All @@ -94,6 +97,7 @@ async def get_tasks_purge(


@router.get("/usersession/{user_component}/call")
@no_cache
async def get_usersession_call(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
user_component: Annotated[UserComponent, Path(description="User session component")],
Expand Down Expand Up @@ -131,6 +135,7 @@ async def get_usersession_call(


@router.get("/usersession/{user_component}/radixlist")
@no_cache
async def get_usersession_radixlist(user_component: UserComponent) -> list:
LOGGER.debug(f"usersession_radixlist() {user_component=}")

Expand All @@ -147,6 +152,7 @@ async def get_usersession_radixlist(user_component: UserComponent) -> list:


@router.get("/usersession/{user_component}/radixcreate")
@no_cache
async def get_usersession_radixcreate(user_component: UserComponent) -> str:
LOGGER.debug(f"usersession_radixcreate() {user_component=}")

Expand All @@ -172,6 +178,7 @@ async def get_usersession_radixcreate(user_component: UserComponent) -> str:


@router.get("/usersession/{user_component}/radixdelete")
@no_cache
async def get_usersession_radixdelete(user_component: UserComponent) -> str:
LOGGER.debug(f"usersession_radixdelete() {user_component=}")

Expand All @@ -184,6 +191,7 @@ async def get_usersession_radixdelete(user_component: UserComponent) -> str:


@router.get("/usersession/dirlist")
@no_cache
async def get_usersession_dirlist(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
user_component: UserComponent | None = None,
Expand All @@ -195,7 +203,7 @@ async def get_usersession_dirlist(
job_component_name = _USER_SESSION_DEFS[user_component].job_component_name

session_dir = UserSessionDirectory(authenticated_user.get_user_id())
session_info_arr = session_dir.get_session_info_arr(job_component_name)
session_info_arr = await session_dir.get_session_info_arr_async(job_component_name)

LOGGER.debug("======================")
for session_info in session_info_arr:
Expand All @@ -206,6 +214,7 @@ async def get_usersession_dirlist(


@router.get("/usersession/dirdel")
@no_cache
async def get_usersession_dirdel(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
user_component: UserComponent | None = None,
Expand All @@ -217,9 +226,9 @@ async def get_usersession_dirdel(
job_component_name = _USER_SESSION_DEFS[user_component].job_component_name

session_dir = UserSessionDirectory(authenticated_user.get_user_id())
session_dir.delete_session_info(job_component_name)
await session_dir.delete_session_info_async(job_component_name)

session_info_arr = session_dir.get_session_info_arr(None)
session_info_arr = await session_dir.get_session_info_arr_async(None)
LOGGER.debug("======================")
for session_info in session_info_arr:
LOGGER.debug(f"{session_info=}")
Expand All @@ -229,6 +238,7 @@ async def get_usersession_dirdel(


@router.get("/bgtask")
@no_cache
async def get_bgtask() -> str:
LOGGER.debug(f"bgtask() - start")

Expand All @@ -247,6 +257,7 @@ async def funcThatLogs(msg: str) -> None:


@router.get("/longtask/{duration_s}")
@no_cache
async def get_longtask(duration_s: int) -> str:
LOGGER.debug(f"get_longtask() {duration_s=} - start")

Expand All @@ -257,6 +268,7 @@ async def get_longtask(duration_s: int) -> str:


@router.get("/ri_surf")
@no_cache
async def get_ri_surf(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
) -> str:
Expand All @@ -272,7 +284,7 @@ async def get_ri_surf(

ijk_index_filter = IJKIndexFilter(min_i=0, max_i=0, min_j=0, max_j=0, min_k=0, max_k=0)

grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid)
grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid, None)
await grid_service.get_grid_geometry_async(ensemble_name, realization, grid_name, ijk_index_filter)
await grid_service.get_mapped_grid_properties_async(
ensemble_name, realization, grid_name, property_name, None, ijk_index_filter
Expand All @@ -282,6 +294,7 @@ async def get_ri_surf(


@router.get("/ri_isect")
@no_cache
async def get_ri_isect(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
) -> str:
Expand All @@ -308,7 +321,7 @@ async def get_ri_isect(
]
# fmt:on

grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid)
grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid, None)
await grid_service.get_polyline_intersection_async(
ensemble_name, realization, grid_name, property_name, None, xy_arr
)
Expand Down
74 changes: 71 additions & 3 deletions backend_py/primary/primary/routers/grid3d/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async def get_grid_models_info(
# pylint: disable=too-many-arguments
async def get_grid_surface(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
instance_str: Annotated[str, Query(description="Component instance string")],
case_uuid: Annotated[str, Query(description="Sumo case uuid")],
ensemble_name: Annotated[str, Query(description="Ensemble name")],
grid_name: Annotated[str, Query(description="Grid name")],
Expand All @@ -71,7 +72,7 @@ async def get_grid_surface(

perf_metrics = PerfMetrics()

grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid)
grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid, instance_str)
perf_metrics.record_lap("create-service")

ijk_index_filter = IJKIndexFilter(min_i=i_min, max_i=i_max, min_j=j_min, max_j=j_max, min_k=k_min, max_k=k_max)
Expand Down Expand Up @@ -107,6 +108,7 @@ async def get_grid_surface(
# pylint: disable=too-many-arguments
async def get_grid_parameter(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
instance_str: Annotated[str, Query(description="Component instance string")],
case_uuid: Annotated[str, Query(description="Sumo case uuid")],
ensemble_name: Annotated[str, Query(description="Ensemble name")],
grid_name: Annotated[str, Query(description="Grid name")],
Expand All @@ -128,7 +130,7 @@ async def get_grid_parameter(

ijk_index_filter = IJKIndexFilter(min_i=i_min, max_i=i_max, min_j=j_min, max_j=j_max, min_k=k_min, max_k=k_max)

grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid)
grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid, instance_str)
perf_metrics.record_lap("create-service")

mapped_grid_properties = await grid_service.get_mapped_grid_properties_async(
Expand Down Expand Up @@ -159,6 +161,7 @@ async def get_grid_parameter(
@router.post("/get_polyline_intersection")
async def post_get_polyline_intersection(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
instance_str: Annotated[str, Query(description="Component instance string")],
case_uuid: Annotated[str, Query(description="Sumo case uuid")],
ensemble_name: Annotated[str, Query(description="Ensemble name")],
grid_name: Annotated[str, Query(description="Grid name")],
Expand All @@ -171,7 +174,7 @@ async def post_get_polyline_intersection(
) -> PolylineIntersection:
perf_metrics = PerfMetrics()

grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid)
grid_service = await UserGrid3dService.create_async(authenticated_user, case_uuid, instance_str)
perf_metrics.record_lap("create-service")

polyline_intersection = await grid_service.get_polyline_intersection_async(
Expand All @@ -189,6 +192,71 @@ async def post_get_polyline_intersection(
return polyline_intersection


import asyncio
from fastapi import APIRouter, Depends, HTTPException, Query, Body, status
from primary.services.user_session_manager.user_session_manager import (
UserComponent,
UserSessionManager,
SessionRunState,
)
from primary.middleware.add_browser_cache import no_cache


@router.get("/status_of_user_service")
@no_cache
async def get_status_of_user_service(
# fmt:off
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
instance_str: Annotated[str | None, Query(description="Component instance string")],
# fmt:on
) -> str:
instance_str = instance_str or "DEFAULT"
session_manager = UserSessionManager(authenticated_user.get_user_id(), authenticated_user.get_username())
session_run_state = await session_manager.get_session_status_async(UserComponent.GRID3D_RI, instance_str)

# Sleep for a while to simulate a long-running operation
await asyncio.sleep(2)

if session_run_state is None:
return "NOT_RUNNING"

return session_run_state.value


@router.get("/kill_service")
@no_cache
async def get_kill_service(
# fmt:off
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
instance_str: Annotated[str | None, Query(description="Component instance string")],
# fmt:on
) -> str:

LOGGER.debug(f"Entering kill_service endpoint")

instance_str = instance_str or "DEFAULT"
session_manager = UserSessionManager(authenticated_user.get_user_id(), authenticated_user.get_username())
kill_ok = await session_manager.delete_session_async(UserComponent.GRID3D_RI, instance_str)

return "KILLED" if kill_ok else "NOT_KILLED"


@router.get("/start_service")
@no_cache
async def get_start_service(
# fmt:off
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
instance_str: Annotated[str | None, Query(description="Component instance string")],
# fmt:on
) -> str:

LOGGER.debug(f"Entering start_service endpoint")

session_manager = UserSessionManager(authenticated_user.get_user_id(), authenticated_user.get_username())

raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented")


def _hack_ensure_b64_property_array_is_float(
props_b64arr: B64FloatArray | B64IntArray, undefined_int_value: int | None
) -> B64FloatArray:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ def __init__(
self._include_inactive_cells = False

@classmethod
async def create_async(cls, authenticated_user: AuthenticatedUser, case_uuid: str) -> "UserGrid3dService":
async def create_async(cls, authenticated_user: AuthenticatedUser, case_uuid: str, component_instance_str: str | None) -> "UserGrid3dService":
perf_metrics = PerfMetrics()

component_instance_str = component_instance_str or "DEFAULT"
session_manager = UserSessionManager(authenticated_user.get_user_id(), authenticated_user.get_username())
session_base_url = await session_manager.get_or_create_session_async(UserComponent.GRID3D_RI, None)
session_base_url = await session_manager.get_or_create_session_async(UserComponent.GRID3D_RI, component_instance_str)
if session_base_url is None:
raise ServiceUnavailableError("Failed to get user session URL", Service.USER_SESSION)
perf_metrics.record_lap("get-session")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import asyncio
import contextlib
from typing import Awaitable, Any, TypeVar, Coroutine
import redis.asyncio as redis


class AbortError(Exception):
pass


class AsyncAbortSignal:
def __init__(self) -> None:
self._event = asyncio.Event()
self._abort_reason: Any | None = None

@property
def aborted(self) -> bool:
return self._event.is_set()

@property
def reason(self) -> Any | None:
return self._abort_reason

async def wait(self) -> None:
await self._event.wait()

def _trigger(self, reason: Any = None) -> None:
self._abort_reason = reason
self._event.set()


class AsyncAbortController:
def __init__(self) -> None:
self._signal = AsyncAbortSignal()

@property
def signal(self) -> AsyncAbortSignal:
return self._signal

def abort(self, reason: Any = None) -> None:
self._signal._trigger(reason)



TResult = TypeVar("TResult")

async def run_with_abort(awaitable: Coroutine[Any, Any, TResult] | asyncio.Task[TResult], signal: AsyncAbortSignal) -> TResult:
work_task = awaitable if isinstance(awaitable, asyncio.Task) else asyncio.create_task(awaitable)
abort_watcher_task = asyncio.create_task(signal.wait())

done, _ = await asyncio.wait({work_task, abort_watcher_task}, return_when=asyncio.FIRST_COMPLETED)

if abort_watcher_task in done:
# Abort finished first so cancel the work, wait for its cancellation, then raise
work_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await work_task
raise AbortError(signal.reason)

abort_watcher_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await abort_watcher_task

return await work_task


async def redis_abort_listener(redis_client: redis.Redis, channel: str, controller: AsyncAbortController) -> None:
pubsub = redis_client.pubsub()
await pubsub.subscribe(channel)
try:
async for msg in pubsub.listen():
if msg.get("type") == "message":
reason = msg.get("data")
controller.abort(reason)
return
finally:
await pubsub.unsubscribe(channel)
await pubsub.close()
Loading
Loading