Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions backend_py/primary/primary/auth/auth_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
LOGGER = logging.getLogger(__name__)

# Alias for the literal that lists the resource names we use
_ResourceName: TypeAlias = Literal["graph", "sumo", "smda", "ssdl"]
_ResourceName: TypeAlias = Literal["graph", "sumo", "smda", "ssdl", "pdm"]


class _TokenEntry(BaseModel):
Expand All @@ -39,6 +39,7 @@ def to_authenticated_user(self) -> AuthenticatedUser:
sumo_token_entry = self.access_tokens.get("sumo")
smda_token_entry = self.access_tokens.get("smda")
ssdl_token_entry = self.access_tokens.get("ssdl")
pdm_token_entry = self.access_tokens.get("pdm")

authenticated_user_obj = AuthenticatedUser(
user_id=self.user_id,
Expand All @@ -48,6 +49,7 @@ def to_authenticated_user(self) -> AuthenticatedUser:
"sumo_access_token": sumo_token_entry.token if sumo_token_entry else None,
"smda_access_token": smda_token_entry.token if smda_token_entry else None,
"ssdl_access_token": ssdl_token_entry.token if ssdl_token_entry else None,
"pdm_access_token": pdm_token_entry.token if pdm_token_entry else None,
},
)

Expand All @@ -58,16 +60,18 @@ class AuthHelper:
def __init__(self) -> None:
self.router = APIRouter()
self.router.add_api_route(path="/login", endpoint=self._login_route, methods=["GET"])
self.router.add_api_route(path="/auth-callback", endpoint=self._authorized_callback_route, methods=["GET"])
self.router.add_api_route(
path="/auth-callback",
endpoint=self._authorized_callback_route,
methods=["GET"],
)

@no_cache
async def _login_route(self, request: Request, redirect_url_after_login: Optional[str] = None) -> RedirectResponse:
await starsessions.load_session(request)
request.session.clear()

all_scopes_list = config.GRAPH_SCOPES.copy()
for value in config.RESOURCE_SCOPES_DICT.values():
all_scopes_list.extend(value)

if "CODESPACE_NAME" in os.environ:
# Developer is using GitHub codespace, so we use the GitHub codespace port forward URL
Expand Down Expand Up @@ -112,7 +116,10 @@ async def _authorized_callback_route(self, request: Request) -> Response:
)

if "error" in token_dict:
return Response(f"Error validating redirected auth response, error: {token_dict['error']}", 400)
return Response(
f"Error validating redirected auth response, error: {token_dict['error']}",
400,
)

_save_token_cache_in_session(request, token_cache)

Expand All @@ -127,7 +134,9 @@ async def _authorized_callback_route(self, request: Request) -> Response:
return Response("Login OK")

@staticmethod
def get_authenticated_user(request_with_session: Request) -> Optional[AuthenticatedUser]:
def get_authenticated_user(
request_with_session: Request,
) -> Optional[AuthenticatedUser]:
perf_metrics = PerfMetrics()

# We may already have created and stored the AuthenticatedUser object in the request's state
Expand Down Expand Up @@ -311,7 +320,9 @@ def _acquire_refreshed_identity_and_tokens(
return new_auth_info


def _create_msal_confidential_client_app(token_cache: msal.TokenCache) -> msal.ConfidentialClientApplication:
def _create_msal_confidential_client_app(
token_cache: msal.TokenCache,
) -> msal.ConfidentialClientApplication:
authority = f"https://login.microsoftonline.com/{config.TENANT_ID}"
return msal.ConfidentialClientApplication(
client_id=config.CLIENT_ID,
Expand All @@ -322,7 +333,9 @@ def _create_msal_confidential_client_app(token_cache: msal.TokenCache) -> msal.C
)


def _load_user_auth_info_from_session(request_with_session: Request) -> _UserAuthInfo | None:
def _load_user_auth_info_from_session(
request_with_session: Request,
) -> _UserAuthInfo | None:
serialized_user_auth_info = request_with_session.session.get("user_auth_info")
if not serialized_user_auth_info:
return None
Expand All @@ -339,7 +352,9 @@ def _save_user_auth_info_in_session(request_with_session: Request, user_auth_inf
request_with_session.session["user_auth_info"] = user_auth_info.model_dump_json()


def _load_token_cache_from_session(request_with_session: Request) -> msal.SerializableTokenCache:
def _load_token_cache_from_session(
request_with_session: Request,
) -> msal.SerializableTokenCache:
token_cache = msal.SerializableTokenCache()

serialized_token_cache = request_with_session.session.get("token_cache")
Expand Down
2 changes: 2 additions & 0 deletions backend_py/primary/primary/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SMDA_RESOURCE_SCOPE = os.environ["WEBVIZ_SMDA_RESOURCE_SCOPE"]
ENTERPRISE_SUBSCRIPTION_KEY = os.environ["WEBVIZ_ENTERPRISE_SUBSCRIPTION_KEY"]
SSDL_RESOURCE_SCOPE = os.environ["WEBVIZ_SSDL_RESOURCE_SCOPE"]
PDM_RESOURCE_SCOPE = os.environ["WEBVIZ_PDM_RESOURCE_SCOPE"]
SUMO_ENV = os.getenv("WEBVIZ_SUMO_ENV", "prod")
GRAPH_SCOPES = ["User.Read", "User.ReadBasic.All"]
VDS_HOST_ADDRESS = os.environ["WEBVIZ_VDS_HOST_ADDRESS"]
Expand All @@ -27,6 +28,7 @@
"sumo": [f"api://{sumo_app_reg[SUMO_ENV]['RESOURCE_ID']}/access_as_user"],
"smda": [SMDA_RESOURCE_SCOPE],
"ssdl": [SSDL_RESOURCE_SCOPE],
"pdm": [PDM_RESOURCE_SCOPE],
}

DEFAULT_CACHE_MAX_AGE = 3600 # 1 hour
Expand Down
11 changes: 9 additions & 2 deletions backend_py/primary/primary/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

from primary.auth.auth_helper import AuthHelper
from primary.auth.enforce_logged_in_middleware import EnforceLoggedInMiddleware
from primary.middleware.add_process_time_to_server_timing_middleware import AddProcessTimeToServerTimingMiddleware
from primary.middleware.add_process_time_to_server_timing_middleware import (
AddProcessTimeToServerTimingMiddleware,
)

from primary.middleware.add_browser_cache import AddBrowserCacheMiddleware
from primary.routers.dev.router import router as dev_router
Expand All @@ -25,6 +27,7 @@
from primary.routers.inplace_volumes.router import router as inplace_volumes_router
from primary.routers.observations.router import router as observations_router
from primary.routers.parameters.router import router as parameters_router
from primary.routers.flow_data.router import router as flow_data_router
from primary.routers.polygons.router import router as polygons_router
from primary.routers.pvt.router import router as pvt_router
from primary.routers.rft.router import router as rft_router
Expand All @@ -40,7 +43,10 @@
from primary.utils.azure_monitor_setup import setup_azure_monitor_telemetry
from primary.utils.exception_handlers import configure_service_level_exception_handlers
from primary.utils.exception_handlers import override_default_fastapi_exception_handlers
from primary.utils.logging_setup import ensure_console_log_handler_is_configured, setup_normal_log_levels
from primary.utils.logging_setup import (
ensure_console_log_handler_is_configured,
setup_normal_log_levels,
)

from . import config

Expand Down Expand Up @@ -110,6 +116,7 @@ async def lifespan_handler_async(_fastapi_app: FastAPI) -> AsyncIterator[None]:
app.include_router(well_router, prefix="/well", tags=["well"])
app.include_router(seismic_router, prefix="/seismic", tags=["seismic"])
app.include_router(polygons_router, prefix="/polygons", tags=["polygons"])
app.include_router(flow_data_router, prefix="/flow_data", tags=["flow_data"])
app.include_router(graph_router, prefix="/graph", tags=["graph"])
app.include_router(observations_router, prefix="/observations", tags=["observations"])
app.include_router(rft_router, prefix="/rft", tags=["rft"])
Expand Down
Empty file.
37 changes: 37 additions & 0 deletions backend_py/primary/primary/routers/flow_data/converters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from primary.services.pdm_access.types import (
WellProductionData as PdmWellProductionData,
WellInjectionData as PdmWellInjectionData,
)
from .schemas import WellProductionData, WellInjectionData


def per_well_production_data_to_api(
well_production: list[PdmWellProductionData],
) -> list[WellProductionData]:
return [
WellProductionData(
wellboreUuid=wp.wellbore_uuid,
wellboreUwi=wp.wellbore_uwbi,
startDate=wp.start_date,
endDate=wp.end_date,
oilProductionSm3=wp.oil_production_sm3,
gasProductionSm3=wp.gas_production_sm3,
waterProductionM3=wp.water_production_m3,
)
for wp in well_production
]


def per_well_injection_data_to_api(
well_injection: list[PdmWellInjectionData],
) -> list[WellInjectionData]:
return [
WellInjectionData(
wellboreUuid=wi.wellbore_uuid,
startDate=wi.start_date,
endDate=wi.end_date,
waterInjection=wi.water_injection,
gasInjection=wi.gas_injection,
)
for wi in well_injection
]
49 changes: 49 additions & 0 deletions backend_py/primary/primary/routers/flow_data/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging

from fastapi import APIRouter, Depends, Query

from primary.services.pdm_access.pdm_access import PDMAccess
from primary.services.utils.authenticated_user import AuthenticatedUser
from primary.auth.auth_helper import AuthHelper

from . import schemas
from . import converters

LOGGER = logging.getLogger(__name__)

router = APIRouter()


@router.get("/production-data/")
async def get_production_data(
# fmt:off
authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user),
field_identifier: str = Query(description="Official field identifier"),
start_date: str = Query(description="Start date in YYYY-MM-DD"),
end_date: str = Query(description="End date in YYYY-MM-DD"),
# fmt:on
) -> list[schemas.WellProductionData]:
"""Get allocated production per well in the time interval"""
pdm_access = PDMAccess(authenticated_user.get_pdm_access_token())
prod_data = await pdm_access.get_per_well_production_in_time_interval_async(
field_identifier=field_identifier, start_date=start_date, end_date=end_date
)
return converters.per_well_production_data_to_api(prod_data)


# Injection Endpoint
@router.get("/injection-data/")
async def get_injection_data(
# fmt:off
authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user),
field_identifier: str = Query(description="Official field identifier"),
start_date: str = Query(description="Start date in YYYY-MM-DD"),
end_date: str = Query(description="End date in YYYY-MM-DD"),
# fmt:on
) -> list[schemas.WellInjectionData]:
"""Get allocated injection per well in the time interval"""
pdm_access = PDMAccess(authenticated_user.get_pdm_access_token())
data = await pdm_access.get_per_well_injection_in_time_interval_async(
field_identifier=field_identifier, start_date=start_date, end_date=end_date
)
return converters.per_well_injection_data_to_api(data)
19 changes: 19 additions & 0 deletions backend_py/primary/primary/routers/flow_data/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from pydantic import BaseModel


class WellProductionData(BaseModel):
wellboreUuid: str
wellboreUwi: str
startDate: str
endDate: str
oilProductionSm3: float
gasProductionSm3: float
waterProductionM3: float


class WellInjectionData(BaseModel):
wellboreUuid: str
startDate: str
endDate: str
waterInjection: float
gasInjection: float
Empty file.
55 changes: 55 additions & 0 deletions backend_py/primary/primary/services/pdm_access/_pdm_get_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging
from typing import List, Optional

from webviz_pkg.core_utils.perf_timer import PerfTimer

from primary import config
from primary.services.utils.httpx_async_client_wrapper import HTTPX_ASYNC_CLIENT_WRAPPER
from primary.services.service_exceptions import (
Service,
InvalidDataError,
InvalidParameterError,
AuthorizationError,
)

LOGGER = logging.getLogger(__name__)


async def pdm_get_request_async(access_token: str, endpoint: str, params: Optional[dict] = None) -> List[dict]:
"""
Generic GET request to PDM API.
"""
urlstring = f"https://api.gateway.equinor.com/pdm-internal-api/v3/api/{endpoint}?"
params = params if params else {}
headers = {
"Content-Type": "application/json",
"authorization": f"Bearer {access_token}",
"Ocp-Apim-Subscription-Key": config.ENTERPRISE_SUBSCRIPTION_KEY,
}
timer = PerfTimer()

response = await HTTPX_ASYNC_CLIENT_WRAPPER.client.get(urlstring, params=params, headers=headers, timeout=60)
results = []
if response.status_code == 200:
results = response.json()

elif response.status_code == 401:
LOGGER.debug(f"Unauthorized access to PDM endpoint {endpoint}: {response.text}")
raise AuthorizationError("Unauthorized access to PDM", Service.PDM)
elif response.status_code == 403:
LOGGER.debug(f"Forbidden access to PDM endpoint {endpoint}: {response.text}")
raise AuthorizationError("Forbidden access to PDM", Service.PDM)
elif response.status_code == 404:
LOGGER.debug(f"Endpoint {endpoint} not found: {response.text}")
raise InvalidDataError(
f"Endpoint {endpoint} either does not exists or can not be found",
Service.PDM,
)

# Capture other errors
else:
LOGGER.debug(f"Error fetching from PDM endpoint {endpoint} (status {response.status_code}): {response.text}")
raise InvalidParameterError(f"Can not fetch data from endpoint {endpoint}", Service.PDM)

LOGGER.debug(f"TIME PDM fetch {endpoint} took {timer.lap_s():.2f} seconds")
return results
59 changes: 59 additions & 0 deletions backend_py/primary/primary/services/pdm_access/pdm_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging

from .types import WellProductionData, WellInjectionData, PRODCOLUMNS, INJCOLUMNS
from ._pdm_get_request import pdm_get_request_async
from .utils.calculate_totals_from_daily import (
calculate_total_production_from_daily,
calculate_total_injection_from_daily,
)

LOGGER = logging.getLogger(__name__)


class PDMEndpoints:
WELL_PROD_DAY = "flex/WellBoreProdDayCompact"
WELL_INJ_DAY = "flex/WellBoreInjDayCompact"


class PDMAccess:
def __init__(self, access_token: str):
self._pdm_token = access_token

async def _pdm_get_request_async(self, endpoint: str, params: dict) -> list[dict]:
return await pdm_get_request_async(access_token=self._pdm_token, endpoint=endpoint, params=params)

async def get_per_well_production_in_time_interval_async(
self,
field_identifier: str,
start_date: str,
end_date: str,
) -> list[WellProductionData]:
params = {
"GOV_FIELD_NAME": field_identifier,
"PROD_DAY": f"RANGE({start_date} | {end_date})",
"TOP": "ALL",
"COLUMNS": ",".join(PRODCOLUMNS),
}
results = await self._pdm_get_request_async(endpoint=PDMEndpoints.WELL_PROD_DAY, params=params)

if not results:
return []

return calculate_total_production_from_daily(results, start_date=start_date, end_date=end_date)

async def get_per_well_injection_in_time_interval_async(
self,
field_identifier: str,
start_date: str,
end_date: str,
) -> list[WellInjectionData]:
params = {
"GOV_FIELD_NAME": field_identifier,
"PROD_DAY": f"RANGE({start_date} | {end_date})",
"TOP": "ALL",
"COLUMNS": ",".join(INJCOLUMNS),
}
results = await self._pdm_get_request_async(endpoint=PDMEndpoints.WELL_INJ_DAY, params=params)
if not results:
return []
return calculate_total_injection_from_daily(results, start_date=start_date, end_date=end_date)
Loading
Loading