diff --git a/.agents/AGENTS.md b/.agents/AGENTS.md index 8acceeb4d..1b6ac558e 100644 --- a/.agents/AGENTS.md +++ b/.agents/AGENTS.md @@ -650,6 +650,16 @@ images = SourceImage.objects.annotate(det_count=Count('detections')) - Use `@shared_task` decorator for all tasks - Check Flower UI for debugging: http://localhost:5555 +### E2E Testing & Monitoring Async Jobs + +Run an end-to-end ML job test: +```bash +docker compose run --rm django python manage.py test_ml_job_e2e \ + --project 18 --dispatch-mode async_api --collection 142 --pipeline "global_moths_2024" +``` + +For monitoring running jobs (Django ORM, REST API, NATS consumer state, Redis counters, worker logs, etc.), see `docs/claude/reference/monitoring-async-jobs.md`. + ### Running a Single Test ```bash diff --git a/ami/jobs/management/commands/chaos_monkey.py b/ami/jobs/management/commands/chaos_monkey.py new file mode 100644 index 000000000..50ad3c6ab --- /dev/null +++ b/ami/jobs/management/commands/chaos_monkey.py @@ -0,0 +1,97 @@ +""" +Fault injection utility for manual chaos testing of ML async jobs. + +Use alongside `test_ml_job_e2e` to verify job behaviour when Redis or NATS +becomes unavailable or loses state mid-processing. + +Usage examples: + + # Flush all Redis state immediately (simulates FLUSHDB mid-job) + python manage.py chaos_monkey flush redis + + # Flush all NATS JetStream streams (simulates broker state loss) + python manage.py chaos_monkey flush nats +""" + +from asgiref.sync import async_to_sync +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError +from django_redis import get_redis_connection + +NATS_URL = getattr(settings, "NATS_URL", "nats://nats:4222") + + +class Command(BaseCommand): + help = "Inject faults into Redis or NATS for chaos/resilience testing" + + def add_arguments(self, parser): + parser.add_argument( + "action", + choices=["flush"], + help="flush: wipe all state.", + ) + parser.add_argument( + "service", + choices=["redis", "nats"], + help="Target service to fault.", + ) + + def handle(self, *args, **options): + action = options["action"] + service = options["service"] + + if action == "flush" and service == "redis": + self._flush_redis() + elif action == "flush" and service == "nats": + self._flush_nats() + + # ------------------------------------------------------------------ + # Redis + # ------------------------------------------------------------------ + + def _flush_redis(self): + self.stdout.write("Flushing Redis database (FLUSHDB)...") + try: + redis = get_redis_connection("default") + redis.flushdb() + self.stdout.write(self.style.SUCCESS("Redis flushed.")) + except Exception as e: + raise CommandError(f"Failed to flush Redis: {e}") from e + + # ------------------------------------------------------------------ + # NATS + # ------------------------------------------------------------------ + + def _flush_nats(self): + """Delete all JetStream streams via the NATS Python client.""" + self.stdout.write("Flushing all NATS JetStream streams...") + + async def _delete_all_streams(): + import nats + + nc = await nats.connect(NATS_URL, connect_timeout=5, allow_reconnect=False) + js = nc.jetstream() + try: + streams = await js.streams_info() + if not streams: + return [] + deleted = [] + for stream in streams: + name = stream.config.name + await js.delete_stream(name) + deleted.append(name) + return deleted + finally: + await nc.close() + + try: + deleted = async_to_sync(_delete_all_streams)() + except Exception as e: + raise CommandError(f"Failed to flush NATS: {e}") from e + + if deleted: + for name in deleted: + self.stdout.write(f" Deleted stream: {name}") + self.stdout.write(self.style.SUCCESS(f"Deleted {len(deleted)} stream(s).")) + else: + self.stdout.write("No streams found — NATS already empty.") diff --git a/ami/jobs/management/commands/test_ml_job_e2e.py b/ami/jobs/management/commands/test_ml_job_e2e.py index 2f613e39c..f79c54fbb 100644 --- a/ami/jobs/management/commands/test_ml_job_e2e.py +++ b/ami/jobs/management/commands/test_ml_job_e2e.py @@ -10,7 +10,11 @@ class Command(BaseCommand): - help = "Run end-to-end test of ML job processing" + help = ( + "Run end-to-end test of ML job processing.\n\n" + "For monitoring and debugging running jobs, see:\n" + " docs/claude/reference/monitoring-async-jobs.md" + ) def add_arguments(self, parser): parser.add_argument("--project", type=int, required=True, help="Project ID") diff --git a/ami/jobs/models.py b/ami/jobs/models.py index be797dd4f..89be29312 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -15,7 +15,7 @@ from ami.base.models import BaseModel from ami.base.schemas import ConfigurableStage, ConfigurableStageParam -from ami.jobs.tasks import run_job +from ami.jobs.tasks import cleanup_async_job_if_needed, run_job from ami.main.models import Deployment, Project, SourceImage, SourceImageCollection from ami.ml.models import Pipeline from ami.ml.post_processing.registry import get_postprocessing_task @@ -88,6 +88,11 @@ def final_states(cls): def failed_states(cls): return [cls.FAILURE, cls.REVOKED, cls.UNKNOWN] + @classmethod + def active_states(cls): + """States where a job is actively processing and should serve tasks to workers.""" + return [cls.STARTED, cls.RETRY] + def get_status_label(status: JobState, progress: float) -> str: """ @@ -331,26 +336,29 @@ def emit(self, record: logging.LogRecord): # Log to the current app logger logger.log(record.levelno, self.format(record)) - # Write to the logs field on the job instance - timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - msg = f"[{timestamp}] {record.levelname} {self.format(record)}" - if msg not in self.job.logs.stdout: - self.job.logs.stdout.insert(0, msg) + # Write to the logs field on the job instance. + # Refresh from DB first to reduce the window for concurrent overwrites — each + # worker holds its own stale in-memory copy of `logs`, so without a refresh the + # last writer always wins and earlier entries are silently dropped. + # @TODO consider saving logs to the database periodically rather than on every log + try: + self.job.refresh_from_db(fields=["logs"]) + timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + msg = f"[{timestamp}] {record.levelname} {self.format(record)}" + if msg not in self.job.logs.stdout: + self.job.logs.stdout.insert(0, msg) - # Write a simpler copy of any errors to the errors field - if record.levelno >= logging.ERROR: - if record.message not in self.job.logs.stderr: - self.job.logs.stderr.insert(0, record.message) + # Write a simpler copy of any errors to the errors field + if record.levelno >= logging.ERROR: + if record.message not in self.job.logs.stderr: + self.job.logs.stderr.insert(0, record.message) - if len(self.job.logs.stdout) > self.max_log_length: - self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length] + if len(self.job.logs.stdout) > self.max_log_length: + self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length] - # @TODO consider saving logs to the database periodically rather than on every log - try: self.job.save(update_fields=["logs"], update_progress=False) except Exception as e: logger.error(f"Failed to save logs for job #{self.job.pk}: {e}") - pass @dataclass @@ -966,19 +974,28 @@ def retry(self, async_task=True): def cancel(self): """ - Terminate the celery task. + Cancel a job. For async_api jobs, clean up NATS/Redis resources + and transition through CANCELING → REVOKED. For other jobs, + revoke the Celery task. """ self.status = JobState.CANCELING self.save() + if self.task_id: task = run_job.AsyncResult(self.task_id) if task: task.revoke(terminate=True) + if self.dispatch_mode == JobDispatchMode.ASYNC_API: + # For async jobs we need to set the status to revoked here since the task already + # finished (it only queues the images). + self.status = JobState.REVOKED self.save() else: self.status = JobState.REVOKED self.save() + cleanup_async_job_if_needed(self) + def update_status(self, status=None, save=True): """ Update the status of the job based on the status of the celery task. @@ -1084,11 +1101,15 @@ def get_default_progress(cls) -> JobProgress: def logger(self) -> logging.Logger: _logger = logging.getLogger(f"ami.jobs.{self.pk}") - # Only add JobLogHandler if not already present - if not any(isinstance(h, JobLogHandler) for h in _logger.handlers): - # Also log output to a field on thie model instance + # Update or add JobLogHandler, always pointing to the current instance. + # The logger is a process-level singleton so its handler may reference a stale + # job instance from a previous task execution in this worker process. + handler = next((h for h in _logger.handlers if isinstance(h, JobLogHandler)), None) + if handler is None: logger.info("Adding JobLogHandler to logger for job %s", self.pk) _logger.addHandler(JobLogHandler(self)) + else: + handler.job = self _logger.propagate = False return _logger diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 5a3ba6d34..917608be0 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -86,10 +86,9 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub progress_info = state_manager.update_state(processed_image_ids, stage="process", failed_image_ids=failed_image_ids) if not progress_info: - logger.error(f"Redis state missing for job {job_id} — job may have been cleaned up prematurely.") # Acknowledge the task to prevent retries, since we don't know the state _ack_task_via_nats(reply_subject, logger) - # TODO: cancel the job to fail fast once PR #1144 is merged + _fail_job(job_id, "Redis state missing for job") return try: @@ -153,8 +152,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub ) if not progress_info: - job.logger.error(f"Redis state missing for job {job_id} — job may have been cleaned up prematurely.") - # TODO: cancel the job to fail fast once PR #1144 is merged + _fail_job(job_id, "Redis state missing for job") return # update complete state based on latest progress info after saving results @@ -180,6 +178,26 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub job.logger.error(error) +def _fail_job(job_id: int, reason: str) -> None: + from ami.jobs.models import Job, JobState + from ami.ml.orchestration.jobs import cleanup_async_job_resources + + try: + with transaction.atomic(): + job = Job.objects.select_for_update().get(pk=job_id) + if job.status in (JobState.CANCELING, *JobState.final_states()): + return + job.update_status(JobState.FAILURE, save=False) + job.finished_at = datetime.datetime.now() + job.save(update_fields=["status", "progress", "finished_at"]) + + job.logger.error(f"Job {job_id} marked as FAILURE: {reason}") + cleanup_async_job_resources(job.pk, job.logger) + except Job.DoesNotExist: + logger.error(f"Cannot fail job {job_id}: not found") + cleanup_async_job_resources(job_id, logger) + + def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> None: try: @@ -295,10 +313,10 @@ def _update_job_progress( # Clean up async resources for completed jobs that use NATS/Redis if job.progress.is_complete(): job = Job.objects.get(pk=job_id) # Re-fetch outside transaction - _cleanup_job_if_needed(job) + cleanup_async_job_if_needed(job) -def _cleanup_job_if_needed(job) -> None: +def cleanup_async_job_if_needed(job) -> None: """ Clean up async resources (NATS/Redis) if this job uses them. @@ -314,7 +332,7 @@ def _cleanup_job_if_needed(job) -> None: # import here to avoid circular imports from ami.ml.orchestration.jobs import cleanup_async_job_resources - cleanup_async_job_resources(job) + cleanup_async_job_resources(job.pk, job.logger) @task_prerun.connect(sender=run_job) @@ -353,7 +371,7 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs): # Clean up async resources for revoked jobs if state == JobState.REVOKED: - _cleanup_job_if_needed(job) + cleanup_async_job_if_needed(job) @task_failure.connect(sender=run_job, retry=False) @@ -368,7 +386,7 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs): job.save() # Clean up async resources for failed jobs - _cleanup_job_if_needed(job) + cleanup_async_job_if_needed(job) def log_time(start: float = 0, msg: str | None = None) -> tuple[float, Callable]: diff --git a/ami/jobs/tests.py b/ami/jobs/tests.py index 033a08b5c..65bf1e6f1 100644 --- a/ami/jobs/tests.py +++ b/ami/jobs/tests.py @@ -445,7 +445,8 @@ def _task_batch_helper(self, value: Any, expected_status: int): pipeline = self._create_pipeline() job = self._create_ml_job("Job for batch test", pipeline) job.dispatch_mode = JobDispatchMode.ASYNC_API - job.save(update_fields=["dispatch_mode"]) + job.status = JobState.STARTED + job.save(update_fields=["dispatch_mode", "status"]) images = [ SourceImage.objects.create( path=f"image_{i}.jpg", @@ -487,6 +488,7 @@ def test_tasks_endpoint_without_pipeline(self): name="Job without pipeline", source_image_collection=self.source_image_collection, dispatch_mode=JobDispatchMode.ASYNC_API, + status=JobState.STARTED, ) self.client.force_authenticate(user=self.user) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index ddc1e57a7..6d0626f23 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -237,8 +237,8 @@ def tasks(self, request, pk=None): if job.dispatch_mode != JobDispatchMode.ASYNC_API: raise ValidationError("Only async_api jobs have fetchable tasks") - # Don't fetch tasks from completed/failed/revoked jobs - if job.status in JobState.final_states(): + # Only serve tasks for actively processing jobs + if job.status not in JobState.active_states(): return Response({"tasks": []}) # Validate that the job has a pipeline diff --git a/ami/ml/orchestration/jobs.py b/ami/ml/orchestration/jobs.py index ce54ecd1c..95c763b1b 100644 --- a/ami/ml/orchestration/jobs.py +++ b/ami/ml/orchestration/jobs.py @@ -11,7 +11,7 @@ logger = logging.getLogger(__name__) -def cleanup_async_job_resources(job: "Job") -> bool: +def cleanup_async_job_resources(job_id: int, _logger: logging.Logger) -> bool: """ Clean up NATS JetStream and Redis resources for a completed job. @@ -22,7 +22,8 @@ def cleanup_async_job_resources(job: "Job") -> bool: Cleanup failures are logged but don't fail the job - data is already saved. Args: - job: The Job instance + job_id: The Job ID (integer primary key) + _logger: Logger to use for logging cleanup results Returns: bool: True if both cleanups succeeded, False otherwise """ @@ -31,26 +32,26 @@ def cleanup_async_job_resources(job: "Job") -> bool: # Cleanup Redis state try: - state_manager = AsyncJobStateManager(job.pk) + state_manager = AsyncJobStateManager(job_id) state_manager.cleanup() - job.logger.info(f"Cleaned up Redis state for job {job.pk}") + _logger.info(f"Cleaned up Redis state for job {job_id}") redis_success = True except Exception as e: - job.logger.error(f"Error cleaning up Redis state for job {job.pk}: {e}") + _logger.error(f"Error cleaning up Redis state for job {job_id}: {e}") # Cleanup NATS resources async def cleanup(): async with TaskQueueManager() as manager: - return await manager.cleanup_job_resources(job.pk) + return await manager.cleanup_job_resources(job_id) try: nats_success = async_to_sync(cleanup)() if nats_success: - job.logger.info(f"Cleaned up NATS resources for job {job.pk}") + _logger.info(f"Cleaned up NATS resources for job {job_id}") else: - job.logger.warning(f"Failed to clean up NATS resources for job {job.pk}") + _logger.warning(f"Failed to clean up NATS resources for job {job_id}") except Exception as e: - job.logger.error(f"Error cleaning up NATS resources for job {job.pk}: {e}") + _logger.error(f"Error cleaning up NATS resources for job {job_id}: {e}") return redis_success and nats_success diff --git a/ami/ml/orchestration/nats_queue.py b/ami/ml/orchestration/nats_queue.py index a23d28ac8..884676637 100644 --- a/ami/ml/orchestration/nats_queue.py +++ b/ami/ml/orchestration/nats_queue.py @@ -95,21 +95,32 @@ def _get_consumer_name(self, job_id: int) -> str: """Get consumer name from job_id.""" return f"job-{job_id}-consumer" - async def _ensure_stream(self, job_id: int): - """Ensure stream exists for the given job.""" + async def _stream_exists(self, job_id: int) -> bool: + """Check if stream exists for the given job. + + Only catches NotFoundError (→ False). TimeoutError propagates deliberately + so callers treat an unreachable NATS server as a hard failure rather than + a missing stream. + """ if self.js is None: raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.") stream_name = self._get_stream_name(job_id) - subject = self._get_subject(job_id) - try: await asyncio.wait_for(self.js.stream_info(stream_name), timeout=NATS_JETSTREAM_TIMEOUT) - logger.debug(f"Stream {stream_name} already exists") - except asyncio.TimeoutError: - raise # NATS unreachable — let caller handle it rather than creating a stream blindly - except Exception as e: - logger.warning(f"Stream {stream_name} does not exist: {e}") + return True + except nats.js.errors.NotFoundError: + return False + + async def _ensure_stream(self, job_id: int): + """Ensure stream exists for the given job.""" + if self.js is None: + raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.") + + if not await self._stream_exists(job_id): + stream_name = self._get_stream_name(job_id) + subject = self._get_subject(job_id) + logger.warning(f"Stream {stream_name} does not exist") # Stream doesn't exist, create it await asyncio.wait_for( self.js.add_stream( @@ -207,7 +218,10 @@ async def reserve_tasks(self, job_id: int, count: int, timeout: float = 5) -> li raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.") try: - await self._ensure_stream(job_id) + if not await self._stream_exists(job_id): + logger.debug(f"Stream for job '{job_id}' does not exist when reserving task") + return [] + await self._ensure_consumer(job_id) consumer_name = self._get_consumer_name(job_id) diff --git a/ami/ml/orchestration/tests/test_nats_queue.py b/ami/ml/orchestration/tests/test_nats_queue.py index a7bd91b68..cf3514bce 100644 --- a/ami/ml/orchestration/tests/test_nats_queue.py +++ b/ami/ml/orchestration/tests/test_nats_queue.py @@ -3,6 +3,8 @@ import unittest from unittest.mock import AsyncMock, MagicMock, patch +import nats + from ami.ml.orchestration.nats_queue import TaskQueueManager from ami.ml.schemas import PipelineProcessingTask @@ -51,8 +53,8 @@ async def test_publish_task_creates_stream_and_consumer(self): """Test that publish_task ensures stream and consumer exist.""" nc, js = self._create_mock_nats_connection() sample_task = self._create_sample_task() - js.stream_info.side_effect = Exception("Not found") - js.consumer_info.side_effect = Exception("Not found") + js.stream_info.side_effect = nats.js.errors.NotFoundError() + js.consumer_info.side_effect = nats.js.errors.NotFoundError() with patch("ami.ml.orchestration.nats_queue.get_connection", AsyncMock(return_value=(nc, js))): async with TaskQueueManager() as manager: diff --git a/docs/claude/reference/monitoring-async-jobs.md b/docs/claude/reference/monitoring-async-jobs.md new file mode 100644 index 000000000..946ddadaa --- /dev/null +++ b/docs/claude/reference/monitoring-async-jobs.md @@ -0,0 +1,205 @@ +# Monitoring Async (NATS) Jobs + +Reference for monitoring and debugging async_api jobs that use NATS JetStream for task distribution to external workers (e.g., AMI Data Companion). + +## Starting a Test Job + +```bash +docker compose run --rm django python manage.py test_ml_job_e2e \ + --project 18 \ + --dispatch-mode async_api \ + --collection 142 \ + --pipeline "global_moths_2024" +``` + +Or create a job via the UI at http://localhost:4000/projects/18/jobs. + +## Monitoring Points + +### 1. Web UI + +**Job details page:** `http://localhost:4000/projects/{PROJECT_ID}/jobs/{JOB_ID}` + +Shows status bar, progress percentage, stage breakdown, and logs. Polls the API automatically. + +### 2. Jobs REST API + +```bash +# Get auth token +TOKEN=$(docker compose exec django python manage.py shell -c \ + "from rest_framework.authtoken.models import Token; print(Token.objects.first().key)" 2>/dev/null) + +# Job status & progress summary +curl -s http://localhost:8000/api/v2/jobs/{JOB_ID}/ \ + -H "Authorization: Token $TOKEN" | jq '{id, status, dispatch_mode, progress: .progress.summary}' + +# Full stage breakdown +curl -s http://localhost:8000/api/v2/jobs/{JOB_ID}/ \ + -H "Authorization: Token $TOKEN" | jq '.progress.stages[] | {key: .key, status: .status, progress: .progress}' +``` + +### 3. Tasks Endpoint (Worker-Facing) + +This is what the external worker polls to get batches of images to process. + +```bash +# See what the worker would get (fetches from NATS, reserves tasks) +curl -s "http://localhost:8000/api/v2/jobs/{JOB_ID}/tasks/?batch=8" \ + -H "Authorization: Token $TOKEN" | jq '.tasks | length' + +# Returns empty [] when job is not in active_states (STARTED, RETRY) +# i.e. returns empty for CANCELING, REVOKED, SUCCESS, FAILURE, etc. +``` + +### 4. Django ORM (Shell) + +```bash +docker compose exec django python manage.py shell -c " +from ami.jobs.models import Job +j = Job.objects.get(pk={JOB_ID}) +print(f'Status: {j.status}') +print(f'Dispatch mode: {j.dispatch_mode}') +print(f'Progress: {j.progress.summary.progress*100:.1f}%') +print(f'Started: {j.started_at}') +print(f'Finished: {j.finished_at}') +for s in j.progress.stages: + print(f' {s.key}: {s.status} {s.progress*100:.1f}%') +" +``` + +### 5. NATS JetStream Consumer State + +Shows the queue depth, in-flight tasks, and acknowledgment progress. + +```bash +docker compose exec django python manage.py shell -c " +from ami.ml.orchestration.nats_queue import TaskQueueManager +import asyncio +async def check(): + async with TaskQueueManager() as m: + info = await m.js.consumer_info('job_{JOB_ID}', 'job-{JOB_ID}-consumer') + print(f'num_pending: {info.num_pending}') # Tasks waiting in queue + print(f'num_ack_pending: {info.num_ack_pending}') # Tasks reserved but not yet ACKed + print(f'num_redelivered: {info.num_redelivered}') # Tasks redelivered after timeout + print(f'delivered.seq: {info.delivered.stream_seq}') # Last delivered sequence + print(f'ack_floor.seq: {info.ack_floor.stream_seq}') # Last contiguous ACK +asyncio.run(check()) +" +``` + +Key fields: +- `num_pending` = tasks still in queue, not yet reserved by any worker +- `num_ack_pending` = tasks reserved by worker, waiting for result POST + ACK +- `num_redelivered` = tasks that timed out (TTR=30s default) and were redelivered +- When `num_pending=0` and `num_ack_pending=0`, all tasks have been processed + +### 6. Redis State (Atomic Progress Counters) + +Tracks per-stage progress independently of the Job model. Updated atomically by Celery result tasks. + +```bash +docker compose exec django python manage.py shell -c " +from ami.ml.orchestration.async_job_state import AsyncJobStateManager +sm = AsyncJobStateManager({JOB_ID}) +for stage in sm.STAGES: + prog = sm.get_progress(stage) + print(f'{stage}: remaining={prog.remaining} processed={prog.processed}/{prog.total} failed={prog.failed} ({prog.percentage*100:.1f}%)') +" +``` + +### 7. Django Logs (Docker Compose) + +```bash +# All django logs (includes task reservations and result processing) +docker compose logs -f django + +# Filter for specific job +docker compose logs -f django 2>&1 | grep "1408" + +# Filter for task reservations +docker compose logs -f django 2>&1 | grep "Reserved" + +# Filter for result processing +docker compose logs -f django 2>&1 | grep "Queued pipeline result" +``` + +### 8. Celery Worker Logs + +```bash +# Celery worker logs (result saving, NATS ACKs, progress updates) +docker compose logs -f celeryworker + +# Filter for specific job +docker compose logs -f celeryworker 2>&1 | grep "job 1408" +``` + +### 9. AMI Worker Logs (External) + +The AMI Data Companion worker runs outside Docker. Check its terminal output for: +- Batch processing progress (e.g., "Finished batch 84. Total items: 672") +- Model inference times (detection + classification) +- Connection errors to Django API or NATS + +```bash +# If running via conda +conda activate ami-py311 +ami worker --pipeline global_moths_2024 + +# Worker registration (loads ML models, ~20s) +ami worker register "local-worker" --project 18 +``` + +## Continuous Monitoring (Watch Loop) + +```bash +# Poll job status every 5 seconds +watch -n 5 'docker compose exec django python manage.py shell -c " +from ami.jobs.models import Job +j = Job.objects.get(pk={JOB_ID}) +print(f\"Status: {j.status} | Progress: {j.progress.summary.progress*100:.1f}%\") +for s in j.progress.stages: + print(f\" {s.key}: {s.status} {s.progress*100:.1f}%\") +"' +``` + +## Job Lifecycle (async_api) + +```text +CREATED → PENDING → STARTED → [processing] → SUCCESS + ↓ + CANCELING → REVOKED (user cancels) + ↓ + FAILURE (error during processing) +``` + +1. **STARTED**: Celery task collects images, publishes to NATS stream, then returns +2. **Processing**: Worker polls `/tasks`, processes batches, POSTs to `/result/` +3. **SUCCESS**: All results received, progress reaches 100% +4. **CANCELING → REVOKED**: User cancels, NATS stream/consumer deleted, status set to REVOKED. In-flight results may still trickle in and are saved. + +## Key Configuration + +| Setting | Default | Source | +|---------|---------|--------| +| NATS task TTR (visibility timeout) | 30s | `NATS_TASK_TTR` env var | +| NATS max_ack_pending | 1000 | `NATS_MAX_ACK_PENDING` env var | +| NATS max_deliver (retries) | 5 | hardcoded in `nats_queue.py` | +| NATS stream retention | 24h | hardcoded in `nats_queue.py` | +| Worker batch size | varies | worker's `?batch=N` param | + +## Troubleshooting + +**Job stuck in STARTED with no progress:** +- Check if worker is running and connected +- Check NATS consumer state — if `num_pending > 0` but nothing is being delivered, worker may have lost connection +- Check `num_redelivered` — high count means tasks are timing out (worker too slow or crashing) + +**Job stuck in CANCELING:** +- Pre-fix: job was stuck because `/tasks` kept serving tasks and nothing transitioned to REVOKED +- Post-fix: `cancel()` cleans up NATS resources and sets REVOKED synchronously +- If still stuck, the periodic `check_incomplete_jobs` beat task (PR #1025) will catch it + +**Results not being saved:** +- Check celeryworker logs for errors in `process_nats_pipeline_result` +- Check Redis state — if `process` is ahead of `results`, Celery is backed up saving results +- Check NATS `num_ack_pending` — high count means results haven't been ACKed yet diff --git a/ui/src/data-services/models/job.ts b/ui/src/data-services/models/job.ts index 625db4ec7..66302d43e 100644 --- a/ui/src/data-services/models/job.ts +++ b/ui/src/data-services/models/job.ts @@ -66,7 +66,9 @@ export class Job { this._job.user_permissions.includes(UserPermission.Run) && this.status.code !== 'CREATED' && this.status.code !== 'STARTED' && - this.status.code !== 'PENDING' + this.status.code !== 'PENDING' && + this.status.code !== 'CANCELING' && + this.status.code !== 'RETRY' ) }