diff --git a/Cargo.toml b/Cargo.toml
index a0c0f9d4..882c270a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -4,6 +4,7 @@ members = [
"torc-server",
"torc-slurm-job-runner",
"torc-dash",
+ "torc-mcp-server",
]
resolver = "2"
@@ -96,6 +97,10 @@ hyper-tls = "0.5"
hyper-openssl = "0.9"
openssl = "0.10"
+# MCP server
+rmcp = { version = "0.1", features = ["server", "macros", "transport-io"] }
+schemars = "1.0"
+
[package]
name = "torc"
version.workspace = true
@@ -159,6 +164,7 @@ client = [
"dep:signal-hook",
"dep:libc",
"dep:nvml-wrapper",
+ "dep:sha2",
"config",
]
tui = [
diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md
index 1477c8ce..68976d86 100644
--- a/docs/src/SUMMARY.md
+++ b/docs/src/SUMMARY.md
@@ -24,6 +24,7 @@
- [Parallelization Strategies](./explanation/parallelization.md)
- [Workflow Actions](./explanation/workflow-actions.md)
- [Slurm Workflows](./explanation/slurm-workflows.md)
+ - [Automatic Failure Recovery](./explanation/automatic-recovery.md)
- [Design](./explanation/design/README.md)
- [Server API Handler](./explanation/design/server.md)
- [Central Database](./explanation/design/database.md)
@@ -75,6 +76,8 @@
- [Map Python functions across workers](./tutorials/map_python_function_across_workers.md)
- [Filtering CLI Output with Nushell](./tutorials/filtering-with-nushell.md)
- [Custom HPC Profile](./tutorials/custom-hpc-profile.md)
+ - [MCP Server with Claude Code](./tutorials/mcp-server.md)
+ - [Automatic Failure Recovery](./tutorials/automatic-recovery.md)
---
diff --git a/docs/src/explanation/README.md b/docs/src/explanation/README.md
index 507ef945..366aa433 100644
--- a/docs/src/explanation/README.md
+++ b/docs/src/explanation/README.md
@@ -16,3 +16,4 @@ This section provides understanding-oriented discussions of Torc's key concepts
- Ready queue optimization for large workflows
- Parallelization strategies and job allocation approaches
- Workflow actions for automation and dynamic resource allocation
+- AI-assisted recovery for diagnosing and fixing job failures
diff --git a/docs/src/explanation/automatic-recovery.md b/docs/src/explanation/automatic-recovery.md
new file mode 100644
index 00000000..86b168fa
--- /dev/null
+++ b/docs/src/explanation/automatic-recovery.md
@@ -0,0 +1,196 @@
+# Automatic Failure Recovery
+
+This document explains how Torc's automatic failure recovery system works, its design principles, and when to use automatic vs manual recovery.
+
+## Overview
+
+Torc provides **automatic failure recovery** through the `torc watch --auto-recover` command. When jobs fail, the system:
+
+1. Diagnoses the failure cause (OOM, timeout, or unknown)
+2. Applies heuristics to adjust resource requirements
+3. Resets failed jobs and submits new Slurm allocations
+4. Resumes monitoring until completion or max retries
+
+This deterministic approach handles the majority of HPC failures without human intervention.
+
+## Design Principles
+
+### Why Deterministic Recovery?
+
+Most HPC job failures fall into predictable categories:
+
+| Failure Type | Frequency | Solution |
+|--------------|-----------|----------|
+| Out of Memory | ~60% | Increase memory allocation |
+| Timeout | ~25% | Increase runtime limit |
+| Transient errors | ~10% | Simple retry |
+| Code bugs | ~5% | Manual intervention |
+
+For 85-90% of failures, the solution is mechanical: increase resources and retry. This doesn't require AI judgment—simple heuristics work well.
+
+### Recovery Architecture
+
+```mermaid
+flowchart LR
+ A[torc watch
polling] --> B{Workflow
complete?}
+ B -->|No| A
+ B -->|Yes, with failures| C[Diagnose failures
check resources]
+ C --> D[Apply heuristics
adjust resources]
+ D --> E[Submit new
allocations]
+ E --> A
+ B -->|Yes, success| F[Exit 0]
+```
+
+### Failure Detection
+
+Torc tracks resource usage during job execution:
+- Memory usage (RSS and peak)
+- CPU utilization
+- Execution time
+
+This data is analyzed to determine failure causes:
+
+**OOM Detection:**
+- Peak memory exceeds specified limit
+- Exit code 137 (SIGKILL from OOM killer)
+- Flag: `likely_oom: true`
+
+**Timeout Detection:**
+- Execution time within 10% of runtime limit
+- Job was killed (not graceful exit)
+- Flag: `likely_timeout: true`
+
+### Recovery Heuristics
+
+Default multipliers applied to failed jobs:
+
+| Failure | Default Multiplier | Configurable |
+|---------|-------------------|--------------|
+| OOM | 1.5x memory | `--memory-multiplier` |
+| Timeout | 1.5x runtime | `--runtime-multiplier` |
+
+Example: A job with 8g memory that fails with OOM gets 12g on retry.
+
+### Slurm Scheduler Regeneration
+
+After adjusting resources, the system regenerates Slurm schedulers:
+
+1. Finds all pending jobs (uninitialized, ready, blocked)
+2. Groups by resource requirements
+3. Calculates minimum allocations needed
+4. Creates new schedulers with appropriate walltimes
+5. Submits allocations to Slurm
+
+This is handled by `torc slurm regenerate --submit`.
+
+## Configuration
+
+### Command-Line Options
+
+```bash
+torc watch \
+ --auto-recover \ # Enable automatic recovery
+ --max-retries 3 \ # Maximum recovery attempts
+ --memory-multiplier 1.5 \ # Memory increase factor for OOM
+ --runtime-multiplier 1.5 \ # Runtime increase factor for timeout
+ --poll-interval 60 \ # Seconds between status checks
+ --output-dir output \ # Directory for job output files
+ --show-job-counts # Display job counts during polling (optional)
+```
+
+### Retry Limits
+
+The `--max-retries` option prevents infinite retry loops. After exceeding this limit, the system exits with an error, indicating manual intervention is needed.
+
+Default: 3 retries
+
+## When to Use Manual Recovery
+
+Automatic recovery works well for resource-related failures, but some situations require manual intervention:
+
+### Use Manual Recovery When:
+
+1. **Jobs keep failing after max retries**
+ - The heuristics aren't solving the problem
+ - Need to investigate root cause
+
+2. **Unknown failure modes**
+ - Exit codes that don't indicate OOM/timeout
+ - Application-specific errors
+
+3. **Code bugs**
+ - Jobs fail consistently with same error
+ - No resource issue detected
+
+4. **Cost optimization**
+ - Want to analyze actual usage before increasing
+ - Need to decide whether job is worth more resources
+
+### MCP Server for Manual Recovery
+
+The Torc MCP server provides tools for AI-assisted investigation:
+
+| Tool | Purpose |
+|------|---------|
+| `get_workflow_status` | Get overall workflow status |
+| `list_failed_jobs` | List failed jobs with error info |
+| `get_job_logs` | Read stdout/stderr logs |
+| `check_resource_utilization` | Detailed resource analysis |
+| `update_job_resources` | Manually adjust resources |
+| `restart_jobs` | Reset and restart jobs |
+| `resubmit_workflow` | Regenerate Slurm schedulers |
+
+## Comparison
+
+| Feature | Automatic | Manual/AI-Assisted |
+|---------|-----------|-------------------|
+| Human involvement | None | Interactive |
+| Speed | Fast | Depends on human |
+| Handles OOM/timeout | Yes | Yes |
+| Handles unknown errors | Retry only | Full investigation |
+| Cost optimization | Basic | Can be sophisticated |
+| Use case | Production workflows | Debugging, optimization |
+
+## Implementation Details
+
+### The Watch Command
+
+```bash
+torc watch --auto-recover
+```
+
+Main loop:
+1. Poll `is_workflow_complete` API
+2. Print status updates
+3. On completion, check for failures
+4. If failures and auto-recover enabled:
+ - Run `torc reports check-resource-utilization --include-failed`
+ - Parse results for `likely_oom` and `likely_timeout` flags
+ - Update resource requirements via API
+ - Run `torc workflows reset-status --failed-only --restart`
+ - Run `torc slurm regenerate --submit`
+ - Increment retry counter
+ - Resume polling
+5. Exit 0 on success, exit 1 on max retries exceeded
+
+### The Regenerate Command
+
+```bash
+torc slurm regenerate --submit
+```
+
+1. Query jobs with status uninitialized/ready/blocked
+2. Group by resource requirements
+3. For each group:
+ - Find best partition using HPC profile
+ - Calculate jobs per node
+ - Determine number of allocations needed
+ - Create scheduler config
+4. Update jobs with new scheduler reference
+5. Submit allocations via sbatch
+
+## See Also
+
+- [Automatic Failure Recovery Tutorial](../tutorials/automatic-recovery.md) - Step-by-step guide
+- [MCP Server Tutorial](../tutorials/mcp-server.md) - Setting up AI-assisted tools
+- [Resource Monitoring](../how-to/resource-monitoring.md) - Understanding resource tracking
diff --git a/docs/src/tutorials/README.md b/docs/src/tutorials/README.md
index 4d835900..4fdc73e8 100644
--- a/docs/src/tutorials/README.md
+++ b/docs/src/tutorials/README.md
@@ -16,6 +16,8 @@ This section contains learning-oriented lessons to help you get started with Tor
10. [Map Python Functions](./map_python_function_across_workers.md) - Distribute Python functions across workers
11. [Filtering CLI Output with Nushell](./filtering-with-nushell.md) - Filter jobs, results, and user data with readable queries
12. [Custom HPC Profile](./custom-hpc-profile.md) - Create an HPC profile for unsupported clusters
+13. [MCP Server with Claude Code](./mcp-server.md) - Enable Claude to interact with your workflows
+14. [Automatic Failure Recovery](./automatic-recovery.md) - Autonomous workflow monitoring with `torc watch`
Start with the Configuration Files tutorial to set up your environment, then try the Dashboard Deployment tutorial if you want to use the web interface.
diff --git a/docs/src/tutorials/ai-failure-recovery.md b/docs/src/tutorials/ai-failure-recovery.md
new file mode 100644
index 00000000..3bd9d1dd
--- /dev/null
+++ b/docs/src/tutorials/ai-failure-recovery.md
@@ -0,0 +1 @@
+# Automatic Failure Recovery
diff --git a/docs/src/tutorials/automatic-recovery.md b/docs/src/tutorials/automatic-recovery.md
new file mode 100644
index 00000000..0653fc12
--- /dev/null
+++ b/docs/src/tutorials/automatic-recovery.md
@@ -0,0 +1,264 @@
+# Tutorial: Automatic Failure Recovery
+
+This tutorial shows how to use `torc watch` with automatic recovery to handle workflow failures without manual intervention.
+
+## Learning Objectives
+
+By the end of this tutorial, you will:
+
+- Understand automatic vs manual recovery options
+- Know how to configure automatic recovery heuristics
+- Monitor workflows with automatic failure handling
+
+## Prerequisites
+
+- Torc installed with the client feature
+- A running Torc server
+- Workflows submitted to Slurm
+
+## Automatic Recovery
+
+The `torc watch` command can automatically recover from common failures:
+
+```bash
+torc watch 42 --auto-recover
+```
+
+This will:
+1. Poll the workflow until completion
+2. On failure, diagnose the cause (OOM, timeout, etc.)
+3. Adjust resource requirements based on heuristics
+4. Reset failed jobs and submit new Slurm allocations
+5. Resume monitoring
+6. Repeat until success or max retries exceeded
+
+### Recovery Heuristics
+
+| Failure Type | Detection | Default Action |
+|--------------|-----------|----------------|
+| Out of Memory | Peak memory > limit, exit code 137 | Increase memory by 1.5x |
+| Timeout | Execution time near limit | Increase runtime by 1.5x |
+| Unknown | Other exit codes | Retry without changes |
+
+### Configuration Options
+
+```bash
+torc watch 42 --auto-recover \
+ --max-retries 5 \ # Maximum recovery attempts (default: 3)
+ --memory-multiplier 2.0 \ # Memory increase factor (default: 1.5)
+ --runtime-multiplier 2.0 \ # Runtime increase factor (default: 1.5)
+ --poll-interval 120 \ # Seconds between status checks (default: 60)
+ --output-dir /scratch/output \
+ --show-job-counts # Display per-status job counts (optional, adds server load)
+```
+
+## Example: Complete Workflow
+
+### 1. Submit a Workflow
+
+```bash
+torc submit-slurm --account myproject workflow.yaml
+```
+
+Output:
+```
+Created workflow 42 with 100 jobs
+Submitted to Slurm with 10 allocations
+```
+
+### 2. Start Watching with Auto-Recovery
+
+```bash
+torc watch 42 --auto-recover --max-retries 3 --show-job-counts
+```
+
+> **Note:** The `--show-job-counts` flag is optional. Without it, the command polls
+> silently until completion, which reduces server load for large workflows.
+
+Output:
+```
+Watching workflow 42 (poll interval: 60s, auto-recover enabled, max retries: 3, job counts enabled)
+ completed=0, running=10, pending=0, failed=0, blocked=90
+ completed=25, running=10, pending=0, failed=0, blocked=65
+ ...
+ completed=95, running=0, pending=0, failed=5, blocked=0
+Workflow 42 is complete
+
+Workflow completed with failures:
+ - Failed: 5
+ - Canceled: 0
+ - Terminated: 0
+ - Completed: 95
+
+Attempting automatic recovery (attempt 1/3)
+
+Diagnosing failures...
+Applying recovery heuristics...
+ Job 107 (train_model_7): OOM detected, increasing memory 8g -> 12g
+ Job 112 (train_model_12): OOM detected, increasing memory 8g -> 12g
+ Job 123 (train_model_23): OOM detected, increasing memory 8g -> 12g
+ Job 131 (train_model_31): OOM detected, increasing memory 8g -> 12g
+ Job 145 (train_model_45): OOM detected, increasing memory 8g -> 12g
+ Applied fixes: 5 OOM, 0 timeout
+
+Resetting failed jobs...
+Regenerating Slurm schedulers and submitting...
+
+Recovery initiated. Resuming monitoring...
+
+Watching workflow 42 (poll interval: 60s, auto-recover enabled, max retries: 3, job counts enabled)
+ completed=95, running=5, pending=0, failed=0, blocked=0
+ ...
+Workflow 42 is complete
+
+✓ Workflow completed successfully (100 jobs)
+```
+
+### 3. If Max Retries Exceeded
+
+If failures persist after max retries:
+
+```
+Max retries (3) exceeded. Manual intervention required.
+Use the Torc MCP server with your AI assistant to investigate.
+```
+
+At this point, you can use the MCP server with an AI assistant to investigate the root cause.
+
+## Manual Recovery (Without --auto-recover)
+
+Without the `--auto-recover` flag, `torc watch` simply monitors and reports:
+
+```bash
+torc watch 42
+```
+
+On failure, it exits with instructions:
+
+```
+Workflow completed with failures:
+ - Failed: 5
+ - Completed: 95
+
+Auto-recovery disabled. To enable, use --auto-recover flag.
+Or use the Torc MCP server with your AI assistant for manual recovery.
+```
+
+## When to Use Each Approach
+
+### Use Automatic Recovery (`--auto-recover`) when:
+- Running standard compute jobs with predictable failure modes
+- You want hands-off operation
+- OOM and timeout are the main failure types
+- You have HPC allocation budget for retries
+
+### Use Manual/AI-Assisted Recovery when:
+- Failures have complex or unknown causes
+- You need to investigate before retrying
+- Resource increases aren't solving the problem
+- You want to understand why jobs are failing
+
+## Best Practices
+
+### 1. Start with Conservative Resources
+
+Set initial resource requests lower and let auto-recovery increase them:
+- Jobs that succeed keep their original allocation
+- Only failing jobs get increased resources
+- Avoids wasting HPC resources on over-provisioned jobs
+
+### 2. Set Reasonable Max Retries
+
+```bash
+--max-retries 3 # Good for most workflows
+```
+
+Too many retries can waste allocation time on jobs that will never succeed.
+
+### 3. Use Appropriate Multipliers
+
+For memory-bound jobs:
+```bash
+--memory-multiplier 2.0 # Double on OOM
+```
+
+For time-sensitive jobs where you want larger increases:
+```bash
+--runtime-multiplier 2.0 # Double runtime on timeout
+```
+
+### 4. Monitor Long-Running Workflows
+
+**Always run `torc watch` inside tmux or screen** for long-running workflows. HPC workflows can run for hours or days, and you don't want to lose your monitoring session if:
+
+- Your SSH connection drops
+- Your laptop goes to sleep
+- You need to disconnect and reconnect later
+
+Using [tmux](https://github.com/tmux/tmux/wiki) (recommended):
+
+```bash
+# Start a new tmux session
+tmux new -s torc-watch
+
+# Run the watch command
+torc watch 42 --auto-recover --poll-interval 300 --show-job-counts
+
+# Detach from session: press Ctrl+b, then d
+# Reattach later: tmux attach -t torc-watch
+```
+
+Using screen:
+```bash
+screen -S torc-watch
+torc watch 42 --auto-recover --poll-interval 300 --show-job-counts
+# Detach: Ctrl+a, then d
+# Reattach: screen -r torc-watch
+```
+
+For very large workflows, omit `--show-job-counts` to reduce server load.
+
+### 5. Check Resource Utilization Afterward
+
+After completion, review actual usage:
+```bash
+torc reports check-resource-utilization 42
+```
+
+This helps tune future job specifications.
+
+## Troubleshooting
+
+### Jobs Keep Failing After Recovery
+
+If jobs fail repeatedly with the same error:
+1. Check if the error is resource-related (OOM/timeout)
+2. Review job logs: `torc jobs logs `
+3. Check if there's a code bug
+4. Use MCP server with AI assistant to investigate
+
+### No Slurm Schedulers Generated
+
+If `slurm regenerate` fails:
+1. Ensure workflow was created with `--account` option
+2. Check HPC profile is detected: `torc hpc detect`
+3. Specify profile explicitly: `--profile kestrel`
+
+### Resource Limits Too High
+
+If jobs are requesting more resources than partitions allow:
+1. Check partition limits: `torc hpc partitions `
+2. Use smaller multipliers
+3. Consider splitting jobs into smaller pieces
+
+## Summary
+
+The `torc watch --auto-recover` command provides:
+
+- **Automatic OOM handling**: Detects memory issues and increases allocations
+- **Automatic timeout handling**: Detects slow jobs and increases runtime
+- **Configurable heuristics**: Adjust multipliers for your workload
+- **Retry limits**: Prevent infinite retry loops
+- **Graceful degradation**: Falls back to manual recovery when needed
+
+For most HPC workflows, automatic recovery handles 80-90% of transient failures without human intervention.
diff --git a/docs/src/tutorials/mcp-server.md b/docs/src/tutorials/mcp-server.md
new file mode 100644
index 00000000..9f2543a6
--- /dev/null
+++ b/docs/src/tutorials/mcp-server.md
@@ -0,0 +1,358 @@
+# Tutorial: Using the MCP Server
+
+This tutorial shows how to use the Torc MCP (Model Context Protocol) server to enable AI assistants to interact with your Torc workflows directly.
+
+## Learning Objectives
+
+By the end of this tutorial, you will:
+
+- Understand what the MCP server provides
+- Know how to configure your AI assistant to use the Torc MCP server
+- Be able to inspect and manage your workflows using natural language
+
+## Prerequisites
+
+- Torc installed
+- Torc server running
+- One of the following AI assistants:
+ - [Claude Code](https://claude.ai/code) (terminal)
+ - [VS Code](https://code.visualstudio.com/) with GitHub Copilot (IDE)
+
+## What is the MCP Server?
+
+The Model Context Protocol (MCP) is an open standard for connecting AI assistants to external tools and data sources. The `torc-mcp-server` binary exposes Torc's workflow management capabilities as MCP tools.
+
+**Available Tools:**
+
+| Tool | Description |
+|------|-------------|
+| `get_workflow_status` | Get workflow info with job counts by status |
+| `get_job_details` | Get detailed job info including resource requirements |
+| `get_job_logs` | Read stdout/stderr from job log files |
+| `list_failed_jobs` | List all failed jobs in a workflow |
+| `list_jobs_by_status` | Filter jobs by status |
+| `check_resource_utilization` | Analyze resource usage and detect OOM/timeout issues |
+| `update_job_resources` | Modify job resource requirements |
+| `restart_jobs` | Reset and restart failed jobs |
+| `resubmit_workflow` | Regenerate Slurm schedulers and submit new allocations |
+| `cancel_jobs` | Cancel specific jobs |
+| `create_workflow_from_spec` | Create a workflow from JSON specification |
+
+## Configuration
+
+Choose the setup that matches your environment:
+
+- **[Claude Code](#claude-code)** - Terminal-based AI assistant
+- **[VS Code + Copilot](#vs-code--github-copilot)** - IDE with GitHub Copilot Chat
+- **[VS Code + Copilot on HPC](#vs-code-remote-ssh-for-hpc)** - Remote development on HPC clusters
+
+---
+
+## Claude Code
+
+Claude Code supports MCP configuration at three scopes:
+
+| Scope | File | Use Case |
+|-------|------|----------|
+| **Project** | `.mcp.json` in project root | Team-shared configuration (commit to git) |
+| **Local** | `.mcp.json` with `--scope local` | Personal project settings (gitignored) |
+| **User** | `~/.claude.json` | Cross-project personal tools |
+
+### Using the CLI (Recommended)
+
+```bash
+# Add the Torc MCP server to your project
+claude mcp add torc \
+ --scope project \
+ -e TORC_API_URL=http://localhost:8080/torc-service/v1 \
+ -e TORC_OUTPUT_DIR=/path/to/your/output \
+ -- /path/to/torc-mcp-server
+```
+
+### Manual Configuration
+
+Create or edit `.mcp.json` in your project root:
+
+```json
+{
+ "mcpServers": {
+ "torc": {
+ "command": "/path/to/torc-mcp-server",
+ "env": {
+ "TORC_API_URL": "http://localhost:8080/torc-service/v1",
+ "TORC_OUTPUT_DIR": "/path/to/your/output"
+ }
+ }
+ }
+}
+```
+
+Replace `/path/to/torc-mcp-server` with the actual path to your built binary.
+
+### Environment Variables
+
+| Variable | Description | Default |
+|----------|-------------|---------|
+| `TORC_API_URL` | Torc server URL | `http://localhost:8080/torc-service/v1` |
+| `TORC_OUTPUT_DIR` | Directory containing job logs | `output` |
+| `TORC_USERNAME` | Username for authentication (optional) | — |
+| `TORC_PASSWORD` | Password for authentication (optional) | — |
+
+You can also use environment variable expansion in `.mcp.json`:
+
+```json
+{
+ "mcpServers": {
+ "torc": {
+ "command": "/path/to/torc-mcp-server",
+ "env": {
+ "TORC_API_URL": "${TORC_API_URL:-http://localhost:8080/torc-service/v1}",
+ "TORC_OUTPUT_DIR": "${TORC_OUTPUT_DIR:-./output}"
+ }
+ }
+ }
+}
+```
+
+### Verify Configuration
+
+Check that the MCP server is configured correctly:
+
+```bash
+# List configured MCP servers
+claude mcp list
+
+# Get details about the torc server
+claude mcp get torc
+```
+
+Start a new Claude Code session in your project directory. Claude will automatically have access to the Torc tools.
+
+### Example Conversation
+
+```
+You: What's the status of workflow 15?
+
+Claude: I'll check the workflow status for you.
+
+Workflow 15 "hyperparameter_sweep" has 18 jobs:
+- Completed: 12
+- Running: 3
+- Failed: 2
+- Blocked: 1
+
+Would you like me to investigate the failed jobs?
+
+You: Yes, show me what went wrong
+
+Claude: Looking at the failed jobs...
+
+Job 7 "train_lr0.01_batch32" failed with:
+- Return code: 137 (killed by OOM)
+- Peak memory: 15.8 GB
+- Requested memory: 8 GB
+
+Job 12 "train_lr0.001_batch64" failed with:
+- Return code: 1
+- Error: FileNotFoundError: checkpoint.pt not found
+
+Recommendations:
+1. Job 7 needs more memory - I can restart it with 20 GB
+2. Job 12 has a missing dependency - check if upstream jobs completed
+
+You: Restart job 7 with 20GB memory
+
+Claude: Done. I've:
+1. Updated job 7's memory requirement from 8g to 20g
+2. Reset failed jobs and restarted the workflow
+
+Job 7 is now ready to run again.
+```
+
+### Troubleshooting Claude Code
+
+**Claude doesn't see the tools:**
+- Verify the MCP server is configured: `claude mcp list`
+- Check the config file is valid JSON: `cat .mcp.json | jq .`
+- Check that the path to `torc-mcp-server` is correct and the binary exists
+- Start a new Claude Code session (MCP servers are loaded at startup)
+
+**Remove the MCP server:**
+```bash
+claude mcp remove torc
+```
+
+---
+
+## VS Code + GitHub Copilot
+
+VS Code with GitHub Copilot Chat supports MCP servers for enhanced AI-assisted workflow management.
+
+### Prerequisites
+
+- VS Code 1.99 or later
+- GitHub Copilot extension installed
+- GitHub Copilot subscription (Business, Enterprise, Pro, or Pro+)
+
+### Configuration
+
+Create `.vscode/mcp.json` in your project root:
+
+```json
+{
+ "servers": {
+ "torc": {
+ "command": "/path/to/torc-mcp-server",
+ "env": {
+ "TORC_API_URL": "http://localhost:8080/torc-service/v1",
+ "TORC_OUTPUT_DIR": "./output"
+ }
+ }
+ }
+}
+```
+
+### Verify Setup
+
+1. Open the Command Palette (`Ctrl+Shift+P` / `Cmd+Shift+P`)
+2. Run "MCP: List Servers"
+3. Verify "torc" appears in the list
+
+### Usage
+
+In Copilot Chat, use **Agent Mode** (`@workspace` or the agent icon) to access MCP tools:
+
+> "What's the status of workflow 42?"
+
+> "Show me the failed jobs and their error logs"
+
+---
+
+## VS Code Remote SSH for HPC
+
+For users running Torc on HPC clusters, VS Code's Remote SSH extension allows you to use Copilot Chat with the MCP server running directly on the cluster.
+
+### Architecture
+
+```
+┌─────────────────────┐ ┌─────────────────────────────────────┐
+│ Local Machine │ SSH │ HPC Cluster │
+│ │◄───────►│ │
+│ VS Code │ │ torc-mcp-server ◄──► torc-server │
+│ (Copilot Chat) │ │ ▲ │
+│ │ │ │ │
+└─────────────────────┘ │ .vscode/mcp.json │
+ └─────────────────────────────────────┘
+```
+
+The MCP server runs on the HPC, communicates with the Torc server on the HPC, and VS Code proxies requests through SSH. No ports need to be exposed to your local machine.
+
+### Step 1: Build `torc-mcp-server` on the HPC
+
+```bash
+# On the HPC (via SSH or login node)
+cd /path/to/torc
+cargo build --release -p torc-mcp-server
+```
+
+### Step 2: Configure MCP in your project
+
+Create `.vscode/mcp.json` in your project directory **on the HPC**:
+
+```json
+{
+ "servers": {
+ "torc": {
+ "command": "/path/on/hpc/torc/target/release/torc-mcp-server",
+ "env": {
+ "TORC_API_URL": "http://localhost:8080/torc-service/v1",
+ "TORC_OUTPUT_DIR": "./output"
+ }
+ }
+ }
+}
+```
+
+> **Important:** MCP servers configured in workspace settings (`.vscode/mcp.json`) run on the remote host, not your local machine.
+
+### Step 3: Connect and use
+
+1. Install the [Remote - SSH](https://marketplace.visualstudio.com/items?itemName=ms-vscode-remote.remote-ssh) extension
+2. Connect to the HPC: `Remote-SSH: Connect to Host...`
+3. Open your project folder on the HPC
+4. Open Copilot Chat and use Agent Mode
+
+### HPC-Specific Tips
+
+- **Module systems:** If your HPC uses modules, you may need to set `PATH` in the env to include required dependencies
+- **Shared filesystems:** Place `.vscode/mcp.json` in a project directory on a shared filesystem accessible from compute nodes
+- **Firewalls:** The MCP server only needs to reach the Torc server on the HPC's internal network
+
+---
+
+## Interact with Workflows
+
+Once configured, you can ask your AI assistant to help manage workflows using natural language:
+
+**Check workflow status:**
+> "What's the status of workflow 42?"
+
+**Investigate failures:**
+> "List all failed jobs in workflow 42 and show me the error logs"
+
+**Take action:**
+> "Restart the failed jobs in workflow 42 with doubled memory"
+
+**Create workflows:**
+> "Create a workflow with 10 parallel jobs that each run `python process.py index`"
+
+---
+
+## How It Works
+
+The MCP server:
+
+1. **Receives tool calls** from the AI assistant via stdio
+2. **Translates them** to Torc REST API calls
+3. **Returns results** in a format the assistant can understand
+
+The server is stateless—it simply proxies requests to your running Torc server. All workflow state remains in Torc's database.
+
+## Security Considerations
+
+- The MCP server has full access to your Torc server
+- Consider using authentication (`TORC_USERNAME`/`TORC_PASSWORD`) if your Torc server is exposed
+- The server can modify workflows (restart, cancel, update resources)
+- Review proposed actions before they execute
+
+## Troubleshooting
+
+### "Failed to connect to server"
+- Ensure your Torc server is running
+- Check that `TORC_API_URL` is correct
+- Verify network connectivity
+
+### "Permission denied" or "Authentication failed"
+- Set `TORC_USERNAME` and `TORC_PASSWORD` if your server requires auth
+- Check that the credentials are correct
+
+### Logs not found
+- Ensure `TORC_OUTPUT_DIR` points to your job output directory
+- Check that jobs have actually run (logs are created at runtime)
+
+## What You Learned
+
+In this tutorial, you learned:
+
+- ✅ What the Torc MCP server provides
+- ✅ How to configure Claude Code to use it
+- ✅ How to configure VS Code + GitHub Copilot to use it
+- ✅ How to set up MCP on HPC clusters via Remote SSH
+- ✅ How to interact with workflows using natural language
+- ✅ Security considerations for production use
+
+## Next Steps
+
+- [Automatic Failure Recovery](./automatic-recovery.md) - Use `torc watch` for automatic failure recovery
+- [Automatic Recovery Explained](../explanation/automatic-recovery.md) - Understand the recovery architecture
+- [Configuration Files](./configuration.md) - Set up Torc configuration
diff --git a/src/cli.rs b/src/cli.rs
index 562daa7d..0f1ef5e2 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -142,6 +142,54 @@ pub enum Commands {
#[arg(long, default_value = "false")]
skip_checks: bool,
},
+ /// Watch a workflow and automatically recover from failures
+ ///
+ /// Monitors a workflow until completion. With --auto-recover, automatically
+ /// diagnoses failures, adjusts resource requirements, and resubmits jobs.
+ ///
+ /// Recovery heuristics:
+ /// - OOM (out of memory): Increase memory by --memory-multiplier (default 1.5x)
+ /// - Timeout: Increase runtime by --runtime-multiplier (default 1.5x)
+ /// - Other failures: Retry without changes (transient errors)
+ ///
+ /// Without --auto-recover, reports failures and exits for manual intervention
+ /// or AI-assisted recovery via the MCP server.
+ Watch {
+ /// Workflow ID to watch
+ #[arg()]
+ workflow_id: i64,
+
+ /// Poll interval in seconds
+ #[arg(short, long, default_value = "60")]
+ poll_interval: u64,
+
+ /// Enable automatic failure recovery
+ #[arg(long)]
+ auto_recover: bool,
+
+ /// Maximum number of recovery attempts (default: 3)
+ #[arg(long, default_value = "3")]
+ max_retries: u32,
+
+ /// Memory multiplier for OOM failures (default: 1.5 = 50% increase)
+ #[arg(long, default_value = "1.5")]
+ memory_multiplier: f64,
+
+ /// Runtime multiplier for timeout failures (default: 1.5 = 50% increase)
+ #[arg(long, default_value = "1.5")]
+ runtime_multiplier: f64,
+
+ /// Output directory for job files
+ #[arg(short, long, default_value = "output")]
+ output_dir: PathBuf,
+
+ /// Show job counts by status during polling
+ ///
+ /// WARNING: This option queries all jobs on each poll, which can cause high
+ /// server load for large workflows. Only use for debugging or small workflows.
+ #[arg(long)]
+ show_job_counts: bool,
+ },
/// Workflow management commands
Workflows {
#[command(subcommand)]
diff --git a/src/client.rs b/src/client.rs
index ae53e318..2c8e3bdc 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -18,6 +18,7 @@ pub mod log_paths;
pub mod parameter_expansion;
pub mod resource_monitor;
pub mod utils;
+pub mod watch;
pub mod workflow_graph;
pub mod workflow_manager;
pub mod workflow_spec;
diff --git a/src/client/commands.rs b/src/client/commands.rs
index 26d90cce..057ba15f 100644
--- a/src/client/commands.rs
+++ b/src/client/commands.rs
@@ -13,6 +13,7 @@ pub mod scheduled_compute_nodes;
pub mod slurm;
pub mod table_format;
pub mod user_data;
+pub mod watch;
pub mod workflows;
use std::env;
diff --git a/src/client/commands/reports.rs b/src/client/commands/reports.rs
index 582651f5..751069eb 100644
--- a/src/client/commands/reports.rs
+++ b/src/client/commands/reports.rs
@@ -68,6 +68,9 @@ pub enum ReportCommands {
/// Show all jobs (default: only show jobs that exceeded requirements)
#[arg(short, long)]
all: bool,
+ /// Include failed jobs in the analysis (for recovery diagnostics)
+ #[arg(long)]
+ include_failed: bool,
},
/// Generate a comprehensive JSON report of job results including all log file paths
Results {
@@ -89,8 +92,16 @@ pub fn handle_report_commands(config: &Configuration, command: &ReportCommands,
workflow_id,
run_id,
all,
+ include_failed,
} => {
- check_resource_utilization(config, *workflow_id, *run_id, *all, format);
+ check_resource_utilization(
+ config,
+ *workflow_id,
+ *run_id,
+ *all,
+ *include_failed,
+ format,
+ );
}
ReportCommands::Results {
workflow_id,
@@ -107,6 +118,7 @@ fn check_resource_utilization(
workflow_id: Option,
run_id: Option,
show_all: bool,
+ include_failed: bool,
format: &str,
) {
// Get or select workflow ID
@@ -122,21 +134,51 @@ fn check_resource_utilization(
},
};
- // Fetch results for the workflow using pagination
+ // Fetch completed results for the workflow using pagination
let mut params = pagination::ResultListParams::new().with_status(models::JobStatus::Completed);
if let Some(rid) = run_id {
params = params.with_run_id(rid);
}
- let results = match pagination::paginate_results(config, wf_id, params) {
+ let completed_results = match pagination::paginate_results(config, wf_id, params) {
Ok(results) => results,
Err(e) => {
- print_error("fetching results", &e);
+ print_error("fetching completed results", &e);
std::process::exit(1);
}
};
+ // Fetch failed results if requested
+ let failed_results = if include_failed {
+ let mut failed_params =
+ pagination::ResultListParams::new().with_status(models::JobStatus::Failed);
+ if let Some(rid) = run_id {
+ failed_params = failed_params.with_run_id(rid);
+ }
+ match pagination::paginate_results(config, wf_id, failed_params) {
+ Ok(results) => results,
+ Err(e) => {
+ print_error("fetching failed results", &e);
+ std::process::exit(1);
+ }
+ }
+ } else {
+ Vec::new()
+ };
+
+ // Combine results
+ let mut results = completed_results;
+ results.extend(failed_results);
+
if results.is_empty() {
- println!("No completed job results found for workflow {}", wf_id);
+ let msg = if include_failed {
+ format!(
+ "No completed or failed job results found for workflow {}",
+ wf_id
+ )
+ } else {
+ format!("No completed job results found for workflow {}", wf_id)
+ };
+ println!("{}", msg);
std::process::exit(0);
}
@@ -175,9 +217,11 @@ fn check_resource_utilization(
// Analyze each result
let mut rows = Vec::new();
let mut over_util_count = 0;
+ let mut failed_jobs_info: Vec = Vec::new();
for result in &results {
let job_id = result.job_id;
+ let is_failed = result.status == models::JobStatus::Failed;
// Get job and its resource requirements
let job = match job_map.get(&job_id) {
@@ -209,6 +253,52 @@ fn check_resource_utilization(
let job_name = job.name.clone();
+ // Track failed jobs separately with their resource info
+ if is_failed {
+ let mut failed_info = serde_json::json!({
+ "job_id": job_id,
+ "job_name": job_name.clone(),
+ "return_code": result.return_code,
+ "exec_time_minutes": result.exec_time_minutes,
+ "configured_memory": resource_req.memory.clone(),
+ "configured_runtime": resource_req.runtime.clone(),
+ "configured_cpus": resource_req.num_cpus,
+ });
+
+ // Add resource usage if available
+ if let Some(peak_mem) = result.peak_memory_bytes {
+ failed_info["peak_memory_bytes"] = serde_json::json!(peak_mem);
+ failed_info["peak_memory_formatted"] =
+ serde_json::json!(format_memory_bytes(peak_mem));
+
+ // Check if it's an OOM issue
+ let specified_memory_bytes = parse_memory_string(&resource_req.memory);
+ if peak_mem > specified_memory_bytes {
+ failed_info["likely_oom"] = serde_json::json!(true);
+ let over_pct =
+ ((peak_mem as f64 / specified_memory_bytes as f64) - 1.0) * 100.0;
+ failed_info["memory_over_utilization"] =
+ serde_json::json!(format!("+{:.1}%", over_pct));
+ }
+ }
+
+ // Check if runtime exceeded
+ let exec_time_seconds = result.exec_time_minutes * 60.0;
+ if let Ok(specified_runtime_seconds) = duration_string_to_seconds(&resource_req.runtime)
+ {
+ let specified_runtime_seconds = specified_runtime_seconds as f64;
+ if exec_time_seconds > specified_runtime_seconds * 0.9 {
+ // If job ran for > 90% of its runtime, it might be a timeout
+ failed_info["likely_timeout"] = serde_json::json!(true);
+ let pct_of_runtime = (exec_time_seconds / specified_runtime_seconds) * 100.0;
+ failed_info["runtime_utilization"] =
+ serde_json::json!(format!("{:.1}%", pct_of_runtime));
+ }
+ }
+
+ failed_jobs_info.push(failed_info);
+ }
+
// Check memory over-utilization
if let Some(peak_memory_bytes) = result.peak_memory_bytes {
let specified_memory_bytes = parse_memory_string(&resource_req.memory);
@@ -305,7 +395,7 @@ fn check_resource_utilization(
// Output results
match format {
"json" => {
- let json_output = serde_json::json!({
+ let mut json_output = serde_json::json!({
"workflow_id": wf_id,
"run_id": run_id,
"total_results": results.len(),
@@ -321,6 +411,13 @@ fn check_resource_utilization(
})
}).collect::>(),
});
+
+ // Add failed jobs section if there are any
+ if !failed_jobs_info.is_empty() {
+ json_output["failed_jobs_count"] = serde_json::json!(failed_jobs_info.len());
+ json_output["failed_jobs"] = serde_json::json!(failed_jobs_info);
+ }
+
println!("{}", serde_json::to_string_pretty(&json_output).unwrap());
}
"table" | _ => {
diff --git a/src/client/commands/slurm.rs b/src/client/commands/slurm.rs
index 9708d8ab..3f942239 100644
--- a/src/client/commands/slurm.rs
+++ b/src/client/commands/slurm.rs
@@ -317,6 +317,43 @@ pub enum SlurmCommands {
#[arg(long)]
force: bool,
},
+ /// Regenerate Slurm schedulers for an existing workflow based on pending jobs
+ ///
+ /// Analyzes jobs that are uninitialized, ready, or blocked and generates new
+ /// Slurm schedulers to run them. Uses existing scheduler configurations as
+ /// defaults for account, partition, and other settings.
+ ///
+ /// This is useful for recovery after job failures: update job resources,
+ /// reset failed jobs, then regenerate schedulers to submit new allocations.
+ Regenerate {
+ /// Workflow ID
+ #[arg()]
+ workflow_id: i64,
+
+ /// Slurm account to use (defaults to account from existing schedulers)
+ #[arg(long)]
+ account: Option,
+
+ /// HPC profile to use (if not specified, tries to detect current system)
+ #[arg(long)]
+ profile: Option,
+
+ /// Bundle all nodes into a single Slurm allocation per scheduler
+ #[arg(long)]
+ single_allocation: bool,
+
+ /// Submit the generated allocations immediately
+ #[arg(long)]
+ submit: bool,
+
+ /// Output directory for job output files (used when submitting)
+ #[arg(short, long, default_value = "output")]
+ output_dir: PathBuf,
+
+ /// Poll interval in seconds (used when submitting)
+ #[arg(short, long, default_value = "60")]
+ poll_interval: i32,
+ },
}
/// Convert seconds to Slurm walltime format (HH:MM:SS or D-HH:MM:SS)
@@ -1080,6 +1117,27 @@ pub fn handle_slurm_commands(config: &Configuration, command: &SlurmCommands, fo
format,
);
}
+ SlurmCommands::Regenerate {
+ workflow_id,
+ account,
+ profile: profile_name,
+ single_allocation,
+ submit,
+ output_dir,
+ poll_interval,
+ } => {
+ handle_regenerate(
+ config,
+ *workflow_id,
+ account.as_deref(),
+ profile_name.as_deref(),
+ *single_allocation,
+ *submit,
+ output_dir,
+ *poll_interval,
+ format,
+ );
+ }
}
}
@@ -2442,3 +2500,469 @@ fn handle_generate(
}
}
}
+
+/// Result of regenerating schedulers for an existing workflow
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RegenerateResult {
+ pub workflow_id: i64,
+ pub pending_jobs: usize,
+ pub schedulers_created: Vec,
+ pub total_allocations: i64,
+ pub warnings: Vec,
+ pub submitted: bool,
+}
+
+/// Information about a created scheduler
+#[derive(Debug, Serialize, Deserialize)]
+pub struct SchedulerInfo {
+ pub id: i64,
+ pub name: String,
+ pub account: String,
+ pub partition: Option,
+ pub walltime: String,
+ pub nodes: i64,
+ pub num_allocations: i64,
+ pub job_count: usize,
+}
+
+/// Handle the regenerate command - regenerates Slurm schedulers for pending jobs
+fn handle_regenerate(
+ config: &Configuration,
+ workflow_id: i64,
+ account: Option<&str>,
+ profile_name: Option<&str>,
+ single_allocation: bool,
+ submit: bool,
+ output_dir: &PathBuf,
+ poll_interval: i32,
+ format: &str,
+) {
+ // Load HPC config and registry
+ let torc_config = TorcConfig::load().unwrap_or_default();
+ let registry = create_registry_with_config_public(&torc_config.client.hpc);
+
+ // Get the HPC profile
+ let profile = if let Some(n) = profile_name {
+ registry.get(n)
+ } else {
+ registry.detect()
+ };
+
+ let profile = match profile {
+ Some(p) => p,
+ None => {
+ if profile_name.is_some() {
+ eprintln!("Unknown HPC profile: {}", profile_name.unwrap());
+ } else {
+ eprintln!("No HPC profile specified and no system detected.");
+ eprintln!("Use --profile or run on an HPC system.");
+ }
+ std::process::exit(1);
+ }
+ };
+
+ // Fetch pending jobs (uninitialized, ready, blocked)
+ let pending_statuses = [
+ models::JobStatus::Uninitialized,
+ models::JobStatus::Ready,
+ models::JobStatus::Blocked,
+ ];
+ let mut pending_jobs: Vec = Vec::new();
+
+ for status in &pending_statuses {
+ match default_api::list_jobs(
+ config,
+ workflow_id,
+ Some(status.clone()),
+ None, // needs_file_id
+ None, // upstream_job_id
+ Some(0),
+ Some(10000),
+ None,
+ None,
+ None,
+ ) {
+ Ok(response) => {
+ pending_jobs.extend(response.items.unwrap_or_default());
+ }
+ Err(e) => {
+ print_error(&format!("listing {:?} jobs", status), &e);
+ std::process::exit(1);
+ }
+ }
+ }
+
+ if pending_jobs.is_empty() {
+ if format == "json" {
+ println!(
+ "{}",
+ serde_json::to_string_pretty(&RegenerateResult {
+ workflow_id,
+ pending_jobs: 0,
+ schedulers_created: Vec::new(),
+ total_allocations: 0,
+ warnings: vec!["No pending jobs found".to_string()],
+ submitted: false,
+ })
+ .unwrap()
+ );
+ } else {
+ println!(
+ "No pending jobs (uninitialized, ready, or blocked) found in workflow {}",
+ workflow_id
+ );
+ }
+ return;
+ }
+
+ // Fetch all resource requirements for the workflow
+ let resource_requirements = match default_api::list_resource_requirements(
+ config,
+ workflow_id,
+ None, // job_id
+ Some(0),
+ Some(10000),
+ None, // sort_by
+ None, // reverse_sort
+ None, // name
+ None, // memory
+ None, // num_cpus
+ None, // num_gpus
+ None, // num_nodes
+ None, // runtime
+ ) {
+ Ok(response) => response.items.unwrap_or_default(),
+ Err(e) => {
+ print_error("listing resource requirements", &e);
+ std::process::exit(1);
+ }
+ };
+
+ // Build a map of resource requirement ID -> model
+ let rr_map: HashMap = resource_requirements
+ .iter()
+ .filter_map(|rr| rr.id.map(|id| (id, rr)))
+ .collect();
+
+ // Get existing schedulers to use as defaults
+ let existing_schedulers = match default_api::list_slurm_schedulers(
+ config,
+ workflow_id,
+ Some(0),
+ Some(100),
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ ) {
+ Ok(response) => response.items.unwrap_or_default(),
+ Err(e) => {
+ print_error("listing existing schedulers", &e);
+ std::process::exit(1);
+ }
+ };
+
+ // Determine account to use
+ let account_to_use = account
+ .map(|s| s.to_string())
+ .or_else(|| existing_schedulers.first().map(|s| s.account.clone()))
+ .unwrap_or_else(|| {
+ eprintln!("No account specified and no existing schedulers found.");
+ eprintln!("Use --account to specify a Slurm account.");
+ std::process::exit(1);
+ });
+
+ // Group jobs by resource requirements
+ let mut jobs_by_rr: HashMap> = HashMap::new();
+ let mut warnings: Vec = Vec::new();
+
+ for job in &pending_jobs {
+ if let Some(rr_id) = job.resource_requirements_id {
+ jobs_by_rr.entry(rr_id).or_default().push(job);
+ } else {
+ warnings.push(format!(
+ "Job '{}' (ID: {}) has no resource requirements, skipping",
+ job.name,
+ job.id.unwrap_or(-1)
+ ));
+ }
+ }
+
+ if jobs_by_rr.is_empty() {
+ if format == "json" {
+ println!(
+ "{}",
+ serde_json::to_string_pretty(&RegenerateResult {
+ workflow_id,
+ pending_jobs: pending_jobs.len(),
+ schedulers_created: Vec::new(),
+ total_allocations: 0,
+ warnings,
+ submitted: false,
+ })
+ .unwrap()
+ );
+ } else {
+ println!("No pending jobs with resource requirements found");
+ for warning in &warnings {
+ println!(" Warning: {}", warning);
+ }
+ }
+ return;
+ }
+
+ // Generate schedulers for each resource requirement group
+ let mut schedulers_created: Vec = Vec::new();
+ let mut total_allocations: i64 = 0;
+ let timestamp = Utc::now().format("%Y%m%d_%H%M%S");
+
+ for (rr_id, jobs) in &jobs_by_rr {
+ let rr = match rr_map.get(rr_id) {
+ Some(rr) => *rr,
+ None => {
+ warnings.push(format!(
+ "Resource requirements ID {} not found, skipping {} job(s)",
+ rr_id,
+ jobs.len()
+ ));
+ continue;
+ }
+ };
+
+ // Parse resource requirements
+ let memory_mb = match parse_memory_mb(&rr.memory) {
+ Ok(m) => m,
+ Err(e) => {
+ warnings.push(format!(
+ "Failed to parse memory '{}' for RR {}: {}",
+ rr.memory, rr_id, e
+ ));
+ continue;
+ }
+ };
+
+ let runtime_secs = match duration_string_to_seconds(&rr.runtime) {
+ Ok(s) => s as u64,
+ Err(e) => {
+ warnings.push(format!(
+ "Failed to parse runtime '{}' for RR {}: {}",
+ rr.runtime, rr_id, e
+ ));
+ continue;
+ }
+ };
+
+ let gpus = if rr.num_gpus > 0 {
+ Some(rr.num_gpus as u32)
+ } else {
+ None
+ };
+
+ // Find best partition
+ let partition = match profile.find_best_partition(
+ rr.num_cpus as u32,
+ memory_mb,
+ runtime_secs,
+ gpus,
+ ) {
+ Some(p) => p,
+ None => {
+ warnings.push(format!(
+ "No partition found for resource requirements '{}' (CPUs: {}, Memory: {}, Runtime: {}, GPUs: {:?})",
+ rr.name, rr.num_cpus, rr.memory, rr.runtime, gpus
+ ));
+ continue;
+ }
+ };
+
+ // Calculate jobs per node and total nodes needed
+ let jobs_per_node_by_cpu = partition.cpus_per_node / rr.num_cpus as u32;
+ let jobs_per_node_by_mem = (partition.memory_mb / memory_mb) as u32;
+ let jobs_per_node_by_gpu = match (gpus, partition.gpus_per_node) {
+ (Some(job_gpus), Some(node_gpus)) if job_gpus > 0 => node_gpus / job_gpus,
+ _ => u32::MAX,
+ };
+ let jobs_per_node = std::cmp::max(
+ 1,
+ std::cmp::min(
+ jobs_per_node_by_cpu,
+ std::cmp::min(jobs_per_node_by_mem, jobs_per_node_by_gpu),
+ ),
+ );
+
+ let nodes_per_job = rr.num_nodes as u32;
+ let total_nodes_needed =
+ ((jobs.len() as u32 + jobs_per_node - 1) / jobs_per_node) * nodes_per_job;
+ let total_nodes_needed = std::cmp::max(1, total_nodes_needed) as i64;
+
+ // Allocation strategy
+ let (nodes_per_alloc, num_allocations) = if single_allocation {
+ (total_nodes_needed, 1i64)
+ } else {
+ (1i64, total_nodes_needed)
+ };
+
+ // Create scheduler name with timestamp to avoid conflicts
+ let scheduler_name = format!("{}_regen_{}", rr.name, timestamp);
+
+ // Create the scheduler in the database
+ let scheduler = models::SlurmSchedulerModel {
+ id: None,
+ workflow_id,
+ name: Some(scheduler_name.clone()),
+ account: account_to_use.clone(),
+ partition: if partition.requires_explicit_request {
+ Some(partition.name.clone())
+ } else {
+ None
+ },
+ mem: Some(rr.memory.clone()),
+ walltime: secs_to_walltime(partition.max_walltime_secs),
+ nodes: nodes_per_alloc,
+ gres: gpus.map(|g| format!("gpu:{}", g)),
+ ntasks_per_node: None,
+ qos: partition.default_qos.clone(),
+ tmp: None,
+ extra: None,
+ };
+
+ let created_scheduler = match default_api::create_slurm_scheduler(config, scheduler) {
+ Ok(s) => s,
+ Err(e) => {
+ print_error("creating scheduler", &e);
+ std::process::exit(1);
+ }
+ };
+
+ let scheduler_id = created_scheduler.id.unwrap_or(-1);
+
+ schedulers_created.push(SchedulerInfo {
+ id: scheduler_id,
+ name: scheduler_name.clone(),
+ account: account_to_use.clone(),
+ partition: created_scheduler.partition.clone(),
+ walltime: created_scheduler.walltime.clone(),
+ nodes: nodes_per_alloc,
+ num_allocations,
+ job_count: jobs.len(),
+ });
+
+ total_allocations += num_allocations;
+
+ // Update jobs to reference this scheduler
+ for job in jobs {
+ if let Some(job_id) = job.id {
+ let mut updated_job = (*job).clone();
+ updated_job.scheduler_id = Some(scheduler_id);
+ if let Err(e) = default_api::update_job(config, job_id, updated_job) {
+ warnings.push(format!(
+ "Failed to update job {} with scheduler: {}",
+ job_id, e
+ ));
+ }
+ }
+ }
+ }
+
+ // Submit allocations if requested
+ let mut submitted = false;
+ if submit && !schedulers_created.is_empty() {
+ // Create output directory
+ if let Err(e) = std::fs::create_dir_all(output_dir) {
+ eprintln!("Error creating output directory: {}", e);
+ std::process::exit(1);
+ }
+
+ for scheduler_info in &schedulers_created {
+ let start_one_worker_per_node = scheduler_info.nodes > 1;
+
+ match schedule_slurm_nodes(
+ config,
+ workflow_id,
+ scheduler_info.id,
+ scheduler_info.num_allocations as i32,
+ "worker",
+ output_dir.to_str().unwrap_or("output"),
+ poll_interval,
+ None, // max_parallel_jobs
+ start_one_worker_per_node,
+ false, // keep_submission_scripts
+ ) {
+ Ok(()) => {
+ info!(
+ "Submitted {} allocation(s) for scheduler '{}'",
+ scheduler_info.num_allocations, scheduler_info.name
+ );
+ }
+ Err(e) => {
+ eprintln!(
+ "Error submitting allocations for scheduler '{}': {}",
+ scheduler_info.name, e
+ );
+ std::process::exit(1);
+ }
+ }
+ }
+ submitted = true;
+ }
+
+ // Output results
+ let result = RegenerateResult {
+ workflow_id,
+ pending_jobs: pending_jobs.len(),
+ schedulers_created,
+ total_allocations,
+ warnings,
+ submitted,
+ };
+
+ if format == "json" {
+ println!("{}", serde_json::to_string_pretty(&result).unwrap());
+ } else {
+ println!("Regenerated Slurm schedulers for workflow {}", workflow_id);
+ println!();
+ println!("Summary:");
+ println!(" Pending jobs: {}", result.pending_jobs);
+ println!(" Schedulers created: {}", result.schedulers_created.len());
+ println!(" Total allocations: {}", result.total_allocations);
+ println!(
+ " Profile used: {} ({})",
+ profile.display_name, profile.name
+ );
+
+ if !result.schedulers_created.is_empty() {
+ println!();
+ println!("Schedulers:");
+ for sched in &result.schedulers_created {
+ println!(
+ " - {} (ID: {}): {} job(s), {} allocation(s) × {} node(s)",
+ sched.name, sched.id, sched.job_count, sched.num_allocations, sched.nodes
+ );
+ }
+ }
+
+ if !result.warnings.is_empty() {
+ println!();
+ println!("Warnings:");
+ for warning in &result.warnings {
+ println!(" - {}", warning);
+ }
+ }
+
+ if result.submitted {
+ println!();
+ println!("Allocations submitted successfully.");
+ } else if !result.schedulers_created.is_empty() {
+ println!();
+ println!("To submit the allocations, run:");
+ println!(" torc slurm regenerate {} --submit", workflow_id);
+ }
+ }
+}
diff --git a/src/client/commands/watch.rs b/src/client/commands/watch.rs
new file mode 100644
index 00000000..ac6fb105
--- /dev/null
+++ b/src/client/commands/watch.rs
@@ -0,0 +1,615 @@
+//! Watch command for monitoring workflows with automatic failure recovery
+
+use std::collections::HashMap;
+use std::path::PathBuf;
+use std::process::Command;
+use std::time::Duration;
+
+use crate::client::apis::configuration::Configuration;
+use crate::client::apis::default_api;
+use crate::time_utils::duration_string_to_seconds;
+
+/// Arguments for the watch command
+pub struct WatchArgs {
+ pub workflow_id: i64,
+ pub poll_interval: u64,
+ pub auto_recover: bool,
+ pub max_retries: u32,
+ pub memory_multiplier: f64,
+ pub runtime_multiplier: f64,
+ pub output_dir: PathBuf,
+ pub show_job_counts: bool,
+}
+
+/// Parse memory string (e.g., "8g", "512m", "1024k") to bytes
+pub fn parse_memory_bytes(mem: &str) -> Option {
+ let mem = mem.trim().to_lowercase();
+ let (num_str, multiplier) = if mem.ends_with("gb") {
+ (mem.trim_end_matches("gb"), 1024u64 * 1024 * 1024)
+ } else if mem.ends_with("g") {
+ (mem.trim_end_matches("g"), 1024u64 * 1024 * 1024)
+ } else if mem.ends_with("mb") {
+ (mem.trim_end_matches("mb"), 1024u64 * 1024)
+ } else if mem.ends_with("m") {
+ (mem.trim_end_matches("m"), 1024u64 * 1024)
+ } else if mem.ends_with("kb") {
+ (mem.trim_end_matches("kb"), 1024u64)
+ } else if mem.ends_with("k") {
+ (mem.trim_end_matches("k"), 1024u64)
+ } else {
+ (mem.as_str(), 1u64)
+ };
+ num_str
+ .parse::()
+ .ok()
+ .map(|n| (n * multiplier as f64) as u64)
+}
+
+/// Format bytes to memory string (e.g., "12g", "512m")
+pub fn format_memory_bytes_short(bytes: u64) -> String {
+ if bytes >= 1024 * 1024 * 1024 {
+ format!("{}g", bytes / (1024 * 1024 * 1024))
+ } else if bytes >= 1024 * 1024 {
+ format!("{}m", bytes / (1024 * 1024))
+ } else if bytes >= 1024 {
+ format!("{}k", bytes / 1024)
+ } else {
+ format!("{}b", bytes)
+ }
+}
+
+/// Format seconds to ISO8601 duration (e.g., "PT2H30M")
+pub fn format_duration_iso8601(secs: u64) -> String {
+ let hours = secs / 3600;
+ let mins = (secs % 3600) / 60;
+ if hours > 0 && mins > 0 {
+ format!("PT{}H{}M", hours, mins)
+ } else if hours > 0 {
+ format!("PT{}H", hours)
+ } else {
+ format!("PT{}M", mins.max(1))
+ }
+}
+
+/// Get job counts by status for a workflow
+fn get_job_counts(
+ config: &Configuration,
+ workflow_id: i64,
+) -> Result, String> {
+ let jobs_response = default_api::list_jobs(
+ config,
+ workflow_id,
+ None, // status filter
+ None, // needs_file_id
+ None, // upstream_job_id
+ None, // offset
+ Some(10000), // limit
+ None, // sort_by
+ None, // reverse_sort
+ None, // include_relationships
+ )
+ .map_err(|e| format!("Failed to list jobs: {}", e))?;
+
+ let jobs = jobs_response.items.unwrap_or_default();
+ let mut counts = HashMap::new();
+
+ for job in &jobs {
+ if let Some(status) = &job.status {
+ let status_str = format!("{:?}", status);
+ *counts.entry(status_str).or_insert(0) += 1;
+ }
+ }
+
+ Ok(counts)
+}
+
+/// Poll until workflow is complete, optionally printing status updates
+fn poll_until_complete(
+ config: &Configuration,
+ workflow_id: i64,
+ poll_interval: u64,
+ show_job_counts: bool,
+) -> Result, String> {
+ loop {
+ // Check if workflow is complete
+ match default_api::is_workflow_complete(config, workflow_id) {
+ Ok(response) => {
+ if response.is_complete {
+ eprintln!("Workflow {} is complete", workflow_id);
+ break;
+ }
+ }
+ Err(e) => {
+ return Err(format!("Error checking workflow status: {}", e));
+ }
+ }
+
+ // Print current status if requested
+ if show_job_counts {
+ match get_job_counts(config, workflow_id) {
+ Ok(counts) => {
+ let completed = counts.get("Completed").unwrap_or(&0);
+ let running = counts.get("Running").unwrap_or(&0);
+ let pending = counts.get("Pending").unwrap_or(&0);
+ let failed = counts.get("Failed").unwrap_or(&0);
+ let blocked = counts.get("Blocked").unwrap_or(&0);
+ eprintln!(
+ " completed={}, running={}, pending={}, failed={}, blocked={}",
+ completed, running, pending, failed, blocked
+ );
+ }
+ Err(e) => {
+ eprintln!("Error getting job counts: {}", e);
+ }
+ }
+ }
+
+ std::thread::sleep(Duration::from_secs(poll_interval));
+ }
+
+ get_job_counts(config, workflow_id)
+}
+
+/// Diagnose failures and return job IDs that need resource adjustments
+fn diagnose_failures(workflow_id: i64, output_dir: &PathBuf) -> Result {
+ // Run check-resource-utilization command
+ let output = Command::new("torc")
+ .args([
+ "-f",
+ "json",
+ "reports",
+ "check-resource-utilization",
+ &workflow_id.to_string(),
+ "--include-failed",
+ "-o",
+ output_dir.to_str().unwrap_or("output"),
+ ])
+ .output()
+ .map_err(|e| format!("Failed to run check-resource-utilization: {}", e))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(format!("check-resource-utilization failed: {}", stderr));
+ }
+
+ let stdout = String::from_utf8_lossy(&output.stdout);
+ serde_json::from_str(&stdout)
+ .map_err(|e| format!("Failed to parse resource utilization output: {}", e))
+}
+
+/// Get Slurm log information for failed jobs
+fn get_slurm_log_info(workflow_id: i64, output_dir: &PathBuf) -> Result {
+ // Run reports results command to get log paths
+ let output = Command::new("torc")
+ .args([
+ "-f",
+ "json",
+ "reports",
+ "results",
+ &workflow_id.to_string(),
+ "-o",
+ output_dir.to_str().unwrap_or("output"),
+ ])
+ .output()
+ .map_err(|e| format!("Failed to run reports results: {}", e))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(format!("reports results failed: {}", stderr));
+ }
+
+ let stdout = String::from_utf8_lossy(&output.stdout);
+ serde_json::from_str(&stdout)
+ .map_err(|e| format!("Failed to parse reports results output: {}", e))
+}
+
+/// Correlate failed jobs with their Slurm allocation logs
+fn correlate_slurm_logs(
+ diagnosis: &serde_json::Value,
+ slurm_info: &serde_json::Value,
+) -> HashMap {
+ let mut log_map = HashMap::new();
+
+ // Build map from job_id to slurm log paths
+ if let Some(jobs) = slurm_info.get("jobs").and_then(|v| v.as_array()) {
+ for job in jobs {
+ if let Some(job_id) = job.get("job_id").and_then(|v| v.as_i64()) {
+ let slurm_stdout = job
+ .get("slurm_stdout")
+ .and_then(|v| v.as_str())
+ .map(String::from);
+ let slurm_stderr = job
+ .get("slurm_stderr")
+ .and_then(|v| v.as_str())
+ .map(String::from);
+ let slurm_job_id = job
+ .get("slurm_job_id")
+ .and_then(|v| v.as_str())
+ .map(String::from);
+
+ if slurm_stdout.is_some() || slurm_stderr.is_some() {
+ log_map.insert(
+ job_id,
+ SlurmLogInfo {
+ slurm_job_id,
+ slurm_stdout,
+ slurm_stderr,
+ },
+ );
+ }
+ }
+ }
+ }
+
+ // Filter to only failed jobs
+ let mut failed_log_map = HashMap::new();
+ if let Some(failed_jobs) = diagnosis.get("failed_jobs").and_then(|v| v.as_array()) {
+ for job_info in failed_jobs {
+ if let Some(job_id) = job_info.get("job_id").and_then(|v| v.as_i64()) {
+ if let Some(log_info) = log_map.remove(&job_id) {
+ failed_log_map.insert(job_id, log_info);
+ }
+ }
+ }
+ }
+
+ failed_log_map
+}
+
+/// Information about Slurm logs for a job
+#[derive(Debug)]
+pub struct SlurmLogInfo {
+ pub slurm_job_id: Option,
+ pub slurm_stdout: Option,
+ pub slurm_stderr: Option,
+}
+
+/// Apply recovery heuristics and update job resources
+fn apply_recovery_heuristics(
+ config: &Configuration,
+ workflow_id: i64,
+ diagnosis: &serde_json::Value,
+ memory_multiplier: f64,
+ runtime_multiplier: f64,
+ output_dir: &PathBuf,
+) -> Result<(usize, usize, usize), String> {
+ let mut oom_fixed = 0;
+ let mut timeout_fixed = 0;
+ let mut other_failures = 0;
+
+ // Try to get Slurm log info for correlation
+ let slurm_log_map = match get_slurm_log_info(workflow_id, output_dir) {
+ Ok(slurm_info) => {
+ let log_map = correlate_slurm_logs(diagnosis, &slurm_info);
+ if !log_map.is_empty() {
+ eprintln!(" Found Slurm logs for {} failed job(s)", log_map.len());
+ }
+ log_map
+ }
+ Err(e) => {
+ log::debug!("Could not get Slurm log info: {}", e);
+ HashMap::new()
+ }
+ };
+
+ // Get failed jobs info from diagnosis
+ let failed_jobs = diagnosis
+ .get("failed_jobs")
+ .and_then(|v| v.as_array())
+ .cloned()
+ .unwrap_or_default();
+
+ for job_info in &failed_jobs {
+ let job_id = job_info.get("job_id").and_then(|v| v.as_i64()).unwrap_or(0);
+ let likely_oom = job_info
+ .get("likely_oom")
+ .and_then(|v| v.as_bool())
+ .unwrap_or(false);
+ let likely_timeout = job_info
+ .get("likely_timeout")
+ .and_then(|v| v.as_bool())
+ .unwrap_or(false);
+
+ if job_id == 0 {
+ continue;
+ }
+
+ // Log Slurm info if available
+ if let Some(slurm_info) = slurm_log_map.get(&job_id) {
+ if let Some(slurm_job_id) = &slurm_info.slurm_job_id {
+ log::debug!(" Job {} ran in Slurm allocation {}", job_id, slurm_job_id);
+ }
+ }
+
+ // Get current job to find resource requirements
+ let job = match default_api::get_job(config, job_id) {
+ Ok(j) => j,
+ Err(e) => {
+ eprintln!(" Warning: couldn't get job {}: {}", job_id, e);
+ continue;
+ }
+ };
+
+ let rr_id = match job.resource_requirements_id {
+ Some(id) => id,
+ None => {
+ eprintln!(" Warning: job {} has no resource requirements", job_id);
+ other_failures += 1;
+ continue;
+ }
+ };
+
+ // Get current resource requirements
+ let rr = match default_api::get_resource_requirements(config, rr_id) {
+ Ok(r) => r,
+ Err(e) => {
+ eprintln!(
+ " Warning: couldn't get resource requirements {}: {}",
+ rr_id, e
+ );
+ continue;
+ }
+ };
+
+ let mut updated = false;
+ let mut new_rr = rr.clone();
+
+ // Apply OOM heuristic
+ if likely_oom {
+ if let Some(current_bytes) = parse_memory_bytes(&rr.memory) {
+ let new_bytes = (current_bytes as f64 * memory_multiplier) as u64;
+ let new_memory = format_memory_bytes_short(new_bytes);
+ eprintln!(
+ " Job {} ({}): OOM detected, increasing memory {} -> {}",
+ job_id, job.name, rr.memory, new_memory
+ );
+ new_rr.memory = new_memory;
+ updated = true;
+ oom_fixed += 1;
+ }
+ }
+
+ // Apply timeout heuristic
+ if likely_timeout {
+ // Use duration_string_to_seconds from time_utils
+ if let Ok(current_secs) = duration_string_to_seconds(&rr.runtime) {
+ let new_secs = (current_secs as f64 * runtime_multiplier) as u64;
+ let new_runtime = format_duration_iso8601(new_secs);
+ eprintln!(
+ " Job {} ({}): Timeout detected, increasing runtime {} -> {}",
+ job_id, job.name, rr.runtime, new_runtime
+ );
+ new_rr.runtime = new_runtime;
+ updated = true;
+ timeout_fixed += 1;
+ }
+ }
+
+ // Update resource requirements if changed
+ if updated {
+ if let Err(e) = default_api::update_resource_requirements(config, rr_id, new_rr) {
+ eprintln!(
+ " Warning: failed to update resource requirements for job {}: {}",
+ job_id, e
+ );
+ }
+ } else if !likely_oom && !likely_timeout {
+ // Unknown failure - will retry without changes
+ other_failures += 1;
+ }
+ }
+
+ Ok((oom_fixed, timeout_fixed, other_failures))
+}
+
+/// Reset failed jobs and restart workflow
+fn reset_failed_jobs(workflow_id: i64) -> Result<(), String> {
+ let output = Command::new("torc")
+ .args([
+ "workflows",
+ "reset-status",
+ &workflow_id.to_string(),
+ "--failed-only",
+ "--restart",
+ "--no-prompts",
+ ])
+ .output()
+ .map_err(|e| format!("Failed to run reset-status: {}", e))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(format!("reset-status failed: {}", stderr));
+ }
+
+ Ok(())
+}
+
+/// Regenerate Slurm schedulers and submit allocations
+fn regenerate_and_submit(workflow_id: i64, output_dir: &PathBuf) -> Result<(), String> {
+ let output = Command::new("torc")
+ .args([
+ "slurm",
+ "regenerate",
+ &workflow_id.to_string(),
+ "--submit",
+ "-o",
+ output_dir.to_str().unwrap_or("output"),
+ ])
+ .output()
+ .map_err(|e| format!("Failed to run slurm regenerate: {}", e))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(format!("slurm regenerate failed: {}", stderr));
+ }
+
+ Ok(())
+}
+
+/// Run the watch command
+pub fn run_watch(config: &Configuration, args: &WatchArgs) {
+ let mut retry_count = 0u32;
+
+ eprintln!(
+ "Watching workflow {} (poll interval: {}s{}{})",
+ args.workflow_id,
+ args.poll_interval,
+ if args.auto_recover {
+ format!(", auto-recover enabled, max retries: {}", args.max_retries)
+ } else {
+ String::new()
+ },
+ if args.show_job_counts {
+ ", job counts enabled"
+ } else {
+ ""
+ }
+ );
+
+ if !args.show_job_counts {
+ eprintln!(" (use --show-job-counts to display per-status counts during polling)");
+ }
+
+ loop {
+ // Poll until workflow is complete
+ let counts = match poll_until_complete(
+ config,
+ args.workflow_id,
+ args.poll_interval,
+ args.show_job_counts,
+ ) {
+ Ok(c) => c,
+ Err(e) => {
+ eprintln!("Error: {}", e);
+ std::process::exit(1);
+ }
+ };
+
+ let completed = *counts.get("Completed").unwrap_or(&0);
+ let failed = *counts.get("Failed").unwrap_or(&0);
+ let canceled = *counts.get("Canceled").unwrap_or(&0);
+ let terminated = *counts.get("Terminated").unwrap_or(&0);
+
+ let needs_recovery = failed > 0 || canceled > 0 || terminated > 0;
+
+ if !needs_recovery {
+ eprintln!("\n✓ Workflow completed successfully ({} jobs)", completed);
+ break;
+ }
+
+ eprintln!("\nWorkflow completed with failures:");
+ eprintln!(" - Failed: {}", failed);
+ eprintln!(" - Canceled: {}", canceled);
+ eprintln!(" - Terminated: {}", terminated);
+ eprintln!(" - Completed: {}", completed);
+
+ // Check if we should attempt recovery
+ if !args.auto_recover {
+ eprintln!("\nAuto-recovery disabled. To enable, use --auto-recover flag.");
+ eprintln!("Or use the Torc MCP server with your AI assistant for manual recovery.");
+ std::process::exit(1);
+ }
+
+ if retry_count >= args.max_retries {
+ eprintln!(
+ "\nMax retries ({}) exceeded. Manual intervention required.",
+ args.max_retries
+ );
+ eprintln!("Use the Torc MCP server with your AI assistant to investigate.");
+ std::process::exit(1);
+ }
+
+ retry_count += 1;
+ eprintln!(
+ "\nAttempting automatic recovery (attempt {}/{})",
+ retry_count, args.max_retries
+ );
+
+ // Step 1: Diagnose failures
+ eprintln!("\nDiagnosing failures...");
+ let diagnosis = match diagnose_failures(args.workflow_id, &args.output_dir) {
+ Ok(d) => d,
+ Err(e) => {
+ eprintln!("Warning: Could not diagnose failures: {}", e);
+ eprintln!("Attempting retry without resource adjustments...");
+ serde_json::json!({"failed_jobs": []})
+ }
+ };
+
+ // Step 2: Apply heuristics to adjust resources
+ eprintln!("\nApplying recovery heuristics...");
+ match apply_recovery_heuristics(
+ config,
+ args.workflow_id,
+ &diagnosis,
+ args.memory_multiplier,
+ args.runtime_multiplier,
+ &args.output_dir,
+ ) {
+ Ok((oom, timeout, other)) => {
+ if oom > 0 || timeout > 0 {
+ eprintln!(" Applied fixes: {} OOM, {} timeout", oom, timeout);
+ }
+ if other > 0 {
+ eprintln!(" {} job(s) with unknown failure cause (will retry)", other);
+ }
+ }
+ Err(e) => {
+ eprintln!("Warning: Error applying heuristics: {}", e);
+ }
+ }
+
+ // Step 3: Reset failed jobs
+ eprintln!("\nResetting failed jobs...");
+ if let Err(e) = reset_failed_jobs(args.workflow_id) {
+ eprintln!("Error resetting jobs: {}", e);
+ std::process::exit(1);
+ }
+
+ // Step 4: Regenerate Slurm schedulers and submit
+ eprintln!("Regenerating Slurm schedulers and submitting...");
+ if let Err(e) = regenerate_and_submit(args.workflow_id, &args.output_dir) {
+ eprintln!("Error regenerating schedulers: {}", e);
+ std::process::exit(1);
+ }
+
+ eprintln!("\nRecovery initiated. Resuming monitoring...\n");
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_memory_bytes() {
+ assert_eq!(parse_memory_bytes("1g"), Some(1024 * 1024 * 1024));
+ assert_eq!(parse_memory_bytes("2gb"), Some(2 * 1024 * 1024 * 1024));
+ assert_eq!(parse_memory_bytes("512m"), Some(512 * 1024 * 1024));
+ assert_eq!(parse_memory_bytes("512mb"), Some(512 * 1024 * 1024));
+ assert_eq!(parse_memory_bytes("1024k"), Some(1024 * 1024));
+ assert_eq!(parse_memory_bytes("1024kb"), Some(1024 * 1024));
+ assert_eq!(parse_memory_bytes("1024"), Some(1024));
+ assert_eq!(parse_memory_bytes("invalid"), None);
+ }
+
+ #[test]
+ fn test_format_memory_bytes_short() {
+ assert_eq!(format_memory_bytes_short(1024 * 1024 * 1024), "1g");
+ assert_eq!(format_memory_bytes_short(2 * 1024 * 1024 * 1024), "2g");
+ assert_eq!(format_memory_bytes_short(512 * 1024 * 1024), "512m");
+ assert_eq!(format_memory_bytes_short(1024 * 1024), "1m");
+ assert_eq!(format_memory_bytes_short(1024), "1k");
+ assert_eq!(format_memory_bytes_short(512), "512b");
+ }
+
+ #[test]
+ fn test_format_duration_iso8601() {
+ assert_eq!(format_duration_iso8601(3600), "PT1H");
+ assert_eq!(format_duration_iso8601(7200), "PT2H");
+ assert_eq!(format_duration_iso8601(5400), "PT1H30M");
+ assert_eq!(format_duration_iso8601(1800), "PT30M");
+ assert_eq!(format_duration_iso8601(60), "PT1M");
+ assert_eq!(format_duration_iso8601(30), "PT1M"); // rounds up to minimum 1 minute
+ }
+}
diff --git a/src/client/watch/audit.rs b/src/client/watch/audit.rs
new file mode 100644
index 00000000..855c1d2a
--- /dev/null
+++ b/src/client/watch/audit.rs
@@ -0,0 +1,150 @@
+//! Audit logging for watch command actions.
+
+use std::fs::{File, OpenOptions};
+use std::io::{BufWriter, Write};
+use std::path::Path;
+
+use chrono::Utc;
+use log::warn;
+use serde::Serialize;
+
+use super::claude_client::Diagnosis;
+use super::recovery::RecoveryAction;
+
+/// Audit logger for recording all watch command actions.
+pub struct AuditLogger {
+ writer: BufWriter,
+}
+
+impl AuditLogger {
+ /// Create a new audit logger writing to the specified file.
+ pub fn new(path: &Path) -> Result {
+ // Create parent directories if needed
+ if let Some(parent) = path.parent() {
+ std::fs::create_dir_all(parent)
+ .map_err(|e| format!("Failed to create audit log directory: {}", e))?;
+ }
+
+ let file = OpenOptions::new()
+ .create(true)
+ .append(true)
+ .open(path)
+ .map_err(|e| format!("Failed to open audit log: {}", e))?;
+
+ Ok(Self {
+ writer: BufWriter::new(file),
+ })
+ }
+
+ /// Log a diagnosis event.
+ pub fn log_diagnosis(&mut self, job_id: i64, job_name: &str, diagnosis: &Diagnosis) {
+ let entry = AuditEntry {
+ timestamp: Utc::now().to_rfc3339(),
+ event_type: "diagnosis".to_string(),
+ job_id,
+ job_name: job_name.to_string(),
+ summary: Some(diagnosis.summary.clone()),
+ root_cause: diagnosis.root_cause.clone(),
+ recommended_action: diagnosis.recommended_action.clone(),
+ confidence: Some(diagnosis.confidence),
+ action_taken: None,
+ action_success: None,
+ notes: diagnosis.notes.clone(),
+ };
+
+ self.write_entry(&entry);
+ }
+
+ /// Log a recovery action event.
+ pub fn log_recovery(
+ &mut self,
+ job_id: i64,
+ job_name: &str,
+ action: &RecoveryAction,
+ success: bool,
+ ) {
+ let entry = AuditEntry {
+ timestamp: Utc::now().to_rfc3339(),
+ event_type: "recovery".to_string(),
+ job_id,
+ job_name: job_name.to_string(),
+ summary: None,
+ root_cause: None,
+ recommended_action: None,
+ confidence: None,
+ action_taken: Some(action.clone()),
+ action_success: Some(success),
+ notes: None,
+ };
+
+ self.write_entry(&entry);
+ }
+
+ /// Log an arbitrary event.
+ #[allow(dead_code)]
+ pub fn log_event(
+ &mut self,
+ event_type: &str,
+ job_id: i64,
+ job_name: &str,
+ notes: Option,
+ ) {
+ let entry = AuditEntry {
+ timestamp: Utc::now().to_rfc3339(),
+ event_type: event_type.to_string(),
+ job_id,
+ job_name: job_name.to_string(),
+ summary: None,
+ root_cause: None,
+ recommended_action: None,
+ confidence: None,
+ action_taken: None,
+ action_success: None,
+ notes,
+ };
+
+ self.write_entry(&entry);
+ }
+
+ fn write_entry(&mut self, entry: &AuditEntry) {
+ let json = match serde_json::to_string(entry) {
+ Ok(j) => j,
+ Err(e) => {
+ warn!("Failed to serialize audit entry: {}", e);
+ return;
+ }
+ };
+
+ if let Err(e) = writeln!(self.writer, "{}", json) {
+ warn!("Failed to write audit entry: {}", e);
+ }
+
+ // Flush after each entry to ensure logs are written
+ if let Err(e) = self.writer.flush() {
+ warn!("Failed to flush audit log: {}", e);
+ }
+ }
+}
+
+/// A single audit log entry (JSON lines format).
+#[derive(Debug, Serialize)]
+struct AuditEntry {
+ timestamp: String,
+ event_type: String,
+ job_id: i64,
+ job_name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ summary: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ root_cause: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ recommended_action: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ confidence: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ action_taken: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ action_success: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ notes: Option,
+}
diff --git a/src/client/watch/claude_client.rs b/src/client/watch/claude_client.rs
new file mode 100644
index 00000000..6eb50357
--- /dev/null
+++ b/src/client/watch/claude_client.rs
@@ -0,0 +1,320 @@
+//! Claude API client for failure diagnosis.
+
+use log::debug;
+use serde::{Deserialize, Serialize};
+
+use crate::client::apis::configuration::Configuration;
+use crate::models::JobModel;
+
+use super::recovery::RecoveryAction;
+
+const CLAUDE_API_URL: &str = "https://api.anthropic.com/v1/messages";
+const ANTHROPIC_VERSION: &str = "2023-06-01";
+
+/// Diagnosis result from Claude.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct Diagnosis {
+ /// Summary of the failure
+ pub summary: String,
+ /// Root cause analysis
+ pub root_cause: Option,
+ /// Recommended recovery action
+ pub recommended_action: Option,
+ /// Confidence level (0.0 - 1.0)
+ pub confidence: f64,
+ /// Additional notes or suggestions
+ pub notes: Option,
+}
+
+/// Claude API client for diagnosing job failures.
+pub struct ClaudeClient {
+ api_key: String,
+ model: String,
+ client: reqwest::blocking::Client,
+}
+
+impl ClaudeClient {
+ /// Create a new Claude client.
+ pub fn new(api_key: String, model: String) -> Self {
+ Self {
+ api_key,
+ model,
+ client: reqwest::blocking::Client::new(),
+ }
+ }
+
+ /// Diagnose a job failure using Claude.
+ pub fn diagnose_failure(
+ &self,
+ config: &Configuration,
+ workflow_id: i64,
+ job: &JobModel,
+ stdout: &str,
+ stderr: &str,
+ ) -> Result {
+ let job_id = job.id.ok_or("Job has no ID")?;
+ let job_name = job.name.clone();
+ let command = job.command.clone();
+
+ // Build the prompt
+ let prompt =
+ self.build_diagnosis_prompt(workflow_id, job_id, &job_name, &command, stdout, stderr);
+
+ // Get tools definition
+ let tools = self.get_tools_definition();
+
+ // Make API request
+ let request_body = serde_json::json!({
+ "model": self.model,
+ "max_tokens": 4096,
+ "tools": tools,
+ "messages": [
+ {
+ "role": "user",
+ "content": prompt
+ }
+ ],
+ "system": self.get_system_prompt()
+ });
+
+ debug!("Sending request to Claude API");
+ let response = self
+ .client
+ .post(CLAUDE_API_URL)
+ .header("Content-Type", "application/json")
+ .header("x-api-key", &self.api_key)
+ .header("anthropic-version", ANTHROPIC_VERSION)
+ .json(&request_body)
+ .send()
+ .map_err(|e| format!("Failed to send request to Claude API: {}", e))?;
+
+ if !response.status().is_success() {
+ let status = response.status();
+ let body = response
+ .text()
+ .unwrap_or_else(|_| "unknown error".to_string());
+ return Err(format!("Claude API error ({}): {}", status, body));
+ }
+
+ let response_body: ClaudeResponse = response
+ .json()
+ .map_err(|e| format!("Failed to parse Claude API response: {}", e))?;
+
+ // Parse the response
+ self.parse_response(&response_body, config, workflow_id)
+ }
+
+ fn get_system_prompt(&self) -> &'static str {
+ r#"You are an expert HPC workflow failure diagnostician. Your job is to analyze job failures from workflow orchestration systems and recommend recovery actions.
+
+When analyzing a failure:
+1. Look for common error patterns (OOM, timeout, missing files, permission errors, CUDA errors, etc.)
+2. Consider the job's resource requirements vs actual usage
+3. Recommend specific, actionable recovery steps
+
+Available recovery actions:
+- restart: Restart the job with no changes (for transient failures)
+- restart_with_resources: Restart with modified resource requirements (memory, CPUs, runtime)
+- cancel: Cancel the job and its dependents (for unrecoverable failures)
+- skip: Mark as completed and continue (for optional jobs)
+
+Always provide:
+1. A clear summary of what went wrong
+2. Root cause analysis when possible
+3. A specific recovery action with parameters
+4. Confidence level in your diagnosis
+
+Use the diagnose_failure tool to report your findings."#
+ }
+
+ fn build_diagnosis_prompt(
+ &self,
+ workflow_id: i64,
+ job_id: i64,
+ job_name: &str,
+ command: &str,
+ stdout: &str,
+ stderr: &str,
+ ) -> String {
+ format!(
+ r#"Please diagnose the following job failure and recommend a recovery action.
+
+## Job Information
+- Workflow ID: {}
+- Job ID: {}
+- Job Name: {}
+- Command: {}
+
+## Standard Output (last 10KB)
+```
+{}
+```
+
+## Standard Error (last 10KB)
+```
+{}
+```
+
+Analyze this failure and use the diagnose_failure tool to report your findings."#,
+ workflow_id, job_id, job_name, command, stdout, stderr
+ )
+ }
+
+ fn get_tools_definition(&self) -> serde_json::Value {
+ serde_json::json!([
+ {
+ "name": "diagnose_failure",
+ "description": "Report the diagnosis of a job failure with recommended recovery action",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "summary": {
+ "type": "string",
+ "description": "Brief summary of what went wrong (1-2 sentences)"
+ },
+ "root_cause": {
+ "type": "string",
+ "description": "Detailed root cause analysis"
+ },
+ "action_type": {
+ "type": "string",
+ "enum": ["restart", "restart_with_resources", "cancel", "skip", "none"],
+ "description": "Type of recovery action to take"
+ },
+ "new_memory": {
+ "type": "string",
+ "description": "New memory requirement (e.g., '8g', '16g') for restart_with_resources"
+ },
+ "new_runtime": {
+ "type": "string",
+ "description": "New runtime limit (e.g., 'PT2H', 'PT4H') for restart_with_resources"
+ },
+ "new_num_cpus": {
+ "type": "integer",
+ "description": "New CPU count for restart_with_resources"
+ },
+ "confidence": {
+ "type": "number",
+ "description": "Confidence in the diagnosis (0.0 to 1.0)"
+ },
+ "notes": {
+ "type": "string",
+ "description": "Additional notes or suggestions"
+ }
+ },
+ "required": ["summary", "action_type", "confidence"]
+ }
+ }
+ ])
+ }
+
+ fn parse_response(
+ &self,
+ response: &ClaudeResponse,
+ _config: &Configuration,
+ _workflow_id: i64,
+ ) -> Result {
+ // Find tool use in response
+ for content in &response.content {
+ if content.content_type == "tool_use"
+ && content.name.as_deref() == Some("diagnose_failure")
+ {
+ let input = content.input.as_ref().ok_or("Tool use has no input")?;
+
+ let summary = input
+ .get("summary")
+ .and_then(|v| v.as_str())
+ .ok_or("Missing summary in diagnosis")?
+ .to_string();
+
+ let root_cause = input
+ .get("root_cause")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+
+ let action_type = input
+ .get("action_type")
+ .and_then(|v| v.as_str())
+ .unwrap_or("none");
+
+ let confidence = input
+ .get("confidence")
+ .and_then(|v| v.as_f64())
+ .unwrap_or(0.5);
+
+ let notes = input
+ .get("notes")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+
+ let recommended_action = match action_type {
+ "restart" => Some(RecoveryAction::Restart),
+ "restart_with_resources" => {
+ let new_memory = input
+ .get("new_memory")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+ let new_runtime = input
+ .get("new_runtime")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+ let new_num_cpus = input.get("new_num_cpus").and_then(|v| v.as_i64());
+
+ Some(RecoveryAction::RestartWithResources {
+ memory: new_memory,
+ runtime: new_runtime,
+ num_cpus: new_num_cpus,
+ })
+ }
+ "cancel" => Some(RecoveryAction::Cancel),
+ "skip" => Some(RecoveryAction::Skip),
+ _ => None,
+ };
+
+ return Ok(Diagnosis {
+ summary,
+ root_cause,
+ recommended_action,
+ confidence,
+ notes,
+ });
+ }
+ }
+
+ // If no tool use found, try to extract from text
+ for content in &response.content {
+ if content.content_type == "text" {
+ if let Some(text) = &content.text {
+ return Ok(Diagnosis {
+ summary: text.chars().take(200).collect(),
+ root_cause: None,
+ recommended_action: None,
+ confidence: 0.3,
+ notes: Some("Could not parse structured response from Claude".to_string()),
+ });
+ }
+ }
+ }
+
+ Err("No diagnosis found in Claude response".to_string())
+ }
+}
+
+/// Claude API response structure.
+#[derive(Debug, Deserialize)]
+struct ClaudeResponse {
+ content: Vec,
+ #[allow(dead_code)]
+ model: String,
+ #[allow(dead_code)]
+ stop_reason: Option,
+}
+
+#[derive(Debug, Deserialize)]
+struct ContentBlock {
+ #[serde(rename = "type")]
+ content_type: String,
+ text: Option,
+ name: Option,
+ input: Option,
+}
diff --git a/src/client/watch/failure_cache.rs b/src/client/watch/failure_cache.rs
new file mode 100644
index 00000000..59342674
--- /dev/null
+++ b/src/client/watch/failure_cache.rs
@@ -0,0 +1,320 @@
+//! Failure pattern cache using SQLite.
+//!
+//! Caches failure diagnoses to avoid repeated API calls for similar failures.
+
+use std::path::Path;
+
+use log::{debug, warn};
+use rusqlite::{Connection, params};
+use sha2::{Digest, Sha256};
+
+use super::claude_client::Diagnosis;
+
+/// Cache for storing failure patterns and their diagnoses.
+pub struct FailureCache {
+ conn: Connection,
+}
+
+impl FailureCache {
+ /// Open or create a failure cache database.
+ pub fn open(path: &Path) -> Result {
+ let conn =
+ Connection::open(path).map_err(|e| format!("Failed to open cache database: {}", e))?;
+
+ // Create tables if they don't exist
+ conn.execute(
+ r#"
+ CREATE TABLE IF NOT EXISTS failure_patterns (
+ id INTEGER PRIMARY KEY,
+ job_name_pattern TEXT NOT NULL,
+ error_signature TEXT NOT NULL,
+ diagnosis_json TEXT NOT NULL,
+ success_count INTEGER DEFAULT 0,
+ failure_count INTEGER DEFAULT 0,
+ created_at TEXT NOT NULL DEFAULT (datetime('now')),
+ last_used_at TEXT NOT NULL DEFAULT (datetime('now')),
+ UNIQUE(job_name_pattern, error_signature)
+ )
+ "#,
+ [],
+ )
+ .map_err(|e| format!("Failed to create cache table: {}", e))?;
+
+ // Create index for faster lookups
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_failure_patterns_lookup ON failure_patterns(job_name_pattern, error_signature)",
+ [],
+ )
+ .map_err(|e| format!("Failed to create cache index: {}", e))?;
+
+ Ok(Self { conn })
+ }
+
+ /// Compute an error signature from stderr content.
+ ///
+ /// This normalizes the error output by:
+ /// 1. Extracting lines containing error keywords
+ /// 2. Removing timestamps and PIDs
+ /// 3. Hashing the result
+ pub fn compute_signature(stderr: &str) -> String {
+ let error_keywords = [
+ "error",
+ "Error",
+ "ERROR",
+ "exception",
+ "Exception",
+ "EXCEPTION",
+ "failed",
+ "Failed",
+ "FAILED",
+ "oom",
+ "OOM",
+ "Out of memory",
+ "killed",
+ "Killed",
+ "KILLED",
+ "timeout",
+ "Timeout",
+ "TIMEOUT",
+ "cuda",
+ "CUDA",
+ "segfault",
+ "Segmentation fault",
+ "permission denied",
+ "Permission denied",
+ "not found",
+ "No such file",
+ ];
+
+ let mut error_lines: Vec = Vec::new();
+
+ for line in stderr.lines() {
+ let line_lower = line.to_lowercase();
+ if error_keywords
+ .iter()
+ .any(|kw| line_lower.contains(&kw.to_lowercase()))
+ {
+ // Normalize the line: remove timestamps, PIDs, paths
+ let normalized = normalize_error_line(line);
+ if !normalized.is_empty() {
+ error_lines.push(normalized);
+ }
+ }
+ }
+
+ // If no error lines found, hash the last 20 lines
+ if error_lines.is_empty() {
+ error_lines = stderr
+ .lines()
+ .rev()
+ .take(20)
+ .map(|l| normalize_error_line(l))
+ .filter(|l| !l.is_empty())
+ .collect();
+ error_lines.reverse();
+ }
+
+ // Compute hash
+ let mut hasher = Sha256::new();
+ for line in &error_lines {
+ hasher.update(line.as_bytes());
+ hasher.update(b"\n");
+ }
+ let result = hasher.finalize();
+ format!("{:x}", result)
+ }
+
+ /// Look up a cached diagnosis for a failure pattern.
+ pub fn lookup(
+ &self,
+ job_name: &str,
+ error_signature: &str,
+ ) -> Result