|
| 1 | +""" |
| 2 | +An minimal async client for the JAWS central server. |
| 3 | +""" |
| 4 | + |
| 5 | +import aiohttp |
| 6 | +import datetime |
| 7 | +import logging |
| 8 | +from typing import Any |
| 9 | + |
| 10 | +from cdmtaskservice.arg_checkers import require_string as _require_string |
| 11 | + |
| 12 | + |
| 13 | +class JAWSClient: |
| 14 | + """ The JAWS client. """ |
| 15 | + |
| 16 | + @classmethod |
| 17 | + async def create(self, url: str, token: str): |
| 18 | + """ |
| 19 | + Initialize the client. |
| 20 | + |
| 21 | + url - the JAWS Central URL. |
| 22 | + token - the JAWS token. |
| 23 | + """ |
| 24 | + cli = JAWSClient(url, token) |
| 25 | + user = await cli._user() # test connection & token |
| 26 | + logging.getLogger(__name__).info(f"Initialized JAWS client with user {user}") |
| 27 | + return cli |
| 28 | + |
| 29 | + def __init__(self, url: str, token: str): |
| 30 | + url = _require_string(url, "url") |
| 31 | + if not url.endswith("/"): |
| 32 | + url += "/" |
| 33 | + self._sess = aiohttp.ClientSession( |
| 34 | + base_url=url, |
| 35 | + headers={"Authorization": f"Bearer {_require_string(token, 'token')}"} |
| 36 | + ) |
| 37 | + |
| 38 | + async def _get(self, url, params=None) -> dict[str, Any]: |
| 39 | + async with self._sess.get(url, params=params) as res: |
| 40 | + # Any jaws errors would be 500 errors since should just be querying known jobs, so |
| 41 | + # don't worry too much about exceptions. Expand later if needed |
| 42 | + # May need to add retries |
| 43 | + # May need to to add some sort of down notification or detection |
| 44 | + res.raise_for_status() |
| 45 | + if res.ok: # assume here that if we get a 2XX we get JSON. Fix if necessary |
| 46 | + return await res.json() |
| 47 | + |
| 48 | + async def _user(self) -> str: |
| 49 | + res = await self._get("user") |
| 50 | + return res["uid"] |
| 51 | + |
| 52 | + async def status(self, run_id: str) -> dict[str, Any]: |
| 53 | + """ |
| 54 | + Get the status of a JAWS run. |
| 55 | + """ |
| 56 | + res = await self._get( |
| 57 | + f"run/{_require_string(run_id, 'run_id')}", |
| 58 | + params={"verbose": "true", "local_tz": "UTC"} |
| 59 | + ) |
| 60 | + res["submitted"] = _add_tz(res["submitted"]) |
| 61 | + res["updated"] = _add_tz(res["updated"]) |
| 62 | + return res |
| 63 | + |
| 64 | + async def close(self): |
| 65 | + await self._sess.close() |
| 66 | + |
| 67 | + |
| 68 | +def _add_tz(timestr: str) -> datetime.datetime: |
| 69 | + return datetime.datetime.fromisoformat(timestr).replace(tzinfo=datetime.timezone.utc) |
0 commit comments