Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ Alexander Haas <[email protected]>
Eileen Kuehn <[email protected]>
matthias.schnepf <[email protected]>
ubdsv <[email protected]>
Max Kühn <[email protected]>
Rene Caspart <[email protected]>
Leon Schuhmacher <[email protected]>
R. Florian von Cube <[email protected]>
Benjamin Rottler <[email protected]>
Max Kühn <[email protected]>
Sebastian Wozniewski <[email protected]>
mschnepf <[email protected]>
swozniewski <[email protected]>
Expand Down
4 changes: 2 additions & 2 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.. Created by changelog.py at 2025-07-09, command
'/Users/giffler/.cache/pre-commit/repoecmh3ah8/py_env-python3.12/bin/changelog docs/source/changes compile --categories Added Changed Fixed Security Deprecated --output=docs/source/changelog.rst'
.. Created by changelog.py at 2025-10-01, command
'/Users/giffler/.cache/pre-commit/repoecmh3ah8/py_env-python3.13/bin/changelog docs/source/changes compile --categories Added Changed Fixed Security Deprecated --output=docs/source/changelog.rst'
based on the format of 'https://keepachangelog.com/'

#########
Expand Down
177 changes: 146 additions & 31 deletions tardis/adapters/batchsystems/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,178 @@
from ...interfaces.batchsystemadapter import MachineStatus
from ...interfaces.executor import Executor
from ...utilities.executors.shellexecutor import ShellExecutor
from ...utilities.utils import htcondor_cmd_option_formatter
from ...utilities.utils import csv_parser
from ...utilities.utils import (
csv_parser,
htcondor_cmd_option_formatter,
htcondor_status_cmd_composer,
)
from ...utilities.asynccachemap import AsyncCacheMap
from ...utilities.attributedict import AttributeDict

from datetime import datetime, timedelta
from functools import partial
from shlex import quote
from types import MappingProxyType
from typing import Iterable
import logging

logger = logging.getLogger("cobald.runtime.tardis.adapters.batchsystem.htcondor")


async def htcondor_get_collectors(
options: AttributeDict, executor: Executor
) -> list[str]:
"""
Asynchronously retrieve a list of HTCondor collector machine names.

Runs ``condor_status -collector -af:t Machine`` (plus any additional formatted
options) using the provided executor, then parses the tab-delimited output
to extract and return the list of collector machine names.

:param options: Additional options for the ``condor_status`` call, such as
``{'pool': 'htcondor.example'}``, which will be formatted and appended
to the command.
:param executor: Executor used to run the ``condor_status`` command
asynchronously.
:return: List of collector machine names.
"""
class_ads = AttributeDict(Machine="Machine")
# Add collector query option, copy options since it is mutable
options = options | {"collector": None}
cmd = htcondor_status_cmd_composer(
attributes=class_ads,
options=options,
)

try:
condor_status = await executor.run_command(cmd)
except CommandExecutionFailure as cef:
logger.warning(f"condor_status could not be executed due to {cef}!")
raise

return [
row["Machine"]
for row in csv_parser(
input_csv=condor_status.stdout,
fieldnames=tuple(class_ads.keys()),
delimiter="\t",
)
]


async def htcondor_get_collector_start_dates(
options: AttributeDict, executor: Executor
) -> dict[str, datetime]:
"""
Asynchronously retrieve the master daemon start times from HTCondor for machines
running a collector daemon as well. Assuming both daemons have a similar start date.
Due to potential bug/feature in HTCondor, the DaemonStartTime of the Collector can
not be used directly.
(see https://www-auth.cs.wisc.edu/lists/htcondor-users/2025-July/msg00092.shtml)

Runs ``condor_status -master -af:t Machine DaemonStartTime`` (plus any
additional formatted options) using the provided executor, and parses
the tab-delimited output into a dictionary.

:param options: Additional options for the ``condor_status`` call, such as
``{'pool': 'htcondor.example'}``, which will be formatted and appended
to the command.
:param executor: Executor used to run the ``condor_status`` command
asynchronously.
:return: List of master daemon start time for host running a collector as well.
(in datetime format).
"""
class_ads = AttributeDict(Machine="Machine", DaemonStartTime="DaemonStartTime")
htcondor_collectors = await htcondor_get_collectors(options, executor)

# Add master query option, copy options since it is mutable
options = options | {"master": None}

cmd = htcondor_status_cmd_composer(
attributes=class_ads,
options=options,
constraint=" || ".join(f'Machine == "{fqdn}"' for fqdn in htcondor_collectors),
)

try:
condor_status = await executor.run_command(cmd)
except CommandExecutionFailure as cef:
logger.warning(f"condor_status could not be executed due to {cef}!")
raise

return {
row["Machine"]: datetime.fromtimestamp(int(row["DaemonStartTime"]))
for row in csv_parser(
input_csv=condor_status.stdout,
fieldnames=tuple(class_ads.keys()),
delimiter="\t",
)
if row["Machine"] in htcondor_collectors
}


async def htcondor_status_updater(
options: AttributeDict, attributes: AttributeDict, executor: Executor
options: AttributeDict,
attributes: AttributeDict,
executor: Executor,
cached_data: MappingProxyType,
) -> dict:
"""
Helper function to call ``condor_status -af`` asynchronously and to translate
the output into a dictionary
the output into a dictionary.

If the HTCondor Collector has been running for less than 3600 seconds,
previously cached status data is used for machines that were already
available before the restart; otherwise, fresh status data is used.

:param options: Additional options for the condor_status call. For example
``{'pool': 'htcondor.example'}`` will be translated into
``condor_status -af ... -pool htcondor.example``
:type options: AttributeDict
:param attributes: Additional fields to add to output of the
``condor_status -af`` response.
:type attributes: AttributeDict
:return: Dictionary containing the output of the ``condor_status`` command
:rtype: dict
:param executor: Executor to run the ``condor_status`` command asynchronously.
:param ro_cached_data: Cached output from previous ``condor_status -af`` call
:return: Dictionary containing the processed output of the ``condor_status``
command, possibly merged with cached data depending on collector uptime.
"""
# copy options, since they are mutable
options = AttributeDict(**options)

attributes_string = f'-af:t {" ".join(attributes.values())}'
collector_start_dates = await htcondor_get_collector_start_dates(options, executor)

options_string = htcondor_cmd_option_formatter(options)
threshold = timedelta(seconds=3600)
now = datetime.now()

cmd = f"condor_status {attributes_string} -constraint PartitionableSlot=?=True"
earliest_start_date = min(collector_start_dates.values())
latest_start_date = max(collector_start_dates.values())

if options_string:
cmd = f"{cmd} {options_string}"
if (now - earliest_start_date) < threshold:
# If all collectors have been running for less than 3600 seconds,
# use cached status for machines that were already available before the
# restart and update it with fresh data if available.
htcondor_status = cached_data.copy()

htcondor_status = {}
elif (now - latest_start_date) < threshold:
# If any collector has been running for more than 3600 seconds, use the oldest
# available one.
htcondor_status = {}
oldest_collector = min(collector_start_dates, key=collector_start_dates.get)
options.pool = oldest_collector

else:
# Repopulate HTCondor status
htcondor_status = {}

cmd = htcondor_status_cmd_composer(
attributes=attributes,
options=options,
constraint="PartitionableSlot=?=True",
)

try:
logger.debug(f"HTCondor status update is running. Command: {cmd}")
condor_status = await executor.run_command(cmd)

for row in csv_parser(
input_csv=condor_status.stdout,
fieldnames=tuple(attributes.keys()),
Expand All @@ -57,7 +184,6 @@ async def htcondor_status_updater(
):
status_key = row["TardisDroneUuid"] or row["Machine"].split(".")[0]
htcondor_status[status_key] = row

except CommandExecutionFailure as cef:
logger.warning(f"condor_status could not be executed due to {cef}!")
raise
Expand All @@ -81,7 +207,7 @@ def __init__(self):
try:
self.htcondor_options = config.BatchSystem.options
except AttributeError:
self.htcondor_options = {}
self.htcondor_options = AttributeDict()

attributes = dict(
Machine="Machine",
Expand All @@ -101,6 +227,7 @@ def __init__(self):
self._executor,
),
max_age=config.BatchSystem.max_age * 60,
provide_cache=True,
)

async def disintegrate_machine(self, drone_uuid: str) -> None:
Expand All @@ -109,7 +236,6 @@ async def disintegrate_machine(self, drone_uuid: str) -> None:

:param drone_uuid: Uuid of the worker node, for some sites corresponding
to the host name of the drone.
:type drone_uuid: str
:return: None
"""
return
Expand All @@ -121,7 +247,6 @@ async def drain_machine(self, drone_uuid: str) -> None:

:param drone_uuid: Uuid of the worker node, for some sites corresponding
to the host name of the drone.
:type drone_uuid: str
:return: None
"""
await self._htcondor_status.update_status()
Expand Down Expand Up @@ -157,7 +282,6 @@ async def integrate_machine(self, drone_uuid: str) -> None:

:param drone_uuid: Uuid of the worker node, for some sites corresponding
to the host name of the drone.
:type drone_uuid: str
:return: None
"""
return None
Expand All @@ -170,9 +294,7 @@ async def get_resource_ratios(self, drone_uuid: str) -> Iterable[float]:

:param drone_uuid: Uuid of the worker node, for some sites corresponding
to the host name of the drone.
:type drone_uuid: str
:return: Iterable of float containing the ratios
:rtype: Iterable[float]
"""
await self._htcondor_status.update_status()
try:
Expand All @@ -192,9 +314,7 @@ async def get_allocation(self, drone_uuid: str) -> float:

:param drone_uuid: Uuid of the worker node, for some sites corresponding
to the host name of the drone.
:type drone_uuid: str
:return: The allocation of a worker node as described above.
:rtype: float
"""
return max(await self.get_resource_ratios(drone_uuid), default=0.0)

Expand All @@ -205,10 +325,8 @@ async def get_machine_status(self, drone_uuid: str) -> MachineStatus:

:param drone_uuid: Uuid of the worker node, for some sites corresponding
to the host name of the drone.
:type drone_uuid: str
:return: The machine status in HTCondor (Available, Draining, Drained,
NotAvailable)
:rtype: MachineStatus
"""
status_mapping = {
("Unclaimed", "Idle"): MachineStatus.Available,
Expand Down Expand Up @@ -236,20 +354,17 @@ async def get_utilisation(self, drone_uuid: str) -> float:

:param drone_uuid: Uuid of the worker node, for some sites corresponding
to the host name of the drone.
:type drone_uuid: str
:return: The utilisation of a worker node as described above.
:rtype: float
"""
return min(await self.get_resource_ratios(drone_uuid), default=0.0)

@property
def machine_meta_data_translation_mapping(self) -> AttributeDict:
"""
The machine meta data translation mapping is used to translate units of
the machine meta data in ``TARDIS`` to values expected by the
The machine metadata translation mapping is used to translate units of
the machine metadata in ``TARDIS`` to values expected by the
HTCondor batch system adapter.

:return: Machine meta data translation mapping
:rtype: AttributeDict
:return: Machine metadata translation mapping
"""
return AttributeDict(Cores=1, Memory=1024, Disk=1024 * 1024)
33 changes: 21 additions & 12 deletions tardis/utilities/asynccachemap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from collections.abc import Mapping
from datetime import datetime
from datetime import timedelta
from functools import partial
from types import MappingProxyType

import asyncio
import logging
Expand All @@ -11,10 +13,16 @@


class AsyncCacheMap(Mapping):
def __init__(self, update_coroutine, max_age: int = 60 * 15):
def __init__(
self,
update_coroutine,
max_age: int = 60 * 15,
provide_cache: bool = False,
):
self._update_coroutine = update_coroutine
self._max_age = max_age
self._last_update = datetime.fromtimestamp(0)
self._provide_cache = provide_cache
self._data = {}
self._lock = None

Expand All @@ -26,17 +34,27 @@ def _async_lock(self):
self._lock = asyncio.Lock()
return self._lock

@property
def read_only_cache(self):
return MappingProxyType(self._data)

@property
def last_update(self) -> datetime:
return self._last_update

@property
def update_coroutine(self):
if self._provide_cache:
return partial(self._update_coroutine, self.read_only_cache)
return self._update_coroutine

async def update_status(self) -> None:
current_time = datetime.now()

async with self._async_lock:
if (current_time - self._last_update) > timedelta(seconds=self._max_age):
try:
data = await self._update_coroutine()
data = await self.update_coroutine()
except json.decoder.JSONDecodeError as je:
logger.warning(
f"AsyncMap update_status failed: Could not decode json {je}"
Expand All @@ -57,13 +75,4 @@ def __len__(self):
return len(self._data)

def __eq__(self, other):
if not isinstance(other, AsyncCacheMap):
return False

return (
self._update_coroutine == other._update_coroutine
and self._max_age == other._max_age
and self._last_update == other._last_update
and self._data == other._data
and self._lock == other._lock
)
return self is other
3 changes: 3 additions & 0 deletions tardis/utilities/attributedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ def __delattr__(self, item):
raise AttributeError(
f"{item} is not a valid attribute. Dict contains {str(self)}."
) from None

def __or__(self, other):
return AttributeDict({**self, **other})
Loading