diff --git a/docs/source/adapters/site.rst b/docs/source/adapters/site.rst index d2446919..3a4e5737 100644 --- a/docs/source/adapters/site.rst +++ b/docs/source/adapters/site.rst @@ -679,3 +679,73 @@ Available machine type configuration options Your favorite site is currently not supported? Please, have a look at how to contribute. + + +Satellite Site Adapter +--------------------- + +.. content-tabs:: left-col + + The :py:class:`~tardis.adapters.sites.satellite.SatelliteAdapter` integrates with a Satellite instance. + Drones run as local processes and claim a free remote host from the configured pool. Once a host is + available, the adapter is able to boot and shut down the remote resource through the Satellite API. + + When a resource is allocated for the first time, it is marked with a ``tardis_reservation_state`` parameter + (values ``free``, ``booting``, ``active`` and ``terminating``) in Satellite. ``booting`` and ``terminating`` are used to + identify reserved machines that are currently being booted or terminated. This flag prevents double allocation of not-online resources that are still linked to a + booting/terminating drone. If TARDIS crashes and its drone database is lost, the parameter has to be reset manually. + +Available adapter configuration options +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. content-tabs:: left-col + + +----------------+------------------------------------------------------------------------------------------+-----------------+ + | Option | Short Description | Requirement | + +================+==========================================================================================+=================+ + | host | Hostname of the Satellite server. HTTPS and ``/api/v2/hosts`` are added automatically. | **Required** | + +----------------+------------------------------------------------------------------------------------------+-----------------+ + | ssl_cert | Path to a CA certificate used to validate the Satellite HTTPS endpoint. | **Required** | + +----------------+------------------------------------------------------------------------------------------+-----------------+ + | username | Satellite account used for API access and the corresponding rights. | **Required** | + +----------------+------------------------------------------------------------------------------------------+-----------------+ + | secret | Personal access token or password of the Satellite account. | **Required** | + +----------------+------------------------------------------------------------------------------------------+-----------------+ + | max_age | The result of Satellite API calls are cached for `max_age` in minutes. | **Required** | + +----------------+------------------------------------------------------------------------------------------+-----------------+ + | machine_pool | Sequence of Satellite host identifiers that form the pool of machines to allocate from. | **Required** | + | | Entries must match ``https:///api/v2/hosts/``. | | + +----------------+------------------------------------------------------------------------------------------+-----------------+ + + The Satellite adapter does not introduce additional machine type specific options. + Provide ``MachineMetaData`` entries for each machine type to describe cores, memory and disk. + +.. content-tabs:: right-col + + .. rubric:: Example configuration + + .. code-block:: yaml + + Sites: + - name: SatelliteSite + adapter: Satellite + quota: 20 + + SatelliteSite: + host: satellite.example.com + username: MaxMustermann + secret: super-secret-token + ssl_cert: /path/to/CA/cert.pem + max_age: 2 + machine_pool: + - compute-node-01 + - compute-node-02 + MachineTypes: + - machine-type-a + MachineTypeConfiguration: + machine-type-a: {} + MachineMetaData: + machine-type-a: + Cores: 16 + Memory: 64 + Disk: 400 diff --git a/tardis/adapters/sites/satellite.py b/tardis/adapters/sites/satellite.py new file mode 100644 index 00000000..cad9f4d3 --- /dev/null +++ b/tardis/adapters/sites/satellite.py @@ -0,0 +1,406 @@ +import asyncio +import logging +import ssl +from contextlib import contextmanager +from functools import partial +from typing import Optional + +import aiohttp + +from tardis.exceptions.tardisexceptions import TardisResourceStatusUpdateFailed +from tardis.interfaces.siteadapter import ResourceStatus, SiteAdapter +from tardis.utilities.attributedict import AttributeDict +from tardis.utilities.staticmapping import StaticMapping +from tardis.utilities.asynccachemap import AsyncCacheMap + +logger = logging.getLogger("cobald.runtime.tardis.interfaces.site") + + +class SatelliteClient: + """ + Async helper for interacting with Satellite instance. + """ + + + def __init__( + self, + host: str, + username: str, + secret: str, + ssl_cert: str, + machine_pool: list[str], + max_age: int, + ) -> None: + + self._base_url = f"https://{host}/api/v2/hosts" + self.ssl_context = ssl.create_default_context(cafile=ssl_cert) + self.auth = aiohttp.BasicAuth(username, secret) + + self.machine_pool = machine_pool + + self.max_age = max_age * 60 + self.cached_status_coroutines = {} + + self._nxt_uuid_lock = asyncio.Lock() + + def _host_url(self, remote_resource_uuid: str = "") -> str: + if remote_resource_uuid == "": + return f"{self._base_url}/" + resource = remote_resource_uuid.strip("/") + return f"{self._base_url}/{resource}" + + async def _request( + self, + session: aiohttp.ClientSession, + method: str, + url: str, + *, + expect_json: bool = True, + headers={ + "Accept": "application/json", + "Foreman-Api-Version": "2", + }, + **kwargs, + ): + async with session.request( + method, + url, + ssl=self.ssl_context, + headers=headers, + **kwargs, + ) as response: + response.raise_for_status() + if expect_json: + return await response.json() + return None + + async def get_status(self, remote_resource_uuid: str, *, force: bool = False): + """ + Return cached version of self._get_status(). + + :param remote_resource_uuid: Satellite identifier of the host. + :type remote_resource_uuid: str + :param force: If True, force refresh of cached status. + :type force: bool + :return: Satellite host data enriched with parameters and power state. + :rtype: dict + """ + if remote_resource_uuid not in self.cached_status_coroutines: + self.cached_status_coroutines[remote_resource_uuid] = AsyncCacheMap( + partial(self._get_status, remote_resource_uuid), + max_age=self.max_age, + ) + await self.cached_status_coroutines[remote_resource_uuid].update_status( + force=force + ) + return self.cached_status_coroutines[remote_resource_uuid] + + async def _get_status(self, remote_resource_uuid: str) -> dict: + """ + Return host data together with custom parameters and power details. + + :param remote_resource_uuid: Satellite identifier of the host. + :type remote_resource_uuid: str + :return: Satellite host data enriched with parameters and power state. + :rtype: dict + """ + async with aiohttp.ClientSession(auth=self.auth) as session: + host_url = self._host_url(remote_resource_uuid) + main_task = self._request(session, "GET", host_url) + params_task = self._request(session, "GET", f"{host_url}/parameters") + power_task = self._request(session, "GET", f"{host_url}/power") + main_response, param_response, power_response = await asyncio.gather( + main_task, params_task, power_task + ) + + # Flatten custom parameters for simpler lookups in later calls. + parameters = {} + for param in param_response.get("results", []): + name = param.get("name") + if not name: + continue + if "value" in param: + parameters[name] = param["value"] + if "id" in param: + parameters[f"{name}_id"] = param["id"] + + main_response["parameters"] = parameters + main_response["power"] = power_response + return main_response + + async def set_power(self, state: str, remote_resource_uuid: str) -> dict: + """ + Set the power state of a host and update its cached status. + + :param state: Desired power state as understood by the Satellite API ["on"|"off"]. + :type state: str + :param remote_resource_uuid: Satellite identifier of the host. + :type remote_resource_uuid: str + :return: Raw response from the Satellite power endpoint. + :rtype: dict + """ + + if state not in ("on", "off"): + raise ValueError(f"Invalid power state {state}") + + async with aiohttp.ClientSession(auth=self.auth) as session: + logger.info(f"Set power {state} for {remote_resource_uuid}") + power_action_result = await self._request( + session, + "PUT", + f"{self._host_url(remote_resource_uuid)}/power", + json={"power_action": state}, + ) + await self.get_status(remote_resource_uuid, force=True) + return power_action_result + + async def get_next_uuid(self) -> str: + """ + Select the next free host by checking reservation and power state. + + :return: Identifier of a reserved and powered-off host ready for use. + :rtype: str + :raises TardisResourceStatusUpdateFailed: If no free host is available. + """ + + + async with self._nxt_uuid_lock: + for host in self.machine_pool: + resource_status = await self.get_status(host) + parameters = resource_status.get("parameters", {}) + reservation_state = parameters.get("tardis_reservation_state", "free") + is_free = reservation_state == "free" + + power_state = resource_status.get("power", {}).get("state") + is_powered_off = power_state == "off" + + if is_free and is_powered_off: + await self.set_satellite_parameter( + host, "tardis_reservation_state", "booting" + ) + logger.info(f"Allocated satellite host {host}") + return host + + logger.info("No free host found, skipping deployment") + raise TardisResourceStatusUpdateFailed("no free host found") + + async def set_satellite_parameter( + self, remote_resource_uuid: str, parameter: str, value: str + ) -> None: + """ + Create or update a Satellite host parameter using lower-case string values only and + updates its cached status. + + :param remote_resource_uuid: Satellite identifier of the host. + :type remote_resource_uuid: str + :param parameter: Name of the parameter to update. + :type parameter: str + :param value: New parameter value. + :type value: str + """ + value = str(value).lower() + status_response = await self.get_status(remote_resource_uuid, force=True) + parameter_id = status_response.get("parameters", {}).get(f"{parameter}_id") + + async with aiohttp.ClientSession(auth=self.auth) as session: + if parameter_id is not None: + await self._request( + session, + "PUT", + f"{self._host_url(remote_resource_uuid)}/parameters/{parameter_id}", + json={"value": value}, + expect_json=False, + ) + logger.info( + f"Updated satellite parameter {parameter} to {value} for {remote_resource_uuid}" + ) + else: + await self._request( + session, + "POST", + f"{self._host_url(remote_resource_uuid)}/parameters", + json={"name": parameter, "value": value}, + expect_json=False, + ) + logger.info( + f"Created satellite parameter {parameter} with value {value} for {remote_resource_uuid}" + ) + await self.get_status(remote_resource_uuid, force=True) + + +class SatelliteAdapter(SiteAdapter): + """ + Translate Satellite host lifecycle operations to the SiteAdapter API. + """ + + def __init__(self, machine_type: str, site_name: str): + self._machine_type = machine_type + self._site_name = site_name + + self.client = SatelliteClient( + host=self.configuration.host, + username=self.configuration.username, + secret=self.configuration.secret, + ssl_cert=self.configuration.ssl_cert, + machine_pool=self.configuration.machine_pool, + max_age=self.configuration.max_age, + ) + + key_translator = StaticMapping( + remote_resource_uuid="remote_resource_uuid", + resource_status="resource_status", + ) + + translator_functions = StaticMapping( + status=lambda x, translator=StaticMapping(): translator[x], + ) + + self.handle_response = partial( + self.handle_response, + key_translator=key_translator, + translator_functions=translator_functions, + ) + + async def deploy_resource( + self, resource_attributes: AttributeDict + ) -> AttributeDict: + """ + Allocate an available host and ensure it is powered on. + + :param resource_attributes: Attributes describing the drone to deploy. + :type resource_attributes: AttributeDict + :return: Normalised response containing at least the remote UUID. + :rtype: AttributeDict + """ + remote_resource_uuid = await self.client.get_next_uuid() + await self.client.set_power( + state="on", remote_resource_uuid=remote_resource_uuid + ) + + return self.handle_response({"remote_resource_uuid": remote_resource_uuid}) + + async def resource_status( + self, resource_attributes: AttributeDict + ) -> AttributeDict: + """ + Query Satellite information and translate to ResourceStatus. If the drone + is marked as terminating, free the host to be used in the next heartbeat interval. + + :param resource_attributes: Attributes describing the tracked drone. + :type resource_attributes: AttributeDict + :return: Normalised response containing the translated resource status. + :rtype: AttributeDict + """ + response = await self.client.get_status( + resource_attributes.remote_resource_uuid + ) + + power_state = response.get("power", {}).get("state") + reservation_state = response.get("parameters", {}).get( + "tardis_reservation_state" + ) + + status = self._resolve_status(power_state, reservation_state) + if status is ResourceStatus.Deleted: + await self.client.set_satellite_parameter( + resource_attributes.remote_resource_uuid, + "tardis_reservation_state", + "free", + ) + elif status is ResourceStatus.Running and reservation_state == "booting": + await self.client.set_satellite_parameter( + resource_attributes.remote_resource_uuid, + "tardis_reservation_state", + "active", + ) + return self.handle_response( + response, + resource_status=status, + remote_resource_uuid=resource_attributes.remote_resource_uuid, + ) + + async def stop_resource(self, resource_attributes: AttributeDict) -> AttributeDict: + """ + Request a power-off for the resource. + + :param resource_attributes: Attributes describing the drone to stop. + :type resource_attributes: AttributeDict + :return: Normalised response including the resulting resource status. + :rtype: AttributeDict + """ + response = await self.client.set_power( + "off", resource_attributes.remote_resource_uuid + ) + has_error = "error" in response + if has_error: + logger.error( + "Failed to stop satellite resource %s: %s", + resource_attributes.remote_resource_uuid, + response, + ) + + status = ResourceStatus.Error if has_error else ResourceStatus.Stopped + return self.handle_response( + response, + resource_status=status, + remote_resource_uuid=resource_attributes.remote_resource_uuid, + ) + + async def terminate_resource(self, resource_attributes: AttributeDict) -> None: + """ + Flag a host as terminating so a later status check frees it. + + :param resource_attributes: Attributes describing the drone to retire. + :type resource_attributes: AttributeDict + """ + await self.client.set_satellite_parameter( + resource_attributes.remote_resource_uuid, + "tardis_reservation_state", + "terminating", + ) + + @contextmanager + def handle_exceptions(self): + """ + Propagate Satellite-specific status failures unchanged. Especially if + no free host is available during deployment. + + :return: Context manager yielding control to the caller. + :rtype: contextmanager + """ + try: + yield + except TardisResourceStatusUpdateFailed: + raise + + def _resolve_status( + self, power_state: Optional[str], reservation_state: Optional[str] + ) -> ResourceStatus: + """ + Translate raw Satellite flags into the canonical ``ResourceStatus``. + + :param power_state: Reported power state of the host. + :type power_state: str or None + :param reservation_state: Reservation flag managed via host parameters. + :type reservation_state: str or None + :return: Resource status understood by TARDIS. + :rtype: ResourceStatus + """ + if reservation_state == "booting": + # booting hosts report as running once their power state flips to on + if power_state == "on": + return ResourceStatus.Running + return ResourceStatus.Booting + + if power_state == "on": + return ResourceStatus.Running + + if power_state == "off": + # if resource is offline its either in stopping/terminating phase or (still) booting + if reservation_state == "terminating": + return ResourceStatus.Deleted + if reservation_state == "active": + return ResourceStatus.Stopped + + # each other state should be treated as error + return ResourceStatus.Error diff --git a/tardis/utilities/asynccachemap.py b/tardis/utilities/asynccachemap.py index d05427c0..382ad82a 100644 --- a/tardis/utilities/asynccachemap.py +++ b/tardis/utilities/asynccachemap.py @@ -30,11 +30,13 @@ def _async_lock(self): def last_update(self) -> datetime: return self._last_update - async def update_status(self) -> None: + async def update_status(self, force: bool = False) -> None: current_time = datetime.now() async with self._async_lock: - if (current_time - self._last_update) > timedelta(seconds=self._max_age): + if (current_time - self._last_update) > timedelta( + seconds=self._max_age + ) or force: try: data = await self._update_coroutine() except json.decoder.JSONDecodeError as je: diff --git a/tests/adapters_t/sites_t/test_satellite.py b/tests/adapters_t/sites_t/test_satellite.py new file mode 100644 index 00000000..1679e490 --- /dev/null +++ b/tests/adapters_t/sites_t/test_satellite.py @@ -0,0 +1,204 @@ +from tardis.adapters.sites.satellite import SatelliteAdapter +from tardis.utilities.attributedict import AttributeDict +from tardis.interfaces.siteadapter import ResourceStatus +from tardis.exceptions.tardisexceptions import TardisResourceStatusUpdateFailed +from tests.utilities.utilities import run_async + +from unittest import TestCase +from unittest.mock import AsyncMock, patch + + +class TestSatelliteAdapter(TestCase): + mock_config_patcher = None + mock_satelliteclient_patcher = None + + @classmethod + def setUpClass(cls): + cls.mock_config_patcher = patch("tardis.interfaces.siteadapter.Configuration") + cls.mock_config = cls.mock_config_patcher.start() + cls.mock_satelliteclient_patcher = patch( + "tardis.adapters.sites.satellite.SatelliteClient" + ) + cls.mock_satelliteclient = cls.mock_satelliteclient_patcher.start() + + @classmethod + def tearDownClass(cls): + cls.mock_config_patcher.stop() + cls.mock_satelliteclient_patcher.stop() + + def setUp(self): + + self.remote_resource_uuid = "uuid-test" + + self.config = self.mock_config.return_value + self.config.TestSite = AttributeDict( + host="https://test.satelliteclient.local", + username="TestUser", + secret="test123", + ssl_cert="/path/to/cert", + machine_pool=["testmachine"], + MachineTypes=["testmachine_type"], + max_age=5, + MachineMetaData=AttributeDict( + testmachine_type=AttributeDict(Cores=4, Memory=8, Disk=100) + ), + MachineTypeConfiguration=AttributeDict( + testmachine_type=AttributeDict( + instance_type="testmachine_type", + ) + ), + ) + + self.client = self.mock_satelliteclient.return_value + self.client.get_status = AsyncMock( + return_value={"status": "running", "id": self.remote_resource_uuid} + ) + + self.client.get_next_uuid = AsyncMock(return_value="uuid-new") + self.client.set_power = AsyncMock(return_value=None) + self.client.set_satellite_parameter = AsyncMock(return_value=None) + + self.satellite_adapter = SatelliteAdapter( + machine_type="testmachine_type", site_name="TestSite" + ) + + def tearDown(self): + self.mock_satelliteclient.reset_mock() + self.client.reset_mock() + + def test_machine_type(self): + self.assertEqual(self.satellite_adapter.machine_type, "testmachine_type") + + def test_site_name(self): + self.assertEqual(self.satellite_adapter.site_name, "TestSite") + + def test_deploy_resource(self): + self.assertEqual( + run_async( + self.satellite_adapter.deploy_resource, + resource_attributes=AttributeDict(drone_uuid="testsite-089123"), + ), + AttributeDict(remote_resource_uuid="uuid-new"), + ) + + self.client.get_next_uuid.assert_awaited_once() + + self.client.set_power.assert_awaited_once_with( + state="on", remote_resource_uuid="uuid-new" + ) + + def _assert_resource_status(self, response: dict, expected_status: ResourceStatus): + """Exercise resource_status and assert the expected ResourceStatus mapping.""" + self.client.get_status.return_value = response + + resource_attributes = AttributeDict( + remote_resource_uuid=self.remote_resource_uuid + ) + + result = run_async( + self.satellite_adapter.resource_status, + resource_attributes=resource_attributes, + ) + + self.assertEqual( + result, + AttributeDict( + remote_resource_uuid=self.remote_resource_uuid, + resource_status=expected_status, + ), + ) + + self.client.get_status.assert_awaited_once_with(self.remote_resource_uuid) + return self.client + + def test_resource_status_running(self): + response = { + "power": {"state": "on"}, + "parameters": {"tardis_reservation_state": "free"}, + } + client = self._assert_resource_status(response, ResourceStatus.Running) + client.set_satellite_parameter.assert_not_awaited() + + def test_resource_status_running_clears_booting(self): + response = { + "power": {"state": "on"}, + "parameters": {"tardis_reservation_state": "booting"}, + } + client = self._assert_resource_status(response, ResourceStatus.Running) + client.set_satellite_parameter.assert_awaited_once_with( + self.remote_resource_uuid, "tardis_reservation_state", "active" + ) + + def test_resource_status_booting(self): + response = { + "power": {"state": "off"}, + "parameters": {"tardis_reservation_state": "booting"}, + } + client = self._assert_resource_status(response, ResourceStatus.Booting) + client.set_satellite_parameter.assert_not_awaited() + + def test_resource_status_deleted(self): + response = { + "power": {"state": "off"}, + "parameters": {"tardis_reservation_state": "terminating"}, + } + client = self._assert_resource_status(response, ResourceStatus.Deleted) + + # Deleted resources should have their reservation flag cleared. + client.set_satellite_parameter.assert_awaited_once_with( + self.remote_resource_uuid, "tardis_reservation_state", "free" + ) + + def test_resource_status_stopped(self): + response = { + "power": {"state": "off"}, + "parameters": {"tardis_reservation_state": "active"}, + } + client = self._assert_resource_status(response, ResourceStatus.Stopped) + client.set_satellite_parameter.assert_not_awaited() + + def test_resource_status_error(self): + response = { + "power": {"state": "suspended"}, + "parameters": {"tardis_reservation_state": "free"}, + } + client = self._assert_resource_status(response, ResourceStatus.Error) + client.set_satellite_parameter.assert_not_awaited() + + def test_stop_resource(self): + self.client.set_power.return_value = {} + + resource_attributes = AttributeDict( + remote_resource_uuid=self.remote_resource_uuid + ) + result = run_async( + self.satellite_adapter.stop_resource, + resource_attributes=resource_attributes, + ) + + self.assertEqual( + result, + AttributeDict( + remote_resource_uuid=self.remote_resource_uuid, + resource_status=ResourceStatus.Stopped, + ), + ) + self.client.set_power.assert_awaited_once_with("off", self.remote_resource_uuid) + self.client.set_satellite_parameter.assert_not_awaited() + + def test_exception_handling(self): + def test_exception_handling(to_raise, to_catch): + with self.assertRaises(to_catch): + with self.satellite_adapter.handle_exceptions(): + raise to_raise + + matrix = [ + ( + TardisResourceStatusUpdateFailed("no free host available"), + TardisResourceStatusUpdateFailed, + ), + (RuntimeError("unexpected satellite error"), RuntimeError), + ] + + for to_raise, to_catch in matrix: + test_exception_handling(to_raise, to_catch) diff --git a/tests/utilities_t/test_asynccachemap.py b/tests/utilities_t/test_asynccachemap.py index 249cf918..436a1efd 100644 --- a/tests/utilities_t/test_asynccachemap.py +++ b/tests/utilities_t/test_asynccachemap.py @@ -21,6 +21,11 @@ def setUp(self): self.command_failing_async_cache_map = AsyncCacheMap( update_coroutine=self.command_failing_update_function ) + self.force_counter = 0 + self.force_update_async_cache_map = AsyncCacheMap( + update_coroutine=self.force_update_function, + max_age=999 * 60, + ) async def json_failing_update_function(self): raise JSONDecodeError(msg="Bla", doc="Blubb", pos=99) @@ -33,6 +38,10 @@ async def command_failing_update_function(self): async def update_function(self): return self.test_data + async def force_update_function(self): + self.force_counter += 1 + return {"counter": self.force_counter} + def update_status(self): run_async(self.async_cache_map.update_status) @@ -71,6 +80,19 @@ def test_last_update(self): datetime.now() - self.async_cache_map.last_update < timedelta(seconds=1) ) + def test_force_update(self): + # get initial value (count) + run_async(self.force_update_async_cache_map.update_status) + self.assertEqual(self.force_update_async_cache_map["counter"], 1) + + # access cached value (no count) + run_async(self.force_update_async_cache_map.update_status) + self.assertEqual(self.force_update_async_cache_map["counter"], 1) + + # check forced update (count) + run_async(self.force_update_async_cache_map.update_status, force=True) + self.assertEqual(self.force_update_async_cache_map["counter"], 2) + def test_eq_async_cache_map(self): test_cache_map = AsyncCacheMap( update_coroutine=self.async_cache_map._update_coroutine