Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b60eab0
merge
carlos-irreverentlabs Jan 16, 2026
644927f
Merge remote-tracking branch 'upstream/main'
carlosgjs Jan 22, 2026
218f7aa
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 3, 2026
90da389
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 10, 2026
8618d3c
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 13, 2026
bd1be5f
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 17, 2026
b102ae1
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 19, 2026
bc908aa
fix: PSv2 follow-up fixes from integration tests (#1135)
mihow Feb 21, 2026
4c3802a
PSv2: Improve task fetching & web worker concurrency configuration (#…
carlosgjs Feb 21, 2026
b717e80
fix: include pipeline_slug in MinimalJobSerializer (#1148)
mihow Feb 21, 2026
883c4f8
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 24, 2026
8df89be
Avoid redis based locking by using atomic updates
carlosgjs Feb 24, 2026
e26f3c6
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 24, 2026
1096fd9
Merge branch 'main' into carlosg/redisatomic
carlosgjs Feb 24, 2026
30c8db3
Test concurrency
carlosgjs Feb 25, 2026
deea095
Increase max ack pending
carlosgjs Feb 25, 2026
20c0fbd
update comment
carlosgjs Feb 25, 2026
e84421e
CR feedback
carlosgjs Feb 25, 2026
cbb2d7f
Cancel jobs if Redis state is missing
carlosgjs Feb 25, 2026
3861190
Add chaos monkey
carlosgjs Feb 25, 2026
d591bd6
CR feedback
carlosgjs Feb 25, 2026
4720bb6
CR 2
carlosgjs Feb 26, 2026
f0cd403
fix: OrderedEnum comparisons now override str MRO in subclasses
mihow Feb 26, 2026
e3134a1
fix: correct misleading error log about NATS redelivery
mihow Feb 26, 2026
41b1232
Merge branch 'carlosg/redisatomic' of github.com:uw-ssec/antenna into…
carlosgjs Feb 26, 2026
94e1bbb
Use job.logger
carlosgjs Feb 26, 2026
dcf57fe
Use job.logger
carlosgjs Feb 26, 2026
4a25e54
Integrate cancellation support
carlosgjs Feb 26, 2026
654593b
Merge branch 'carlosg/redisatomic' into carlos/redisfail
carlosgjs Feb 26, 2026
5d38d67
merge, update tests
carlosgjs Feb 26, 2026
ac90c2f
Remove pause support in monkey
carlosgjs Feb 26, 2026
4eb763a
fix: cancel async jobs by cleaning up NATS/Redis and stopping task de…
mihow Feb 27, 2026
8671214
fix(ui): hide Retry button while job is in CANCELING state
mihow Feb 27, 2026
b1146cc
fix: downgrade Redis-missing log to warning for canceled jobs
mihow Feb 27, 2026
dccaceb
docs: add async job monitoring reference
mihow Feb 27, 2026
d63be48
fix: update tests for active_states() guard on /tasks endpoint
mihow Feb 27, 2026
4ef7a24
Merge branch 'RolnickLab:main' into main
mihow Feb 27, 2026
c389e90
Merge remote-tracking branch 'upstream/main'
carlosgjs Feb 27, 2026
33a6425
Merge branch 'main' of github.com:uw-ssec/antenna
carlosgjs Feb 27, 2026
934db1d
Merge branch 'main' into carlos/redisfail
carlosgjs Feb 27, 2026
f4d88ff
fix: improve job cancel ordering, fail status sync, and log handler s…
mihow Feb 27, 2026
20e4ec2
fix: restore timeout on _stream_exists and use settings for NATS_URL
mihow Feb 27, 2026
cf18987
fix(ui): block retry button while job is in RETRY state
mihow Feb 27, 2026
f1bed5e
docs: clarify _stream_exists timeout propagation design
mihow Feb 28, 2026
a16fc05
docs: add language tag to fenced code block in monitoring guide
mihow Feb 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .agents/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,16 @@ images = SourceImage.objects.annotate(det_count=Count('detections'))
- Use `@shared_task` decorator for all tasks
- Check Flower UI for debugging: http://localhost:5555

### E2E Testing & Monitoring Async Jobs

Run an end-to-end ML job test:
```bash
docker compose run --rm django python manage.py test_ml_job_e2e \
--project 18 --dispatch-mode async_api --collection 142 --pipeline "global_moths_2024"
```

For monitoring running jobs (Django ORM, REST API, NATS consumer state, Redis counters, worker logs, etc.), see `docs/claude/reference/monitoring-async-jobs.md`.

### Running a Single Test

```bash
Expand Down
97 changes: 97 additions & 0 deletions ami/jobs/management/commands/chaos_monkey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
Fault injection utility for manual chaos testing of ML async jobs.

Use alongside `test_ml_job_e2e` to verify job behaviour when Redis or NATS
becomes unavailable or loses state mid-processing.

Usage examples:

# Flush all Redis state immediately (simulates FLUSHDB mid-job)
python manage.py chaos_monkey flush redis

# Flush all NATS JetStream streams (simulates broker state loss)
python manage.py chaos_monkey flush nats
"""

from asgiref.sync import async_to_sync
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django_redis import get_redis_connection

NATS_URL = getattr(settings, "NATS_URL", "nats://nats:4222")


class Command(BaseCommand):
help = "Inject faults into Redis or NATS for chaos/resilience testing"

def add_arguments(self, parser):
parser.add_argument(
"action",
choices=["flush"],
help="flush: wipe all state.",
)
parser.add_argument(
"service",
choices=["redis", "nats"],
help="Target service to fault.",
)

def handle(self, *args, **options):
action = options["action"]
service = options["service"]

if action == "flush" and service == "redis":
self._flush_redis()
elif action == "flush" and service == "nats":
self._flush_nats()

# ------------------------------------------------------------------
# Redis
# ------------------------------------------------------------------

def _flush_redis(self):
self.stdout.write("Flushing Redis database (FLUSHDB)...")
try:
redis = get_redis_connection("default")
redis.flushdb()
self.stdout.write(self.style.SUCCESS("Redis flushed."))
except Exception as e:
raise CommandError(f"Failed to flush Redis: {e}") from e

# ------------------------------------------------------------------
# NATS
# ------------------------------------------------------------------

def _flush_nats(self):
"""Delete all JetStream streams via the NATS Python client."""
self.stdout.write("Flushing all NATS JetStream streams...")

async def _delete_all_streams():
import nats

nc = await nats.connect(NATS_URL, connect_timeout=5, allow_reconnect=False)
js = nc.jetstream()
try:
streams = await js.streams_info()
if not streams:
return []
deleted = []
for stream in streams:
name = stream.config.name
await js.delete_stream(name)
deleted.append(name)
return deleted
finally:
await nc.close()

try:
deleted = async_to_sync(_delete_all_streams)()
except Exception as e:
raise CommandError(f"Failed to flush NATS: {e}") from e

if deleted:
for name in deleted:
self.stdout.write(f" Deleted stream: {name}")
self.stdout.write(self.style.SUCCESS(f"Deleted {len(deleted)} stream(s)."))
else:
self.stdout.write("No streams found — NATS already empty.")
6 changes: 5 additions & 1 deletion ami/jobs/management/commands/test_ml_job_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@


class Command(BaseCommand):
help = "Run end-to-end test of ML job processing"
help = (
"Run end-to-end test of ML job processing.\n\n"
"For monitoring and debugging running jobs, see:\n"
" docs/claude/reference/monitoring-async-jobs.md"
)

def add_arguments(self, parser):
parser.add_argument("--project", type=int, required=True, help="Project ID")
Expand Down
59 changes: 40 additions & 19 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from ami.base.models import BaseModel
from ami.base.schemas import ConfigurableStage, ConfigurableStageParam
from ami.jobs.tasks import run_job
from ami.jobs.tasks import cleanup_async_job_if_needed, run_job
from ami.main.models import Deployment, Project, SourceImage, SourceImageCollection
from ami.ml.models import Pipeline
from ami.ml.post_processing.registry import get_postprocessing_task
Expand Down Expand Up @@ -88,6 +88,11 @@ def final_states(cls):
def failed_states(cls):
return [cls.FAILURE, cls.REVOKED, cls.UNKNOWN]

@classmethod
def active_states(cls):
"""States where a job is actively processing and should serve tasks to workers."""
return [cls.STARTED, cls.RETRY]


def get_status_label(status: JobState, progress: float) -> str:
"""
Expand Down Expand Up @@ -331,26 +336,29 @@ def emit(self, record: logging.LogRecord):
# Log to the current app logger
logger.log(record.levelno, self.format(record))

# Write to the logs field on the job instance
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
msg = f"[{timestamp}] {record.levelname} {self.format(record)}"
if msg not in self.job.logs.stdout:
self.job.logs.stdout.insert(0, msg)
# Write to the logs field on the job instance.
# Refresh from DB first to reduce the window for concurrent overwrites — each
# worker holds its own stale in-memory copy of `logs`, so without a refresh the
# last writer always wins and earlier entries are silently dropped.
# @TODO consider saving logs to the database periodically rather than on every log
try:
self.job.refresh_from_db(fields=["logs"])
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
msg = f"[{timestamp}] {record.levelname} {self.format(record)}"
if msg not in self.job.logs.stdout:
self.job.logs.stdout.insert(0, msg)

# Write a simpler copy of any errors to the errors field
if record.levelno >= logging.ERROR:
if record.message not in self.job.logs.stderr:
self.job.logs.stderr.insert(0, record.message)
# Write a simpler copy of any errors to the errors field
if record.levelno >= logging.ERROR:
if record.message not in self.job.logs.stderr:
self.job.logs.stderr.insert(0, record.message)

if len(self.job.logs.stdout) > self.max_log_length:
self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length]
if len(self.job.logs.stdout) > self.max_log_length:
self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length]

# @TODO consider saving logs to the database periodically rather than on every log
try:
self.job.save(update_fields=["logs"], update_progress=False)
except Exception as e:
logger.error(f"Failed to save logs for job #{self.job.pk}: {e}")
pass


@dataclass
Expand Down Expand Up @@ -966,19 +974,28 @@ def retry(self, async_task=True):

def cancel(self):
"""
Terminate the celery task.
Cancel a job. For async_api jobs, clean up NATS/Redis resources
and transition through CANCELING → REVOKED. For other jobs,
revoke the Celery task.
"""
self.status = JobState.CANCELING
self.save()

if self.task_id:
task = run_job.AsyncResult(self.task_id)
if task:
task.revoke(terminate=True)
if self.dispatch_mode == JobDispatchMode.ASYNC_API:
# For async jobs we need to set the status to revoked here since the task already
# finished (it only queues the images).
self.status = JobState.REVOKED
self.save()
else:
self.status = JobState.REVOKED
self.save()

cleanup_async_job_if_needed(self)

def update_status(self, status=None, save=True):
"""
Update the status of the job based on the status of the celery task.
Expand Down Expand Up @@ -1084,11 +1101,15 @@ def get_default_progress(cls) -> JobProgress:
def logger(self) -> logging.Logger:
_logger = logging.getLogger(f"ami.jobs.{self.pk}")

# Only add JobLogHandler if not already present
if not any(isinstance(h, JobLogHandler) for h in _logger.handlers):
# Also log output to a field on thie model instance
# Update or add JobLogHandler, always pointing to the current instance.
# The logger is a process-level singleton so its handler may reference a stale
# job instance from a previous task execution in this worker process.
handler = next((h for h in _logger.handlers if isinstance(h, JobLogHandler)), None)
if handler is None:
logger.info("Adding JobLogHandler to logger for job %s", self.pk)
_logger.addHandler(JobLogHandler(self))
else:
handler.job = self
_logger.propagate = False
return _logger

Expand Down
36 changes: 27 additions & 9 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,9 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub

progress_info = state_manager.update_state(processed_image_ids, stage="process", failed_image_ids=failed_image_ids)
if not progress_info:
logger.error(f"Redis state missing for job {job_id} — job may have been cleaned up prematurely.")
# Acknowledge the task to prevent retries, since we don't know the state
_ack_task_via_nats(reply_subject, logger)
# TODO: cancel the job to fail fast once PR #1144 is merged
_fail_job(job_id, "Redis state missing for job")
return

try:
Expand Down Expand Up @@ -153,8 +152,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
)

if not progress_info:
job.logger.error(f"Redis state missing for job {job_id} — job may have been cleaned up prematurely.")
# TODO: cancel the job to fail fast once PR #1144 is merged
_fail_job(job_id, "Redis state missing for job")
return

# update complete state based on latest progress info after saving results
Expand All @@ -180,6 +178,26 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
job.logger.error(error)


def _fail_job(job_id: int, reason: str) -> None:
from ami.jobs.models import Job, JobState
from ami.ml.orchestration.jobs import cleanup_async_job_resources

try:
with transaction.atomic():
job = Job.objects.select_for_update().get(pk=job_id)
if job.status in (JobState.CANCELING, *JobState.final_states()):
return
job.update_status(JobState.FAILURE, save=False)
job.finished_at = datetime.datetime.now()
job.save(update_fields=["status", "progress", "finished_at"])

job.logger.error(f"Job {job_id} marked as FAILURE: {reason}")
cleanup_async_job_resources(job.pk, job.logger)
except Job.DoesNotExist:
logger.error(f"Cannot fail job {job_id}: not found")
cleanup_async_job_resources(job_id, logger)


def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> None:
try:

Expand Down Expand Up @@ -295,10 +313,10 @@ def _update_job_progress(
# Clean up async resources for completed jobs that use NATS/Redis
if job.progress.is_complete():
job = Job.objects.get(pk=job_id) # Re-fetch outside transaction
_cleanup_job_if_needed(job)
cleanup_async_job_if_needed(job)


def _cleanup_job_if_needed(job) -> None:
def cleanup_async_job_if_needed(job) -> None:
"""
Clean up async resources (NATS/Redis) if this job uses them.

Expand All @@ -314,7 +332,7 @@ def _cleanup_job_if_needed(job) -> None:
# import here to avoid circular imports
from ami.ml.orchestration.jobs import cleanup_async_job_resources

cleanup_async_job_resources(job)
cleanup_async_job_resources(job.pk, job.logger)


@task_prerun.connect(sender=run_job)
Expand Down Expand Up @@ -353,7 +371,7 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs):

# Clean up async resources for revoked jobs
if state == JobState.REVOKED:
_cleanup_job_if_needed(job)
cleanup_async_job_if_needed(job)


@task_failure.connect(sender=run_job, retry=False)
Expand All @@ -368,7 +386,7 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs):
job.save()

# Clean up async resources for failed jobs
_cleanup_job_if_needed(job)
cleanup_async_job_if_needed(job)


def log_time(start: float = 0, msg: str | None = None) -> tuple[float, Callable]:
Expand Down
4 changes: 3 additions & 1 deletion ami/jobs/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,8 @@ def _task_batch_helper(self, value: Any, expected_status: int):
pipeline = self._create_pipeline()
job = self._create_ml_job("Job for batch test", pipeline)
job.dispatch_mode = JobDispatchMode.ASYNC_API
job.save(update_fields=["dispatch_mode"])
job.status = JobState.STARTED
job.save(update_fields=["dispatch_mode", "status"])
images = [
SourceImage.objects.create(
path=f"image_{i}.jpg",
Expand Down Expand Up @@ -487,6 +488,7 @@ def test_tasks_endpoint_without_pipeline(self):
name="Job without pipeline",
source_image_collection=self.source_image_collection,
dispatch_mode=JobDispatchMode.ASYNC_API,
status=JobState.STARTED,
)

self.client.force_authenticate(user=self.user)
Expand Down
4 changes: 2 additions & 2 deletions ami/jobs/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ def tasks(self, request, pk=None):
if job.dispatch_mode != JobDispatchMode.ASYNC_API:
raise ValidationError("Only async_api jobs have fetchable tasks")

# Don't fetch tasks from completed/failed/revoked jobs
if job.status in JobState.final_states():
# Only serve tasks for actively processing jobs
if job.status not in JobState.active_states():
return Response({"tasks": []})

# Validate that the job has a pipeline
Expand Down
Loading