diff --git a/database/unified_db/__init__.py b/database/unified_db/__init__.py index 3ab455b3..aa7f7688 100644 --- a/database/unified_db/__init__.py +++ b/database/unified_db/__init__.py @@ -64,6 +64,14 @@ upload_job_and_trial_records, upload_traces_to_hf, register_benchmark_and_tasks_from_job, + # Utility functions + calculate_standard_error, + # Pending Job Status functions + create_job_entry_pending, + update_job_status_to_started, + get_job_by_model_benchmark, + get_latest_job_for_model_benchmark, + create_job_entry_started, ) from .models import ( DatasetModel, @@ -149,4 +157,11 @@ "upload_job_and_trial_records", "upload_traces_to_hf", "register_benchmark_and_tasks_from_job", + "calculate_standard_error", + # Pending Job Status exports + "create_job_entry_pending", + "update_job_status_to_started", + "get_job_by_model_benchmark", + "get_latest_job_for_model_benchmark", + "create_job_entry_started", ] \ No newline at end of file diff --git a/database/unified_db/complete_schema.sql b/database/unified_db/complete_schema.sql index 45b91bdd..a9edeec9 100644 --- a/database/unified_db/complete_schema.sql +++ b/database/unified_db/complete_schema.sql @@ -1,4 +1,5 @@ -- Complete Schema for OT-Agents Registration System +-- Merged with DC-Agents additions (duplicate_of support) -- Run this file to set up all required tables -- Enable UUID extension @@ -53,6 +54,7 @@ CREATE TABLE IF NOT EXISTS models ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), name TEXT NOT NULL, base_model_id UUID REFERENCES models(id), + duplicate_of UUID REFERENCES models(id) ON DELETE RESTRICT, created_by TEXT NOT NULL, creation_location TEXT NOT NULL, creation_time TIMESTAMP WITH TIME ZONE DEFAULT NOW(), @@ -70,7 +72,10 @@ CREATE TABLE IF NOT EXISTS models ( agent_id UUID REFERENCES agents(id) NOT NULL, training_type TEXT CHECK (training_type IN ('SFT', 'RL')), traces_location_s3 TEXT, - description TEXT + description TEXT, + + -- Prevent self-reference for duplicate_of + CONSTRAINT models_no_self_duplicate CHECK (duplicate_of IS NULL OR duplicate_of != id) ); -- Indexes for models @@ -79,6 +84,7 @@ CREATE INDEX idx_models_created_by ON models(created_by); CREATE INDEX idx_models_agent_id ON models(agent_id); CREATE INDEX idx_models_dataset_id ON models(dataset_id); CREATE INDEX idx_models_base_model_id ON models(base_model_id); +CREATE INDEX idx_models_duplicate_of ON models(duplicate_of); CREATE INDEX idx_models_training_type ON models(training_type); CREATE INDEX idx_models_creation_time ON models(creation_time DESC); CREATE INDEX idx_models_training_start ON models(training_start DESC); @@ -90,15 +96,20 @@ CREATE TABLE IF NOT EXISTS benchmarks ( name TEXT NOT NULL, benchmark_version_hash CHAR(64), is_external BOOLEAN NOT NULL DEFAULT false, + duplicate_of UUID REFERENCES benchmarks(id) ON DELETE RESTRICT, external_link TEXT, description TEXT, - updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + + -- Prevent self-reference for duplicate_of + CONSTRAINT benchmarks_no_self_duplicate CHECK (duplicate_of IS NULL OR duplicate_of != id) ); -- Indexes for benchmarks CREATE INDEX idx_benchmarks_name ON benchmarks(name); CREATE INDEX idx_benchmarks_benchmark_version_hash ON benchmarks(benchmark_version_hash); CREATE INDEX idx_benchmarks_is_external ON benchmarks(is_external); +CREATE INDEX idx_benchmarks_duplicate_of ON benchmarks(duplicate_of); CREATE INDEX idx_benchmarks_updated_at ON benchmarks(updated_at DESC); -- ==================== UPDATE TRIGGERS ==================== @@ -182,12 +193,14 @@ COMMENT ON TABLE models IS 'Table storing ML model metadata and training informa COMMENT ON COLUMN models.training_type IS 'Type of training: SFT (Supervised Fine-Tuning) or RL (Reinforcement Learning)'; COMMENT ON COLUMN models.is_external IS 'Whether this model is external (e.g., from HuggingFace)'; COMMENT ON COLUMN models.training_parameters IS 'JSON containing all training hyperparameters and configuration'; +COMMENT ON COLUMN models.duplicate_of IS 'UUID of the canonical model this entry is a duplicate of (for deduplication tracking)'; -- Benchmarks table COMMENT ON TABLE benchmarks IS 'Table storing evaluation benchmark metadata'; COMMENT ON COLUMN benchmarks.name IS 'Name of the benchmark'; COMMENT ON COLUMN benchmarks.benchmark_version_hash IS 'SHA-256 hash of the benchmark version (64 characters)'; COMMENT ON COLUMN benchmarks.is_external IS 'Whether this benchmark is external (not hosted internally)'; +COMMENT ON COLUMN benchmarks.duplicate_of IS 'UUID of the canonical benchmark this entry is a duplicate of (for deduplication tracking)'; COMMENT ON COLUMN benchmarks.external_link IS 'Link to external benchmark if applicable'; COMMENT ON COLUMN benchmarks.description IS 'Description of the benchmark and its purpose'; COMMENT ON COLUMN benchmarks.updated_at IS 'Timestamp when the benchmark was last updated'; diff --git a/database/unified_db/config.py b/database/unified_db/config.py index 414c6fc0..43228955 100644 --- a/database/unified_db/config.py +++ b/database/unified_db/config.py @@ -86,8 +86,13 @@ def create_supabase_client(use_admin: bool = False) -> Client: print("⚠️ Admin access requested but no service role key found") print(" Some operations may fail due to RLS policies") - # Create client (v2 API doesn't use ClientOptions the same way) - return create_client(supabase_config.supabase_url, key) + # Create client with timeout options + options = ClientOptions( + postgrest_client_timeout=30, + storage_client_timeout=30 + ) + + return create_client(supabase_config.supabase_url, key, options) def get_default_client() -> Client: diff --git a/database/unified_db/models.py b/database/unified_db/models.py index f0992781..19679752 100644 --- a/database/unified_db/models.py +++ b/database/unified_db/models.py @@ -98,6 +98,7 @@ class ModelModel(BaseModel): id: Optional[UUID] = Field(default_factory=uuid4) name: str base_model_id: Optional[UUID] = None + duplicate_of: Optional[UUID] = None # Reference to canonical model this is a duplicate of created_by: str creation_location: str creation_time: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc)) @@ -133,6 +134,10 @@ def serialize_id(self, value: Optional[UUID]) -> Optional[str]: def serialize_base_model_id(self, value: Optional[UUID]) -> Optional[str]: return str(value) if value else None + @field_serializer('duplicate_of') + def serialize_duplicate_of(self, value: Optional[UUID]) -> Optional[str]: + return str(value) if value else None + @field_serializer('dataset_id') def serialize_dataset_id(self, value: Optional[UUID]) -> Optional[str]: return str(value) if value else None @@ -171,6 +176,7 @@ def clean_model_metadata(model_data: Dict[str, Any]) -> Dict[str, Any]: 'id': str(model_data.get('id')) if model_data.get('id') else None, 'name': model_data.get('name'), 'base_model_id': str(model_data.get('base_model_id')) if model_data.get('base_model_id') else None, + 'duplicate_of': str(model_data.get('duplicate_of')) if model_data.get('duplicate_of') else None, 'created_by': model_data.get('created_by'), 'creation_location': model_data.get('creation_location'), 'creation_time': model_data.get('creation_time'), @@ -241,6 +247,7 @@ class BenchmarkModel(BaseModel): name: str benchmark_version_hash: Optional[str] = Field(None, max_length=64) is_external: bool = False + duplicate_of: Optional[UUID] = None # Reference to canonical benchmark this is a duplicate of external_link: Optional[str] = None description: Optional[str] = None updated_at: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc)) @@ -256,6 +263,10 @@ def validate_benchmark_version_hash(cls, v: Optional[str]) -> Optional[str]: def serialize_id(self, value: Optional[UUID]) -> Optional[str]: return str(value) if value else None + @field_serializer('duplicate_of') + def serialize_duplicate_of(self, value: Optional[UUID]) -> Optional[str]: + return str(value) if value else None + @field_serializer('updated_at') def serialize_updated_at(self, value: Optional[datetime]) -> Optional[str]: return value.isoformat() if value else None @@ -271,6 +282,7 @@ def clean_benchmark_metadata(benchmark_data: Dict[str, Any]) -> Dict[str, Any]: 'name': benchmark_data.get('name'), 'benchmark_version_hash': benchmark_data.get('benchmark_version_hash'), 'is_external': benchmark_data.get('is_external'), + 'duplicate_of': str(benchmark_data.get('duplicate_of')) if benchmark_data.get('duplicate_of') else None, 'external_link': benchmark_data.get('external_link'), 'description': benchmark_data.get('description'), 'updated_at': benchmark_data.get('updated_at') @@ -363,18 +375,20 @@ class SandboxJobModel(BaseModel): username: str started_at: Optional[datetime] = None ended_at: Optional[datetime] = None + submitted_at: Optional[datetime] = None # When submitted to SLURM queue + slurm_job_id: Optional[str] = None # SLURM job ID for tracking git_commit_id: Optional[str] = None package_version: Optional[str] = None - n_trials: int - config: Dict[str, Any] + n_trials: Optional[int] = None # Made optional for Pending jobs + config: Optional[Dict[str, Any]] = None # Made optional for Pending jobs metrics: Optional[Dict[str, Any]] = None stats: Optional[Dict[str, Any]] = None agent_id: UUID model_id: UUID benchmark_id: UUID - n_rep_eval: int + n_rep_eval: Optional[int] = None # Made optional for Pending jobs hf_traces_link: Optional[str] = None - job_status: Optional[str] = None + job_status: Optional[str] = None # "Pending", "Started", "Finished" @field_validator('git_commit_id', 'package_version') @classmethod @@ -410,6 +424,10 @@ def serialize_started_at(self, value: Optional[datetime]) -> Optional[str]: def serialize_ended_at(self, value: Optional[datetime]) -> Optional[str]: return value.isoformat() if value else None + @field_serializer('submitted_at') + def serialize_submitted_at(self, value: Optional[datetime]) -> Optional[str]: + return value.isoformat() if value else None + def clean_sandbox_job_metadata(job_data: Dict[str, Any]) -> Dict[str, Any]: """Clean sandbox job metadata for API responses.""" @@ -423,6 +441,8 @@ def clean_sandbox_job_metadata(job_data: Dict[str, Any]) -> Dict[str, Any]: 'username': job_data.get('username'), 'started_at': job_data.get('started_at'), 'ended_at': job_data.get('ended_at'), + 'submitted_at': job_data.get('submitted_at'), + 'slurm_job_id': job_data.get('slurm_job_id'), 'git_commit_id': job_data.get('git_commit_id'), 'package_version': job_data.get('package_version'), 'n_trials': job_data.get('n_trials'), diff --git a/database/unified_db/requirements.txt b/database/unified_db/requirements.txt index e28bf5c8..598977c7 100644 --- a/database/unified_db/requirements.txt +++ b/database/unified_db/requirements.txt @@ -1,7 +1,7 @@ # Essential dependencies for DC-Agents Dataset Registration # Database and API -supabase==2.22.3 +supabase>=2.0.0,<3.0.0 # Data processing pandas>=2.0.0 diff --git a/database/unified_db/utils.py b/database/unified_db/utils.py index 20c8a106..dda7c334 100644 --- a/database/unified_db/utils.py +++ b/database/unified_db/utils.py @@ -6,15 +6,19 @@ with support for both HuggingFace and local parquet file datasets. """ +import io import json import logging import os +import tarfile +import tempfile import time +import warnings +from dataclasses import dataclass from datetime import datetime, timezone -from pathlib import Path -from typing import Any, Dict, List, Optional, Union +from pathlib import Path, PurePosixPath +from typing import Any, Dict, Iterator, List, Optional, Sequence, Union from uuid import UUID -import warnings from harbor.utils.traces_utils import export_traces from supabase import Client @@ -490,6 +494,37 @@ def create_model(model_data: Dict[str, Any]) -> Dict[str, Any]: raise ValueError("Failed to create model") return clean_model_metadata(response.data[0]) + except ValueError as e: + # Check if the error is specifically the JSON 'inf' compliant error + if "Out of range float values" in str(e): + logger.warning("Detected non-JSON compliant floats (inf/nan). Applying patch...") + + import math + def sanitize(obj): + if isinstance(obj, dict): + return {k: sanitize(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [sanitize(v) for v in obj] + elif isinstance(obj, float): + if math.isinf(obj): + logger.info("Replacing 'inf' float with string 'inf'") + return "inf" + if math.isnan(obj): + logger.info("Replacing 'nan' float with string 'nan'") + return "nan" + return obj + + # Sanitize and retry + model_data = sanitize(model_data) + client = get_supabase_client(use_admin=True) + response = client.table('models').insert(model_data).execute() + if not response.data: + raise ValueError("Failed to create model") + + return clean_model_metadata(response.data[0]) + else: + # Re-raise if it's a different ValueError + raise e except Exception as e: logger.error(f"Error creating model: {e}") raise @@ -519,6 +554,7 @@ def register_hf_model( name: Optional[str] = None, created_by: Optional[str] = None, base_model_id: Optional[str] = None, + duplicate_of: Optional[str] = None, dataset_id: Optional[str] = None, dataset_names: Optional[str] = None, training_end: Optional[datetime] = None, @@ -580,6 +616,15 @@ def register_hf_model( logger.info(f"Model {model_name} already exists") return {"success": True, "model": existing['id'], "exists": True} + # Validate duplicate_of if provided + if duplicate_of: + client = get_supabase_client() + target = client.table('models').select('id, duplicate_of').eq('id', duplicate_of).execute() + if not target.data: + return {"success": False, "error": f"duplicate_of target model {duplicate_of} not found"} + if target.data[0].get('duplicate_of'): + return {"success": False, "error": f"duplicate_of target {duplicate_of} is itself a duplicate. Only point to canonical entries."} + # Handle multi-dataset validation final_dataset_id = None final_dataset_names = None @@ -638,7 +683,7 @@ def register_hf_model( # Extract description from model card if not provided if not description: - description = getattr(hf_info, 'cardData', {}).get('description') or getattr(hf_info, 'description', '') + description = (getattr(hf_info, 'cardData', None) or {}).get('description') or getattr(hf_info, 'description', '') # Prepare auto-filled training parameters auto_params = { @@ -684,6 +729,7 @@ def register_hf_model( # Optional user fields "base_model_id": base_model_id, + "duplicate_of": duplicate_of, "dataset_id": final_dataset_id, "dataset_names": final_dataset_names, "training_end": training_end.isoformat() if training_end else now.isoformat(), @@ -721,6 +767,7 @@ def register_local_model( agent_id: str, training_start: datetime, base_model_id: Optional[str] = None, + duplicate_of: Optional[str] = None, dataset_id: Optional[str] = None, dataset_names: Optional[str] = None, training_end: Optional[datetime] = None, @@ -784,6 +831,15 @@ def register_local_model( logger.info(f"Model {name} already exists") return {"success": True, "model": existing, "exists": True} + # Validate duplicate_of if provided + if duplicate_of: + client = get_supabase_client() + target = client.table('models').select('id, duplicate_of').eq('id', duplicate_of).execute() + if not target.data: + return {"success": False, "error": f"duplicate_of target model {duplicate_of} not found"} + if target.data[0].get('duplicate_of'): + return {"success": False, "error": f"duplicate_of target {duplicate_of} is itself a duplicate. Only point to canonical entries."} + # Handle multi-dataset validation final_dataset_id = None final_dataset_names = None @@ -900,6 +956,7 @@ def register_local_model( # Optional user fields "base_model_id": base_model_id, + "duplicate_of": duplicate_of, "dataset_id": final_dataset_id, "dataset_names": final_dataset_names, "training_end": training_end.isoformat() if training_end else None, @@ -1358,6 +1415,7 @@ def register_benchmark( name: str, benchmark_version_hash: Optional[str] = None, is_external: bool = False, + duplicate_of: Optional[str] = None, external_link: Optional[str] = None, description: Optional[str] = None, forced_update: bool = True @@ -1392,6 +1450,15 @@ def register_benchmark( logger.info(f"Benchmark {name} already exists") return {"success": True, "benchmark": existing, "exists": True} + # Validate duplicate_of if provided + if duplicate_of: + client = get_supabase_client() + target = client.table('benchmarks').select('id, duplicate_of').eq('id', duplicate_of).execute() + if not target.data: + return {"success": False, "error": f"duplicate_of target benchmark {duplicate_of} not found"} + if target.data[0].get('duplicate_of'): + return {"success": False, "error": f"duplicate_of target {duplicate_of} is itself a duplicate. Only point to canonical entries."} + # Build benchmark data - only auto-fill system fields now = datetime.now(timezone.utc) benchmark_data = { @@ -1402,6 +1469,7 @@ def register_benchmark( "name": name, "benchmark_version_hash": benchmark_version_hash, "is_external": is_external, + "duplicate_of": duplicate_of, "external_link": external_link, "description": description } @@ -1584,6 +1652,12 @@ def delete_model_by_id(model_id: str) -> Dict[str, Any]: dependent_names = [model['name'] for model in models_using_base.data] return {"success": False, "error": f"Cannot delete model '{model_name}': used as base model by: {', '.join(dependent_names)}"} + # Check for models marking this as canonical (duplicate_of) + models_duplicates = client.table('models').select('id, name').eq('duplicate_of', model_id).execute() + if models_duplicates.data: + duplicate_names = [m['name'] for m in models_duplicates.data] + return {"success": False, "error": f"Cannot delete model '{model_name}': is canonical for duplicates: {', '.join(duplicate_names)}"} + # Delete the model response = client.table('models').delete().eq('id', model_id).execute() @@ -1644,8 +1718,14 @@ def delete_benchmark_by_id(benchmark_id: str) -> Dict[str, Any]: benchmark_name = existing.data[0]['name'] - # Benchmarks currently have no foreign key constraints, so we can delete directly - # Note: If benchmark results tables are added later, check those constraints here + # Check for benchmarks marking this as canonical (duplicate_of) + benchmark_duplicates = client.table('benchmarks').select('id, name').eq('duplicate_of', benchmark_id).execute() + if benchmark_duplicates.data: + duplicate_names = [b['name'] for b in benchmark_duplicates.data] + return {"success": False, "error": f"Cannot delete benchmark '{benchmark_name}': is canonical for duplicates: {', '.join(duplicate_names)}"} + + # Note: sandbox_jobs and sandbox_benchmark_tasks also reference benchmarks + # Those FK constraints are handled by the database # Delete the benchmark response = client.table('benchmarks').delete().eq('id', benchmark_id).execute() @@ -4436,3 +4516,325 @@ def register_base_model( except Exception as e: logger.error(f"Failed to register base model {base_model_name}: {e}") return {"success": False, "error": str(e)} + + +# ==================== PENDING JOB STATUS UTILITIES ==================== + +JOB_STATUS_PENDING = "Pending" +JOB_STATUS_STARTED = "Started" +JOB_STATUS_FINISHED = "Finished" + + +def get_job_by_model_benchmark(model_id: str, benchmark_id: str) -> Optional[Dict[str, Any]]: + """ + Get the most recent job for a given model and benchmark. + + Args: + model_id: UUID of the model + benchmark_id: UUID of the benchmark + + Returns: + Job dict if found, None otherwise + """ + try: + client = get_supabase_client() + response = ( + client.table('sandbox_jobs') + .select('*') + .eq('model_id', model_id) + .eq('benchmark_id', benchmark_id) + .order('created_at', desc=True) + .limit(1) + .execute() + ) + + if not response.data: + return None + + return clean_sandbox_job_metadata(response.data[0]) + except Exception as e: + logger.error(f"Error getting job for model={model_id}, benchmark={benchmark_id}: {e}") + return None + + +def get_latest_job_for_model_benchmark(model_hf: str, dataset_hf: str) -> Optional[Dict[str, Any]]: + """ + Get the most recent job for a given model and dataset by HF names. + + Args: + model_hf: HuggingFace model name (e.g., "org/model-name") + dataset_hf: HuggingFace dataset repo (e.g., "DCAgent/dev_set_v2") + + Returns: + Job dict if found, None otherwise + """ + try: + # First resolve model_id and benchmark_id from HF names + model = get_model_by_name(model_hf) + if not model: + logger.warning(f"Model not found: {model_hf}") + return None + + # Extract repo name from dataset_hf (e.g., "DCAgent/dev_set_v2" -> "dev_set_v2") + dataset_name = dataset_hf.split("/")[-1] if "/" in dataset_hf else dataset_hf + benchmark = get_benchmark_by_name(dataset_name) + if not benchmark: + logger.warning(f"Benchmark not found: {dataset_name}") + return None + + return get_job_by_model_benchmark(model['id'], benchmark['id']) + except Exception as e: + logger.error(f"Error getting latest job for model={model_hf}, dataset={dataset_hf}: {e}") + return None + + +def create_job_entry_pending( + job_name: str, + model_hf: str, + benchmark_hf: str, + agent_name: str, + slurm_job_id: str, + username: Optional[str] = None, + config: Optional[Dict[str, Any]] = None +) -> Dict[str, Any]: + """ + Create a job entry with status="Pending" at SLURM submit time. + + This is called by the listener immediately after sbatch returns successfully. + It creates a minimal job entry to prevent duplicate submissions while the job + waits in the SLURM queue. + + Args: + job_name: Unique job name (RUN_TAG) + model_hf: HuggingFace model name + benchmark_hf: HuggingFace dataset/benchmark repo + agent_name: Name of the agent (e.g., "terminus-2") + slurm_job_id: SLURM job ID from sbatch output + username: Username for the job + config: Optional config dict + + Returns: + {"success": bool, "job": dict, "error": str} + """ + try: + logger.info(f"Creating pending job entry: {job_name}") + + # Resolve model + model = get_model_by_name(model_hf) + if not model: + return {"success": False, "error": f"Model not found: {model_hf}"} + model_id = model['id'] + + # Resolve benchmark (extract repo name from HF format) + benchmark_name = benchmark_hf.split("/")[-1] if "/" in benchmark_hf else benchmark_hf + benchmark = get_benchmark_by_name(benchmark_name) + if not benchmark: + return {"success": False, "error": f"Benchmark not found: {benchmark_name}"} + benchmark_id = benchmark['id'] + + # Check for existing job (any status) - prevent duplicates + existing = get_job_by_model_benchmark(model_id, benchmark_id) + if existing: + status = existing.get('job_status') + if status == JOB_STATUS_FINISHED: + return {"success": False, "error": f"Job already finished", "job": existing} + if status in (JOB_STATUS_PENDING, JOB_STATUS_STARTED): + return {"success": True, "job": existing, "exists": True} + + # Resolve or create agent + agent_res = register_agent(name=agent_name) + if not agent_res.get('success'): + return {"success": False, "error": f"Failed to register agent: {agent_res.get('error')}"} + agent_id = agent_res['agent']['id'] + + # Build minimal job entry for Pending status + now = datetime.now(timezone.utc) + job_data = { + "job_name": job_name, + "username": username or "listener", + "agent_id": agent_id, + "model_id": model_id, + "benchmark_id": benchmark_id, + "job_status": JOB_STATUS_PENDING, + "submitted_at": now.isoformat(), + "slurm_job_id": slurm_job_id, + "created_at": now.isoformat(), + # These are set to None/minimal for Pending, updated when job starts + "config": config or {}, + "n_trials": 0, + "n_rep_eval": 0, + } + + # Create the job + result = create_sandbox_job(job_data) + logger.info(f"Created pending job entry: {job_name} (id={result.get('id')})") + return {"success": True, "job": result} + + except Exception as e: + logger.error(f"Failed to create pending job entry {job_name}: {e}") + return {"success": False, "error": str(e)} + + +def update_job_status_to_started( + job_name: str, + n_trials: int, + n_rep_eval: int, + config: Dict[str, Any], + harbor_package_version: Optional[str] = None +) -> Dict[str, Any]: + """ + Update a Pending job to Started status when sbatch actually runs. + + This is called by the sbatch script when it starts executing. + It updates the job entry with full details now that we know them. + + Args: + job_name: Job name (RUN_TAG) to find the job + n_trials: Number of concurrent trials (n_concurrent) + n_rep_eval: Number of attempts per task (n_attempts) + config: Full config dict + harbor_package_version: Harbor package version + + Returns: + {"success": bool, "job": dict, "error": str} + """ + try: + logger.info(f"Updating job to Started: {job_name}") + + # Look up job by name + existing = get_sandbox_job_by_name(job_name) + if not existing: + return {"success": False, "error": f"Job not found: {job_name}"} + + job_id = existing['id'] + current_status = existing.get('job_status') + + # Validate state transition + if current_status == JOB_STATUS_FINISHED: + return {"success": False, "error": f"Job already finished, cannot restart"} + if current_status == JOB_STATUS_STARTED: + # Already started, just return success (idempotent) + logger.info(f"Job {job_name} already in Started status") + return {"success": True, "job": existing, "already_started": True} + + # Update to Started + now = datetime.now(timezone.utc) + update_data = { + "job_status": JOB_STATUS_STARTED, + "started_at": now.isoformat(), + "n_trials": n_trials, + "n_rep_eval": n_rep_eval, + "config": config, + "package_version": harbor_package_version, + } + + result = update_sandbox_job(job_id, update_data) + logger.info(f"Updated job to Started: {job_name}") + return {"success": True, "job": result} + + except Exception as e: + logger.error(f"Failed to update job to Started {job_name}: {e}") + return {"success": False, "error": str(e)} + + +def create_job_entry_started( + model_hf_name: str, + benchmark_hf_name: str, + job_name: str, + username: str, + slurm_job_id: str, + harbor_package_version: Optional[str], + agent_name: str, + config: Dict[str, Any], + n_trials: int, + n_rep_eval: int +) -> Dict[str, Any]: + """ + Create a job entry with status="Started" directly. + + This is the original behavior - creates a fully populated job entry + when the sbatch script starts running. Use this for backward compatibility + or when the listener doesn't create a Pending entry first. + + Args: + model_hf_name: HuggingFace model name + benchmark_hf_name: HuggingFace dataset/benchmark repo + job_name: Unique job name (RUN_TAG) + username: Username for the job + slurm_job_id: SLURM job ID + harbor_package_version: Harbor package version + agent_name: Name of the agent + config: Config dict + n_trials: Number of concurrent trials + n_rep_eval: Number of attempts per task + + Returns: + {"success": bool, "job": dict, "error": str} + """ + try: + logger.info(f"Creating started job entry: {job_name}") + + # First, check if a Pending entry exists and upgrade it + existing = get_sandbox_job_by_name(job_name) + if existing: + status = existing.get('job_status') + if status == JOB_STATUS_PENDING: + # Upgrade Pending -> Started + return update_job_status_to_started( + job_name=job_name, + n_trials=n_trials, + n_rep_eval=n_rep_eval, + config=config, + harbor_package_version=harbor_package_version + ) + elif status == JOB_STATUS_STARTED: + logger.info(f"Job {job_name} already Started") + return {"success": True, "job": existing, "exists": True} + elif status == JOB_STATUS_FINISHED: + return {"success": False, "error": "Job already finished"} + + # Resolve model + model = get_model_by_name(model_hf_name) + if not model: + return {"success": False, "error": f"Model not found: {model_hf_name}"} + model_id = model['id'] + + # Resolve benchmark + benchmark_name = benchmark_hf_name.split("/")[-1] if "/" in benchmark_hf_name else benchmark_hf_name + benchmark = get_benchmark_by_name(benchmark_name) + if not benchmark: + return {"success": False, "error": f"Benchmark not found: {benchmark_name}"} + benchmark_id = benchmark['id'] + + # Resolve or create agent + agent_res = register_agent(name=agent_name) + if not agent_res.get('success'): + return {"success": False, "error": f"Failed to register agent: {agent_res.get('error')}"} + agent_id = agent_res['agent']['id'] + + # Build full job entry + now = datetime.now(timezone.utc) + job_data = { + "job_name": job_name, + "username": username, + "agent_id": agent_id, + "model_id": model_id, + "benchmark_id": benchmark_id, + "job_status": JOB_STATUS_STARTED, + "started_at": now.isoformat(), + "submitted_at": now.isoformat(), + "slurm_job_id": slurm_job_id, + "created_at": now.isoformat(), + "config": config, + "n_trials": n_trials, + "n_rep_eval": n_rep_eval, + "package_version": harbor_package_version, + } + + result = create_sandbox_job(job_data) + logger.info(f"Created started job entry: {job_name} (id={result.get('id')})") + return {"success": True, "job": result} + + except Exception as e: + logger.error(f"Failed to create started job entry {job_name}: {e}") + return {"success": False, "error": str(e)} diff --git a/eval/tacc/unified_eval_harbor.sbatch b/eval/tacc/unified_eval_harbor.sbatch new file mode 100644 index 00000000..d2d2559a --- /dev/null +++ b/eval/tacc/unified_eval_harbor.sbatch @@ -0,0 +1,493 @@ +#!/bin/bash +#SBATCH -p gh +#SBATCH --time=24:00:00 +#SBATCH --nodes 1 +#SBATCH --ntasks-per-node 1 +#SBATCH --cpus-per-task=72 +#SBATCH --exclude=c610-021,c611-011,c640-041,c611-041,c611-122,c637-082,c637-091,c610-111 +#SBATCH --account CCR24067 +#SBATCH --output=experiments/logs/%x_%j.out +#SBATCH --job-name=eval + +# ============================================================================== +# Unified Eval Harbor Script +# ============================================================================== +# This script replaces all individual sbatch scripts with one parameterized version. +# Parameters are passed via environment variables (set by unified_eval_listener.py). +# +# Positional Args: +# $1 = MODEL (HuggingFace model name, e.g., "org/model-name") +# $2 = REPO_ID (HuggingFace dataset repo, e.g., "DCAgent/dev_set_v2") +# $3 = BENCHMARK_ID (optional, DB benchmark UUID) +# $4 = RUN_TAG (optional, job name - if provided, updates Pending job to Started) +# +# Environment Variables (with defaults): +# EVAL_N_CONCURRENT Harbor --n-concurrent (default: 64) +# EVAL_N_ATTEMPTS Harbor --n-attempts (default: 3) +# EVAL_GPU_MEMORY_UTIL VLLM --gpu-memory-utilization (default: 0.9) +# EVAL_DAYTONA_THRESHOLD Max DaytonaErrors before abort (default: 3) +# EVAL_VLLM_MAX_RETRIES VLLM startup retries (default: 5) +# EVAL_AGENT_PARSER Agent parser type (default: "", use "xml" for swebench) +# EVAL_SLURM_TIME SLURM time limit (read at submit time) +# EVAL_ENABLE_THINKING Enable thinking blocks (default: false) +# EVAL_AGENT_NAME Agent name for harbor and DB entries (default: "terminus-2") +# ============================================================================== + +# === CONFIGURABLE PARAMETERS (with defaults) === +N_CONCURRENT="${EVAL_N_CONCURRENT:-64}" +N_ATTEMPTS="${EVAL_N_ATTEMPTS:-3}" +GPU_MEMORY_UTIL="${EVAL_GPU_MEMORY_UTIL:-0.9}" +DAYTONA_ERROR_THRESHOLD="${EVAL_DAYTONA_THRESHOLD:-3}" +VLLM_MAX_RETRIES="${EVAL_VLLM_MAX_RETRIES:-5}" +AGENT_PARSER="${EVAL_AGENT_PARSER:-}" +ENABLE_THINKING="${EVAL_ENABLE_THINKING:-false}" +AGENT_NAME="${EVAL_AGENT_NAME:-terminus-2}" + +# === POSITIONAL ARGS === +MODEL="${1:-mlfoundations-dev/claude_3_7_20250219_tbench_traces_sharegptv1}" +REPO_ID="${2:-DCAgent/dev_set_v2}" +BENCHMARK_ID="${3:-}" +RUN_TAG_ARG="${4:-}" # Optional: job name from listener (if provided, Pending entry exists) + +# Create timestamp and safe names +TIMESTAMP=$(date +'%Y%m%d_%H%M%S') +SAFE_MODEL=$(echo "$MODEL" | tr '/:' '_') +SAFE_REPO=$(echo "$REPO_ID" | tr '/:' '_') + +echo "==============================================" +echo "Unified Eval Harbor - Starting job" +echo "==============================================" +echo "Model: $MODEL" +echo "Repository: $REPO_ID" +echo "Benchmark ID: ${BENCHMARK_ID:-}" +echo "" +echo "Parameters:" +echo " N_CONCURRENT: $N_CONCURRENT" +echo " N_ATTEMPTS: $N_ATTEMPTS" +echo " GPU_MEMORY_UTIL: $GPU_MEMORY_UTIL" +echo " DAYTONA_ERROR_THRESHOLD: $DAYTONA_ERROR_THRESHOLD" +echo " VLLM_MAX_RETRIES: $VLLM_MAX_RETRIES" +echo " AGENT_PARSER: ${AGENT_PARSER:-}" +echo " ENABLE_THINKING: $ENABLE_THINKING" +echo " AGENT_NAME: $AGENT_NAME" +echo "==============================================" + +module purge +module load gcc/15.1.0 +module load cuda/12.8 +module load tacc-apptainer + +# Set up environment variables +export VLLM_USE_V1=1 +export RAY_RUNTIME_ENV_HOOK=ray._private.runtime_env.uv_runtime_env_hook.hook +export RAY_ADDRESS=${RAY_ADDRESS:-} +export VLLM_CACHE_ROOT=/scratch/10000/eguha3/vllm_cache +export VLLM_CONFIG_ROOT=/scratch/10000/eguha3/vllm_config +export TRITON_DUMP_DIR=/scratch/10000/eguha3/triton_dump_dir +export TRITON_OVERRIDE_DIR=/scratch/10000/eguha3/triton_override_dir +export TRITON_CACHE_DIR=/scratch/10000/eguha3/triton_cache_dir +export FLASHINFER_WORKSPACE_BASE=/scratch/08002/gsmyrnis/flashinfer_cache +export UV_CACHE_DIR=/scratch/10000/eguha3/uv_cache_dir +export HYDRA_FULL_ERROR=1 +export HF_CACHE_DIR=/scratch/08134/negin/dc-agent-shared/.hf_cache +export HF_HUB_CACHE=$SCRATCH/hub +export LITELLM_LOCAL_MODEL_COST_MAP=True + +# DB/API secrets etc. +source /scratch/08134/negin/dc-agent-shared/dc-agent/eval/tacc/secret.env + +# Toolchain fixes +ln -sf /home1/apps/gcc/15.1.0/lib64/libstdc++.so.6 /scratch/08134/negin/dc-agent-shared/SkyRL/envs/tacc_rl_v5/lib/libstdc++.so.6 +export LD_LIBRARY_PATH=/home1/apps/gcc/15.1.0/lib64:/scratch/08134/negin/dc-agent-shared/SkyRL/envs/tacc_rl_v5/lib/python3.12/site-packages/torch/lib:$LD_LIBRARY_PATH + +# Conda env +source /scratch/08002/gsmyrnis/miniconda3/etc/profile.d/conda.sh +conda activate /scratch/08134/negin/dc-agent-shared/SkyRL/envs/tacc_rl_v5 + +# Start VLLM server +mkdir -p experiments/logs +vllm serve "$MODEL" \ + --host 0.0.0.0 --port 8000 \ + --served-model-name "$MODEL" \ + --enforce-eager \ + --gpu-memory-utilization "$GPU_MEMORY_UTIL" \ + > "experiments/logs/vllm_${SLURM_JOB_ID}.log" 2>&1 & +VLLM_PID=$! + +cleanup() { + echo "Cleaning up..." + kill $VLLM_PID 2>/dev/null || true + conda deactivate || true +} +trap cleanup EXIT + +# Wait for VLLM server to start with healthcheck +RETRY_INTERVAL=100 +for i in $(seq 1 $VLLM_MAX_RETRIES); do + if curl -s http://localhost:8000/v1/models > /dev/null; then + echo "VLLM server is ready" + break + fi + echo "Waiting for VLLM server to start (attempt $i/$VLLM_MAX_RETRIES)..." + sleep $RETRY_INTERVAL + if [ $i -eq $VLLM_MAX_RETRIES ]; then + echo "VLLM server failed to start" + exit 1 + fi +done + +# Get the dataset path using the specified repo_id +echo "Downloading/locating dataset: $REPO_ID" +DATASET_PATH=$(python "./snapshot_download.py" "$REPO_ID" | grep DATASET_PATH | tail -n 1 | cut -d'=' -f2) +if [ -z "${DATASET_PATH:-}" ]; then + echo "Failed to get dataset path" + exit 1 +fi +echo "Using dataset path: $DATASET_PATH" + +# Construct run dir (harbor honors --job-name in jobs/) +# Use provided RUN_TAG if available (from listener), otherwise generate one +if [ -n "$RUN_TAG_ARG" ]; then + RUN_TAG="$RUN_TAG_ARG" + echo "Using RUN_TAG from listener: $RUN_TAG" +else + RUN_TAG="${SAFE_REPO}_${SAFE_MODEL}_${TIMESTAMP}" + echo "Generated new RUN_TAG: $RUN_TAG" +fi +RUN_DIR="jobs/${RUN_TAG}" +echo "Job dir: ${RUN_DIR}" + +# Update or create DB row with status='Started' BEFORE running eval +echo "Updating/Creating DB job entry..." +export MODEL="$MODEL" +export REPO_ID="$REPO_ID" +export RUN_TAG="$RUN_TAG" +export RUN_TAG_ARG="$RUN_TAG_ARG" +export SLURM_JOB_ID="$SLURM_JOB_ID" +export N_CONCURRENT="$N_CONCURRENT" +export N_ATTEMPTS="$N_ATTEMPTS" +export AGENT_NAME="$AGENT_NAME" + +# Get harbor package version +HARBOR_VERSION=$(python -c "import harbor; print(harbor.__version__)" 2>/dev/null || echo "unknown") +export HARBOR_VERSION + +python - <<'PY' +import os, sys, json +sys.path.insert(0, "/scratch/08134/negin/dc-agent-shared/dcagents-leaderboard") +from unified_db.utils import create_job_entry_started, update_job_status_to_started + +model_hf = os.environ["MODEL"] +dataset_hf = os.environ["REPO_ID"] +run_tag = os.environ["RUN_TAG"] +run_tag_arg = os.environ.get("RUN_TAG_ARG", "") +slurm_job_id = os.environ["SLURM_JOB_ID"] +harbor_version = os.environ.get("HARBOR_VERSION", "unknown") +n_concurrent = int(os.environ.get("N_CONCURRENT", "64")) +n_attempts = int(os.environ.get("N_ATTEMPTS", "3")) +agent_name = os.environ.get("AGENT_NAME", "terminus-2") + +# If RUN_TAG was provided from listener, a Pending entry should exist - update it +# Otherwise, create a new Started entry (backward compatibility) +if run_tag_arg: + print(f"Updating Pending job to Started: {run_tag}") + result = update_job_status_to_started( + job_name=run_tag, + n_trials=n_concurrent, + n_rep_eval=n_attempts, + config={"agent": agent_name, "env": "daytona"}, + harbor_package_version=harbor_version + ) +else: + print(f"Creating new Started job entry: {run_tag}") + result = create_job_entry_started( + model_hf_name=model_hf, + benchmark_hf_name=dataset_hf, + job_name=run_tag, + username=os.environ.get("UPLOAD_USERNAME", os.environ.get("USER", "unknown")), + slurm_job_id=slurm_job_id, + harbor_package_version=harbor_version, + agent_name=agent_name, + config={"agent": agent_name, "env": "daytona"}, + n_trials=n_concurrent, + n_rep_eval=n_attempts + ) + +if not result.get("success"): + print(f"ERROR: {result.get('error')}", file=sys.stderr) + sys.exit(1) + +# IMPORTANT: Save the DB job_id for later update +db_job_id = result["job"]["id"] +print(f"DB job ready with ID: {db_job_id}") +PY + +if [ $? -ne 0 ]; then + echo "Failed to create/update DB job entry" + exit 1 +fi + +# Extract the job_id from Python output and save it +DB_JOB_ID=$(python - <<'PY' +import os, sys +sys.path.insert(0, "/scratch/08134/negin/dc-agent-shared/dcagents-leaderboard") +from unified_db.utils import get_latest_job_for_model_benchmark + +model_hf = os.environ["MODEL"] +dataset_hf = os.environ["REPO_ID"] + +result = get_latest_job_for_model_benchmark(model_hf, dataset_hf) +if result and result.get("id"): + print(result["id"]) +else: + sys.exit(1) +PY +) + +if [ -z "$DB_JOB_ID" ]; then + echo "Failed to get DB job ID" + exit 1 +fi + +echo "DB job entry ready: $DB_JOB_ID" + +export HARBOR_JOBS_DIR="/tmp/harbor_jobs" +mkdir -p "$HARBOR_JOBS_DIR" + +# Let Terminus2 auto-resolve model_info for hosted_vllm (includes cost fields) +export TERMINUS_MODEL_MAX_TOKENS=32768 +export TERMINUS_MODEL_MAX_OUTPUT_TOKENS=16384 + +# Build harbor command with configurable params +HARBOR_CMD="harbor jobs start" +HARBOR_CMD="$HARBOR_CMD -p \"$DATASET_PATH\"" +HARBOR_CMD="$HARBOR_CMD --n-concurrent $N_CONCURRENT" +HARBOR_CMD="$HARBOR_CMD --agent \"$AGENT_NAME\"" +HARBOR_CMD="$HARBOR_CMD --model \"hosted_vllm/$MODEL\"" +HARBOR_CMD="$HARBOR_CMD --env \"daytona\"" +HARBOR_CMD="$HARBOR_CMD --agent-kwarg \"api_base=http://localhost:8000/v1\"" +HARBOR_CMD="$HARBOR_CMD --agent-kwarg \"key=fake_key\"" +HARBOR_CMD="$HARBOR_CMD --agent-kwarg \"max_tokens=16384\"" +HARBOR_CMD="$HARBOR_CMD --n-attempts $N_ATTEMPTS" +HARBOR_CMD="$HARBOR_CMD --export-traces" +HARBOR_CMD="$HARBOR_CMD --job-name \"$RUN_TAG\"" +HARBOR_CMD="$HARBOR_CMD --config \"dcagent_eval_config.yaml\"" + +# Add parser arg if specified (e.g., for swebench) +if [ -n "$AGENT_PARSER" ]; then + HARBOR_CMD="$HARBOR_CMD --agent-kwarg \"parser=$AGENT_PARSER\"" +fi + +# Add extra_body for thinking control (only if enable_thinking is true) +if [ "$ENABLE_THINKING" = "true" ]; then + HARBOR_CMD="$HARBOR_CMD --agent-kwarg 'extra_body={\"chat_template_kwargs\":{\"enable_thinking\":true}}'" + echo "Thinking blocks enabled via extra_body" +fi + +echo "Running: $HARBOR_CMD" + +# Run sandbox evaluation +set +e +eval $HARBOR_CMD +SB_EXIT=$? +set -e + +# Save originals for exact round-trip later +mkdir -p "$RUN_DIR" +{ + echo "MODEL=$MODEL" + echo "REPO_ID=$REPO_ID" + echo "TIMESTAMP=$TIMESTAMP" + echo "SLURM_JOB_ID=$SLURM_JOB_ID" + echo "DB_JOB_ID=$DB_JOB_ID" + echo "RUN_TAG=$RUN_TAG" + echo "RUN_TAG_ARG=$RUN_TAG_ARG" + echo "N_CONCURRENT=$N_CONCURRENT" + echo "N_ATTEMPTS=$N_ATTEMPTS" + echo "GPU_MEMORY_UTIL=$GPU_MEMORY_UTIL" + echo "DAYTONA_ERROR_THRESHOLD=$DAYTONA_ERROR_THRESHOLD" + echo "AGENT_PARSER=$AGENT_PARSER" + echo "ENABLE_THINKING=$ENABLE_THINKING" +} > "$RUN_DIR/meta.env" + +# If eval failed, don't attempt upload +if [ ${SB_EXIT:-0} -ne 0 ]; then + echo "harbor run exited with non-zero status: ${SB_EXIT}. Skipping upload." + exit ${SB_EXIT} +fi + +# Ensure run dir exists; no fallback +if [ ! -d "$RUN_DIR" ]; then + echo "Expected run directory not found: $RUN_DIR" + exit 2 +fi + + +# ---- Check for DaytonaErrors before upload ---- +RESULT_FILE="$RUN_DIR/result.json" +ERROR_LOG="jobs/daytona_errors.log" + +if [ -f "$RESULT_FILE" ]; then + echo "Checking for DaytonaErrors in $RESULT_FILE..." + + # Count DaytonaErrors using Python + DAYTONA_COUNT=$(python3 -c " +import json +import sys + +try: + with open('$RESULT_FILE', 'r') as f: + data = json.load(f) + + total_errors = 0 + error_ids = [] + + if 'stats' in data and 'evals' in data['stats']: + for eval_key, eval_data in data['stats']['evals'].items(): + if 'exception_stats' in eval_data and 'DaytonaError' in eval_data['exception_stats']: + daytona_errors = eval_data['exception_stats']['DaytonaError'] + if isinstance(daytona_errors, list): + total_errors += len(daytona_errors) + error_ids.extend(daytona_errors) + + print(total_errors) + + # Also write detailed info to stderr for logging + if total_errors > 0: + print(f'Found {total_errors} DaytonaError(s): {error_ids[:5]}...', file=sys.stderr) + +except Exception as e: + print(f'Error parsing result.json: {e}', file=sys.stderr) + print('0') +" 2>&1 | tail -n 1) + + echo "DaytonaError count: ${DAYTONA_COUNT}" + + + # If too many DaytonaErrors, log and skip upload + if [ "${DAYTONA_COUNT:-0}" -gt "$DAYTONA_ERROR_THRESHOLD" ]; then + echo "Job has ${DAYTONA_COUNT} DaytonaErrors (> ${DAYTONA_ERROR_THRESHOLD}), skipping upload" + + # Log to error file + { + echo "===============================================" + echo "Timestamp: $(date)" + echo "Job: ${RUN_TAG}" + echo "SLURM_JOB_ID: ${SLURM_JOB_ID}" + echo "Model: ${MODEL}" + echo "Repo: ${REPO_ID}" + echo "DaytonaErrors: ${DAYTONA_COUNT}" + echo "Threshold: ${DAYTONA_ERROR_THRESHOLD}" + echo "Result file: ${RESULT_FILE}" + + # Extract error details + python3 -c " +import json +with open('$RESULT_FILE', 'r') as f: + data = json.load(f) +if 'stats' in data and 'evals' in data['stats']: + for eval_key, eval_data in data['stats']['evals'].items(): + if 'exception_stats' in eval_data and 'DaytonaError' in eval_data['exception_stats']: + errors = eval_data['exception_stats']['DaytonaError'] + if errors: + print(f'Eval: {eval_key}') + for i, error_id in enumerate(errors[:10], 1): + print(f' {i}. {error_id}') + if len(errors) > 10: + print(f' ... and {len(errors) - 10} more') +" + echo "===============================================" + } >> "$ERROR_LOG" + + echo "Error details logged to: $ERROR_LOG" + echo "Job completed but not uploaded due to excessive DaytonaErrors" + exit 0 # Exit successfully but skip upload + fi +else + echo "Warning: result.json not found, continuing with upload" +fi + + + +# ---- Upload results to DB ---- + +# Point PYTHONPATH at your uploader package +export PYTHONPATH="/scratch/08134/negin/dc-agent-shared/dcagents-leaderboard:${PYTHONPATH:-}" + +export RUN_DIR="$RUN_DIR" +export UPLOAD_USERNAME="${EVAL_UPLOAD_USERNAME:-$USER}" +export UPLOAD_MODE="${UPLOAD_MODE:-skip_on_error}" +export RUN_TAG="$RUN_TAG" +UPLOAD_LOG="experiments/logs/upload_${SLURM_JOB_ID}.log" +mkdir -p "$(dirname "$UPLOAD_LOG")" + +echo "Uploading results from: $RUN_DIR" | tee -a "$UPLOAD_LOG" +echo "Using username=${UPLOAD_USERNAME}, mode=${UPLOAD_MODE}" | tee -a "$UPLOAD_LOG" + +# Run the uploader (from dcagents-leaderboard) +python - <<'PY' 2>&1 | tee -a "$UPLOAD_LOG" +import os, sys +from unified_db.utils import upload_eval_results +import re +import hashlib + + +def sanitize_hf_repo_id(repo_id: str, max_length: int = 96) -> str: + """ + Sanitize a Hugging Face repo_id to comply with naming rules. + Keeps org prefix (e.g. 'mlfoundations-dev/') and cleans up the rest. + No extra '-' before hash suffix. + """ + def collapse(s: str) -> str: + prev = None + while s != prev: + prev = s + s = s.replace("--", "-").replace("..", ".") + return s + + org, name = repo_id.split("/", 1) if "/" in repo_id else (None, repo_id) + + name = re.sub(r"[^A-Za-z0-9._-]", "-", name) + name = collapse(name).strip("-.") + + if not name: + name = "repo" + + limit = max_length - (len(org) + 1 if org else 0) + if len(name) > limit: + digest = hashlib.sha1(name.encode()).hexdigest()[:8] + keep = max(1, limit - len(digest)) + base = name[:keep].rstrip("-.") + if not base: + base = "r" + name = f"{base}{digest}" # no '-' before hash + name = collapse(name).strip("-.") + + # final cleanup + name = collapse(name).strip("-.") + if name[0] in "-.": + name = "r" + name[1:] + if name[-1] in "-.": + name = name[:-1] + "0" + + return f"{org}/{name}" if org else name + + +run_dir = os.environ["RUN_DIR"] +run_tag = os.environ["RUN_TAG"] +username = os.environ.get("UPLOAD_USERNAME", "negin") +error_mode= os.environ.get("UPLOAD_MODE", "skip_on_error") +hf_repo_id = sanitize_hf_repo_id(f"DCAgent2/{run_tag}") +hf_token = os.environ["HF_TOKEN"] +print(f"[uploader] upload_eval_results(path={run_dir!r}, username={username!r}, error_mode={error_mode!r}, hf_repo_id={hf_repo_id!r})") +upload_eval_results(run_dir, username=username, error_mode=error_mode, hf_token=hf_token, hf_repo_id=hf_repo_id, register_benchmark=True) +print("[uploader] done.") +PY +UPLOAD_EXIT=${PIPESTATUS[0]} + +if [ $UPLOAD_EXIT -ne 0 ]; then + echo "Upload failed with exit code: $UPLOAD_EXIT" + exit $UPLOAD_EXIT +fi + +echo "Eval and upload finished successfully." diff --git a/eval/tacc/unified_eval_listener.py b/eval/tacc/unified_eval_listener.py new file mode 100644 index 00000000..ec7c962c --- /dev/null +++ b/eval/tacc/unified_eval_listener.py @@ -0,0 +1,1227 @@ +#!/usr/bin/env python3 +""" +Unified Eval Listener - Consolidates all eval listeners and sbatch scripts. + +This script replaces: + - aider_eval_listener.py, bfcl_eval_listener.py, swebench_eval_listener.py + - v2_eval_listener.py, v2_eval_listener_prio.py, tb2_eval_listener.py + - dev_eval_listener.py + - Uses unified_eval_harbor.sbatch (replaces all individual sbatch scripts) + +Features: + - Preset configurations for each benchmark (aider, bfcl, swebench, v2, tb2) + - Priority file hot-reload (changes take effect without restart) + - Configurable sbatch parameters (n-concurrent, daytona-threshold, etc.) + +Usage Examples: + # Use a preset (replaces individual listener scripts) + python unified_eval_listener.py --preset bfcl + python unified_eval_listener.py --preset swebench + python unified_eval_listener.py --preset v2 --priority-file priority_models.txt + + # Custom configuration with sbatch params + python unified_eval_listener.py \\ + --datasets "DCAgent/dev_set_v2" \\ + --n-concurrent 128 \\ + --daytona-threshold 5 + + # Override preset concurrency + python unified_eval_listener.py --preset v2 --n-concurrent 64 + + # Dry run to preview + python unified_eval_listener.py --preset v2 --dry-run --once + +Environment Variables (all optional, CLI args take precedence): + EVAL_LISTENER_LOOKBACK_DAYS Days to look back for models (default: 100) + EVAL_LISTENER_CHECK_HOURS Hours between iterations (default: 4.0) + EVAL_LISTENER_SBATCH SBATCH script to use + EVAL_LISTENER_LOG_DIR Log directory (default: experiments/listener_logs) + EVAL_LISTENER_DATASETS Comma/space/newline list of HF dataset repos + EVAL_LISTENER_PRIORITY_FILE Path to priority models file (hot-reloaded) + EVAL_LISTENER_DRY_RUN "1" or "true" to enable dry run mode + EVAL_LISTENER_REQUIRE_PRIORITY_LIST "1" or "true" to require priority list + EVAL_LISTENER_CHECK_HF_EXISTS "1" or "true" to validate HF model existence +""" + +import argparse +import getpass +import json +import os +import re +import subprocess +import sys +import time +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Dict, List, Optional, Set, Tuple + +# Add leaderboard utilities to path +sys.path.insert(0, "/scratch/08134/negin/dc-agent-shared/dcagents-leaderboard") + +from unified_db.utils import get_supabase_client + +# ---------- Preset Definitions ---------- +# Each preset can configure: +# - datasets: list of HF dataset repos +# - sbatch_script: sbatch script to use (default: unified_eval_harbor.sbatch) +# - log_suffix: suffix for log file +# - check_hf_exists: validate model exists on HuggingFace +# - n_concurrent: Harbor --n-concurrent (default: 64) +# - n_attempts: Harbor --n-attempts (default: 3) +# - gpu_memory_util: VLLM --gpu-memory-utilization (default: 0.9) +# - daytona_threshold: Max DaytonaErrors before abort (default: 3) +# - vllm_max_retries: VLLM startup retries (default: 5) +# - agent_parser: Agent parser type (default: "", use "xml" for swebench) +# - slurm_time: SLURM time limit (default: "24:00:00") +PRESETS: Dict[str, Dict] = { + "aider": { + "datasets": ["DCAgent2/aider_polyglot"], + "log_suffix": "aider", + "n_concurrent": 32, + "daytona_threshold": 3, + }, + "bfcl": { + "datasets": ["DCAgent/dev_set_v2"], + "log_suffix": "v2", + "n_concurrent": 32, + "daytona_threshold": 10, + "vllm_max_retries": 20, + "enable_thinking": True, + }, + "swebench": { + "datasets": ["DCAgent2/swebench-verified-random-100-folders"], + "check_hf_exists": True, + "log_suffix": "swebench", + "n_concurrent": 64, + "daytona_threshold": 15, + "agent_parser": "xml", + "gpu_memory_util": 0.95, + "vllm_max_retries": 20, + }, + "v2": { + "datasets": ["DCAgent/dev_set_v2"], + "log_suffix": "v2", + "n_concurrent": 32, + "daytona_threshold": 10, + "vllm_max_retries": 20, + "enable_thinking": True, + }, + "tb2": { + "datasets": ["DCAgent2/terminal_bench_2"], + "log_suffix": "tb2", + "n_concurrent": 32, + "daytona_threshold": 10, + "slurm_time": "48:00:00", + "gpu_memory_util": 0.95, + }, + "dev": { + "datasets": [], # Must provide via args/env + "log_suffix": "dev", + }, +} + +# ---------- Constants ---------- +HF_URL_RE = re.compile(r'https?://(?:www\.)?huggingface\.co/([^/\s]+)/([^/\s#?]+)') +JOB_STATUS_PENDING = "Pending" +JOB_STATUS_STARTED = "Started" +JOB_STATUS_FINISHED = "Finished" +DEFAULT_STALE_JOB_HOURS = 24 +DEFAULT_STALE_PENDING_HOURS = 168 +DEFAULT_LOOKBACK_DAYS = 100 +DEFAULT_CHECK_HOURS = 12.0 +DEFAULT_LOG_DIR = "experiments/listener_logs" + +# Sbatch parameter defaults +DEFAULT_N_CONCURRENT = 64 +DEFAULT_N_ATTEMPTS = 3 +DEFAULT_GPU_MEMORY_UTIL = 0.9 +DEFAULT_DAYTONA_THRESHOLD = 3 +DEFAULT_VLLM_MAX_RETRIES = 5 +DEFAULT_AGENT_PARSER = "" +DEFAULT_SLURM_TIME = "24:00:00" +DEFAULT_AGENT_NAME = "terminus-2" +DEFAULT_SLURM_PARTITION = "gh" +DEFAULT_ENABLE_THINKING = False +DEFAULT_SBATCH_SCRIPT = "unified_eval_harbor.sbatch" + + +# ---------- Configuration ---------- +@dataclass +class ListenerConfig: + """Configuration for the eval listener.""" + datasets: List[str] + sbatch_script: str + log_file: Optional[Path] + lookback_days: int + check_interval_hours: float + stale_job_hours: int + stale_pending_hours: int + priority_file: Optional[str] + require_priority_list: bool + priority_models: Set[str] + check_hf_exists: bool + dry_run: bool + run_once: bool + verbose: bool + # Sbatch parameters (passed to sbatch via env vars) + n_concurrent: int = DEFAULT_N_CONCURRENT + n_attempts: int = DEFAULT_N_ATTEMPTS + gpu_memory_util: float = DEFAULT_GPU_MEMORY_UTIL + daytona_threshold: int = DEFAULT_DAYTONA_THRESHOLD + vllm_max_retries: int = DEFAULT_VLLM_MAX_RETRIES + agent_parser: str = DEFAULT_AGENT_PARSER + slurm_time: str = DEFAULT_SLURM_TIME + enable_thinking: bool = DEFAULT_ENABLE_THINKING + agent_name: str = DEFAULT_AGENT_NAME + slurm_partition: str = DEFAULT_SLURM_PARTITION + upload_username: str = "" + log_prefix: str = "[unified-eval-listener]" + + @property + def check_interval_seconds(self) -> int: + return int(self.check_interval_hours * 60 * 60) + + +# ---------- Logging ---------- +_LOG_FILE: Optional[Path] = None + + +def set_log_file(path: Optional[Path]) -> None: + global _LOG_FILE + _LOG_FILE = path + + +def log(msg: str, prefix: str = "[unified-eval-listener]") -> None: + """Log a message to stdout and optionally to file.""" + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + line = f"{prefix} {ts} {msg}" + print(line, flush=True) + if _LOG_FILE: + try: + with _LOG_FILE.open("a") as f: + f.write(line + "\n") + except Exception: + pass + + +# ---------- Priority Models Loading ---------- +def load_priority_models(filepath: Optional[str]) -> Set[str]: + """ + Load priority models from a text file. + + File format: + - One model per line (HuggingFace format: org/model) + - Lines starting with # are comments + - Blank lines are ignored + + Returns: + Set of model names (exact match). Empty set if file is missing or empty. + """ + if not filepath: + return set() + + path = Path(filepath) + if not path.exists(): + log(f"Priority file not found: {filepath}") + return set() + + models: Set[str] = set() + try: + with path.open("r") as f: + for line in f: + line = line.strip() + # Skip empty lines and comments + if not line or line.startswith("#"): + continue + models.add(line) + log(f"Loaded {len(models)} model(s) from priority file: {filepath}") + return models + except Exception as e: + log(f"ERROR reading priority file {filepath}: {e}") + return set() + + +# ---------- HuggingFace Utilities ---------- +def check_hf_model_exists(model_name: str) -> bool: + """ + Check if a model exists on HuggingFace Hub. + + Args: + model_name: HF model name (e.g., "org/model-name") + + Returns: + True if model exists and is accessible, False otherwise + """ + if not model_name or not isinstance(model_name, str): + return False + + try: + from huggingface_hub import model_info + model_info(model_name) + return True + except Exception as e: + log(f"HF check failed for {model_name}: {e}") + return False + + +def _parse_hf_from_str(val: Optional[str]) -> Optional[str]: + """Parse HuggingFace model name from a string (URL or org/repo).""" + if not isinstance(val, str): + return None + m = HF_URL_RE.search(val) + if m: + return f"{m.group(1)}/{m.group(2)}" + return None + + +def resolve_hf_model_name(model_row: Dict) -> Optional[str]: + """ + Resolve HF model name from a database model row. + + Checks multiple fields in order of priority. + """ + # Check name field first + v = model_row.get("name") + if isinstance(v, str) and "/" in v and not v.startswith("hosted_vllm/"): + return v + + # Check other URL fields + for field in ("weights_location", "training_parameters", "url", "hf_url"): + vv = model_row.get(field) + if isinstance(vv, str): + name = _parse_hf_from_str(vv) + if name: + return name + + # Check training_parameters as JSON + vv = model_row.get("training_parameters") + if isinstance(vv, str): + try: + obj = json.loads(vv) + except Exception: + obj = None + else: + obj = vv + + if isinstance(obj, dict): + for sval in obj.values(): + if isinstance(sval, str): + name = _parse_hf_from_str(sval) + if name: + return name + + return None + + +# ---------- Dataset Parsing ---------- +def parse_datasets(s: str) -> List[str]: + """ + Parse dataset list from string. + + Supports comma, space, or newline separated values. + Normalizes HF URLs to org/repo format. + """ + parts = [p.strip() for p in re.split(r"[,\s]+", s) if p.strip()] + out = [] + for p in parts: + m = HF_URL_RE.search(p) + out.append(f"{m.group(1)}/{m.group(2)}" if m else p) + + # Dedup while preserving order + seen: Set[str] = set() + uniq: List[str] = [] + for d in out: + if d not in seen: + seen.add(d) + uniq.append(d) + return uniq + + +def dataset_repo_name(dataset_hf: str) -> str: + """Convert 'org/repo' or HF URL to 'repo' (just the repo name).""" + if not dataset_hf: + return dataset_hf + m = HF_URL_RE.search(dataset_hf) + if m: + return m.group(2) + if "/" in dataset_hf: + return dataset_hf.rsplit("/", 1)[-1] + return dataset_hf + + +# ---------- Database Operations ---------- +_BENCH_CACHE: Dict[str, Optional[str]] = {} + + +def _iso(dt: datetime) -> str: + """Convert datetime to ISO format string.""" + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc).isoformat() + + +def _time_filters(q, since_iso: str): + """Apply time filter to Supabase query (handles both column names).""" + try: + return q.gte('creation_time', since_iso) + except Exception: + return q.gte('created_at', since_iso) + + +def fetch_recent_models(days: int) -> List[Dict]: + """Fetch recent models from Supabase within the lookback window.""" + client = get_supabase_client() + since = _iso(datetime.now(timezone.utc) - timedelta(days=days)) + try: + resp = _time_filters(client.table('models').select('*'), since).execute() + rows = list(resp.data or []) + except Exception as e: + log(f"ERROR: failed querying models by time: {e}") + return [] + + # Filter out precomputed models + out: List[Dict] = [] + for r in rows: + if r.get("created_by") == "precomputed_hf": + continue + out.append(r) + return out + + +def resolve_benchmark_id(dataset_hf: str) -> Optional[str]: + """ + Look up benchmark ID from database for a given dataset. + + Caches results for performance. + """ + repo_name = dataset_repo_name(dataset_hf) + if repo_name in _BENCH_CACHE: + return _BENCH_CACHE[repo_name] + + try: + client = get_supabase_client() + resp = ( + client.table('benchmarks') + .select('id,name') + .eq('name', repo_name) + .limit(1) + .execute() + ) + rows = resp.data or [] + bench_id = rows[0]['id'] if rows else None + _BENCH_CACHE[repo_name] = bench_id + if not bench_id: + log(f"No benchmark row found for dataset '{dataset_hf}' (wanted name='{repo_name}').") + return bench_id + except Exception as e: + log(f"ERROR resolving benchmark id for dataset '{dataset_hf}': {e}") + return None + + +def check_job_status( + model_id: str, benchmark_id: Optional[str] +) -> Tuple[bool, Optional[str], Optional[datetime], Optional[datetime], Optional[str]]: + """ + Check if a job exists for (model_id, benchmark_id) and its status. + + Returns: + (job_exists, job_status, started_at, submitted_at, slurm_job_id) + """ + if not benchmark_id: + return (False, None, None, None, None) + + try: + client = get_supabase_client() + q = ( + client.table('sandbox_jobs') + .select('id,job_status,started_at,submitted_at,slurm_job_id') + .eq('model_id', model_id) + .eq('benchmark_id', benchmark_id) + .order('created_at', desc=True) + .limit(1) + ) + data = (q.execute().data) or [] + + if not data: + return (False, None, None, None, None) + + job = data[0] + job_status = job.get('job_status') + started_at_str = job.get('started_at') + submitted_at_str = job.get('submitted_at') + slurm_job_id = job.get('slurm_job_id') + + started_at = None + if started_at_str: + try: + started_at = datetime.fromisoformat(started_at_str.replace('Z', '+00:00')) + except Exception: + pass + + submitted_at = None + if submitted_at_str: + try: + submitted_at = datetime.fromisoformat(submitted_at_str.replace('Z', '+00:00')) + except Exception: + pass + + return (True, job_status, started_at, submitted_at, slurm_job_id) + + except Exception as e: + log(f"WARNING: sandbox_jobs check failed for model_id={model_id}, benchmark_id={benchmark_id}: {e}") + return (False, None, None, None, None) # fail-open + + +def is_job_stale(started_at: Optional[datetime], hours: int = DEFAULT_STALE_JOB_HOURS) -> bool: + """Check if a job started more than the specified hours ago.""" + if not started_at: + # If started_at is null but job exists with status='Started', treat as stale + return True + now = datetime.now(timezone.utc) + if started_at.tzinfo is None: + started_at = started_at.replace(tzinfo=timezone.utc) + age = now - started_at + return age > timedelta(hours=hours) + + +def should_start_job( + model_id: str, + benchmark_id: Optional[str], + stale_hours: int = DEFAULT_STALE_JOB_HOURS, + stale_pending_hours: int = DEFAULT_STALE_PENDING_HOURS, +) -> Tuple[bool, str, Optional[str]]: + """ + Determine if a job should be started based on DB status. + + Returns: + (should_start, reason, slurm_job_id) + slurm_job_id is provided so the caller can scancel stale jobs. + """ + job_exists, job_status, started_at, submitted_at, slurm_job_id = check_job_status( + model_id, benchmark_id + ) + + if not job_exists: + return (True, "no existing job", None) + + if job_status == JOB_STATUS_FINISHED: + return (False, "job finished", slurm_job_id) + + if job_status == JOB_STATUS_PENDING: + # Job submitted but not yet running - check if stale using separate pending threshold + if is_job_stale(submitted_at, stale_pending_hours): + submitted_str = submitted_at.isoformat() if submitted_at else "null" + return (True, f"stale pending job (submitted_at={submitted_str})", slurm_job_id) + else: + submitted_str = submitted_at.isoformat() if submitted_at else "null" + return (False, f"job pending in SLURM queue (submitted_at={submitted_str})", slurm_job_id) + + if job_status == JOB_STATUS_STARTED: + if is_job_stale(started_at, stale_hours): + started_str = started_at.isoformat() if started_at else "null" + return (True, f"stale job (started_at={started_str})", slurm_job_id) + else: + started_str = started_at.isoformat() if started_at else "null" + return (False, f"job in progress (started_at={started_str})", slurm_job_id) + + # Unknown status - start job to be safe + return (True, f"unknown job status: {job_status}", slurm_job_id) + + +# ---------- Job Submission ---------- +@dataclass +class SbatchParams: + """Parameters passed to sbatch via environment variables.""" + n_concurrent: int = DEFAULT_N_CONCURRENT + n_attempts: int = DEFAULT_N_ATTEMPTS + gpu_memory_util: float = DEFAULT_GPU_MEMORY_UTIL + daytona_threshold: int = DEFAULT_DAYTONA_THRESHOLD + vllm_max_retries: int = DEFAULT_VLLM_MAX_RETRIES + agent_parser: str = DEFAULT_AGENT_PARSER + slurm_time: str = DEFAULT_SLURM_TIME + enable_thinking: bool = DEFAULT_ENABLE_THINKING + agent_name: str = DEFAULT_AGENT_NAME + slurm_partition: str = DEFAULT_SLURM_PARTITION + upload_username: str = "" + + def to_env(self) -> Dict[str, str]: + """Convert to environment variables for sbatch.""" + env = { + "EVAL_N_CONCURRENT": str(self.n_concurrent), + "EVAL_N_ATTEMPTS": str(self.n_attempts), + "EVAL_GPU_MEMORY_UTIL": str(self.gpu_memory_util), + "EVAL_DAYTONA_THRESHOLD": str(self.daytona_threshold), + "EVAL_VLLM_MAX_RETRIES": str(self.vllm_max_retries), + "EVAL_AGENT_PARSER": self.agent_parser, + "EVAL_SLURM_TIME": self.slurm_time, + "EVAL_ENABLE_THINKING": "true" if self.enable_thinking else "false", + "EVAL_AGENT_NAME": self.agent_name, + } + if self.upload_username: + env["EVAL_UPLOAD_USERNAME"] = self.upload_username + return env + + def __str__(self) -> str: + """String representation for logging.""" + parts = [ + f"n_concurrent={self.n_concurrent}", + f"n_attempts={self.n_attempts}", + f"gpu_memory_util={self.gpu_memory_util}", + f"daytona_threshold={self.daytona_threshold}", + f"vllm_max_retries={self.vllm_max_retries}", + ] + if self.agent_parser: + parts.append(f"agent_parser={self.agent_parser}") + if self.slurm_time != DEFAULT_SLURM_TIME: + parts.append(f"slurm_time={self.slurm_time}") + if self.enable_thinking: + parts.append("enable_thinking=True") + if self.agent_name != DEFAULT_AGENT_NAME: + parts.append(f"agent_name={self.agent_name}") + if self.slurm_partition != DEFAULT_SLURM_PARTITION: + parts.append(f"slurm_partition={self.slurm_partition}") + if self.upload_username: + parts.append(f"upload_username={self.upload_username}") + return ", ".join(parts) + + +def _run(cmd: List[str], env: Optional[Dict[str, str]] = None) -> Tuple[int, str]: + """Run a command and return exit code and output.""" + # Merge with current environment if extra env vars provided + run_env = None + if env: + run_env = os.environ.copy() + run_env.update(env) + + proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=run_env + ) + out_lines = [] + assert proc.stdout is not None + for line in proc.stdout: + out_lines.append(line.rstrip()) + code = proc.wait() + return code, "\n".join(out_lines) + + +def generate_run_tag(dataset_hf: str, model_hf: str) -> str: + """ + Generate a unique RUN_TAG for the job. + + Format: {safe_repo}_{safe_model}_{timestamp} + """ + safe_repo = dataset_repo_name(dataset_hf).replace("-", "_").replace(".", "_") + safe_model = model_hf.split("/")[-1].replace("-", "_").replace(".", "_") + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"{safe_repo}_{safe_model}_{timestamp}" + + +def cancel_slurm_job(slurm_job_id: str, dry_run: bool = False) -> bool: + """Cancel a SLURM job via scancel. Returns True if successful.""" + if dry_run: + log(f"[DRY RUN] Would cancel SLURM job {slurm_job_id}") + return True + code, out = _run(["scancel", slurm_job_id]) + if code == 0: + log(f"Cancelled SLURM job {slurm_job_id}") + return True + else: + log(f"WARNING: scancel failed for job {slurm_job_id}: {out}") + return False + + +def submit_eval( + hf_model_name: str, + dataset_hf: str, + benchmark_id: Optional[str], + sbatch_script: str, + sbatch_params: Optional[SbatchParams] = None, + dry_run: bool = False, + upload_username: str = "", +) -> Tuple[Optional[str], Optional[str]]: + """ + Submit sbatch job and create a Pending DB entry. + + sbatch args: + $1 = model HF name + $2 = dataset HF repo (org/repo) + $3 = benchmark_id (uuid) [optional] + $4 = job_name (RUN_TAG) + + Environment variables (from sbatch_params): + EVAL_N_CONCURRENT, EVAL_N_ATTEMPTS, EVAL_GPU_MEMORY_UTIL, + EVAL_DAYTONA_THRESHOLD, EVAL_VLLM_MAX_RETRIES, EVAL_AGENT_PARSER, + EVAL_SLURM_TIME, EVAL_AGENT_NAME + + Returns: + (slurm_job_id, job_name) if successful, ("DRY_RUN", job_name) if dry run, (None, None) on failure + """ + # Generate unique job name + job_name = generate_run_tag(dataset_hf, hf_model_name) + + cmd = ["sbatch"] + if sbatch_params: + cmd.extend(["--time", sbatch_params.slurm_time]) + cmd.extend(["--partition", sbatch_params.slurm_partition]) + cmd.append(sbatch_script) + cmd.extend([hf_model_name, dataset_hf]) + if benchmark_id: + cmd.append(str(benchmark_id)) + cmd.append(job_name) # 4th arg: job_name (RUN_TAG) + + # Get env vars from params + env_vars = sbatch_params.to_env() if sbatch_params else {} + + if dry_run: + log(f"[DRY RUN] Would execute: {' '.join(cmd)}") + if sbatch_params: + log(f"[DRY RUN] With params: {sbatch_params}") + return ("DRY_RUN", job_name) + + code, out = _run(cmd, env=env_vars) + log(f"sbatch: {' '.join(cmd)}\n{out}") + + if code != 0: + return (None, None) + + m = re.search(r"Submitted batch job (\d+)", out) + slurm_job_id = m.group(1) if m else None + + if not slurm_job_id: + log("ERROR: Could not parse SLURM job ID from sbatch output") + return (None, None) + + # Create Pending DB entry + try: + from unified_db.utils import create_job_entry_pending + agent = sbatch_params.agent_name if sbatch_params else DEFAULT_AGENT_NAME + result = create_job_entry_pending( + job_name=job_name, + model_hf=hf_model_name, + benchmark_hf=dataset_hf, + agent_name=agent, + slurm_job_id=slurm_job_id, + username=upload_username or "listener", + config={"agent": agent, "env": "daytona"}, + ) + if result.get("success"): + log(f"Created Pending DB entry for job {job_name}") + else: + log(f"WARNING: Failed to create Pending DB entry: {result.get('error')}") + except Exception as e: + log(f"WARNING: Exception creating Pending DB entry: {e}") + + return (slurm_job_id, job_name) + + +# ---------- Main Listener Class ---------- +class EvalListener: + """Unified eval listener that handles all benchmark configurations.""" + + def __init__(self, config: ListenerConfig): + self.config = config + set_log_file(config.log_file) + + def run_iteration(self) -> int: + """ + Run one check iteration. + + Returns: + Number of jobs submitted (or would submit in dry-run mode) + """ + # Hot-reload priority models from file (enables editing during long runs) + if self.config.priority_file: + new_priority = load_priority_models(self.config.priority_file) + if new_priority != self.config.priority_models: + log(f"Priority list reloaded: {len(new_priority)} model(s)") + self.config.priority_models = new_priority + + log("Checking for new models...") + models = fetch_recent_models(self.config.lookback_days) + log(f"Found {len(models)} model(s) in window. Filtering...") + + # Check if we should skip all models due to require_priority_list + if not self.config.priority_models and self.config.require_priority_list: + log("No priority list configured and --require-priority-list is set. Skipping all models.") + return 0 + + submissions: List[Tuple[str, str, str, Optional[str], str, Optional[str]]] = [] + # (model_id, hf_model_name, dataset_hf, benchmark_id, reason, slurm_job_id) + + # Track stats + skipped_not_in_priority = 0 + skipped_hf_not_exists = 0 + + # Resolve all benchmarks up front (once per loop) + dataset_to_bench: Dict[str, Optional[str]] = { + ds: resolve_benchmark_id(ds) for ds in self.config.datasets + } + + for m in models: + model_id = str(m.get("id")) + if not model_id: + continue + + hf_model = resolve_hf_model_name(m) + if not hf_model: + if self.config.verbose: + log(f"Skip: cannot resolve HF model for id={model_id}, name={m.get('name')}") + continue + + # Priority filtering (exact match) + if self.config.priority_models and hf_model not in self.config.priority_models: + skipped_not_in_priority += 1 + continue + + # HuggingFace existence check + if self.config.check_hf_exists: + if not check_hf_model_exists(hf_model): + log(f"Skip: model not found on HuggingFace: {hf_model} (model_id={model_id})") + skipped_hf_not_exists += 1 + continue + + for dataset_hf in self.config.datasets: + bench_id = dataset_to_bench.get(dataset_hf) + + # Check DB status to decide if we should start + should_start, reason, old_slurm_job_id = should_start_job( + model_id, bench_id, self.config.stale_job_hours, + stale_pending_hours=self.config.stale_pending_hours, + ) + + if should_start: + submissions.append((model_id, hf_model, dataset_hf, bench_id, reason, old_slurm_job_id)) + elif self.config.verbose: + log(f"Skip: model={hf_model}, dataset={dataset_hf}, reason={reason}") + + # Log filtering stats + if self.config.priority_models and skipped_not_in_priority > 0: + log(f"Skipped {skipped_not_in_priority} model(s) not in priority list") + if self.config.check_hf_exists and skipped_hf_not_exists > 0: + log(f"Skipped {skipped_hf_not_exists} model(s) not found on HuggingFace") + + if not submissions: + log("No eligible (model, dataset) pairs to submit.") + return 0 + + prefix = "[DRY RUN] Would submit" if self.config.dry_run else "Submitting" + log(f"{prefix} {len(submissions)} eval(s)...") + + # Create sbatch params from config + sbatch_params = SbatchParams( + n_concurrent=self.config.n_concurrent, + n_attempts=self.config.n_attempts, + gpu_memory_util=self.config.gpu_memory_util, + daytona_threshold=self.config.daytona_threshold, + vllm_max_retries=self.config.vllm_max_retries, + agent_parser=self.config.agent_parser, + slurm_time=self.config.slurm_time, + enable_thinking=self.config.enable_thinking, + agent_name=self.config.agent_name, + slurm_partition=self.config.slurm_partition, + upload_username=self.config.upload_username, + ) + + submitted = 0 + for mid, hf_model, dataset_hf, bench_id, reason, old_slurm_job_id in submissions: + dry_prefix = "[DRY RUN] " if self.config.dry_run else "" + log(f"{dry_prefix}Submitting: model={hf_model}, dataset={dataset_hf}, reason={reason}") + + # Cancel stale Pending SLURM job before resubmission + if reason.startswith("stale pending") and old_slurm_job_id: + cancel_slurm_job(old_slurm_job_id, dry_run=self.config.dry_run) + + slurm_job_id, job_name = submit_eval( + hf_model, + dataset_hf, + bench_id, + self.config.sbatch_script, + sbatch_params=sbatch_params, + dry_run=self.config.dry_run, + upload_username=self.config.upload_username, + ) + + if slurm_job_id: + if self.config.dry_run: + log(f" -> Would submit as SLURM job (job_name={job_name})") + else: + log(f" -> Submitted as SLURM job {slurm_job_id} (job_name={job_name})") + submitted += 1 + else: + log(f" -> Submission failed") + + if not self.config.dry_run: + time.sleep(1) + + return submitted + + def run(self) -> None: + """Main event loop.""" + # Log configuration + hdr = ( + f"lookback={self.config.lookback_days}d, " + f"every {self.config.check_interval_hours}h, " + f"sbatch={self.config.sbatch_script}" + ) + log(f"Starting listener for datasets={self.config.datasets}: {hdr}") + log( + f"Job logic: restart if 'Started' and started_at > {self.config.stale_job_hours}h ago, " + f"restart+scancel if 'Pending' and submitted_at > {self.config.stale_pending_hours}h ago, " + f"skip if 'Finished'" + ) + log(f"Dry run mode: {self.config.dry_run}") + log(f"Run once mode: {self.config.run_once}") + log(f"Check HF exists: {self.config.check_hf_exists}") + log(f"Require priority list: {self.config.require_priority_list}") + + if self.config.priority_models: + log(f"Priority filtering: {len(self.config.priority_models)} model(s) in list") + if self.config.priority_file: + log(f"Priority file: {self.config.priority_file} (hot-reloaded each iteration)") + if self.config.verbose: + for m in sorted(self.config.priority_models): + log(f" - {m}") + else: + log("Priority filtering: disabled (no priority file or empty)") + + # Log sbatch parameters + sbatch_params = SbatchParams( + n_concurrent=self.config.n_concurrent, + n_attempts=self.config.n_attempts, + gpu_memory_util=self.config.gpu_memory_util, + daytona_threshold=self.config.daytona_threshold, + vllm_max_retries=self.config.vllm_max_retries, + agent_parser=self.config.agent_parser, + slurm_time=self.config.slurm_time, + enable_thinking=self.config.enable_thinking, + agent_name=self.config.agent_name, + slurm_partition=self.config.slurm_partition, + ) + log(f"Sbatch params: {sbatch_params}") + + while True: + try: + self.run_iteration() + + # Exit after one iteration if requested + if self.config.run_once or self.config.dry_run: + mode = "DRY RUN" if self.config.dry_run else "ONCE" + log(f"[{mode}] Complete. Exiting after one iteration.") + break + + hours = self.config.check_interval_hours + log(f"Sleeping for {hours} hours...\n") + time.sleep(self.config.check_interval_seconds) + + except KeyboardInterrupt: + log("Interrupted by user. Exiting.") + sys.exit(0) + except Exception as e: + log(f"ERROR in main loop: {e}. Backing off 30s.") + time.sleep(30) + + +# ---------- CLI Argument Parsing ---------- +def parse_args() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Unified Eval Listener - Run models on benchmark datasets", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Presets available: aider, bfcl, swebench, v2, tb2, dev + +Examples: + # Replace bfcl_eval_listener.py + python unified_eval_listener.py --preset bfcl + + # Replace v2_eval_listener_prio.py with priority filtering + python unified_eval_listener.py --preset v2 \\ + --priority-file priority_models.txt + + # Replace swebench_eval_listener.py (HF checking enabled by preset) + python unified_eval_listener.py --preset swebench + + # Custom: multiple datasets + custom options + python unified_eval_listener.py \\ + --datasets "DCAgent/dev_set_v2,DCAgent2/bfcl-parity" \\ + --sbatch-script custom_eval.sbatch \\ + --check-hf-exists + + # Dry run to preview + python unified_eval_listener.py --preset v2 --dry-run + + # Single iteration mode + python unified_eval_listener.py --preset bfcl --once + """, + ) + + # Preset configuration + parser.add_argument( + "--preset", "-p", + choices=list(PRESETS.keys()), + help="Use a preset configuration (aider, bfcl, swebench, v2, tb2, dev)", + ) + + # Dataset configuration + parser.add_argument( + "--datasets", "-d", + help="Comma/space separated HF dataset repos (overrides preset)", + ) + parser.add_argument( + "--sbatch-script", "-s", + help="SBATCH script to use (overrides preset)", + ) + parser.add_argument( + "--log-file", + help="Log file path (default: auto-generated based on preset)", + ) + + # Timing configuration + parser.add_argument( + "--lookback-days", + type=int, + help=f"Days to look back for models (default: {DEFAULT_LOOKBACK_DAYS})", + ) + parser.add_argument( + "--check-hours", + type=float, + help=f"Hours between iterations (default: {DEFAULT_CHECK_HOURS})", + ) + parser.add_argument( + "--stale-hours", + type=int, + help=f"Hours before 'Started' job is stale (default: {DEFAULT_STALE_JOB_HOURS})", + ) + parser.add_argument( + "--stale-pending-hours", + type=int, + help=f"Hours before 'Pending' job is stale (default: {DEFAULT_STALE_PENDING_HOURS})", + ) + + # Priority filtering + parser.add_argument( + "--priority-file", + help="Path to priority models file (one model per line)", + ) + parser.add_argument( + "--require-priority-list", + action="store_true", + help="Skip all models when priority list is empty/missing", + ) + + # Validation options + parser.add_argument( + "--check-hf-exists", + action="store_true", + help="Validate model exists on HuggingFace before submit", + ) + + # Eval parameters (passed to sbatch via env vars) + parser.add_argument( + "--n-concurrent", + type=int, + help=f"Harbor concurrent jobs (default: {DEFAULT_N_CONCURRENT}, preset overrides)", + ) + parser.add_argument( + "--n-attempts", + type=int, + help=f"Retry attempts per task (default: {DEFAULT_N_ATTEMPTS})", + ) + parser.add_argument( + "--gpu-memory-util", + type=float, + help=f"VLLM GPU memory fraction (default: {DEFAULT_GPU_MEMORY_UTIL})", + ) + parser.add_argument( + "--daytona-threshold", + type=int, + help=f"Max DaytonaErrors before abort (default: {DEFAULT_DAYTONA_THRESHOLD})", + ) + parser.add_argument( + "--vllm-max-retries", + type=int, + help=f"VLLM startup retries (default: {DEFAULT_VLLM_MAX_RETRIES})", + ) + parser.add_argument( + "--agent-parser", + help=f"Agent parser type (default: \"{DEFAULT_AGENT_PARSER}\", use \"xml\" for swebench)", + ) + parser.add_argument( + "--slurm-time", + help=f"SLURM time limit (default: \"{DEFAULT_SLURM_TIME}\")", + ) + parser.add_argument( + "--agent-name", + help=f"Agent name for harbor and DB entries (default: \"{DEFAULT_AGENT_NAME}\")", + ) + parser.add_argument( + "--slurm-partition", + help=f"SLURM partition (default: \"{DEFAULT_SLURM_PARTITION}\")", + ) + parser.add_argument( + "--enable-thinking", + action="store_true", + help="Enable thinking blocks for model inference (default: False)", + ) + parser.add_argument( + "--upload-username", + help="Username for DB entries and result uploads (default: current OS user)", + ) + + # Execution mode + parser.add_argument( + "--dry-run", + action="store_true", + help="Preview mode, no actual submission (implies --once)", + ) + parser.add_argument( + "--once", + action="store_true", + help="Run single iteration and exit", + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable verbose logging", + ) + + return parser.parse_args() + + +def _env_bool(name: str) -> bool: + """Get boolean from environment variable.""" + return os.getenv(name, "").lower() in ("1", "true", "yes") + + +def build_config(args: argparse.Namespace) -> ListenerConfig: + """Build configuration from args, env vars, and preset defaults.""" + + # Start with preset if specified + preset_config: Dict = {} + if args.preset: + preset_config = PRESETS.get(args.preset, {}) + + # Resolve datasets: CLI > ENV > Preset + datasets_str = args.datasets or os.getenv("EVAL_LISTENER_DATASETS") or "" + if datasets_str: + datasets = parse_datasets(datasets_str) + else: + datasets = preset_config.get("datasets", []) + + if not datasets: + print("ERROR: No datasets specified. Use --datasets, EVAL_LISTENER_DATASETS, or --preset") + sys.exit(2) + + # Resolve sbatch script: CLI > ENV > Preset > Default + sbatch_script = ( + args.sbatch_script + or os.getenv("EVAL_LISTENER_SBATCH") + or preset_config.get("sbatch_script") + or DEFAULT_SBATCH_SCRIPT + ) + + # Resolve timing: CLI > ENV > Default + lookback_days = ( + args.lookback_days + if args.lookback_days is not None + else int(os.getenv("EVAL_LISTENER_LOOKBACK_DAYS", str(DEFAULT_LOOKBACK_DAYS))) + ) + check_hours = ( + args.check_hours + if args.check_hours is not None + else float(os.getenv("EVAL_LISTENER_CHECK_HOURS", str(DEFAULT_CHECK_HOURS))) + ) + stale_hours = args.stale_hours if args.stale_hours is not None else DEFAULT_STALE_JOB_HOURS + stale_pending_hours = args.stale_pending_hours if args.stale_pending_hours is not None else DEFAULT_STALE_PENDING_HOURS + + # Resolve log file + log_dir = Path(os.getenv("EVAL_LISTENER_LOG_DIR", DEFAULT_LOG_DIR)) + log_dir.mkdir(parents=True, exist_ok=True) + + if args.log_file: + log_file = Path(args.log_file) + else: + suffix = preset_config.get("log_suffix", "unified") + current_time = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = log_dir / f"{suffix}_eval_listener_{current_time}.log" + + # Resolve priority file: CLI > ENV + priority_file = args.priority_file or os.getenv("EVAL_LISTENER_PRIORITY_FILE") + priority_models = load_priority_models(priority_file) + + # Resolve boolean flags: CLI > ENV > Preset + require_priority = args.require_priority_list or _env_bool("EVAL_LISTENER_REQUIRE_PRIORITY_LIST") + dry_run = args.dry_run or _env_bool("EVAL_LISTENER_DRY_RUN") + check_hf_exists = ( + args.check_hf_exists + or _env_bool("EVAL_LISTENER_CHECK_HF_EXISTS") + or preset_config.get("check_hf_exists", False) + ) + + # Resolve sbatch parameters: CLI > Preset > Default + # Helper to get value with priority: CLI arg > Preset > Default + def _resolve(cli_val, preset_key: str, default): + if cli_val is not None: + return cli_val + return preset_config.get(preset_key, default) + + n_concurrent = _resolve(args.n_concurrent, "n_concurrent", DEFAULT_N_CONCURRENT) + n_attempts = _resolve(args.n_attempts, "n_attempts", DEFAULT_N_ATTEMPTS) + gpu_memory_util = _resolve(args.gpu_memory_util, "gpu_memory_util", DEFAULT_GPU_MEMORY_UTIL) + daytona_threshold = _resolve(args.daytona_threshold, "daytona_threshold", DEFAULT_DAYTONA_THRESHOLD) + vllm_max_retries = _resolve(args.vllm_max_retries, "vllm_max_retries", DEFAULT_VLLM_MAX_RETRIES) + agent_parser = _resolve(args.agent_parser, "agent_parser", DEFAULT_AGENT_PARSER) + slurm_time = _resolve(args.slurm_time, "slurm_time", DEFAULT_SLURM_TIME) + agent_name = _resolve(args.agent_name, "agent_name", DEFAULT_AGENT_NAME) + slurm_partition = _resolve(args.slurm_partition, "slurm_partition", DEFAULT_SLURM_PARTITION) + # enable_thinking: CLI flag > Preset > Default (CLI is action="store_true" so check explicitly) + enable_thinking = args.enable_thinking or preset_config.get("enable_thinking", DEFAULT_ENABLE_THINKING) + + # Resolve upload_username: CLI > ENV > current OS user + upload_username = ( + args.upload_username + or os.getenv("EVAL_UPLOAD_USERNAME") + or getpass.getuser() + ) + + return ListenerConfig( + datasets=datasets, + sbatch_script=sbatch_script, + log_file=log_file, + lookback_days=lookback_days, + check_interval_hours=check_hours, + stale_job_hours=stale_hours, + stale_pending_hours=stale_pending_hours, + priority_file=priority_file, + require_priority_list=require_priority, + priority_models=priority_models, + check_hf_exists=check_hf_exists, + dry_run=dry_run, + run_once=args.once, + verbose=args.verbose, + # Sbatch parameters + n_concurrent=n_concurrent, + n_attempts=n_attempts, + gpu_memory_util=gpu_memory_util, + daytona_threshold=daytona_threshold, + vllm_max_retries=vllm_max_retries, + agent_parser=agent_parser, + slurm_time=slurm_time, + enable_thinking=enable_thinking, + agent_name=agent_name, + slurm_partition=slurm_partition, + upload_username=upload_username, + ) + + +# ---------- Main ---------- +def main() -> None: + args = parse_args() + config = build_config(args) + listener = EvalListener(config) + listener.run() + + +if __name__ == "__main__": + main()