Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ Alexander Haas <[email protected]>
Eileen Kuehn <[email protected]>
matthias.schnepf <[email protected]>
ubdsv <[email protected]>
Max Kühn <[email protected]>
Rene Caspart <[email protected]>
Leon Schuhmacher <[email protected]>
R. Florian von Cube <[email protected]>
Benjamin Rottler <[email protected]>
Max Kühn <[email protected]>
Sebastian Wozniewski <[email protected]>
mschnepf <[email protected]>
swozniewski <[email protected]>
Expand Down
4 changes: 2 additions & 2 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.. Created by changelog.py at 2025-07-09, command
'/Users/giffler/.cache/pre-commit/repoecmh3ah8/py_env-python3.12/bin/changelog docs/source/changes compile --categories Added Changed Fixed Security Deprecated --output=docs/source/changelog.rst'
.. Created by changelog.py at 2025-08-14, command
'/Users/giffler/.cache/pre-commit/repoecmh3ah8/py_env-python3.13/bin/changelog docs/source/changes compile --categories Added Changed Fixed Security Deprecated --output=docs/source/changelog.rst'
based on the format of 'https://keepachangelog.com/'

#########
Expand Down
156 changes: 138 additions & 18 deletions tardis/adapters/batchsystems/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,134 @@
from ...interfaces.batchsystemadapter import MachineStatus
from ...interfaces.executor import Executor
from ...utilities.executors.shellexecutor import ShellExecutor
from ...utilities.utils import htcondor_cmd_option_formatter
from ...utilities.utils import csv_parser
from ...utilities.utils import (
csv_parser,
htcondor_cmd_option_formatter,
htcondor_status_cmd_composer,
)
from ...utilities.asynccachemap import AsyncCacheMap
from ...utilities.attributedict import AttributeDict

from datetime import datetime, timedelta
from functools import partial
from shlex import quote
from types import MappingProxyType
from typing import Iterable
import logging

logger = logging.getLogger("cobald.runtime.tardis.adapters.batchsystem.htcondor")


async def htcondor_get_collectors(
options: AttributeDict, executor: Executor
) -> list[str]:
"""
Asynchronously retrieve a list of HTCondor collector machine names.

Runs ``condor_status -collector -af:t Machine`` (plus any additional formatted
options) using the provided executor, then parses the tab-delimited output
to extract and return the list of collector machine names.

:param options: Additional options for the ``condor_status`` call, such as
``{'pool': 'htcondor.example'}``, which will be formatted and appended
to the command.
:type options: AttributeDict
:param executor: Executor used to run the ``condor_status`` command
asynchronously.
:type executor: Executor
:return: List of collector machine names.
:rtype: list[str]
"""
class_ads = AttributeDict(Machine="Machine")
# Add collector query option, copy options since it is mutable
options = AttributeDict(collector=None, **options)
cmd = htcondor_status_cmd_composer(
attributes=class_ads,
options=options,
)

try:
condor_status = await executor.run_command(cmd)
except CommandExecutionFailure as cef:
logger.warning(f"condor_status could not be executed due to {cef}!")
raise

return [
row["Machine"]
for row in csv_parser(
input_csv=condor_status.stdout,
fieldnames=tuple(class_ads.keys()),
delimiter="\t",
)
]


async def htcondor_get_collector_start_dates(
options: AttributeDict, executor: Executor
) -> list[datetime]:
"""
Asynchronously retrieve the master daemon start times from HTCondor for machines
running a collector daemon as well. Assuming both daemons have a similar start date.
Due to potential bug/feature in HTCondor, the DaemonStartTime of the Collector can
not be used directly.
(see https://www-auth.cs.wisc.edu/lists/htcondor-users/2025-July/msg00092.shtml)

Runs ``condor_status -master -af:t Machine DaemonStartTime`` (plus any
additional formatted options) using the provided executor, and parses
the tab-delimited output into a dictionary.

:param options: Additional options for the ``condor_status`` call, such as
``{'pool': 'htcondor.example'}``, which will be formatted and appended
to the command.
:type options: AttributeDict
:param executor: Executor used to run the ``condor_status`` command
asynchronously.
:type executor: Executor
:return: List of master daemon start time for host running a collector as well.
(in datetime format).
:rtype: list[datetime]
"""
class_ads = AttributeDict(Machine="Machine", DaemonStartTime="DaemonStartTime")
htcondor_collectors = await htcondor_get_collectors(options, executor)

# Add master query option, copy options since it is mutable
options = AttributeDict(master=None, **options)

cmd = htcondor_status_cmd_composer(
attributes=class_ads,
options=options,
)

try:
condor_status = await executor.run_command(cmd)
except CommandExecutionFailure as cef:
logger.warning(f"condor_status could not be executed due to {cef}!")
raise

return [
datetime.fromtimestamp(int(row["DaemonStartTime"]))
for row in csv_parser(
input_csv=condor_status.stdout,
fieldnames=tuple(class_ads.keys()),
delimiter="\t",
)
if row["Machine"] in htcondor_collectors
]


async def htcondor_status_updater(
options: AttributeDict, attributes: AttributeDict, executor: Executor
options: AttributeDict,
attributes: AttributeDict,
executor: Executor,
ro_cached_data: MappingProxyType,
) -> dict:
"""
Helper function to call ``condor_status -af`` asynchronously and to translate
the output into a dictionary
the output into a dictionary.

If the HTCondor Collector has been running for less than 3600 seconds,
previously cached status data is used for machines that were already
available before the restart; otherwise, fresh status data is used.

:param options: Additional options for the condor_status call. For example
``{'pool': 'htcondor.example'}`` will be translated into
Expand All @@ -31,24 +140,35 @@ async def htcondor_status_updater(
:param attributes: Additional fields to add to output of the
``condor_status -af`` response.
:type attributes: AttributeDict
:return: Dictionary containing the output of the ``condor_status`` command
:param executor: Executor to run the ``condor_status`` command asynchronously.
:type executor: Executor
:param ro_cached_data: Cached output from previous ``condor_status -af`` call
:type ro_cached_data: MappingProxyType
:return: Dictionary containing the processed output of the ``condor_status``
command, possibly merged with cached data depending on collector uptime.
:rtype: dict
"""

attributes_string = f'-af:t {" ".join(attributes.values())}'
collector_start_dates = await htcondor_get_collector_start_dates(options, executor)

options_string = htcondor_cmd_option_formatter(options)
cmd = htcondor_status_cmd_composer(
attributes=attributes,
options=options,
constraint="PartitionableSlot=?=True",
)

cmd = f"condor_status {attributes_string} -constraint PartitionableSlot=?=True"

if options_string:
cmd = f"{cmd} {options_string}"

htcondor_status = {}
if (datetime.now() - max(collector_start_dates)) < timedelta(seconds=3600):
# If any collector has been running for less than 3600 seconds,
# use cached status for machines that were already available before the
# restart and update it with fresh data if available.
htcondor_status = {**ro_cached_data}
else:
htcondor_status = {}

try:
logger.debug(f"HTCondor status update is running. Command: {cmd}")
condor_status = await executor.run_command(cmd)

for row in csv_parser(
input_csv=condor_status.stdout,
fieldnames=tuple(attributes.keys()),
Expand All @@ -57,7 +177,6 @@ async def htcondor_status_updater(
):
status_key = row["TardisDroneUuid"] or row["Machine"].split(".")[0]
htcondor_status[status_key] = row

except CommandExecutionFailure as cef:
logger.warning(f"condor_status could not be executed due to {cef}!")
raise
Expand All @@ -81,7 +200,7 @@ def __init__(self):
try:
self.htcondor_options = config.BatchSystem.options
except AttributeError:
self.htcondor_options = {}
self.htcondor_options = AttributeDict()

attributes = dict(
Machine="Machine",
Expand All @@ -101,6 +220,7 @@ def __init__(self):
self._executor,
),
max_age=config.BatchSystem.max_age * 60,
update_coroutine_receives_ro_cache=True,
)

async def disintegrate_machine(self, drone_uuid: str) -> None:
Expand Down Expand Up @@ -245,11 +365,11 @@ async def get_utilisation(self, drone_uuid: str) -> float:
@property
def machine_meta_data_translation_mapping(self) -> AttributeDict:
"""
The machine meta data translation mapping is used to translate units of
the machine meta data in ``TARDIS`` to values expected by the
The machine metadata translation mapping is used to translate units of
the machine metadata in ``TARDIS`` to values expected by the
HTCondor batch system adapter.

:return: Machine meta data translation mapping
:return: Machine metadata translation mapping
:rtype: AttributeDict
"""
return AttributeDict(Cores=1, Memory=1024, Disk=1024 * 1024)
24 changes: 22 additions & 2 deletions tardis/utilities/asynccachemap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from collections.abc import Mapping
from datetime import datetime
from datetime import timedelta
from functools import partial
from types import MappingProxyType

import asyncio
import logging
Expand All @@ -11,10 +13,16 @@


class AsyncCacheMap(Mapping):
def __init__(self, update_coroutine, max_age: int = 60 * 15):
def __init__(
self,
update_coroutine,
max_age: int = 60 * 15,
update_coroutine_receives_ro_cache: bool = False,
):
self._update_coroutine = update_coroutine
self._max_age = max_age
self._last_update = datetime.fromtimestamp(0)
self._update_coroutine_receives_cache = update_coroutine_receives_ro_cache
self._data = {}
self._lock = None

Expand All @@ -26,17 +34,27 @@ def _async_lock(self):
self._lock = asyncio.Lock()
return self._lock

@property
def read_only_cache(self):
return MappingProxyType(self._data)

@property
def last_update(self) -> datetime:
return self._last_update

@property
def update_coroutine(self):
if self._update_coroutine_receives_cache:
return partial(self._update_coroutine, self.read_only_cache)
return self._update_coroutine

async def update_status(self) -> None:
current_time = datetime.now()

async with self._async_lock:
if (current_time - self._last_update) > timedelta(seconds=self._max_age):
try:
data = await self._update_coroutine()
data = await self.update_coroutine()
except json.decoder.JSONDecodeError as je:
logger.warning(
f"AsyncMap update_status failed: Could not decode json {je}"
Expand Down Expand Up @@ -66,4 +84,6 @@ def __eq__(self, other):
and self._last_update == other._last_update
and self._data == other._data
and self._lock == other._lock
and self._update_coroutine_receives_cache
== other._update_coroutine_receives_cache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is extremely specific now. I guess you can just check for identity as return self is other (which then also doesn't need the type check).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed that as well. Thanks for spotting this.

)
46 changes: 41 additions & 5 deletions tardis/utilities/utils.py
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really part of the feature, but fixing Type Hints is always good when visting the code anyhow.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from contextlib import contextmanager
from io import StringIO
from typing import Any, Callable, Dict, List, TypeVar, Tuple
from typing import Any, Callable, Dict, Optional, TypeVar, Union


import csv
Expand All @@ -24,11 +24,47 @@ def htcondor_cmd_option_formatter(options: AttributeDict) -> str:
return cmd_option_formatter(options, prefix="-", separator=" ")


def htcondor_status_cmd_composer(
attributes: AttributeDict,
options: Optional[AttributeDict] = None,
constraint: Optional[str] = None,
) -> str:
"""
Composes an `condor_status` command string from attributes (classads), options,
and an optional constraint. This function does not execute the command,
it only returns the assembled command string.

:param attributes: Mapping of attribute names to values, used to construct
the `-af:t` argument.
:type attributes: AttributeDict
:param options: Additional HTCondor command-line options, formatted by
`htcondor_cmd_option_formatter`.
:type options: Optional[AttributeDict]
:param constraint: Constraint expression to filter results
(e.g., "PartitionableSlot==True").
:type constraint: Optional[str]
:return: Fully assembled `condor_status` command string.
:rtype: str
"""
attributes_string = f'-af:t {" ".join(attributes.values())}' # noqa: E231

cmd = f"condor_status {attributes_string}"

if constraint:
cmd = f"{cmd} -constraint {constraint}"

if options:
options_string = htcondor_cmd_option_formatter(options)
cmd = f"{cmd} {options_string}"

return cmd


def csv_parser(
input_csv: str,
fieldnames: [List, Tuple],
fieldnames: Union[list[str], tuple[str, ...]],
delimiter: str = "\t",
replacements: dict = None,
replacements: Optional[dict[str, Any]] = None,
skipinitialspace: bool = False,
skiptrailingspace: bool = False,
):
Expand All @@ -38,11 +74,11 @@ def csv_parser(
:param input_csv: CSV formatted input
:type input_csv: str
:param fieldnames: corresponding field names
:type fieldnames: [List, Tuple]
:type fieldnames: Union[List[str], Tuple[str, ...]]
:param delimiter: delimiter between entries
:type delimiter: str
:param replacements: fields to be replaced
:type replacements: dict
:type replacements: Optional[Dict[str, str]]
:param skipinitialspace: ignore whitespace immediately following the delimiter
:type skipinitialspace: bool
:param skiptrailingspace: ignore whitespace at the end of each csv row
Expand Down
Loading
Loading