Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4897ddf
refactor: first step to outsource query command
cmeesters Aug 12, 2025
44c42d7
refactor: working on the interface - attempt to make an option optional
cmeesters Aug 13, 2025
82cae3f
feat: adjusting dynamiccally adjust for query wait times instead of a…
cmeesters Aug 18, 2025
c484c90
feat: new switch between sacct and squeue as status commands
cmeesters Sep 19, 2025
7220326
fix: formatting
cmeesters Sep 19, 2025
7f7c5fb
feat: changed toml file to respect the 80 char limit
cmeesters Sep 19, 2025
383de18
fix: formatting
cmeesters Sep 19, 2025
b39af85
fix: the line lenght limit is 88
cmeesters Sep 19, 2025
8d9624a
fix: formatting
cmeesters Sep 19, 2025
1e2bb00
fix: formatting
cmeesters Sep 19, 2025
2702510
fix: superflous import
cmeesters Sep 19, 2025
419180a
fix: formatting
cmeesters Sep 19, 2025
bc851dd
Update snakemake_executor_plugin_slurm/job_status_query.py
cmeesters Sep 19, 2025
5a6cacb
Merge branch 'main' into feat/test_job_query
cmeesters Nov 21, 2025
5c3450b
fix: formatting2
cmeesters Nov 21, 2025
e8e360a
fix: corrected messages and logic behind them
cmeesters Nov 21, 2025
1dae920
fix: formatting
cmeesters Nov 21, 2025
3b19f8c
feat: finally enable command switch
cmeesters Nov 21, 2025
afb3026
fix: removed unused import
cmeesters Nov 21, 2025
d189548
fix: unicode to ascii x
cmeesters Nov 21, 2025
84abbc3
fix: clearer warning message for low MinAge settings
cmeesters Nov 21, 2025
0957dfb
fix: adjusted wording
cmeesters Nov 24, 2025
ca69118
Merge branch 'main' into feat/test_job_query
cmeesters Nov 27, 2025
9018ea8
fix: better message for low values of MinJobAge
cmeesters Nov 27, 2025
3f7970b
Fix typo in comment about command generation
cmeesters Nov 27, 2025
afc48dc
feat: added validators for cli settigns
cmeesters Nov 27, 2025
b650eaa
feat: new improved cli validation
cmeesters Nov 27, 2025
a40a3c0
fix: formatting
cmeesters Nov 27, 2025
41a52fd
fix: init_seconds_before_status_checks must be a positive integer - a…
cmeesters Nov 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 164 additions & 27 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import csv
from io import StringIO
import logging
import os
from pathlib import Path
import re
Expand Down Expand Up @@ -32,76 +33,131 @@
delete_empty_dirs,
set_gres_string,
)
from .job_status_query import (
get_min_job_age,
is_query_tool_available,
should_recommend_squeue_status_command,
)
from .efficiency_report import create_efficiency_report
from .submit_string import get_submit_command


def _get_status_command_default():
"""Get smart default for status_command based on cluster configuration."""
sacct_available = is_query_tool_available("sacct")
squeue_available = is_query_tool_available("squeue")
# squeue is assumed to always be available on SLURM clusters

if not squeue_available and not sacct_available:
raise WorkflowError(
"Neither 'sacct' nor 'squeue' commands are available on this system. "
"At least one of these commands is required for job status queries."
)
if sacct_available:
return "sacct"
else:
return "squeue"


def _get_status_command_help():
"""Get help text with computed default."""
default_cmd = _get_status_command_default()
sacct_available = is_query_tool_available("sacct")
squeue_recommended = should_recommend_squeue_status_command()

base_help = "Command to query job status. Options: 'sacct', 'squeue'. "

if default_cmd == "sacct":
if sacct_available and not squeue_recommended:
info = (
"'sacct' detected and will be used "
"(MinJobAge may be too low for reliable 'squeue' usage)"
)
else:
info = "'sacct' detected and will be used"
else: # default_cmd == "squeue"
if squeue_recommended:
# cumbersome, due to black and the need to stay below 80 chars
msg_part1 = "'squeue' recommended (MinJobAge is sufficient )"
msg_part2 = " for reliable usage"
info = msg_part1 + msg_part2
elif not sacct_available:
info = (
"'sacct' not available, falling back to 'squeue'. "
"WARNING: 'squeue' may not work reliably if MinJobAge is "
"too low"
)
else:
info = (
"'squeue' will be used. "
"WARNING: MinJobAge may be too low for reliable 'squeue' usage"
)

return (
f"{base_help}Default: '{default_cmd}' ({info}). "
f"Set explicitly to override auto-detection."
)


@dataclass
class ExecutorSettings(ExecutorSettingsBase):
"""Settings for the SLURM executor plugin."""

logdir: Optional[Path] = field(
default=None,
metadata={
"help": "Per default the SLURM log directory is relative to "
"the working directory."
"the working directory. "
"This flag allows to set an alternative directory.",
"env_var": False,
"required": False,
},
)

keep_successful_logs: bool = field(
default=False,
metadata={
"help": "Per default SLURM log files will be deleted upon sucessful "
"completion of a job. Whenever a SLURM job fails, its log "
"file will be preserved. "
"help": "Per default SLURM log files will be deleted upon "
"successful completion of a job. Whenever a SLURM job fails, "
"its log file will be preserved. "
"This flag allows to keep all SLURM log files, even those "
"of successful jobs.",
"env_var": False,
"required": False,
},
)

delete_logfiles_older_than: Optional[int] = field(
default=10,
metadata={
"help": "Per default SLURM log files in the SLURM log directory "
"of a workflow will be deleted after 10 days. For this, "
"best leave the default log directory unaltered. "
"Setting this flag allows to change this behaviour. "
"If set to <=0, no old files will be deleted. ",
"If set to <=0, no old files will be deleted.",
},
)

init_seconds_before_status_checks: Optional[int] = field(
default=40,
metadata={
"help": "Defines the time in seconds before the first status "
"check is performed after job submission.",
"env_var": False,
"required": False,
},
)
status_attempts: Optional[int] = field(
default=5,
metadata={
"help": "Defines the number of attempts to query the status of "
"all active jobs. If the status query fails, the next attempt "
"will be performed after the next status check interval."
"The default is 5 status attempts before giving up. The maximum "
"time between status checks is 180 seconds.",
"env_var": False,
"required": False,
"check is performed on submitted jobs.",
},
)

requeue: bool = field(
default=False,
metadata={
"help": "Allow requeuing preempted of failed jobs, "
"help": "Requeue jobs if they fail with exit code != 0, "
"if no cluster default. Results in "
"`sbatch ... --requeue ...` "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
)

no_account: bool = field(
default=False,
metadata={
Expand All @@ -111,6 +167,7 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)

efficiency_report: bool = field(
default=False,
metadata={
Expand All @@ -120,6 +177,7 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)

efficiency_report_path: Optional[Path] = field(
default=None,
metadata={
Expand All @@ -132,14 +190,39 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)

efficiency_threshold: Optional[float] = field(
default=0.8,
metadata={
"help": "The efficiency threshold for the efficiency report. "
"Jobs with an efficiency below this threshold will be reported. "
"This flag has no effect, if not set.",
"help": "Threshold for efficiency report. "
"Jobs with efficiency below this threshold will be reported.",
"env_var": False,
"required": False,
},
)

status_command: Optional[str] = field(
default_factory=_get_status_command_default,
metadata={
"help": _get_status_command_help(),
"env_var": False,
"required": False,
},
)

status_attempts: Optional[int] = field(
default=5,
metadata={
"help": "Defines the number of attempts to query the status of "
"all active jobs. If the status query fails, the next attempt "
"will be performed after the next status check interval. "
"The default is 5 status attempts before giving up. The maximum "
"time between status checks is 180 seconds.",
"env_var": False,
"required": False,
},
)

qos: Optional[str] = field(
default=None,
metadata={
Expand All @@ -148,6 +231,7 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)

reservation: Optional[str] = field(
default=None,
metadata={
Expand All @@ -157,6 +241,11 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

def __post_init__(self):
"""Validate settings after initialization."""
# Add any validation logic here if needed in the future
pass


# Required:
# Specify common settings shared by various executors.
Expand All @@ -176,9 +265,6 @@ class ExecutorSettings(ExecutorSettingsBase):
pass_default_resources_args=True,
pass_envvar_declarations_to_cmd=False,
auto_deploy_default_storage_provider=False,
# wait a bit until slurmdbd has job info available
init_seconds_before_status_checks=40,
pass_group_args=True,
)


Expand All @@ -200,6 +286,57 @@ def __post_init__(self, test_mode: bool = False):
else Path(".snakemake/slurm_logs").resolve()
)

# Validate status_command configuration if the field exists
self._validate_status_command_settings()

def _validate_status_command_settings(self):
"""Validate and provide feedback about status_command configuration."""
if hasattr(self.workflow.executor_settings, "status_command"):
status_command = self.workflow.executor_settings.status_command
if status_command:
min_job_age = get_min_job_age()
sacct_available = is_query_tool_available("sacct")

# Calculate dynamic threshold: 3 times the initial status check interval
# The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum
between_status_check_seconds = getattr(
self.workflow.executor_settings, "seconds_between_status_checks", 70
)
dynamic_check_threshold = 3 * between_status_check_seconds

if not sacct_available and status_command == "sacct":
self.logger.warning(
"The 'sacct' command is not available on this system. "
"Using 'squeue' instead for job status queries."
)
elif sacct_available and min_job_age is not None:
if (
min_job_age < dynamic_check_threshold
and status_command == "sacct"
):
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). "
f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. "
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
)
elif (
min_job_age >= dynamic_check_threshold
and status_command == "squeue"
):
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). "
f"The 'squeue' command should work reliably for status queries. "
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
)

def get_status_command(self):
"""Get the status command to use, with fallback logic."""
if hasattr(self.workflow.executor_settings, "status_command"):
return self.workflow.executor_settings.status_command
else:
# Fallback: determine the best command based on cluster configuration
return _get_status_command_default()

def shutdown(self) -> None:
"""
Shutdown the executor.
Expand Down
Loading
Loading