Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ Alexander Haas <[email protected]>
Eileen Kuehn <[email protected]>
matthias.schnepf <[email protected]>
ubdsv <[email protected]>
Max Kühn <[email protected]>
Rene Caspart <[email protected]>
Leon Schuhmacher <[email protected]>
R. Florian von Cube <[email protected]>
Benjamin Rottler <[email protected]>
Max Kühn <[email protected]>
Sebastian Wozniewski <[email protected]>
mschnepf <[email protected]>
swozniewski <[email protected]>
Expand Down
4 changes: 2 additions & 2 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.. Created by changelog.py at 2025-07-09, command
'/Users/giffler/.cache/pre-commit/repoecmh3ah8/py_env-python3.12/bin/changelog docs/source/changes compile --categories Added Changed Fixed Security Deprecated --output=docs/source/changelog.rst'
.. Created by changelog.py at 2025-08-13, command
'/Users/giffler/.cache/pre-commit/repoecmh3ah8/py_env-python3.13/bin/changelog docs/source/changes compile --categories Added Changed Fixed Security Deprecated --output=docs/source/changelog.rst'
based on the format of 'https://keepachangelog.com/'

#########
Expand Down
121 changes: 115 additions & 6 deletions tardis/adapters/batchsystems/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,115 @@
from ...utilities.asynccachemap import AsyncCacheMap
from ...utilities.attributedict import AttributeDict

from datetime import datetime, timedelta
from functools import partial
from shlex import quote
from types import MappingProxyType
from typing import Iterable
import logging

logger = logging.getLogger("cobald.runtime.tardis.adapters.batchsystem.htcondor")


async def htcondor_get_collectors(
options: AttributeDict, executor: Executor
) -> list[str]:
"""
Asynchronously retrieve a list of HTCondor collector machine names.

Runs ``condor_status -collector -af:t Machine`` (plus any additional formatted
options) using the provided executor, then parses the tab-delimited output
to extract and return the list of collector machine names.

:param options: Additional options for the ``condor_status`` call, such as
``{'pool': 'htcondor.example'}``, which will be formatted and appended
to the command.
:type options: AttributeDict
:param executor: Executor used to run the ``condor_status`` command
asynchronously.
:type executor: Executor
:return: List of collector machine names.
:rtype: list[str]
"""
options_string = htcondor_cmd_option_formatter(options)
class_ads = ("Machine",)
cmd = f'condor_status -collector -af:t {" ".join(class_ads)}' # noqa: E231

if options_string:
cmd = f"{cmd} {options_string}"

condor_status = await executor.run_command(cmd)

return [
row["Machine"]
for row in csv_parser(
input_csv=condor_status.stdout,
fieldnames=class_ads,
delimiter="\t",
)
]


async def htcondor_get_collector_start_dates(
options: AttributeDict, executor: Executor
) -> list[datetime]:
"""
Asynchronously retrieve the master daemon start times from HTCondor for machines
running a collector daemon as well. Assuming both daemons have a similar start date.
Due to potential bug/feature in HTCondor, the DaemonStartTime of the Collector can
not be used directly.
(see https://www-auth.cs.wisc.edu/lists/htcondor-users/2025-July/msg00092.shtml)

Runs ``condor_status -master -af:t Machine DaemonStartTime`` (plus any
additional formatted options) using the provided executor, and parses
the tab-delimited output into a dictionary.

:param options: Additional options for the ``condor_status`` call, such as
``{'pool': 'htcondor.example'}``, which will be formatted and appended
to the command.
:type options: AttributeDict
:param executor: Executor used to run the ``condor_status`` command
asynchronously.
:type executor: Executor
:return: List of master daemon start time for host running a collector as well.
(in datetime format).
:rtype: list[datetime]
"""
options_string = htcondor_cmd_option_formatter(options)
class_ads = ("Machine", "DaemonStartTime")
htcondor_collectors = await htcondor_get_collectors(options, executor)

cmd = f'condor_status -master -af:t {" ".join(class_ads)}' # noqa: E231

if options_string:
cmd = f"{cmd} {options_string}"

condor_status = await executor.run_command(cmd)

return [
datetime.fromtimestamp(int(row["DaemonStartTime"]))
for row in csv_parser(
input_csv=condor_status.stdout,
fieldnames=class_ads,
delimiter="\t",
)
if row["Machine"] in htcondor_collectors
]


async def htcondor_status_updater(
options: AttributeDict, attributes: AttributeDict, executor: Executor
options: AttributeDict,
attributes: AttributeDict,
executor: Executor,
ro_cached_data: MappingProxyType,
) -> dict:
"""
Helper function to call ``condor_status -af`` asynchronously and to translate
the output into a dictionary
the output into a dictionary.

If the HTCondor Collector has been running for less than 3600 seconds,
previously cached status data is used for machines that were already
available before the restart; otherwise, fresh status data is used.

:param options: Additional options for the condor_status call. For example
``{'pool': 'htcondor.example'}`` will be translated into
Expand All @@ -31,11 +126,18 @@ async def htcondor_status_updater(
:param attributes: Additional fields to add to output of the
``condor_status -af`` response.
:type attributes: AttributeDict
:return: Dictionary containing the output of the ``condor_status`` command
:param executor: Executor to run the ``condor_status`` command asynchronously.
:type executor: Executor
:param ro_cached_data: Cached output from previous ``condor_status -af`` call
:type ro_cached_data: MappingProxyType
:return: Dictionary containing the processed output of the ``condor_status``
command, possibly merged with cached data depending on collector uptime.
:rtype: dict
"""

attributes_string = f'-af:t {" ".join(attributes.values())}'
collector_start_dates = await htcondor_get_collector_start_dates(options, executor)

attributes_string = f'-af:t {" ".join(attributes.values())}' # noqa: E231

options_string = htcondor_cmd_option_formatter(options)

Expand All @@ -44,11 +146,18 @@ async def htcondor_status_updater(
if options_string:
cmd = f"{cmd} {options_string}"

htcondor_status = {}
if (datetime.now() - max(collector_start_dates)) < timedelta(seconds=3600):
# If any collector has been running for less than 3600 seconds,
# use cached status for machines that were already available before the
# restart and update it with fresh data if available.
htcondor_status = {**ro_cached_data}
else:
htcondor_status = {}

try:
logger.debug(f"HTCondor status update is running. Command: {cmd}")
condor_status = await executor.run_command(cmd)

for row in csv_parser(
input_csv=condor_status.stdout,
fieldnames=tuple(attributes.keys()),
Expand All @@ -57,7 +166,6 @@ async def htcondor_status_updater(
):
status_key = row["TardisDroneUuid"] or row["Machine"].split(".")[0]
htcondor_status[status_key] = row

except CommandExecutionFailure as cef:
logger.warning(f"condor_status could not be executed due to {cef}!")
raise
Expand Down Expand Up @@ -101,6 +209,7 @@ def __init__(self):
self._executor,
),
max_age=config.BatchSystem.max_age * 60,
update_coroutine_receives_ro_cache=True,
)

async def disintegrate_machine(self, drone_uuid: str) -> None:
Expand Down
22 changes: 20 additions & 2 deletions tardis/utilities/asynccachemap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from collections.abc import Mapping
from datetime import datetime
from datetime import timedelta
from functools import partial
from types import MappingProxyType

import asyncio
import logging
Expand All @@ -11,10 +13,16 @@


class AsyncCacheMap(Mapping):
def __init__(self, update_coroutine, max_age: int = 60 * 15):
def __init__(
self,
update_coroutine,
max_age: int = 60 * 15,
update_coroutine_receives_ro_cache: bool = False,
):
self._update_coroutine = update_coroutine
self._max_age = max_age
self._last_update = datetime.fromtimestamp(0)
self._update_coroutine_receives_cache = update_coroutine_receives_ro_cache
self._data = {}
self._lock = None

Expand All @@ -26,17 +34,27 @@ def _async_lock(self):
self._lock = asyncio.Lock()
return self._lock

@property
def read_only_cache(self):
return MappingProxyType(self._data)

@property
def last_update(self) -> datetime:
return self._last_update

@property
def update_coroutine(self):
if self._update_coroutine_receives_cache:
return partial(self._update_coroutine, self.read_only_cache)
return self._update_coroutine

async def update_status(self) -> None:
current_time = datetime.now()

async with self._async_lock:
if (current_time - self._last_update) > timedelta(seconds=self._max_age):
try:
data = await self._update_coroutine()
data = await self.update_coroutine()
except json.decoder.JSONDecodeError as je:
logger.warning(
f"AsyncMap update_status failed: Could not decode json {je}"
Expand Down
10 changes: 5 additions & 5 deletions tardis/utilities/utils.py
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really part of the feature, but fixing Type Hints is always good when visting the code anyhow.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from contextlib import contextmanager
from io import StringIO
from typing import Any, Callable, Dict, List, TypeVar, Tuple
from typing import Any, Callable, Dict, Optional, TypeVar, Union


import csv
Expand All @@ -26,9 +26,9 @@ def htcondor_cmd_option_formatter(options: AttributeDict) -> str:

def csv_parser(
input_csv: str,
fieldnames: [List, Tuple],
fieldnames: Union[list[str], tuple[str, ...]],
delimiter: str = "\t",
replacements: dict = None,
replacements: Optional[dict[str, Any]] = None,
skipinitialspace: bool = False,
skiptrailingspace: bool = False,
):
Expand All @@ -38,11 +38,11 @@ def csv_parser(
:param input_csv: CSV formatted input
:type input_csv: str
:param fieldnames: corresponding field names
:type fieldnames: [List, Tuple]
:type fieldnames: Union[List[str], Tuple[str, ...]]
:param delimiter: delimiter between entries
:type delimiter: str
:param replacements: fields to be replaced
:type replacements: dict
:type replacements: Optional[Dict[str, str]]
:param skipinitialspace: ignore whitespace immediately following the delimiter
:type skipinitialspace: bool
:param skiptrailingspace: ignore whitespace at the end of each csv row
Expand Down
Loading
Loading