diff --git a/examples/backends/05_slurm_backend_run.py b/examples/backends/05_slurm_backend_run.py new file mode 100644 index 0000000..2aae157 --- /dev/null +++ b/examples/backends/05_slurm_backend_run.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 +""" +ROMPY SLURM Backend Example + +This example demonstrates how to use the SLURM backend to run models on HPC clusters. +The SLURM backend enables resource management and job scheduling for high-performance +computing environments. + +Run this example: + python 05_slurm_backend_run.py + +Note: This example requires access to a SLURM-managed HPC cluster. +""" + +import logging +import tempfile +from datetime import datetime +from pathlib import Path + +from rompy.backends import SlurmConfig +from rompy.core.time import TimeRange +from rompy.model import ModelRun + +# Configure logging +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + +def example_slurm_basic(): + """ + Example 1: Basic SLURM execution + + This example demonstrates the simplest configuration for running a model + on a SLURM cluster with minimal parameters. + """ + logger.info("=" * 60) + logger.info("Example 1: Basic SLURM Execution") + logger.info("=" * 60) + logger.info("This example demonstrates the simplest SLURM backend configuration.") + logger.info("") + + with tempfile.TemporaryDirectory() as temp_dir: + # Create a basic model run + model = ModelRun( + run_id="slurm_basic_example", + period=TimeRange( + start=datetime(2023, 1, 1), + end=datetime(2023, 1, 2), + interval="1H", + ), + output_dir=Path(temp_dir), + delete_existing=True, + ) + + # Basic SLURM configuration + config = SlurmConfig( + queue="general", # SLURM partition name + timeout=1800, # Max execution time in seconds (30 minutes) + nodes=1, # Number of nodes to allocate + ntasks=1, # Number of tasks (processes) to run + cpus_per_task=2, # Number of CPU cores per task + time_limit="00:30:00", # Time limit in HH:MM:SS format + ) + + logger.info(f"SlurmConfig created: {config}") + logger.info("Running model with basic SLURM configuration...") + + try: + # This would submit the job to SLURM (in a real environment) + # success = model.run(backend=config) + # Since we're not in a real SLURM environment, we'll just show the config + logger.info("✅ SlurmConfig validated successfully") + logger.info("Key concepts: SlurmConfig, queue, nodes, ntasks, cpus_per_task") + logger.info("Note: In a real environment, this would submit to SLURM") + except Exception as e: + logger.error(f"❌ SLURM model run failed: {e}") + + +def example_slurm_advanced(): + """ + Example 2: Advanced SLURM execution with multiple parameters + + This example shows how to configure complex SLURM jobs with multiple + resource allocations, environment variables, and custom options. + """ + logger.info("=" * 60) + logger.info("Example 2: Advanced SLURM Configuration") + logger.info("=" * 60) + logger.info("This example demonstrates advanced SLURM backend configuration.") + logger.info("") + + with tempfile.TemporaryDirectory() as temp_dir: + model = ModelRun( + run_id="slurm_advanced_example", + period=TimeRange( + start=datetime(2023, 1, 1), + end=datetime(2023, 1, 3), + interval="1H", + ), + output_dir=Path(temp_dir), + delete_existing=True, + ) + + # Advanced SLURM configuration with many parameters + config = SlurmConfig( + queue="gpu", # GPU partition + timeout=7200, # 2 hours timeout + nodes=2, # 2 compute nodes + ntasks=8, # 8 tasks total + cpus_per_task=4, # 4 CPUs per task + time_limit="02:00:00", # 2 hours time limit + account="research_project", # Account for billing + qos="high", # Quality of Service + reservation="special_reservation", # Reservation name + output_file="slurm-%j.out", # Output file pattern (job ID) + error_file="slurm-%j.err", # Error file pattern + job_name="advanced_simulation", # Name of the SLURM job + mail_type="BEGIN,END,FAIL", # Types of notifications + mail_user="researcher@domain.com", # Email for notifications + additional_options=["--gres=gpu:v100:2", "--exclusive"], # GPU resources + env_vars={ # Environment variables + "OMP_NUM_THREADS": "4", + "MODEL_DEBUG": "true", + "DATA_PATH": "/shared/data", + "RESULTS_PATH": "/shared/results", + }, + ) + + logger.info(f"Advanced SlurmConfig created: {config}") + logger.info("Running model with advanced SLURM configuration...") + + try: + # Show validation success + logger.info("✅ Advanced SlurmConfig validated successfully") + logger.info("Key concepts: account, qos, reservations, GRES, environment variables") + logger.info("Note: In a real environment, this would submit a complex job to SLURM") + except Exception as e: + logger.error(f"❌ Advanced SLURM configuration failed: {e}") + + +def example_slurm_with_custom_command(): + """ + Example 3: SLURM execution with custom command + + This example shows how to run a custom command on the SLURM cluster, + useful for executing different types of jobs or calling external binaries. + """ + logger.info("=" * 60) + logger.info("Example 3: SLURM with Custom Command") + logger.info("=" * 60) + logger.info("This example demonstrates running custom commands on SLURM.") + logger.info("") + + with tempfile.TemporaryDirectory() as temp_dir: + model = ModelRun( + run_id="slurm_custom_command_example", + period=TimeRange( + start=datetime(2023, 1, 1), + end=datetime(2023, 1, 2), + interval="1H", + ), + output_dir=Path(temp_dir), + delete_existing=True, + ) + + # SLURM configuration with a custom command + config = SlurmConfig( + queue="general", + timeout=3600, # 1 hour timeout + nodes=1, + ntasks=1, + cpus_per_task=2, + time_limit="01:00:00", + command="echo 'Running custom SLURM job' && date && pwd && ls -la", # Custom command + env_vars={"CUSTOM_VAR": "value"}, + ) + + logger.info(f"SlurmConfig with custom command: {config}") + logger.info("Running custom command on SLURM...") + + try: + logger.info("✅ SlurmConfig with custom command validated successfully") + logger.info("Key concepts: command parameter, custom execution") + logger.info("Note: In a real environment, this would execute the custom command on SLURM") + except Exception as e: + logger.error(f"❌ SLURM custom command configuration failed: {e}") + + +def example_slurm_from_dict(): + """ + Example 4: Creating SLURM configuration from dictionary + + This example shows how to create SLURM configurations from dictionaries, + which is useful when loading from configuration files (YAML/JSON). + """ + logger.info("=" * 60) + logger.info("Example 4: SLURM Configuration from Dictionary") + logger.info("=" * 60) + logger.info("This example demonstrates creating SLURM configs from dictionaries.") + logger.info("") + + # Simulate loading from YAML/JSON file + slurm_config_data = { + "queue": "compute", + "timeout": 7200, + "nodes": 1, + "ntasks": 4, + "cpus_per_task": 2, + "time_limit": "02:00:00", + "account": "myproject", + "env_vars": { + "OMP_NUM_THREADS": "2", + "MODEL_PRECISION": "double", + "DATA_DIR": "/shared/data" + }, + "job_name": "yaml_configured_job", + "additional_options": ["--mem-per-cpu=2048"] + } + + try: + # Create configuration from dictionary + config = SlurmConfig(**slurm_config_data) + + logger.info("✅ SLURM configuration created from dictionary:") + logger.info(f" Queue: {config.queue}") + logger.info(f" Nodes: {config.nodes}") + logger.info(f" Total CPU cores: {config.ntasks * config.cpus_per_task}") + logger.info(f" Time limit: {config.time_limit}") + logger.info(f" Environment variables: {len(config.env_vars)}") + logger.info("Key concepts: dictionary unpacking, YAML/JSON compatibility") + logger.info("Note: This is how configuration files are loaded in production") + except Exception as e: + logger.error(f"❌ SLURM dictionary configuration failed: {e}") + + +def example_slurm_validation(): + """ + Example 5: SLURM configuration validation + + This example demonstrates ROMPY's built-in validation for SLURM configurations. + The Pydantic model catches configuration errors before runtime. + """ + logger.info("=" * 60) + logger.info("Example 5: SLURM Configuration Validation") + logger.info("=" * 60) + logger.info("This example shows how ROMPY validates SLURM configurations automatically.") + logger.info("") + + from pydantic import ValidationError + + # Valid SLURM configuration + try: + valid_config = SlurmConfig( + queue="general", + timeout=3600, + nodes=1, + ntasks=1, + cpus_per_task=2, + time_limit="01:00:00", + env_vars={"TEST_VAR": "value"} + ) + logger.info("✅ Valid SlurmConfig created successfully") + except Exception as e: + logger.error(f"❌ Valid SLURM config validation failed unexpectedly: {e}") + + # Invalid time limit format + logger.info("Testing invalid time limit format...") + try: + invalid_config = SlurmConfig( + queue="general", + time_limit="25:00", # Invalid format - missing seconds + ) + logger.info("❌ This should not succeed") + except ValidationError as e: + logger.info(f"✅ Validation correctly caught time limit error: {e.errors()[0]['msg']}") + + # Invalid number of nodes (too high) + logger.info("Testing invalid number of nodes...") + try: + invalid_config = SlurmConfig( + queue="general", + nodes=101, # Max is 100 + time_limit="01:00:00" + ) + logger.info("❌ This should not succeed") + except ValidationError as e: + logger.info(f"✅ Validation correctly caught nodes error: {e.errors()[0]['msg']}") + + # Invalid cpus_per_task (too high) + logger.info("Testing invalid CPUs per task...") + try: + invalid_config = SlurmConfig( + queue="general", + cpus_per_task=129, # Max is 128 + time_limit="01:00:00" + ) + logger.info("❌ This should not succeed") + except ValidationError as e: + logger.info(f"✅ Validation correctly caught cpus_per_task error: {e.errors()[0]['msg']}") + + logger.info("Key concepts: Pydantic validation, error handling, configuration safety") + + +def main(): + """Run all SLURM backend examples.""" + logger.info("🚀 ROMPY SLURM Backend Examples") + logger.info("================================") + logger.info("These examples demonstrate how to use ROMPY with SLURM clusters for HPC jobs.") + logger.info("Each example builds on the previous one to show increasingly sophisticated usage.") + logger.info("") + + # Run examples + examples = [ + example_slurm_basic, + example_slurm_advanced, + example_slurm_with_custom_command, + example_slurm_from_dict, + example_slurm_validation, + ] + + completed_examples = 0 + for i, example in enumerate(examples, 1): + try: + logger.info(f"Running example {i}/{len(examples)}...") + example() + completed_examples += 1 + logger.info("") + except Exception as e: + logger.error(f"❌ Example {example.__name__} failed: {e}") + logger.info("") + + logger.info("=" * 60) + logger.info( + f"🎉 SLURM examples completed! ({completed_examples}/{len(examples)} examples ran successfully)" + ) + logger.info("=" * 60) + logger.info("What you learned:") + logger.info("• Basic SLURM execution with SlurmConfig") + logger.info("• Advanced SLURM parameters: queues, nodes, tasks, resources") + logger.info("• Custom commands and environment variables") + logger.info("• Configuration from dictionaries") + logger.info("• Built-in validation for SLURM configurations") + logger.info("") + logger.info("Next steps:") + logger.info("1. Review the SlurmConfig documentation for all available parameters") + logger.info("2. Try these configurations in your actual SLURM environment") + logger.info("3. Create your own SLURM configuration files for your models") + logger.info("4. Combine with other ROMPY features like postprocessing and pipelines") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/backends/README.md b/examples/backends/README.md index d5f2362..3d5d3e3 100644 --- a/examples/backends/README.md +++ b/examples/backends/README.md @@ -1,129 +1,86 @@ -# Backend Examples +# ROMPY SLURM Backend Examples -This directory contains examples demonstrating how to use ROMPY's backend configuration system to execute models in different environments. +This directory contains examples of how to use ROMPY with SLURM for HPC cluster execution. -## Overview +## Examples -ROMPY uses Pydantic-based backend configurations to provide type-safe, validated execution parameters for different environments. This system enables precise control over model execution while maintaining flexibility and extensibility. +### 05_slurm_backend_run.py +A comprehensive tutorial showing different ways to configure and use the SLURM backend: -## Available Examples +- Basic SLURM execution +- Advanced SLURM configuration with multiple parameters +- Custom commands on SLURM +- Creating configurations from dictionaries +- Configuration validation -### 1. Basic Local Run (`01_basic_local_run.py`) -Demonstrates the simplest use case: -- Local execution with `LocalConfig` -- Basic timeout and command configuration -- No-op postprocessing - -### 2. Docker Run (`02_docker_run.py`) -Shows Docker container execution: -- Using pre-built Docker images -- Volume mounting for data access -- Environment variable configuration -- Resource limits (CPU, memory) - -### 3. Custom Postprocessor (`03_custom_postprocessor.py`) -Illustrates custom postprocessing: -- Creating custom postprocessor classes -- Processing model outputs after execution -- Error handling and result reporting - -### 4. Complete Workflow (`04_complete_workflow.py`) -Demonstrates a full workflow: -- Model execution with local backend -- Custom postprocessing with file analysis -- Comprehensive logging and error handling - -## Backend Configuration Types - -### LocalConfig -For execution on the local system: -```python -from rompy.backends import LocalConfig - -config = LocalConfig( - timeout=3600, # 1 hour - command="python run_model.py", - env_vars={"OMP_NUM_THREADS": "4"}, - shell=True, - capture_output=True -) -``` - -### DockerConfig -For execution in Docker containers: -```python -from rompy.backends import DockerConfig - -config = DockerConfig( - image="python:3.9-slim", - cpu=2, - memory="2g", - timeout=7200, - volumes=["/data:/app/data:rw"], - env_vars={"MODEL_CONFIG": "production"} -) +Run the example: +```bash +python 05_slurm_backend_run.py ``` -## Running the Examples +### basic_model_run.py +Creates a basic ModelRun configuration that can be used to test different backend configurations. This provides a consistent model configuration that works across all backends. -Each example can be run directly: +### test_backends_with_modelrun.py +Demonstrates using the basic ModelRun with different backend configurations (Local, Docker, SLURM). This example shows how the same model run can be configured to work across different execution environments. +Run the example: ```bash -# Basic local execution -python 01_basic_local_run.py +python test_backends_with_modelrun.py +``` -# Docker execution (requires Docker) -python 02_docker_run.py +## Configuration Files -# Custom postprocessing -python 03_custom_postprocessor.py +### slurm_backend.yml +A basic configuration file for running jobs on SLURM with minimal parameters. -# Complete workflow -python 04_complete_workflow.py -``` +### slurm_backend_examples.yml +A collection of different SLURM configuration examples: +- Basic SLURM configuration +- Advanced GPU job configuration +- High-memory job configuration +- Custom working directory configuration ## Key Features -- **Type Safety**: All configurations are validated using Pydantic -- **IDE Support**: Full autocompletion and inline documentation -- **Flexibility**: Easy to extend with custom backends and postprocessors -- **Error Handling**: Clear validation errors and execution feedback -- **Serialization**: Configurations can be saved/loaded as YAML/JSON - -## Configuration Validation +The ROMPY SLURM backend supports: -Backend configurations provide comprehensive validation: -- Timeout values must be between 60 and 86400 seconds -- Working directories must exist if specified -- Docker image names must follow valid conventions -- Volume mounts must reference existing host paths +- **Resource allocation**: Specify nodes, tasks, and CPU cores +- **Queue/partition selection**: Run on different SLURM partitions +- **Time limits**: Set job time limits in HH:MM:SS format +- **Environment variables**: Set environment variables for your job +- **Job notifications**: Email notifications on job start/end/failure +- **Custom commands**: Run custom commands instead of the default model run +- **Additional SLURM options**: Pass any additional SLURM options via `additional_options` +- **GPU resources**: Support for GPU allocation via `--gres` options -## Best Practices +## Usage -1. **Set appropriate timeouts** based on your model complexity -2. **Use environment variables** for sensitive configuration -3. **Validate configurations** before execution -4. **Handle errors gracefully** in your postprocessors -5. **Use resource limits** appropriately in Docker configurations +To use the SLURM backend in your application: -## Output Structure +```python +from rompy.backends import SlurmConfig +from rompy.model import ModelRun + +# Create SLURM configuration +config = SlurmConfig( + queue="gpu", # SLURM partition + nodes=2, # Number of nodes + ntasks=8, # Number of tasks + cpus_per_task=4, # CPU cores per task + time_limit="02:00:00", # Time limit + account="research_project", # Account for billing + additional_options=["--gres=gpu:v100:2"], # GPU allocation +) -All examples create output in the `./output` directory with the following structure: +# Create and run your model +model = ModelRun(...) +model.run(backend=config) ``` -output/ -├── / -│ ├── INPUT # Generated model input file -│ ├── datasets/ # Placeholder for input datasets -│ ├── outputs/ # Placeholder for model outputs -│ └── # Any files created during execution -``` - -## Extending the Examples -You can extend these examples by: -- Creating custom backend configurations -- Implementing custom postprocessors -- Adding new execution environments -- Integrating with workflow orchestration systems +## Validation -For more detailed information, see the [Backend Configurations documentation](../../docs/source/backend_configurations.rst). \ No newline at end of file +The SLURM backend includes comprehensive validation: +- Time limit format validation (HH:MM:SS) +- Bounds checking for nodes, CPUs, etc. +- Required field validation \ No newline at end of file diff --git a/examples/backends/basic_model_run.py b/examples/backends/basic_model_run.py new file mode 100644 index 0000000..780fb6a --- /dev/null +++ b/examples/backends/basic_model_run.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +""" +Basic ModelRun Configuration for Backend Testing + +This script creates a simple ModelRun configuration that can be used to test +different backend configurations (local, docker, slurm). +""" + +import tempfile +from datetime import datetime +from pathlib import Path + +from rompy.core.time import TimeRange +from rompy.model import ModelRun + + +def create_basic_model_run(): + """ + Create a basic model run configuration for testing backends. + This creates a minimal model run that can execute a simple command + using different backends. + """ + # Create a temporary directory for output + temp_dir = Path(tempfile.mkdtemp(prefix="rompy_test_")) + + # Create a basic model run + model_run = ModelRun( + run_id="test_backend_run", + period=TimeRange( + start=datetime(2023, 1, 1), + end=datetime(2023, 1, 2), + interval="1H", + ), + output_dir=temp_dir, + delete_existing=True, + ) + + return model_run + + +if __name__ == "__main__": + # Create the basic model run + model = create_basic_model_run() + + print("Basic ModelRun Configuration Created") + print("="*40) + print(f"Run ID: {model.run_id}") + print(f"Output Directory: {model.output_dir}") + print(f"Time Period: {model.period.start} to {model.period.end}") + print(f"Time Interval: {model.period.interval}") + print(f"Delete Existing: {model.delete_existing}") + print() + print("This basic configuration can be used to test different backends.") + print("For example:") + print(" - Local backend: Executes commands on the local machine") + print(" - Docker backend: Runs commands in Docker containers") + print(" - SLURM backend: Submits jobs to HPC clusters") \ No newline at end of file diff --git a/examples/backends/test_backends_with_modelrun.py b/examples/backends/test_backends_with_modelrun.py new file mode 100644 index 0000000..f3002d7 --- /dev/null +++ b/examples/backends/test_backends_with_modelrun.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +""" +Test Backend Configurations with Basic ModelRun + +This script demonstrates how to use the basic ModelRun configuration +with different backend configurations. +""" + +import logging +import tempfile +from datetime import datetime +from pathlib import Path + +from rompy.backends import DockerConfig, LocalConfig, SlurmConfig +from rompy.core.time import TimeRange +from rompy.model import ModelRun + + +# Configure logging +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + +def create_basic_model_run(): + """ + Create a basic model run configuration for testing backends. + """ + temp_dir = Path(tempfile.mkdtemp(prefix="rompy_test_")) + + model_run = ModelRun( + run_id="test_backend_run", + period=TimeRange( + start=datetime(2023, 1, 1), + end=datetime(2023, 1, 2), + interval="1H", + ), + output_dir=temp_dir, + delete_existing=True, + ) + + return model_run + + +def test_local_backend(): + """Test the local backend with basic configuration.""" + logger.info("Testing Local Backend Configuration") + logger.info("-" * 40) + + model = create_basic_model_run() + + # Create local backend configuration + config = LocalConfig( + timeout=1800, # 30 minutes + command="echo 'Running model on local backend' && pwd && date", + env_vars={ + "MODEL_TYPE": "test", + "ENVIRONMENT": "local" + }, + shell=True, + capture_output=True + ) + + logger.info(f"LocalConfig: {config}") + + # Note: In a real environment, you would run: + # success = model.run(backend=config) + # For this example, we'll just validate the configuration works + logger.info("Local backend configuration validated successfully") + logger.info(f"Working directory: {model.output_dir}") + +def test_docker_backend(): + """Test the Docker backend with basic configuration.""" + logger.info("Testing Docker Backend Configuration") + logger.info("-" * 40) + + model = create_basic_model_run() + + # Create Docker backend configuration + config = DockerConfig( + image="python:3.9-slim", + timeout=1800, + cpu=2, + memory="1g", + executable="python -c \"print('Running model in Docker'); import os; print(f'Working in: {os.getcwd()}')\"", + volumes=[f"{model.output_dir}:/app/work:rw"], + env_vars={ + "MODEL_TYPE": "test", + "ENVIRONMENT": "docker", + "PYTHONUNBUFFERED": "1" + } + ) + + logger.info(f"DockerConfig: {config}") + + # Validate the configuration + logger.info("Docker backend configuration validated successfully") + logger.info(f"Working directory: {model.output_dir}") + +def test_slurm_backend(): + """Test the SLURM backend with basic configuration.""" + logger.info("Testing SLURM Backend Configuration") + logger.info("-" * 40) + + model = create_basic_model_run() + + # Create SLURM backend configuration + config = SlurmConfig( + queue="general", + timeout=1800, + nodes=1, + ntasks=1, + cpus_per_task=2, + time_limit="00:30:00", + job_name="test_backend_job", + output_file=f"{model.output_dir}/slurm-%j.out", + error_file=f"{model.output_dir}/slurm-%j.err", + env_vars={ + "MODEL_TYPE": "test", + "ENVIRONMENT": "slurm" + }, + command="echo 'Running model on SLURM backend' && pwd && date && env | grep MODEL" + ) + + logger.info(f"SlurmConfig: {config}") + + # Validate the configuration + logger.info("SLURM backend configuration validated successfully") + logger.info(f"Working directory: {model.output_dir}") + +def main(): + """Run all backend tests.""" + logger.info("Testing Backend Configurations with Basic ModelRun") + logger.info("=" * 50) + logger.info("This script demonstrates how to configure different backends") + logger.info("for the same basic ModelRun configuration.") + # Test all backends + test_local_backend() + test_docker_backend() + test_slurm_backend() + + logger.info("=" * 50) + logger.info("All backend configurations validated successfully!") + logger.info("Next steps:") + logger.info("1. Try running these configurations on actual backend systems") + logger.info("2. Adjust resource requirements based on your needs") + logger.info("3. Add more complex commands or model executables") + logger.info("4. Use the YAML configuration files in examples/configs/") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/configs/README.md b/examples/configs/README.md index 2ec538c..0265b3f 100644 --- a/examples/configs/README.md +++ b/examples/configs/README.md @@ -8,8 +8,12 @@ This directory contains example configuration files for ROMPY backend systems. T - **`local_backend.yml`** - Single-document local backend configuration - **`docker_backend.yml`** - Single-document Docker backend configuration +- **`slurm_backend.yml`** - Single-document SLURM backend configuration +- **`basic_modelrun.yml`** - Basic model run configuration for CLI testing +- **`basic_pipeline.yml`** - Basic pipeline configuration for CLI testing - **`local_backend_examples.yml`** - Multi-document local backend examples - **`docker_backend_examples.yml`** - Multi-document Docker backend examples +- **`slurm_backend_examples.yml`** - Multi-document SLURM backend examples - **`pipeline_config.yml`** - Complete pipeline configuration examples - **`validate_configs.py`** - Validation script for configuration files @@ -92,6 +96,29 @@ rompy pipeline --config pipeline_config.yml | `user` | string | "root" | Container user | | `remove_container` | bool | true | Remove after execution | +### SLURM Backend Options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `queue` | string | - | SLURM partition name (required) | +| `nodes` | int | 1 | Number of compute nodes to allocate (1-100) | +| `ntasks` | int | 1 | Number of tasks (processes) to run | +| `cpus_per_task` | int | 1 | Number of CPU cores per task (1-128) | +| `time_limit` | string | "1:00:00" | Time limit in HH:MM:SS format | +| `account` | string | null | Account for billing/resource tracking | +| `qos` | string | null | Quality of Service for the job | +| `reservation` | string | null | Reservation name to run job under | +| `output_file` | string | null | Output file path for job output | +| `error_file` | string | null | Error file path for job errors | +| `job_name` | string | null | Name for the SLURM job | +| `mail_type` | string | null | Type of mail to send (BEGIN, END, FAIL, etc.) | +| `mail_user` | string | null | Email address for notifications | +| `additional_options` | list | [] | Additional SLURM options (e.g., ['--gres=gpu:1']) | +| `timeout` | int | 3600 | Maximum execution time in seconds (1 minute to 24 hours) | +| `env_vars` | dict | {} | Environment variables for execution | +| `working_dir` | string | null | Working directory for execution | +| `command` | string | null | Optional shell command to run instead of config.run() | + ## Example Configurations ### Local Backend @@ -119,6 +146,64 @@ env_vars: MODEL_THREADS: "4" ``` +### SLURM Backend + +```yaml +backend_type: slurm +config: + queue: "general" + timeout: 7200 + nodes: 2 + ntasks: 8 + cpus_per_task: 4 + time_limit: "02:00:00" + account: "myproject" + additional_options: + - "--gres=gpu:v100:2" + job_name: "simulation_job" + env_vars: + OMP_NUM_THREADS: "4" + MODEL_CONFIG: "production" +``` + +### Basic ModelRun Configuration + +```yaml +run_id: "cli_test_backend_run" +period: + start: "2023-01-01T00:00:00" + end: "2023-01-02T00:00:00" + interval: "1H" +output_dir: "./output/cli_test" +delete_existing: true +``` + +### Basic Pipeline Configuration + +```yaml +pipeline_backend: local + +model_run: + run_id: "cli_test_backend_run" + output_dir: "./output/cli_test" + delete_existing: true + period: + start: "2023-01-01T00:00:00" + end: "2023-01-02T00:00:00" + interval: "1H" + +run_backend: + backend_type: local + timeout: 3600 + command: "echo 'Running basic model test'" + env_vars: + MODEL_TYPE: "test" + ENVIRONMENT: "cli" + +postprocessing: + processor: "noop" +``` + ### Pipeline Configuration ```yaml diff --git a/examples/configs/basic_modelrun.yml b/examples/configs/basic_modelrun.yml new file mode 100644 index 0000000..52e3d8a --- /dev/null +++ b/examples/configs/basic_modelrun.yml @@ -0,0 +1,10 @@ +# Basic ModelRun Configuration for CLI Testing +# This configuration can be used with the ROMPY CLI to test different backends + +run_id: "cli_test_backend_run" +period: + start: "2023-01-01T00:00:00" + end: "2023-01-02T00:00:00" + interval: "1H" +output_dir: "./output/cli_test" +delete_existing: true \ No newline at end of file diff --git a/examples/configs/basic_pipeline.yml b/examples/configs/basic_pipeline.yml new file mode 100644 index 0000000..1249f38 --- /dev/null +++ b/examples/configs/basic_pipeline.yml @@ -0,0 +1,37 @@ +# Complete Pipeline Configuration for CLI Testing +# This demonstrates how to use the basic model run with different backends via CLI + +pipeline_backend: local # or 'docker', 'slurm' depending on your system + +model_run: + run_id: "cli_test_backend_run" + output_dir: "./output/cli_test" + delete_existing: true + period: + start: "2023-01-01T00:00:00" + end: "2023-01-02T00:00:00" + interval: "1H" + +# This would be the backend for the actual model run execution +# Uncomment the appropriate section based on your system: + +# Local backend configuration +run_backend: + backend_type: local + timeout: 3600 + command: "echo 'Running basic model test'" + env_vars: + MODEL_TYPE: "test" + ENVIRONMENT: "cli" + +# To run with local backend: +# rompy run --config basic_modelrun.yml --backend-config local_backend.yml + +# To run with Docker backend: +# rompy run --config basic_modelrun.yml --backend-config docker_backend.yml + +# To run with SLURM backend: +# rompy run --config basic_modelrun.yml --backend-config slurm_backend.yml + +postprocessing: + processor: "noop" # or other available processors \ No newline at end of file diff --git a/examples/configs/docker_backend.yml b/examples/configs/docker_backend.yml index 4faa138..3eff3cc 100644 --- a/examples/configs/docker_backend.yml +++ b/examples/configs/docker_backend.yml @@ -1,20 +1,18 @@ # Docker Backend Configuration # Configuration for executing models in Docker containers -backend_type: docker +type: docker image: "python:3.9-slim" timeout: 7200 # 2 hours cpu: 4 memory: "2g" -executable: "python" +executable: 'bash -c "echo ''Hello from Docker!''"' mpiexec: "" volumes: - - "/tmp:/tmp:rw" - - ".:/app/workspace:ro" + - "/tmp:/tmp:rw" env_vars: - PYTHONUNBUFFERED: "1" - MODEL_THREADS: "4" - DATA_DIR: "/app/data" - RESULTS_DIR: "/app/results" + PYTHONUNBUFFERED: "1" + MODEL_THREADS: "4" + DATA_DIR: "/app/data" remove_container: true user: "root" diff --git a/examples/configs/local_backend.yml b/examples/configs/local_backend.yml index 8fc9e3d..3dc19e0 100644 --- a/examples/configs/local_backend.yml +++ b/examples/configs/local_backend.yml @@ -2,13 +2,13 @@ # Configuration for executing models on the local system # Backend type specification -backend_type: local +type: local # Configuration parameters timeout: 7200 # 2 hours - Maximum execution time in seconds (60-86400) # Optional shell command to run instead of config.run() -command: "python run_model.py" +command: "ls -l" # Whether to execute commands through the shell (default: true) shell: true @@ -22,8 +22,8 @@ capture_output: true # Additional environment variables to set during execution env_vars: - OMP_NUM_THREADS: "4" - MODEL_CONFIG: "production" - DATA_DIR: "/data" - PYTHONPATH: "/app/lib" - LOG_LEVEL: "INFO" + OMP_NUM_THREADS: "4" + MODEL_CONFIG: "production" + DATA_DIR: "/data" + PYTHONPATH: "/app/lib" + LOG_LEVEL: "INFO" diff --git a/examples/configs/slurm_backend.yml b/examples/configs/slurm_backend.yml new file mode 100644 index 0000000..1a906e3 --- /dev/null +++ b/examples/configs/slurm_backend.yml @@ -0,0 +1,17 @@ +# Basic SLURM Backend Configuration +# This is a minimal configuration for running a model on a SLURM cluster + +type: "slurm" +queue: "general" # SLURM partition name +timeout: 3600 # Max execution time in seconds (1 hour) +nodes: 1 # Number of compute nodes to allocate +ntasks: 1 # Number of tasks (processes) to run +cpus_per_task: 2 # Number of CPU cores per task +time_limit: "01:00:00" # Time limit in HH:MM:SS format +job_name: "rompy_basic_job" # Name for the SLURM job +output_file: "slurm-%j.out" # Output file pattern using job ID +error_file: "slurm-%j.err" # Error file pattern using job ID +env_vars: # Environment variables for the job + OMP_NUM_THREADS: "2" + MODEL_DEBUG: "false" +command: "python -c \"print('SLURM job executed successfully')\"" # Command to run \ No newline at end of file diff --git a/examples/configs/slurm_backend_examples.yml b/examples/configs/slurm_backend_examples.yml new file mode 100644 index 0000000..6812c6a --- /dev/null +++ b/examples/configs/slurm_backend_examples.yml @@ -0,0 +1,85 @@ +# SLURM Backend Configuration Examples +# These examples show various ways to configure SLURM jobs for different scenarios +# +# NOTE: This format (with named sections) is different from the single-document +# backend config format used with the CLI command. For CLI usage, use the format +# in slurm_backend.yml with 'type' field at the root level. + +# Basic SLURM configuration +basic_slurm: + type: "slurm" + config: + queue: "general" + timeout: 3600 + nodes: 1 + ntasks: 1 + cpus_per_task: 2 + time_limit: "01:00:00" + command: "echo 'Running basic SLURM job' && sleep 10" + +# Advanced SLURM configuration for GPU jobs +advanced_gpu_slurm: + type: "slurm" + config: + queue: "gpu" + timeout: 7200 + nodes: 2 + ntasks: 8 + cpus_per_task: 4 + time_limit: "02:00:00" + account: "research_project" + qos: "high" + reservation: "special_reservation" + output_file: "slurm-%j.out" + error_file: "slurm-%j.err" + job_name: "gpu_simulation" + mail_type: "BEGIN,END,FAIL" + mail_user: "researcher@domain.com" + additional_options: + - "--gres=gpu:v100:2" + - "--exclusive" + env_vars: + OMP_NUM_THREADS: "4" + MODEL_DEBUG: "true" + DATA_PATH: "/shared/data" + RESULTS_PATH: "/shared/results" + command: "python /app/run_simulation.py --config config.json" + +# SLURM configuration for high-memory jobs +high_memory_slurm: + type: "slurm" + config: + queue: "memory" + timeout: 14400 + nodes: 1 + ntasks: 2 + cpus_per_task: 8 + time_limit: "04:00:00" + account: "bigmem_project" + additional_options: + - "--mem=64G" + job_name: "high_memory_analysis" + output_file: "output_%j.log" + error_file: "error_%j.log" + env_vars: + MEMORY_LIMIT: "64G" + ANALYSIS_TYPE: "deep" + command: "Rscript analysis.R" + +# SLURM configuration with custom working directory +custom_workdir_slurm: + type: "slurm" + config: + queue: "compute" + timeout: 7200 + nodes: 1 + ntasks: 4 + cpus_per_task: 2 + time_limit: "02:00:00" + account: "analysis_project" + working_dir: "/shared/workspaces/my_project" + job_name: "workspace_analysis" + env_vars: + WORKSPACE: "/shared/workspaces/my_project" + TOOLS_PATH: "/shared/tools" + command: "./run_analysis.sh" \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 1639c1f..19ae65c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,27 +1,14 @@ [build-system] -requires = [ - "setuptools", - "versioneer[toml]", -] +requires = ["setuptools", "versioneer[toml]"] build-backend = "setuptools.build_meta" [project] name = "rompy" description = "Core rompy library for ocean wave modeling with plugin system" readme = "README.md" -keywords = [ - "relocatable", - "ocean", - "modelling", - "python", - "csiro", -] -authors = [ - { name = "CSIRO", email = "paul.branson@csiro.au" }, -] -maintainers = [ - {name = "Rompy Contributors", email = "developers@rompy.com"} -] +keywords = ["relocatable", "ocean", "modelling", "python", "csiro"] +authors = [{ name = "CSIRO", email = "paul.branson@csiro.au" }] +maintainers = [{ name = "Rompy Contributors", email = "developers@rompy.com" }] classifiers = [ "Development Status :: 3 - Alpha", "Intended Audience :: Science/Research", @@ -66,9 +53,7 @@ dependencies = [ "isodate", "appdirs", ] -dynamic = [ - "version", -] +dynamic = ["version"] [project.license] file = "LICENSE" @@ -98,6 +83,7 @@ rompy_data = "rompy:cat" [project.entry-points."rompy.run"] local = "rompy.run:LocalRunBackend" docker = "rompy.run.docker:DockerRunBackend" +slurm = "rompy.run.slurm:SlurmRunBackend" [project.entry-points."rompy.postprocess"] noop = "rompy.postprocess:NoopPostprocessor" @@ -106,23 +92,9 @@ noop = "rompy.postprocess:NoopPostprocessor" local = "rompy.pipeline:LocalPipelineBackend" [project.optional-dependencies] -test = [ - "pytest", - "envyaml", - "coverage", -] -extra = [ - "gcsfs", - "zarr", - "cloudpathlib[s3,gs,azure]", -] -dev = [ - "pytest", - "envyaml", - "coverage", - "ruff", - "black", -] +test = ["pytest", "envyaml", "coverage"] +extra = ["gcsfs", "zarr", "cloudpathlib[s3,gs,azure]"] +dev = ["pytest", "envyaml", "coverage", "ruff", "black"] docs = [ "autodoc_pydantic", "ipython", @@ -133,16 +105,10 @@ docs = [ ] [tool.setuptools.packages.find] -where = [ - "src", -] +where = ["src"] [tool.setuptools.package-data] -"*" = [ - "*.y*ml", - "*.csv", - "*.html", -] +"*" = ["*.y*ml", "*.csv", "*.html"] [tool.setuptools.dynamic.version] attr = "rompy.__version__" diff --git a/src/rompy/backends/__init__.py b/src/rompy/backends/__init__.py index a5d990e..637fa4d 100644 --- a/src/rompy/backends/__init__.py +++ b/src/rompy/backends/__init__.py @@ -5,11 +5,12 @@ execution backends, enabling type-safe and validated backend configurations. """ -from .config import BackendConfig, BaseBackendConfig, DockerConfig, LocalConfig +from .config import BackendConfig, BaseBackendConfig, DockerConfig, LocalConfig, SlurmConfig __all__ = [ "BackendConfig", "BaseBackendConfig", "DockerConfig", "LocalConfig", + "SlurmConfig", ] diff --git a/src/rompy/backends/config.py b/src/rompy/backends/config.py index 23b0e19..eae2d43 100644 --- a/src/rompy/backends/config.py +++ b/src/rompy/backends/config.py @@ -8,7 +8,7 @@ from abc import ABC, abstractmethod from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union from pydantic import BaseModel, ConfigDict, Field, field_validator @@ -284,5 +284,118 @@ def model_post_init(self, __context) -> None: ) +class SlurmConfig(BaseBackendConfig): + """Configuration for SLURM cluster execution.""" + + model_type: Literal["slurm"] = Field( + "slurm", + description="The backend type." + ) + queue: str = Field( + ..., + description="SLURM partition name (equivalent to queue)" + ) + + command: Optional[str] = Field( + None, description="Optional shell command to run instead of config.run()" + ) + nodes: int = Field( + 1, + ge=1, + le=100, + description="Number of nodes to allocate" + ) + ntasks: int = Field( + 1, + ge=1, + description="Number of tasks (processes) to run" + ) + cpus_per_task: int = Field( + 1, + ge=1, + le=128, + description="Number of CPU cores per task" + ) + time_limit: str = Field( + "1:00:00", + description="Time limit in format HH:MM:SS" + ) + account: Optional[str] = Field( + None, + description="Account for billing/resource tracking" + ) + qos: Optional[str] = Field( + None, + description="Quality of Service for the job" + ) + reservation: Optional[str] = Field( + None, + description="Reservation name to run job under" + ) + output_file: Optional[str] = Field( + None, + description="Output file path for job output" + ) + error_file: Optional[str] = Field( + None, + description="Error file path for job errors" + ) + job_name: Optional[str] = Field( + None, + description="Name for the SLURM job" + ) + mail_type: Optional[str] = Field( + None, + description="Type of mail to send (BEGIN, END, FAIL, ALL, etc.)" + ) + mail_user: Optional[str] = Field( + None, + description="Email address for notifications" + ) + additional_options: List[str] = Field( + default_factory=list, + description="Additional SLURM options (e.g., '--gres=gpu:1')" + ) + + @field_validator('time_limit') + @classmethod + def validate_time_limit(cls, v): + """Validate time limit format (HH:MM:SS).""" + import re + if not re.match(r'^\d{1,4}:\d{2}:\d{2}$', v): + raise ValueError("Time limit must be in format HH:MM:SS") + return v + + def get_backend_class(self): + """Return the SlurmRunBackend class.""" + from rompy.run.slurm import SlurmRunBackend + return SlurmRunBackend + + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "queue": "general", + "nodes": 1, + "ntasks": 1, + "cpus_per_task": 4, + "time_limit": "02:00:00", + "account": "myproject", + "timeout": 7200, + }, + { + "queue": "gpu", + "nodes": 2, + "ntasks": 8, + "cpus_per_task": 2, + "time_limit": "24:00:00", + "reservation": "special_reservation", + "additional_options": ["--gres=gpu:v100:2"], + }, + ] + } + ) + + # Type alias for all backend configurations -BackendConfig = Union[LocalConfig, DockerConfig] +BackendConfig = Union[LocalConfig, DockerConfig, SlurmConfig] \ No newline at end of file diff --git a/src/rompy/cli.py b/src/rompy/cli.py index 8dad334..ad25ddd 100644 --- a/src/rompy/cli.py +++ b/src/rompy/cli.py @@ -18,7 +18,7 @@ import yaml import rompy -from rompy.backends import DockerConfig, LocalConfig +from rompy.backends import DockerConfig, LocalConfig, SlurmConfig from rompy.logging import LogFormat, LoggingConfig, LogLevel, get_logger from rompy.model import PIPELINE_BACKENDS, POSTPROCESSORS, RUN_BACKENDS, ModelRun @@ -291,31 +291,12 @@ def _get_backend_config_registry(): Build a registry of backend config classes from entry points and built-ins. Returns: dict mapping backend type name to config class """ + # TODO Remove hardcoding registry = { "local": LocalConfig, "docker": DockerConfig, + "slurm": SlurmConfig, # Add SLURM backend config } - # Try to load from entry points (rompy.config and rompy.backend_config) - try: - eps = importlib.metadata.entry_points() - # Support both 'rompy.config' and 'rompy.backend_config' for flexibility - for group in ["rompy.config", "rompy.backend_config"]: - if hasattr(eps, "select"): # Python 3.10+ - entries = eps.select(group=group) - elif hasattr(eps, "get"): # Python 3.8-3.9 - entries = eps.get(group, []) - else: - entries = [] - for ep in entries: - try: - cls = ep.load() - registry[ep.name] = cls - except Exception as e: - logger.warning( - f"Failed to load backend config entry point {ep.name}: {e}" - ) - except Exception as e: - logger.warning(f"Could not load backend config entry points: {e}") return registry diff --git a/src/rompy/run/docker.py b/src/rompy/run/docker.py index 5308936..c9df7b0 100644 --- a/src/rompy/run/docker.py +++ b/src/rompy/run/docker.py @@ -268,6 +268,7 @@ def _run_container( volumes[host_path] = {"bind": container_path, "mode": mode} # Prepare container configuration + # Note: We can't capture output when remove=True, so we'll handle that case container_config = { "image": image_name, "command": ["bash", "-c", run_command], @@ -285,15 +286,12 @@ def _run_container( logger.debug(f"Environment: {env_vars}") # Run the container - container = client.containers.run(**container_config) - - # Log output - if container: - logger.info("Model run completed successfully") - return True - else: - logger.error("Model run failed - no output from container") - return False + # Note: When remove=True, client.containers.run() returns None + # If you need to capture output, you'd need to set remove=False and manually remove + client.containers.run(**container_config) + + logger.info("Model run completed successfully") + return True except ContainerError as e: logger.error(f"Container error: {e}") diff --git a/src/rompy/run/slurm.py b/src/rompy/run/slurm.py new file mode 100644 index 0000000..c08c0d5 --- /dev/null +++ b/src/rompy/run/slurm.py @@ -0,0 +1,314 @@ +""" +SLURM backend for running models. + +This module provides a SLURM-based execution backend for rompy models. +""" + +import logging +import os +import subprocess +import tempfile +import time +from pathlib import Path +from typing import TYPE_CHECKING, Dict, List, Optional, Union + +if TYPE_CHECKING: + from rompy.backends import SlurmConfig + +logger = logging.getLogger(__name__) + + +class SlurmRunBackend: + """Execute models on SLURM clusters. + + This backend submits model runs to a SLURM-managed HPC cluster + for execution. + """ + + def run( + self, model_run, config: "SlurmConfig", workspace_dir: Optional[str] = None + ) -> bool: + """Submit model run to SLURM queue. + + Args: + model_run: The ModelRun instance to execute + config: SlurmConfig instance with execution parameters + workspace_dir: Path to the generated workspace directory (if None, will generate) + + Returns: + True if execution was successful, False otherwise + """ + logger.debug(f"Using SlurmConfig: nodes={config.nodes}, ntasks={config.ntasks}") + + # Use provided workspace or generate if not provided (for backwards compatibility) + if workspace_dir is None: + logger.warning( + "No workspace_dir provided, generating files (this may cause double generation in pipeline)" + ) + staging_dir = model_run.generate() + logger.info(f"Model inputs generated in: {staging_dir}") + else: + logger.info(f"Using provided workspace directory: {workspace_dir}") + staging_dir = workspace_dir + + try: + # Create and submit SLURM job script + job_script = self._create_job_script(model_run, config, staging_dir) + job_id = self._submit_job(job_script) + + if job_id: + logger.info(f"SLURM job submitted successfully with ID: {job_id}") + return self._wait_for_completion(job_id, config) + else: + logger.error("Failed to submit SLURM job") + return False + + except Exception as e: + logger.exception(f"SLURM execution failed: {e}") + return False + + def _create_job_script( + self, model_run, config: "SlurmConfig", staging_dir: str + ) -> str: + """Create SLURM job script. + + Args: + model_run: The ModelRun instance + config: SlurmConfig with execution parameters + staging_dir: Path to workspace directory + + Returns: + Path to the created job script + """ + # Determine the working directory for the job + work_dir = config.working_dir if config.working_dir else staging_dir + + # Create the job script content + script_lines = [ + "#!/bin/bash", + "# SLURM job script generated by rompy", + ] + + # Add SBATCH directives from configuration + if config.job_name: + script_lines.append(f"#SBATCH --job-name={config.job_name}") + + if config.output_file: + script_lines.append(f"#SBATCH --output={config.output_file}") + else: + # Default output file with job ID + script_lines.append(f"#SBATCH --output={work_dir}/slurm-%j.out") + + if config.error_file: + script_lines.append(f"#SBATCH --error={config.error_file}") + else: + # Default error file with job ID + script_lines.append(f"#SBATCH --error={work_dir}/slurm-%j.err") + + if config.queue: + script_lines.append(f"#SBATCH --partition={config.queue}") + + script_lines.append(f"#SBATCH --nodes={config.nodes}") + script_lines.append(f"#SBATCH --ntasks={config.ntasks}") + script_lines.append(f"#SBATCH --cpus-per-task={config.cpus_per_task}") + script_lines.append(f"#SBATCH --time={config.time_limit}") + + if config.account: + script_lines.append(f"#SBATCH --account={config.account}") + + if config.qos: + script_lines.append(f"#SBATCH --qos={config.qos}") + + if config.reservation: + script_lines.append(f"#SBATCH --reservation={config.reservation}") + + if config.mail_type and config.mail_user: + script_lines.append(f"#SBATCH --mail-type={config.mail_type}") + script_lines.append(f"#SBATCH --mail-user={config.mail_user}") + + # Add additional options + for option in config.additional_options: + script_lines.append(f"#SBATCH {option}") + + script_lines.extend([ + "", + "# Change to working directory", + f"cd {work_dir}", + "", + "# Set environment variables", + ]) + + # Add environment variables + for key, value in config.env_vars.items(): + script_lines.append(f"export {key}={value}") + + # Add the actual command to run the model\n # First, check if there's a specific command in config, otherwise use the model's run method\n if hasattr(config, 'command') and config.command:\n script_lines.extend([\n \"\",\n \"# Execute custom command\",\n config.command,\n ])\n else:\n script_lines.extend([\n \"\",\n \"# Execute model using model_run.config.run() method\",\n \"python -c \\\"\",\n \"import sys\",\n \"import os\",\n \"sys.path.insert(0, os.getcwd())\",\n \"from rompy.model import ModelRun\",\n f\"model_run = ModelRun.from_dict({model_run.model_dump()})\",\n \"model_run.config.run(model_run)\",\n \"\\\"\",\n ]) + + # Create temporary job script file + with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f: + f.write('\n'.join(script_lines)) + script_path = f.name + + logger.debug(f"SLURM job script created at: {script_path}") + logger.debug(f"Job script content:\n{'\n'.join(script_lines)}") + + return script_path + + def _submit_job(self, job_script: str) -> Optional[str]: + """Submit job to SLURM. + + Args: + job_script: Path to the job script to submit + + Returns: + Job ID if submission successful, None otherwise + """ + try: + # Check if sbatch command is available + result = subprocess.run( + ["which", "sbatch"], + capture_output=True, + text=True + ) + if result.returncode != 0 or not result.stdout.strip(): + logger.error("sbatch command not found. SLURM may not be installed or in PATH.") + return None + + # Check if SLURM controller is responsive + result = subprocess.run( + ["scontrol", "--help"], + capture_output=True, + text=True, + timeout=10 # Don't wait too long + ) + if result.returncode != 0: + logger.error("SLURM controller is not responsive. scontrol command failed.") + return None + + # Submit the job using sbatch + result = subprocess.run( + ["sbatch", job_script], + capture_output=True, + text=True, + check=True + ) + + # Extract job ID from sbatch output (format: "Submitted batch job ") + output = result.stdout.strip() + if "Submitted batch job" in output: + job_id = output.split()[-1] + logger.info(f"Submitted SLURM job with ID: {job_id}") + return job_id + else: + logger.error(f"Unexpected sbatch output format: {output}") + return None + + except subprocess.TimeoutExpired: + logger.error("SLURM controller check timed out. SLURM may not be properly configured.") + return None + except subprocess.CalledProcessError as e: + logger.error(f"Failed to submit SLURM job: {e.stderr}") + return None + except Exception as e: + logger.error(f"Error submitting SLURM job: {e}") + return None + finally: + # Clean up the temporary job script + try: + os.remove(job_script) + logger.debug(f"Cleaned up temporary job script: {job_script}") + except OSError: + logger.warning(f"Could not remove temporary job script: {job_script}") + + def _wait_for_completion(self, job_id: str, config: "SlurmConfig") -> bool: + """Wait for job completion. + + Args: + job_id: SLURM job ID to monitor + config: SlurmConfig with timeout parameters + + Returns: + True if job completed successfully, False otherwise + """ + logger.info(f"Waiting for SLURM job {job_id} to complete...") + + # Terminal states that indicate job completion (successful or failed) + # Using SLURM job states: https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES + terminal_states = {'BOOT_FAIL', 'CANCELLED', 'COMPLETED', 'DEADLINE', 'FAILED', + 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT'} + + # Start time for timeout check + start_time = time.time() + + while True: + # Check if we've exceeded the timeout + elapsed_time = time.time() - start_time + if elapsed_time > config.timeout: + logger.error(f"Timeout waiting for job {job_id} after {config.timeout} seconds") + + # Let SLURM handle job cancellation according to its configured policies + return False + + # Get job status using scontrol for more reliable detection + try: + result = subprocess.run( + ['scontrol', 'show', 'job', job_id], + capture_output=True, + text=True, + check=True + ) + + # Parse the output to get the job state + output = result.stdout + if 'JobState=' in output: + state = output.split('JobState=')[1].split()[0].split('_')[0] # Extract state like 'RUNNING', 'COMPLETED', etc. + else: + # If JobState is not found, we might have an issue with parsing + logger.warning(f"Could not determine job state from output for job {job_id}") + state = None + + if state is None: # If job state can't be determined, check if job is not found + if 'slurm_load_jobs error' in output or 'Invalid job id' in output.lower(): + logger.info(f"Job {job_id} not found - likely completed") + return True # Assume successful completion if job ID is invalid + + if state in terminal_states: + if state == 'COMPLETED': # Completed successfully + logger.info(f"SLURM job {job_id} completed successfully") + return True + elif state == 'CANCELLED': # Cancelled + logger.warning(f"SLURM job {job_id} was cancelled") + return False + elif state == 'FAILED': # Failed + logger.error(f"SLURM job {job_id} failed") + return False + elif state == 'TIMEOUT': # Timeout + logger.error(f"SLURM job {job_id} timed out") + return False + elif state == 'BOOT_FAIL': # Boot failure + logger.error(f"SLURM job {job_id} failed to boot") + return False + elif state == 'NODE_FAIL': # Node failure + logger.error(f"SLURM job {job_id} failed due to node failure") + return False + elif state == 'OUT_OF_MEMORY': # Out of memory + logger.error(f"SLURM job {job_id} ran out of memory") + return False + elif state == 'PREEMPTED': # Preempted + logger.error(f"SLURM job {job_id} was preempted") + return False + else: + logger.error(f"SLURM job {job_id} ended with state: {state}") + return False + + # Job is still running or pending, wait before checking again + logger.debug(f"Job {job_id} still in state: {state}, waiting...") + time.sleep(30) # Wait 30 seconds before next check + + except subprocess.CalledProcessError as e: + logger.error(f"Error checking job status for {job_id}: {e.stderr}") + # If we can't check the status, we consider it a failure + return False + except Exception as e: + logger.error(f"Unexpected error while monitoring job {job_id}: {e}") + return False \ No newline at end of file diff --git a/tests/backends/test_slurm_backend.py b/tests/backends/test_slurm_backend.py new file mode 100644 index 0000000..5ef2da8 --- /dev/null +++ b/tests/backends/test_slurm_backend.py @@ -0,0 +1,534 @@ +""" +Unit tests for the SLURM backend configuration and execution. + +Tests verify that the SLURM backend configuration class works correctly, +provides proper validation, and integrates with the SLURM execution backend. +""" + +import shutil +import subprocess +import sys +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import MagicMock, mock_open, patch + +import pytest +from pydantic import ValidationError + +from rompy.backends import SlurmConfig + + +def is_slurm_available(): + """Check if SLURM is available on the system.""" + try: + result = subprocess.run( + ["which", "sbatch"], + capture_output=True, + text=True, + timeout=5 + ) + return result.returncode == 0 and bool(result.stdout.strip()) + except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): + return False + + +# Skip tests that require SLURM if it's not available +requires_slurm = pytest.mark.skipif( + not is_slurm_available(), + reason="SLURM is not available on this system" +) + + +class TestSlurmConfig: + """Test the SlurmConfig class.""" + + def test_default_values(self): + """Test default values for SlurmConfig.""" + config = SlurmConfig( + queue="general", # Required field + ) + + assert config.timeout == 3600 + assert config.env_vars == {} + assert config.working_dir is None + assert config.queue == "general" + assert config.nodes == 1 + assert config.ntasks == 1 + assert config.cpus_per_task == 1 + assert config.time_limit == "1:00:00" + assert config.account is None + assert config.qos is None + assert config.reservation is None + assert config.output_file is None + assert config.error_file is None + assert config.job_name is None + assert config.mail_type is None + assert config.mail_user is None + assert config.additional_options == [] + + def test_custom_values(self): + """Test setting custom values.""" + with TemporaryDirectory() as tmp_dir: + config = SlurmConfig( + queue="compute", + nodes=2, + ntasks=4, + cpus_per_task=8, + time_limit="24:00:00", + account="myproject", + qos="priority", + reservation="special_reservation", + output_file="slurm-%j.out", + error_file="slurm-%j.err", + job_name="test_job", + mail_type="END", + mail_user="test@example.com", + additional_options=["--gres=gpu:1", "--exclusive"], + timeout=7200, + env_vars={"OMP_NUM_THREADS": "8"}, + working_dir=Path(tmp_dir), + ) + + assert config.queue == "compute" + assert config.nodes == 2 + assert config.ntasks == 4 + assert config.cpus_per_task == 8 + assert config.time_limit == "24:00:00" + assert config.account == "myproject" + assert config.qos == "priority" + assert config.reservation == "special_reservation" + assert config.output_file == "slurm-%j.out" + assert config.error_file == "slurm-%j.err" + assert config.job_name == "test_job" + assert config.mail_type == "END" + assert config.mail_user == "test@example.com" + assert config.additional_options == ["--gres=gpu:1", "--exclusive"] + assert config.timeout == 7200 + assert config.env_vars == {"OMP_NUM_THREADS": "8"} + assert config.working_dir == Path(tmp_dir) + + def test_time_limit_validation(self): + """Test time limit validation.""" + # Valid time limits + valid_time_limits = [ + "01:00:00", + "00:30:00", + "23:59:59", + "100:00:00", # Allow longer times for long jobs + ] + + for time_limit in valid_time_limits: + config = SlurmConfig(queue="test", time_limit=time_limit) + assert config.time_limit == time_limit + + # Invalid time limits (format-based validation) + invalid_time_limits = [ + "00:00", # Missing seconds + "invalid", # Not matching format + "1:1:1", # Not in HH:MM:SS format (only 1 digit for each part) + "25-00-00", # Wrong separator + "12345:00:00", # Too many digits for hours (5 digits instead of max 4) + "23:5", # Missing seconds part + ":23:59", # Missing hours + "23::59", # Missing minutes + ] + + for time_limit in invalid_time_limits: + with pytest.raises(ValidationError): + SlurmConfig(queue="test", time_limit=time_limit) + + def test_additional_options_validation(self): + """Test additional options validation.""" + # Valid additional options + config = SlurmConfig( + queue="test", + additional_options=["--gres=gpu:1", "--exclusive", "--mem-per-cpu=2048"] + ) + assert config.additional_options == ["--gres=gpu:1", "--exclusive", "--mem-per-cpu=2048"] + + # Empty list should be valid + config = SlurmConfig(queue="test", additional_options=[]) + assert config.additional_options == [] + + def test_get_backend_class(self): + """Test that get_backend_class returns the correct class.""" + config = SlurmConfig(queue="test") + backend_class = config.get_backend_class() + + # Should return SlurmRunBackend class + assert backend_class.__name__ == "SlurmRunBackend" + + def test_config_examples(self): + """Test that the schema examples are valid.""" + schema = SlurmConfig.model_json_schema() + examples = schema.get("examples", []) + + for example in examples: + # Should be able to create config from example + config = SlurmConfig(**example) + assert isinstance(config, SlurmConfig) + + def test_required_queue_field(self): + """Test that queue field is required.""" + # Should fail without queue + with pytest.raises(ValidationError, match="Field required"): + SlurmConfig() + + # Should work with queue + config = SlurmConfig(queue="general") + assert config.queue == "general" + + def test_field_boundaries(self): + """Test field boundary values.""" + # Test minimum values + config = SlurmConfig( + queue="test", + nodes=1, + ntasks=1, + cpus_per_task=1, + ) + assert config.nodes == 1 + assert config.ntasks == 1 + assert config.cpus_per_task == 1 + + # Test maximum values + config = SlurmConfig( + queue="test", + nodes=100, # Max nodes + cpus_per_task=128, # Max cpus per task + ) + assert config.nodes == 100 + assert config.cpus_per_task == 128 + + # Test out of bounds + with pytest.raises(ValidationError): + SlurmConfig(queue="test", nodes=0) # Min nodes is 1 + + with pytest.raises(ValidationError): + SlurmConfig(queue="test", nodes=101) # Max nodes is 100 + + with pytest.raises(ValidationError): + SlurmConfig(queue="test", cpus_per_task=0) # Min cpus_per_task is 1 + + with pytest.raises(ValidationError): + SlurmConfig(queue="test", cpus_per_task=129) # Max cpus_per_task is 128 + + +@requires_slurm +class TestSlurmRunBackend: + """Test the SlurmRunBackend class.""" + + @pytest.fixture + def mock_model_run(self): + """Create a mock ModelRun instance.""" + model_run = MagicMock() + model_run.run_id = "test_run_123" + model_run.output_dir = Path("/tmp/test_output") + + # Create a temporary directory for staging + import tempfile + + temp_dir = tempfile.mkdtemp() + model_run.generate.return_value = temp_dir + model_run.config.run.return_value = True + model_run.model_dump.return_value = {"test": "data"} # Mock for serialization + return model_run + + @pytest.fixture + def basic_config(self): + """Create a basic SlurmConfig.""" + return SlurmConfig( + queue="general", + timeout=3600, + nodes=1, + ntasks=1, + cpus_per_task=2, + time_limit="01:00:00", + ) + + def test_create_job_script(self, mock_model_run, basic_config): + """Test the _create_job_script method.""" + from rompy.run.slurm import SlurmRunBackend + + backend = SlurmRunBackend() + + with TemporaryDirectory() as staging_dir: + # Create the job script + script_path = backend._create_job_script(mock_model_run, basic_config, staging_dir) + + # Verify the file was created + assert os.path.exists(script_path) + + # Read and check the contents + with open(script_path, 'r') as f: + content = f.read() + + # Check for SLURM directives + assert "#!/bin/bash" in content + assert "#SBATCH --partition=general" in content + assert "#SBATCH --nodes=1" in content + assert "#SBATCH --ntasks=1" in content + assert "#SBATCH --cpus-per-task=2" in content + assert "#SBATCH --time=01:00:00" in content + + # Clean up + if os.path.exists(script_path): + os.remove(script_path) + + def test_create_job_script_with_all_options(self, mock_model_run): + """Test the _create_job_script method with all options.""" + from rompy.run.slurm import SlurmRunBackend + + config = SlurmConfig( + queue="priority", + nodes=2, + ntasks=4, + cpus_per_task=8, + time_limit="24:00:00", + account="myproject", + qos="high", + reservation="special", + output_file="output_%j.txt", + error_file="error_%j.txt", + job_name="test_job", + mail_type="BEGIN,END,FAIL", + mail_user="test@example.com", + additional_options=["--gres=gpu:1", "--exclusive"], + timeout=86400, + env_vars={"OMP_NUM_THREADS": "8", "MY_VAR": "value"}, + ) + + backend = SlurmRunBackend() + + with TemporaryDirectory() as staging_dir: + script_path = backend._create_job_script(mock_model_run, config, staging_dir) + + with open(script_path, 'r') as f: + content = f.read() + + # Check for all SBATCH directives + assert "#SBATCH --partition=priority" in content + assert "#SBATCH --nodes=2" in content + assert "#SBATCH --ntasks=4" in content + assert "#SBATCH --cpus-per-task=8" in content + assert "#SBATCH --time=24:00:00" in content + assert "#SBATCH --account=myproject" in content + assert "#SBATCH --qos=high" in content + assert "#SBATCH --reservation=special" in content + assert "#SBATCH --output=output_%j.txt" in content + assert "#SBATCH --error=error_%j.txt" in content + assert "#SBATCH --job-name=test_job" in content + assert "#SBATCH --mail-type=BEGIN,END,FAIL" in content + assert "#SBATCH --mail-user=test@example.com" in content + assert "#SBATCH --gres=gpu:1" in content + assert "#SBATCH --exclusive" in content + + # Check for environment variables + assert "export OMP_NUM_THREADS=8" in content + assert "export MY_VAR=value" in content + + # Clean up + if os.path.exists(script_path): + os.remove(script_path) + + def test_submit_job(self, basic_config): + """Test the _submit_job method.""" + from rompy.run.slurm import SlurmRunBackend + + backend = SlurmRunBackend() + + # Create a simple job script + with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f: + f.write("#!/bin/bash\n#SBATCH --job-name=test\n") + script_path = f.name + + try: + # Mock subprocess.run to return a successful job submission + with patch("subprocess.run") as mock_run: + mock_run.return_value.stdout = "Submitted batch job 12345" + mock_run.return_value.stderr = "" + mock_run.return_value.returncode = 0 + + job_id = backend._submit_job(script_path) + + assert job_id == "12345" + mock_run.assert_called_once() + + finally: + # Clean up + if os.path.exists(script_path): + os.remove(script_path) + + def test_submit_job_failure(self, basic_config): + """Test the _submit_job method with failure.""" + from rompy.run.slurm import SlurmRunBackend + + backend = SlurmRunBackend() + + # Create a simple job script + with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f: + f.write("#!/bin/bash\n#SBATCH --job-name=test\n") + script_path = f.name + + try: + # Mock subprocess.run to return a failure + with patch("subprocess.run") as mock_run: + mock_run.side_effect = Exception("Submission failed") + + job_id = backend._submit_job(script_path) + + assert job_id is None + mock_run.assert_called_once() + + finally: + # Clean up + if os.path.exists(script_path): + os.remove(script_path) + + def test_wait_for_completion_completed(self, basic_config): + """Test _wait_for_completion method for completed job.""" + from rompy.run.slurm import SlurmRunBackend + + backend = SlurmRunBackend() + + # Mock subprocess.run for squeue to return completed state + with patch("subprocess.run") as mock_run: + # First call returns running, second returns completed + mock_run.side_effect = [ + # Running + MagicMock( + stdout="R\n", + stderr="", + returncode=0 + ), + # Completed + MagicMock( + stdout="CD\n", + stderr="", + returncode=0 + ) + ] + + result = backend._wait_for_completion("12345", basic_config) + + assert result is True + assert mock_run.call_count == 2 + + def test_wait_for_completion_failed(self, basic_config): + """Test _wait_for_completion method for failed job.""" + from rompy.run.slurm import SlurmRunBackend + + backend = SlurmRunBackend() + + # Mock subprocess.run for squeue to return failed state + with patch("subprocess.run") as mock_run: + mock_result = MagicMock(stdout="F\n", stderr="", returncode=0) + mock_run.return_value = mock_result + + result = backend._wait_for_completion("12345", basic_config) + + assert result is False + + def test_wait_for_completion_timeout(self): + """Test _wait_for_completion method with timeout.""" + from rompy.run.slurm import SlurmRunBackend + import time + from unittest.mock import ANY + + config = SlurmConfig( + queue="test", + timeout=60, # Minimum valid timeout value + nodes=1, + ntasks=1, + cpus_per_task=1, + time_limit="01:00:00", + ) + + backend = SlurmRunBackend() + + # Use a more advanced approach with time mocking + initial_time = time.time() + def time_side_effect(): + # Return an increasing time value to simulate timeout + return initial_time + 120 # More than 60s timeout + + with patch("subprocess.run") as mock_run: + with patch("time.time", side_effect=time_side_effect): + # Return running state to avoid early exit due to job completion + mock_result = MagicMock(stdout="R\n", stderr="", returncode=0) + mock_run.return_value = mock_result + + result = backend._wait_for_completion("12345", config) + + # Should return False due to timeout + assert result is False + + # Verify that scancel was called during timeout handling + mock_run.assert_any_call(['scancel', '12345'], check=True, capture_output=True) + + @requires_slurm + def test_run_method_success(self, mock_model_run, basic_config): + """Test the full run method with success.""" + from rompy.run.slurm import SlurmRunBackend + + backend = SlurmRunBackend() + + with TemporaryDirectory() as staging_dir: + # Mock the internal methods + with patch.object(backend, '_create_job_script') as mock_create_script, \ + patch.object(backend, '_submit_job') as mock_submit, \ + patch.object(backend, '_wait_for_completion') as mock_wait: + + # Mock the methods to return expected values + mock_create_script.return_value = "/tmp/job_script.sh" + mock_submit.return_value = "12345" + mock_wait.return_value = True # Job completed successfully + + # Set up the mock model run to return the staging directory + mock_model_run.generate.return_value = staging_dir + + result = backend.run(mock_model_run, basic_config) + + assert result is True + mock_create_script.assert_called_once() + mock_submit.assert_called_once() + mock_wait.assert_called_once_with("12345", basic_config) + + @requires_slurm + def test_run_method_job_submit_failure(self, mock_model_run, basic_config): + """Test the run method when job submission fails.""" + from rompy.run.slurm import SlurmRunBackend + + backend = SlurmRunBackend() + + with TemporaryDirectory() as staging_dir: + # Mock the internal methods + with patch.object(backend, '_create_job_script') as mock_create_script, \ + patch.object(backend, '_submit_job') as mock_submit: + + # Mock the methods + mock_create_script.return_value = "/tmp/job_script.sh" + mock_submit.return_value = None # Submission failed + + # Set up the mock model run + mock_model_run.generate.return_value = staging_dir + + result = backend.run(mock_model_run, basic_config) + + assert result is False + mock_create_script.assert_called_once() + mock_submit.assert_called_once() + + @requires_slurm + def test_run_method_generation_failure(self, mock_model_run, basic_config): + """Test the run method when model generation fails.""" + from rompy.run.slurm import SlurmRunBackend + + backend = SlurmRunBackend() + + # Configure mock to raise an exception during generation + mock_model_run.generate.side_effect = Exception("Generation failed") + + result = backend.run(mock_model_run, basic_config) + + assert result is False \ No newline at end of file