Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pandas = "^2.2.3"
[tool.coverage.run]
omit = [".*", "*/site-packages/*", "Snakefile"]

[tool.black]
line-length = 88

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
8 changes: 5 additions & 3 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
)
from .efficiency_report import create_efficiency_report
from .submit_string import get_submit_command
from .validation import validate_or_get_slurm_job_id, validate_slurm_extra
from .partitions import read_partition_file, get_best_partition
from .validation import validate_slurm_extra


@dataclass
Expand Down Expand Up @@ -420,8 +420,10 @@ def run_job(self, job: JobExecutorInterface):
# To extract the job id we split by semicolon and take the first element
# (this also works if no cluster name was provided)
slurm_jobid = out.strip().split(";")[0]
if not slurm_jobid:
raise WorkflowError("Failed to retrieve SLURM job ID from sbatch output.")
# this slurm_jobid might be wrong: some cluster admin give convoluted
# sbatch outputs. So we need to validate it properly (and replace it
# if necessary).
slurm_jobid = validate_or_get_slurm_job_id(slurm_jobid, out)
slurm_logfile = slurm_logfile.with_name(
slurm_logfile.name.replace("%j", slurm_jobid)
)
Expand Down
55 changes: 55 additions & 0 deletions snakemake_executor_plugin_slurm/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,61 @@
from snakemake_interface_common.exceptions import WorkflowError


def validate_or_get_slurm_job_id(job_id, output):
"""
Validate that the SLURM job ID is a positive integer.

Args:
job_id (str): The SLURM job ID to validate.
output (str): The full sbatch output to parse if job_id is invalid.

Raises:
WorkflowError: If the job ID is not a positive integer or we cannot
determine a valid job ID from the given input string.
"""
# this regex just matches a positive integer
# strict validation would require to check for a JOBID with either
# the SLURM database or control daemon. This is too much overhead.
if re.match(r"^\d+$", job_id):
return job_id
else:
# Try matching a positive integer, raise an error if more than one match or
# no match found. Match standalone integers, excluding those followed by %,
# letters, or digits (units/percentages/floats). Allows format: "1234" or
# "1234; clustername" (SLURM multi-cluster format).

# If the first attempt to validate the job fails, try parsing the sbatch output
# a bit more sophisticatedly.
# The regex below matches standalone positive integers with a word boundary
# before the number. The number must NOT be:
# - Part of a decimal number (neither before nor after the dot)
# - Followed by a percent sign with optional space (23% or 23 %)
# - Followed by units/counts with optional space:
# * Memory units: k, K, m, M, g, G, kiB, KiB, miB, MiB, giB, GiB
# * Resource counts: files, cores, hours, cpus/CPUs (case-insensitive)
# * minutes are excluded, because of the match to 'm' for Megabytes
# Units must be followed by whitespace, hyphen, period, or end of string
# Use negative lookbehind to exclude digits after a dot, and negative lookahead
# to exclude digits before a dot or followed by units/percent
matches = re.findall(
r"(?<![.\d])\d+(?![.\d]|\s*%|\s*(?:[kKmMgG](?:iB)?|files|cores|"
r"hours|[cC][pP][uU][sS]?)(?:\s|[-.]|$))",
output,
)
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
raise WorkflowError(
f"Multiple possible SLURM job IDs found in: {output}. "
"Was looking for exactly one positive integer."
)
elif not matches:
raise WorkflowError(
f"No valid SLURM job ID found in: {output}. "
"Was looking for exactly one positive integer."
)


def get_forbidden_slurm_options():
"""
Return a dictionary of forbidden SLURM options that the executor manages.
Expand Down
196 changes: 194 additions & 2 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
)
from snakemake_executor_plugin_slurm.utils import set_gres_string
from snakemake_executor_plugin_slurm.submit_string import get_submit_command

from snakemake_executor_plugin_slurm.validation import validate_slurm_extra
from snakemake_executor_plugin_slurm.validation import (
validate_slurm_extra,
validate_or_get_slurm_job_id,
)
from snakemake_interface_common.exceptions import WorkflowError
import pandas as pd

Expand Down Expand Up @@ -862,3 +864,193 @@ def test_multiple_forbidden_options(self, mock_job):
# Should raise error for job-name (first one encountered)
with pytest.raises(WorkflowError, match=r"job-name.*not allowed"):
validate_slurm_extra(job)


class TestSlurmJobIdValidation:
"""Test cases for the validate_or_get_slurm_job_id function."""

def test_parsable_format_simple(self):
"""Test parsable format with just job ID."""
output = "12345"
result = validate_or_get_slurm_job_id("12345", output)
assert result == "12345"

def test_parsable_format_with_cluster(self):
"""Test parsable format with cluster name (jobid;clustername)."""
output = "54321;mycluster"
result = validate_or_get_slurm_job_id("54321", output)
assert result == "54321"

def test_convoluted_output_with_percentages(self):
"""Test extraction from output containing percentages."""
output = """Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Maecenas quis risus porttitor: 25%
pretium enim volutpat: 23.3%
Submitted batch job 88888
some other text"""
result = validate_or_get_slurm_job_id("88888", output)
assert result == "88888"

def test_convoluted_output_with_units(self):
"""Test extraction from output containing memory/size units."""
output = """System information:
Memory available: 256 GiB
CPU usage: 12 cores
Storage: 500 G
Job ID: 77777
Files processed: 1500 files"""
result = validate_or_get_slurm_job_id("77777", output)
assert result == "77777"

def test_convoluted_output_mixed(self):
"""Test extraction with percentages, decimals, and units mixed."""
output = """Cluster status report
Queue utilization: 75.5%
Memory allocated: 128 KiB per node
Disk usage: 23.3 % of quota
Allocated space: 50 G
Processing 3000 files
Your job 123456 has been submitted
"""
result = validate_or_get_slurm_job_id("123456", output)
assert result == "123456"

def test_job_id_at_beginning(self):
"""Test extraction when job ID appears at the start."""
output = """999888 submitted successfully
Memory: 64 GiB
Nodes: 4"""
result = validate_or_get_slurm_job_id("999888", output)
assert result == "999888"

def test_job_id_in_middle(self):
"""Test extraction when job ID is in the middle of output."""
output = """Configuration loaded: 100%
Job 444555 queued
Estimated wait time: 5.5 minutes"""
result = validate_or_get_slurm_job_id("444555", output)
assert result == "444555"

def test_output_with_lowercase_units(self):
"""Test that lowercase units are properly excluded."""
output = """Memory: 32 m
Storage: 100 k files
Job: 333222"""
result = validate_or_get_slurm_job_id("333222", output)
assert result == "333222"

def test_output_with_cores_and_cpus(self):
"""Test that numbers followed by 'cores' or 'cpus' are excluded."""
output = """System resources:
Allocated: 16 cores
Available CPUs: 32
Using 8 cpus
Active cpu: 1
Job ID: 555666"""
result = validate_or_get_slurm_job_id("555666", output)
assert result == "555666"

def test_output_with_mixed_case_units(self):
"""Test mixed case memory units (MiB, GiB, etc.)."""
output = """Allocated: 512 MiB
Reserved: 2 GiB
Cache: 128 KiB
Job ID is 111222"""
result = validate_or_get_slurm_job_id("111222", output)
assert result == "111222"

def test_decimal_numbers_excluded(self):
"""Test that decimal numbers are not matched as job IDs."""
output = """Performance: 23.3 MB/s
Efficiency: 99.9%
Job: 666777
Load: 1.5"""
result = validate_or_get_slurm_job_id("666777", output)
assert result == "666777"

def test_percentage_with_space(self):
"""Test percentages with space before % sign."""
output = """Completion: 45 %
Progress: 78.5 %
Job ID: 555444"""
result = validate_or_get_slurm_job_id("555444", output)
assert result == "555444"

def test_units_with_hyphen(self):
"""Test units followed by hyphen."""
output = """Memory: 256M-512M range
Job: 888999"""
result = validate_or_get_slurm_job_id("888999", output)
assert result == "888999"

def test_units_with_period(self):
"""Test units followed by period."""
output = """Allocated 128G. Starting job 777888."""
result = validate_or_get_slurm_job_id("777888", output)
assert result == "777888"

def test_multiple_job_ids_error(self):
"""Test that multiple possible job IDs raise an error."""
output = """Previous job: 11111
New job: 22222
Both are active"""
with pytest.raises(
WorkflowError, match=r"Multiple possible SLURM job IDs found"
):
validate_or_get_slurm_job_id("invalid", output)

def test_no_valid_job_id_error(self):
"""Test that output with no valid job ID raises an error."""
output = """Error: 23.3%
Memory: 128 GiB
Status: 99.9% complete"""
with pytest.raises(WorkflowError, match=r"No valid SLURM job ID found"):
validate_or_get_slurm_job_id("invalid", output)

def test_complex_multiline_output(self):
"""Test complex realistic SLURM output."""
output = """
╔══════════════════════════════════════════════════════════════════════════════╗
║ SLURM Job Submission ║
╚══════════════════════════════════════════════════════════════════════════════╝

Cluster Information:
- Queue utilization: 67.8%
- Available memory: 512 GiB
- Free storage: 2.5 TiB (1500 files pending)

Job Configuration:
- Requested memory: 64 GiB
- Requested time: 48.5 hours
- Cores: 16

Submitting job 202411170001 to cluster...

Status:
- Queue position: 23
- Estimated start: 15.3 minutes
"""
result = validate_or_get_slurm_job_id("202411170001", output)
print(result)
assert result == "202411170001"

def test_invalid_job_id_then_extract_from_output(self):
"""Test fallback extraction when initial job_id is malformed."""
output = "Submitted batch job 12345"
# Pass invalid job_id, should extract from output
result = validate_or_get_slurm_job_id("invalid_id", output)
assert result == "12345"

def test_job_id_with_trailing_content(self):
"""Test job ID followed by various trailing content."""
output = "Job 123456 submitted to partition main"
result = validate_or_get_slurm_job_id("123456", output)
assert result == "123456"

def test_numbers_in_filenames_excluded(self):
"""Test that numbers in 'files' counts are excluded."""
output = """Quota 250 files
Used 100 files
Job ID: 999000"""
result = validate_or_get_slurm_job_id("999000", output)
assert result == "999000"