Skip to content

Support ML async job cancellation, fail jobs on redis errors#1162

Merged
mihow merged 45 commits intoRolnickLab:mainfrom
uw-ssec:carlos/redisfail
Feb 28, 2026
Merged

Support ML async job cancellation, fail jobs on redis errors#1162
mihow merged 45 commits intoRolnickLab:mainfrom
uw-ssec:carlos/redisfail

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Feb 27, 2026

Summary

This pull request builds on #1150 and it's based off carlos/redisatomic

This pull request introduces a new chaos testing management command for fault injection and refactors async job cleanup logic to improve reliability and resilience. The most important changes include the addition of a manual chaos testing utility, improved job log handling to prevent lost logs, and a more robust cleanup of async resources for jobs using Redis and NATS. The cleanup logic is now more consistent and reliable, especially in failure and cancellation scenarios.

  • Added chaos_monkey.py management command for manual fault injection of Redis and NATS, allowing developers to flush or pause these services to simulate outages and test job resilience.
  • Refactored cleanup_async_job_resources to accept job ID and logger instead of a Job instance, ensuring cleanup can occur even if the Job object is unavailable and improving logging consistency.
  • Introduced _fail_job helper to mark jobs as failed and trigger async resource cleanup when Redis state is missing, improving failure handling in NATS pipeline results.
  • Updated job cancellation logic to always trigger async cleanup and correctly set status to REVOKED for async jobs.
  • Improved job log handler to refresh logs from the database before writing, reducing lost logs due to concurrent updates.
  • Ensured logger handler always references the current job instance, preventing stale log writes in worker processes.
  • Added _stream_exists check in NATS queue orchestration to avoid unnecessary stream creation and improve error handling when reserving tasks.

How to Test the Changes

Start a job with e.g.:

docker compose run --rm django python manage.py test_ml_job_e2e --collection "ami-1000" --pipeline quebec_vermont_moths_2023 --dispatch-mode async_api --project 1

Then either cancel it in the UI or flush/stop Redis

docker compose run --rm django python manage.py chaos_monkey flush redis # must run in container
docker compose down redis

Screenshots

image image

Known Issues

Occasionally the Error logs get overwritten by another worker and hence the error won't be displayed, which is a known issue with the job logger.

Deployment Notes

Include instructions if this PR requires specific steps for its deployment (database migrations, config changes, etc.)

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • New chaos testing command to inject faults by flushing Redis or message-stream state.
    • Added comprehensive monitoring and E2E testing guide for async job workflows.
  • Bug Fixes

    • Corrected task eligibility to use active job states.
    • Avoided implicit creation of message streams when none exist.
    • Prevented retry action during canceling/retry states.
    • Improved async job cleanup during cancellation and failure paths.
  • Documentation

    • Expanded ML job test command help with monitoring references.

carlos-irreverentlabs and others added 30 commits January 16, 2026 11:25
* fix: prevent NATS connection flooding and stale job task fetching

- Add connect_timeout=5, allow_reconnect=False to NATS connections to
  prevent leaked reconnection loops from blocking Django's event loop
- Guard /tasks endpoint against terminal-status jobs (return empty tasks
  instead of attempting NATS reserve)
- IncompleteJobFilter now excludes jobs by top-level status in addition
  to progress JSON stages
- Add stale worker cleanup to integration test script

Found during PSv2 integration testing where stale ADC workers with
default DataLoader parallelism overwhelmed the single uvicorn worker
thread by flooding /tasks with concurrent NATS reserve requests.

Co-Authored-By: Claude <[email protected]>

* docs: PSv2 integration test session notes and NATS flooding findings

Session notes from 2026-02-16 integration test including root cause
analysis of stale worker task competition and NATS connection issues.
Findings doc tracks applied fixes and remaining TODOs with priorities.

Co-Authored-By: Claude <[email protected]>

* docs: update session notes with successful test run #3

PSv2 integration test passed end-to-end (job 1380, 20/20 images).
Identified ack_wait=300s as cause of ~5min idle time when GPU
processes race for NATS tasks.

Co-Authored-By: Claude <[email protected]>

* fix: batch NATS task fetch to prevent HTTP timeouts

Replace N×1 reserve_task() calls with single reserve_tasks() batch
fetch. The previous implementation created a new pull subscription per
message (320 NATS round trips for batch=64), causing the /tasks endpoint
to exceed HTTP client timeouts. The new approach uses one psub.fetch()
call for the entire batch.

Co-Authored-By: Claude <[email protected]>

* docs: add next session prompt

* feat: add pipeline__slug__in filter for multi-pipeline job queries

Workers that handle multiple pipelines can now fetch jobs for all of them
in a single request: ?pipeline__slug__in=slug1,slug2

Co-Authored-By: Claude <[email protected]>

* chore: remove local-only docs and scripts from branch

These files are session notes, planning docs, and test scripts that
should stay local rather than be part of the PR.

Co-Authored-By: Claude <[email protected]>

* feat: set job dispatch_mode at creation time based on project feature flags

ML jobs with a pipeline now get dispatch_mode set during setup() instead
of waiting until run() is called by the Celery worker. This lets the UI
show the correct mode immediately after job creation.

Co-Authored-By: Claude <[email protected]>

* fix: add timeouts to all JetStream operations and restore reconnect policy

Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations
via asyncio.wait_for() so a hung NATS connection fails fast instead of
blocking the caller's thread indefinitely. Also restore the intended
reconnect policy (2 attempts, 1s wait) that was lost in a prior force push.

Co-Authored-By: Claude <[email protected]>

* fix: propagate NATS timeouts as 503 instead of swallowing them

asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was
caught by the broad `except Exception` in reserve_tasks(), silently
returning [] and making NATS outages indistinguishable from empty queues.
Workers would then poll immediately, recreating the flooding problem.

- Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks()
- Catch TimeoutError and OSError in the /tasks view, return 503
- Restore allow_reconnect=False (fail-fast on connection issues)
- Add return type annotation to get_connection()

Co-Authored-By: Claude <[email protected]>

* fix: address review comments (log level, fetch timeout, docstring)

- Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid
  log spam from frequent polling)
- Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker
  for 5s on empty queues
- Fix docstring examples using string 'job123' for int-typed job_id

Co-Authored-By: Claude <[email protected]>

* fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses

NoServersError, ConnectionClosedError, and other NATS exceptions inherit
from nats.errors.Error (not OSError), so they escaped the handler and
returned 500 instead of 503.

Co-Authored-By: Claude <[email protected]>

---------

Co-authored-by: Claude <[email protected]>
…olnickLab#1142)

* feat: configurable NATS tuning and gunicorn worker management

Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to
this branch:

- Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s)
- Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100)
- Local dev: switch to gunicorn+UvicornWorker by default for production
  parity, with USE_UVICORN=1 escape hatch for raw uvicorn
- Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8)
  when not explicitly set in the environment

Co-Authored-By: Claude <[email protected]>

* fix: address PR review comments

- Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`)
- Update TaskQueueManager docstring with Args section
- Simplify production WEB_CONCURRENCY fallback (just use nproc)

Co-Authored-By: Claude <[email protected]>

---------

Co-authored-by: Michael Bunsen <[email protected]>
Co-authored-by: Claude <[email protected]>
* fix: include pipeline_slug in MinimalJobSerializer (ids_only response)

The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in
the response to know which pipeline to run. Without it, Pydantic
validation fails and the worker skips the job.

Co-Authored-By: Claude <[email protected]>

* Update ami/jobs/serializers.py

Co-authored-by: Copilot <[email protected]>

---------

Co-authored-by: Claude <[email protected]>
Co-authored-by: Copilot <[email protected]>
JobState(str, OrderedEnum) was using str's lexicographic __gt__
instead of OrderedEnum's definition-order __gt__, because str
comes first in the MRO. This caused max(FAILURE, SUCCESS) to
return SUCCESS, silently discarding failure state in concurrent
job progress updates.

Fix: __init_subclass__ injects comparison methods directly onto
each subclass so they take MRO priority over data-type mixins.

Also preserve FAILURE status through the progress ternary when
progress < 1.0, so early failure detection isn't overwritten.

Co-Authored-By: Claude <[email protected]>
The NATS message is ACK'd at line 145, before update_state() and
_update_job_progress(). If either of those raises, the except
block was logging "NATS will redeliver" when it won't.

Co-Authored-By: Claude <[email protected]>
mihow and others added 7 commits February 26, 2026 17:57
When a job is canceled, NATS/Redis cleanup runs before in-flight results
finish processing. The resulting "Redis state missing" message is expected,
not an error.

Co-Authored-By: Claude <[email protected]>
Covers all monitoring points for NATS async jobs: Django ORM, REST API,
tasks endpoint, NATS consumer state, Redis counters, Docker logs, and
AMI worker logs. Linked from CLAUDE.md and the test_ml_job_e2e command.

Co-Authored-By: Claude <[email protected]>
Tests need to set job status to STARTED since the /tasks endpoint
now only serves tasks for jobs in active_states() (STARTED, RETRY).

Co-Authored-By: Claude <[email protected]>
Copilot AI review requested due to automatic review settings February 27, 2026 16:22
@netlify
Copy link

netlify bot commented Feb 27, 2026

Deploy Preview for antenna-ssec ready!

Name Link
🔨 Latest commit a16fc05
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69a24679ad545d0008040f50
😎 Deploy Preview https://deploy-preview-1162--antenna-ssec.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@netlify
Copy link

netlify bot commented Feb 27, 2026

Deploy Preview for antenna-preview ready!

Name Link
🔨 Latest commit a16fc05
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69a2467930b64c0008b6d173
😎 Deploy Preview https://deploy-preview-1162--antenna-preview.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.
Lighthouse
Lighthouse
1 paths audited
Performance: 75 (🟢 up 9 from production)
Accessibility: 89 (🟢 up 9 from production)
Best Practices: 92 (🔴 down 8 from production)
SEO: 100 (🟢 up 8 from production)
PWA: 80 (no change from production)
View the detailed breakdown and full score reports

To edit notification comments on pull requests, go to your Netlify project configuration.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 27, 2026

📝 Walkthrough

Walkthrough

Adds operational tooling and robustness to async job workflows: a chaos-monkey management command, monitoring docs and help text, stronger job cancellation and cleanup semantics, NATS stream-existence checks, renames/introduces cleanup helpers, small UI and test adjustments, and logging/emit changes to refresh DB state before writing logs.

Changes

Cohort / File(s) Summary
Documentation & Help Text
..agents/AGENTS.md, ami/jobs/management/commands/test_ml_job_e2e.py, docs/claude/reference/monitoring-async-jobs.md
Adds E2E testing/monitoring docs, references monitoring guide from the ML job e2e command, and introduces an extensive monitoring/debugging reference for NATS-backed async jobs.
Chaos Testing Command
ami/jobs/management/commands/chaos_monkey.py
New Django management command supporting flush action for redis and nats to inject faults and delete JetStream streams.
Job State, Logging & Cancellation
ami/jobs/models.py
Adds JobState.active_states, refreshes DB in JobLogHandler.emit, ensures handler references current job, and calls cleanup_async_job_if_needed during Job.cancel with adjusted REVOKED/CANCELED transitions for ASYNC_API.
Task Processing & Cleanup Helpers
ami/jobs/tasks.py, ami/ml/orchestration/jobs.py
Renames _cleanup_job_if_neededcleanup_async_job_if_needed, adds _fail_job(job_id, reason), updates callers to use new cleanup, and changes cleanup_async_job_resources signature to (job_id: int, _logger: logging.Logger).
NATS Stream & Reservation Logic
ami/ml/orchestration/nats_queue.py, ami/ml/orchestration/tests/test_nats_queue.py
Adds _stream_exists(job_id) to avoid blind stream creation, updates _ensure_stream and reserve_tasks to skip creating streams when missing, and adjusts tests to use nats.js.errors.NotFoundError.
API, UI & Tests
ami/jobs/views.py, ami/jobs/tests.py, ui/src/data-services/models/job.ts
Tasks endpoint now uses active_states to determine eligibility; tests update job initialization to STARTED and persist status; UI canRetry excludes CANCELING and RETRY states.
Small imports/usage changes
ami/jobs/models.py, ami/jobs/tasks.py
Adds import/usage of cleanup_async_job_if_needed and updates log/cleanup call sites to the new helper name.

Sequence Diagram

sequenceDiagram
    participant Client as Client
    participant JobModel as Job Model
    participant Celery as Celery/Worker
    participant TaskQueue as NATS JetStream
    participant Redis as Redis Progress Store
    participant Cleanup as Cleanup Logic

    Client->>JobModel: cancel(job_id)
    JobModel->>JobModel: evaluate dispatch_mode
    alt ASYNC_API
        JobModel->>Celery: revoke associated task(s)
        Celery-->>JobModel: revoke ack
        JobModel->>Cleanup: cleanup_async_job_if_needed(job)
        Cleanup->>TaskQueue: TaskQueueManager.cleanup_job_resources(job_id)
        TaskQueue-->>Cleanup: delete/cleanup streams & consumers
        Cleanup->>Redis: AsyncJobStateManager.clean_state(job_id)
        Redis-->>Cleanup: state cleared
        Cleanup-->>JobModel: cleanup result
        JobModel->>JobModel: set status -> REVOKED
    else sync dispatch / other
        JobModel->>JobModel: set status -> CANCELED
    end
    JobModel-->>Client: cancellation completed
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

backend, ml

Poem

🐰 I hopped through streams and flushed a cache,
Poked at jobs and made the workers dash.
States now tidy, streams now checked,
Logs refreshed before being decked.
Hooray — chaos pranced, then neatly crashed!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 51.61% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main changes: async job cancellation support and failure handling when Redis errors occur, which are core features throughout the PR.
Description check ✅ Passed The description covers all required template sections: Summary, List of Changes, How to Test, Screenshots, Deployment Notes, and Checklist. It provides clear context, detailed changes, testing instructions, and known issues.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request enhances ML async job resilience by adding support for job cancellation, improving error handling when Redis state is lost, and introducing chaos testing utilities. The changes build on PR #1150's atomic Redis operations by ensuring proper cleanup of NATS and Redis resources in various failure and cancellation scenarios.

Changes:

  • Refactored async resource cleanup to accept job_id and logger instead of Job instance, improving reliability when the Job object is unavailable
  • Introduced _fail_job helper that marks jobs as failed and triggers cleanup when Redis state is missing
  • Updated job cancellation to always clean up async resources and correctly transition CANCELING → REVOKED for async jobs
  • Improved job logging with DB refresh before writes and handler instance updates to reduce lost logs
  • Added chaos_monkey management command for fault injection testing
  • Created comprehensive monitoring documentation and updated job state checks to use active_states()

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
ami/jobs/models.py Added active_states() method, updated cancel logic to cleanup async resources and handle ASYNC_API jobs, improved logger handler to always reference current instance and refresh logs from DB
ami/jobs/tasks.py Added _fail_job helper for failing jobs on Redis errors, renamed cleanup function to cleanup_async_job_if_needed for consistency, updated all cleanup call sites
ami/jobs/views.py Changed tasks endpoint to use active_states() instead of checking final_states, preventing task serving during cancellation
ami/jobs/tests.py Updated test jobs to STARTED status to match new active_states requirement
ami/ml/orchestration/jobs.py Refactored cleanup_async_job_resources signature to accept job_id and logger
ami/ml/orchestration/nats_queue.py Added _stream_exists check, refactored _ensure_stream to avoid unnecessary creation, updated test to return empty list when stream doesn't exist
ami/ml/orchestration/tests/test_nats_queue.py Updated to use specific nats.js.errors.NotFoundError instead of generic Exception
ami/jobs/management/commands/chaos_monkey.py New management command for Redis and NATS fault injection testing
ami/jobs/management/commands/test_ml_job_e2e.py Updated help text to reference monitoring documentation
docs/claude/reference/monitoring-async-jobs.md New comprehensive monitoring guide for async jobs
ui/src/data-services/models/job.ts Added CANCELING to non-retryable states
.agents/AGENTS.md Added reference to E2E testing and monitoring documentation

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
ui/src/data-services/models/job.ts (1)

64-71: ⚠️ Potential issue | 🟠 Major

Block retry while a job is already in RETRY.

canRetry now blocks PENDING/CANCELING, but it still allows retries when status is RETRY (an active processing state), which can permit duplicate retry actions.

Proposed fix
   get canRetry(): boolean {
     return (
       this._job.user_permissions.includes(UserPermission.Run) &&
       this.status.code !== 'CREATED' &&
       this.status.code !== 'STARTED' &&
       this.status.code !== 'PENDING' &&
-      this.status.code !== 'CANCELING'
+      this.status.code !== 'CANCELING' &&
+      this.status.code !== 'RETRY'
     )
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/data-services/models/job.ts` around lines 64 - 71, The canRetry getter
currently allows retry when a job's status.code === 'RETRY', which permits
duplicate retry actions; update the boolean conditions in the canRetry method
(getter) to also return false when this.status.code === 'RETRY' by adding that
check alongside the existing PENDING/CANCELING/CREATED/STARTED checks so retries
are blocked while a job is already in RETRY.
ami/jobs/models.py (1)

339-360: ⚠️ Potential issue | 🟠 Major

Protect the full DB log-write sequence from exceptions.

Line 343 executes refresh_from_db outside the existing try/catch. If that DB call fails, logging can raise into caller code paths and disrupt job execution.

Suggested fix
-        self.job.refresh_from_db(fields=["logs"])
-        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-        msg = f"[{timestamp}] {record.levelname} {self.format(record)}"
-        if msg not in self.job.logs.stdout:
-            self.job.logs.stdout.insert(0, msg)
-
-        # Write a simpler copy of any errors to the errors field
-        if record.levelno >= logging.ERROR:
-            if record.message not in self.job.logs.stderr:
-                self.job.logs.stderr.insert(0, record.message)
-
-        if len(self.job.logs.stdout) > self.max_log_length:
-            self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length]
-
-        # `@TODO` consider saving logs to the database periodically rather than on every log
         try:
+            self.job.refresh_from_db(fields=["logs"])
+            timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+            msg = f"[{timestamp}] {record.levelname} {self.format(record)}"
+            if msg not in self.job.logs.stdout:
+                self.job.logs.stdout.insert(0, msg)
+
+            if record.levelno >= logging.ERROR:
+                if record.message not in self.job.logs.stderr:
+                    self.job.logs.stderr.insert(0, record.message)
+
+            if len(self.job.logs.stdout) > self.max_log_length:
+                self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length]
+
             self.job.save(update_fields=["logs"], update_progress=False)
         except Exception as e:
             logger.error(f"Failed to save logs for job #{self.job.pk}: {e}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/models.py` around lines 339 - 360, The refresh_from_db call can
raise and currently sits outside the try/except around self.job.save, allowing
logging to crash callers; wrap the entire DB-write sequence (including
self.job.refresh_from_db(fields=["logs"]), the stdout/stderr modifications,
trimming, and the self.job.save(update_fields=["logs"], update_progress=False)
call) in a try/except that catches Exception (as the existing save handler does)
so refresh failures are swallowed/logged locally and do not propagate; retain
existing behavior of not re-raising and ensure any logging of the exception uses
the same local error handling path as the save exception handling.
🧹 Nitpick comments (3)
ami/jobs/management/commands/chaos_monkey.py (3)

78-81: Add timeout to individual stream deletions for consistency.

The existing delete_stream method in nats_queue.py wraps the deletion call with asyncio.wait_for(..., timeout=NATS_JETSTREAM_TIMEOUT). Without a timeout here, this loop could hang indefinitely if NATS becomes unresponsive mid-deletion—which is ironic for a chaos testing tool.

⏱️ Proposed fix: Add timeout to each deletion
+import asyncio
+
+NATS_JETSTREAM_TIMEOUT = 5  # or import from nats_queue if accessible
+
         async def _delete_all_streams():
             import nats

             nc = await nats.connect(NATS_URL, connect_timeout=5, allow_reconnect=False)
             js = nc.jetstream()
             try:
                 streams = await js.streams_info()
                 if not streams:
                     return []
                 deleted = []
                 for stream in streams:
                     name = stream.config.name
-                    await js.delete_stream(name)
+                    await asyncio.wait_for(js.delete_stream(name), timeout=NATS_JETSTREAM_TIMEOUT)
                     deleted.append(name)
                 return deleted
             finally:
                 await nc.close()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/management/commands/chaos_monkey.py` around lines 78 - 81, The loop
in chaos_monkey.py calls js.delete_stream(name) without a timeout, so wrap each
individual deletion in asyncio.wait_for(..., timeout=NATS_JETSTREAM_TIMEOUT) to
match the timeout behavior in nats_queue.py; update the for loop that calls
js.delete_stream to await asyncio.wait_for(js.delete_stream(name),
timeout=NATS_JETSTREAM_TIMEOUT) and ensure asyncio and the
NATS_JETSTREAM_TIMEOUT constant are imported/available in the file so individual
stream deletions cannot hang indefinitely.

38-45: Consider adding an else clause for defensive coding.

While argparse constraints currently prevent invalid combinations, adding new actions or services could result in silent no-ops if handle() isn't updated accordingly.

🛡️ Optional: Add defensive else clause
         if action == "flush" and service == "redis":
             self._flush_redis()
         elif action == "flush" and service == "nats":
             self._flush_nats()
+        else:
+            raise CommandError(f"Unsupported action/service combination: {action} {service}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/management/commands/chaos_monkey.py` around lines 38 - 45, The
handle method currently only handles the two valid combos and silently does
nothing for others; add a defensive else branch at the end of handle() that
detects unsupported action/service combinations (when neither the "flush redis"
nor "flush nats" branches match) and fails fast by raising a
django.core.management.base.CommandError or logging a clear error message;
update the handle function (referencing handle, _flush_redis, and _flush_nats)
to validate inputs and raise the CommandError with a message like "Unsupported
action/service combination: {action}/{service}" so future changes don't
accidentally create silent no-ops.

20-20: Consider reading NATS URL from settings instead of hardcoding.

Hardcoding nats://ami_local_nats:4222 limits this command to a specific environment. If the project already has NATS configuration in Django settings (similar to how Redis uses get_redis_connection("default")), consider reusing it for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/management/commands/chaos_monkey.py` at line 20, Replace the
hardcoded NATS_URL in chaos_monkey.py with a value read from Django settings:
import django.conf.settings and use the project's NATS configuration (e.g.,
settings.NATS_URL or settings.NATS.get('URL') — whichever convention your
project uses), falling back to "nats://ami_local_nats:4222" if the setting is
missing; update the NATS_URL symbol assignment accordingly so the command
respects environment-specific configuration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/models.py`:
- Around line 984-989: Move the Celery revoke call to run before running
cleanup_async_job_if_needed to prevent a worker from recreating resources after
cleanup; specifically, obtain the AsyncResult via
run_job.AsyncResult(self.task_id), call task.revoke(terminate=True) (and any
needed task state checks) and self.save() immediately, then call
cleanup_async_job_if_needed(self). Update the logic in the method that currently
calls cleanup_async_job_if_needed(self) first so revoke + save happen first to
eliminate the cancellation race window.

In `@ami/jobs/tasks.py`:
- Around line 190-193: The _fail_job function currently sets job.status =
JobState.FAILURE and saves only ["status","finished_at"], which can leave
job.progress.summary.status out of sync; update job.progress.summary.status =
JobState.FAILURE (or the equivalent enum/value used elsewhere) before saving and
include the progress field in the save call (e.g., add "progress" to
update_fields) so both job.status and progress.summary.status are persisted
together and remain consistent with JobState.FAILURE.

In `@ami/ml/orchestration/nats_queue.py`:
- Around line 98-108: The _stream_exists method calls
self.js.stream_info(stream_name) without timeout protection; wrap that call in
asyncio.wait_for using the same timeout variable used elsewhere (e.g.,
self._nats_timeout) so the JetStream metadata call cannot hang indefinitely,
keeping the existing try/except NotFoundError behavior and returning True/False
as before.

In `@docs/claude/reference/monitoring-async-jobs.md`:
- Around line 167-173: The fenced ASCII lifecycle block (the triple-backtick
block showing "CREATED → PENDING → STARTED → [processing] → SUCCESS ...
FAILURE") is missing a language identifier and triggers markdownlint MD040;
update that code fence to include a language tag (e.g., change the opening ```
to ```text) so the block becomes a fenced-code block with a language identifier
while keeping the ASCII diagram content unchanged.

---

Outside diff comments:
In `@ami/jobs/models.py`:
- Around line 339-360: The refresh_from_db call can raise and currently sits
outside the try/except around self.job.save, allowing logging to crash callers;
wrap the entire DB-write sequence (including
self.job.refresh_from_db(fields=["logs"]), the stdout/stderr modifications,
trimming, and the self.job.save(update_fields=["logs"], update_progress=False)
call) in a try/except that catches Exception (as the existing save handler does)
so refresh failures are swallowed/logged locally and do not propagate; retain
existing behavior of not re-raising and ensure any logging of the exception uses
the same local error handling path as the save exception handling.

In `@ui/src/data-services/models/job.ts`:
- Around line 64-71: The canRetry getter currently allows retry when a job's
status.code === 'RETRY', which permits duplicate retry actions; update the
boolean conditions in the canRetry method (getter) to also return false when
this.status.code === 'RETRY' by adding that check alongside the existing
PENDING/CANCELING/CREATED/STARTED checks so retries are blocked while a job is
already in RETRY.

---

Nitpick comments:
In `@ami/jobs/management/commands/chaos_monkey.py`:
- Around line 78-81: The loop in chaos_monkey.py calls js.delete_stream(name)
without a timeout, so wrap each individual deletion in asyncio.wait_for(...,
timeout=NATS_JETSTREAM_TIMEOUT) to match the timeout behavior in nats_queue.py;
update the for loop that calls js.delete_stream to await
asyncio.wait_for(js.delete_stream(name), timeout=NATS_JETSTREAM_TIMEOUT) and
ensure asyncio and the NATS_JETSTREAM_TIMEOUT constant are imported/available in
the file so individual stream deletions cannot hang indefinitely.
- Around line 38-45: The handle method currently only handles the two valid
combos and silently does nothing for others; add a defensive else branch at the
end of handle() that detects unsupported action/service combinations (when
neither the "flush redis" nor "flush nats" branches match) and fails fast by
raising a django.core.management.base.CommandError or logging a clear error
message; update the handle function (referencing handle, _flush_redis, and
_flush_nats) to validate inputs and raise the CommandError with a message like
"Unsupported action/service combination: {action}/{service}" so future changes
don't accidentally create silent no-ops.
- Line 20: Replace the hardcoded NATS_URL in chaos_monkey.py with a value read
from Django settings: import django.conf.settings and use the project's NATS
configuration (e.g., settings.NATS_URL or settings.NATS.get('URL') — whichever
convention your project uses), falling back to "nats://ami_local_nats:4222" if
the setting is missing; update the NATS_URL symbol assignment accordingly so the
command respects environment-specific configuration.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 12a3c70 and 934db1d.

📒 Files selected for processing (12)
  • .agents/AGENTS.md
  • ami/jobs/management/commands/chaos_monkey.py
  • ami/jobs/management/commands/test_ml_job_e2e.py
  • ami/jobs/models.py
  • ami/jobs/tasks.py
  • ami/jobs/tests.py
  • ami/jobs/views.py
  • ami/ml/orchestration/jobs.py
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_nats_queue.py
  • docs/claude/reference/monitoring-async-jobs.md
  • ui/src/data-services/models/job.ts

@carlosgjs carlosgjs requested a review from mihow February 27, 2026 20:05
@mihow
Copy link
Collaborator

mihow commented Feb 27, 2026

Claude says — responding to CodeRabbit nitpick comments on chaos_monkey.py:

Timeout on stream deletions (L78-81): This is a manual chaos testing tool — if NATS hangs during deletion, the developer will Ctrl+C. Adding asyncio.wait_for wrappers here adds complexity to a deliberately simple utility. Skipping.

Defensive else clause (L38-45): argparse choices=["flush"] and choices=["redis", "nats"] already make an invalid combination impossible at the argument parsing level. An else clause would be unreachable dead code. Skipping.

NATS_URL from settings (L20): Already addressed — will update to read from settings.NATS_URL (same as Copilot's comment).

@mihow
Copy link
Collaborator

mihow commented Feb 27, 2026

Claude says — responding to CodeRabbit "outside diff" comments:

Protect full DB log-write sequence (models.py:339-360): Good catch. The refresh_from_db() at line 343 is unprotected — if the DB is unreachable, this would crash the caller. Will wrap the entire sequence (refresh + mutation + save) in the existing try/except.

Block retry while in RETRY status (job.ts:64-71): Valid. RETRY is an active processing state (the job is currently being re-executed), so allowing another retry click would cause duplicate execution. Will add the check.

mihow and others added 5 commits February 27, 2026 15:49
…afety

- Reorder cancel(): revoke Celery task before cleaning up async resources
  to prevent a theoretical race where a worker recreates state after cleanup
- Remove redundant self.save() after task.revoke() (no fields changed)
- Use update_status() in _fail_job() to keep progress.summary.status in
  sync with job.status
- Wrap entire log handler emit() DB sequence (refresh_from_db + mutations +
  save) in try/except so a DB failure during logging cannot crash callers

Co-Authored-By: Claude <[email protected]>
- Add asyncio.wait_for() wrapper to _stream_exists() stream_info call,
  accidentally dropped during refactor from _ensure_stream
- Read NATS_URL from Django settings in chaos_monkey command instead of
  hardcoding, consistent with TaskQueueManager

Co-Authored-By: Claude <[email protected]>
RETRY is an active processing state; allowing another retry while one
is already running could cause duplicate execution.

Co-Authored-By: Claude <[email protected]>
Add docstring explaining that TimeoutError is deliberately not caught —
an unreachable NATS server should be a hard failure, not a "stream
missing" false negative. Multiple reviewers questioned this behavior.

Co-Authored-By: Claude <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
ami/ml/orchestration/nats_queue.py (1)

120-123: Consider using logger.debug instead of logger.warning for expected stream creation.

When publish_task is called for a new job, the stream won't exist yet—that's normal operation. Logging at WARNING level here may create noise in production logs for every new job.

💡 Suggested change
         if not await self._stream_exists(job_id):
             stream_name = self._get_stream_name(job_id)
             subject = self._get_subject(job_id)
-            logger.warning(f"Stream {stream_name} does not exist")
+            logger.debug(f"Stream {stream_name} does not exist, creating...")
             # Stream doesn't exist, create it
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 120 - 123, The warning log
in publish_task when a stream doesn't exist is expected behavior and should be
changed to a lower-severity message; replace logger.warning(...) in the block
that calls self._stream_exists(job_id) with logger.debug(...) (optionally
include job_id or stream_name via self._get_stream_name(job_id) to keep
context), so use the same check around _stream_exists and the same
stream/subject values from _get_stream_name/_get_subject but log at debug level
instead of warning.
ui/src/data-services/models/job.ts (1)

64-72: Consider centralizing retry-blocked statuses to reduce future drift.

This logic is correct, but a small refactor (constant set/array for blocked statuses) will make future status additions less error-prone.

♻️ Optional refactor
+const NON_RETRYABLE_STATUSES: ServerJobStatusCode[] = [
+  'CREATED',
+  'STARTED',
+  'PENDING',
+  'CANCELING',
+  'RETRY',
+]
+
   get canRetry(): boolean {
     return (
       this._job.user_permissions.includes(UserPermission.Run) &&
-      this.status.code !== 'CREATED' &&
-      this.status.code !== 'STARTED' &&
-      this.status.code !== 'PENDING' &&
-      this.status.code !== 'CANCELING' &&
-      this.status.code !== 'RETRY'
+      !NON_RETRYABLE_STATUSES.includes(this.status.code)
     )
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/data-services/models/job.ts` around lines 64 - 72, Refactor the
canRetry getter to use a centralized constant (e.g., BLOCKED_RETRY_STATUSES or
RETRY_BLOCKLIST) containing the status codes currently checked
('CREATED','STARTED','PENDING','CANCELING','RETRY') and then check membership
against that set; update the logic in canRetry (which references
this._job.user_permissions.includes(UserPermission.Run) and this.status.code) to
use the new constant so future status changes only require editing the
blocklist.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/management/commands/chaos_monkey.py`:
- Around line 65-97: The _flush_nats helper uses nc.jetstream() but calls
JetStreamManager APIs; update the inner async function _delete_all_streams to
use nc.jsm() instead of nc.jetstream() and change all subsequent calls from js.*
to jsm.*, e.g. call jsm.streams_info() and jsm.delete_stream(name) (leave
connection/close logic and the surrounding sync/try/except intact).

---

Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 120-123: The warning log in publish_task when a stream doesn't
exist is expected behavior and should be changed to a lower-severity message;
replace logger.warning(...) in the block that calls self._stream_exists(job_id)
with logger.debug(...) (optionally include job_id or stream_name via
self._get_stream_name(job_id) to keep context), so use the same check around
_stream_exists and the same stream/subject values from
_get_stream_name/_get_subject but log at debug level instead of warning.

In `@ui/src/data-services/models/job.ts`:
- Around line 64-72: Refactor the canRetry getter to use a centralized constant
(e.g., BLOCKED_RETRY_STATUSES or RETRY_BLOCKLIST) containing the status codes
currently checked ('CREATED','STARTED','PENDING','CANCELING','RETRY') and then
check membership against that set; update the logic in canRetry (which
references this._job.user_permissions.includes(UserPermission.Run) and
this.status.code) to use the new constant so future status changes only require
editing the blocklist.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 934db1d and a16fc05.

📒 Files selected for processing (6)
  • ami/jobs/management/commands/chaos_monkey.py
  • ami/jobs/models.py
  • ami/jobs/tasks.py
  • ami/ml/orchestration/nats_queue.py
  • docs/claude/reference/monitoring-async-jobs.md
  • ui/src/data-services/models/job.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • docs/claude/reference/monitoring-async-jobs.md

@mihow
Copy link
Collaborator

mihow commented Feb 28, 2026

Claude says — responding to CodeRabbit's second review round:

nc.jsm() vs nc.jetstream() (Critical): Invalid. JetStreamContext inherits from JetStreamManager — both expose streams_info() and delete_stream(). Verified at runtime. Already replied inline.

logger.warninglogger.debug for stream creation (Nitpick): Reasonable but this is existing code from before the PR, not introduced by these changes. The warning is arguably useful during early adoption to flag when streams are being created unexpectedly. Skipping for now.

Centralize retry-blocked statuses (Nitpick): Over-engineering for 5 status checks in a getter. The inline list is clear and easy to read. Adding a separate constant adds indirection without meaningful benefit. Skipping.

Copy link
Collaborator

@mihow mihow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested before the most recent merge.

@mihow mihow merged commit 05ad870 into RolnickLab:main Feb 28, 2026
9 of 11 checks passed
@mihow
Copy link
Collaborator

mihow commented Mar 3, 2026

Claude says: while troubleshooting a stalled job with @mihow we discovered a failure mode that isn't covered by the fixes in this PR.

What happened: Job 2318 (async_api, 925 images) processed 168 images successfully but the remaining 757 were never completed. The NATS consumer shows all 925 messages were delivered (4381 total delivery attempts, 756 redelivered) and the consumer is now idle with num_pending=0, num_ack_pending=0. The messages exhausted max_deliver=5 and became dead — but Django has zero error logs on the job record, and the job is still sitting in STARTED status.

The gap: When an external worker pulls tasks via GET /jobs/{id}/tasks/ but never posts results back to /result/, NATS redelivers up to max_deliver times and then silently drops the message. Since process_nats_pipeline_result is never called, none of the error handling paths in this PR (or the existing code) fire. Django never learns that the tasks are dead.

Possible fix: A stale consumer check — either as a periodic beat task or inside the /tasks/ endpoint — that detects when the NATS consumer has no pending or in-flight messages but the job still has remaining images, and marks the job as FAILURE.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants