-
Notifications
You must be signed in to change notification settings - Fork 12
Description
Summary
When an external ML worker pulls tasks from a NATS-backed async job but fails to post results back, the tasks silently die in NATS after exhausting max_deliver retries. Django is never notified, no error is logged on the job record, and the job remains in STARTED status indefinitely.
Reproduction
- Create an async_api job with N images
- Have a worker pull tasks via
GET /jobs/{id}/tasks/ - Worker fails to process or crashes — never calls
POST /jobs/{id}/result/ - Wait for NATS
ack_wait(30s default) ×max_deliver(5) = ~2.5 minutes per message - All messages become dead in NATS
- Job stays STARTED forever with 0 errors logged
Observed Behavior
- NATS consumer state:
num_pending=0,num_ack_pending=0,num_redelivered=756 - Job progress: 168/925 processed, 757 remaining, 0 failed
- Job logs: empty (no errors, no warnings)
- Job status: STARTED (never transitions to FAILURE)
Root Cause
The result-processing path (process_nats_pipeline_result) is the only place that updates job progress and logs errors. When a worker pulls a task but never posts results, this code path is never invoked. NATS handles retries internally and eventually drops the message — but there is no callback, webhook, or polling mechanism for Django to detect that messages have been permanently dropped.
The _fail_job() function added in #1162 only triggers when Redis state is missing during result processing. It does not cover the case where result processing never happens at all.
Proposed Solution
Add a stale consumer detection mechanism. Two possible approaches:
Option A: Check inside the /tasks/ endpoint
When reserve_tasks() returns an empty list, check the NATS consumer state. If num_pending == 0 and num_ack_pending == 0 but the job still has remaining images (from Redis or the Job progress), mark the job as FAILURE with a descriptive error message.
Pros: Runs naturally as workers poll, no extra infrastructure.
Cons: Requires a worker to keep polling; if all workers stop, the check never runs.
Option B: Periodic Celery beat task
Add a beat task that runs every few minutes, queries all STARTED async_api jobs, checks their NATS consumer state, and fails any job where the consumer is exhausted but progress is incomplete.
Pros: Catches stalled jobs even if no workers are polling. Can also detect jobs where the NATS stream was deleted (e.g., container restart with ephemeral storage).
Cons: Adds a periodic task and requires NATS connectivity from the beat worker.
Option C: Both
Use Option A for fast detection during active polling, and Option B as a safety net.
Additional Context
- Related PR: Support ML async job cancellation, fail jobs on redis errors #1162 (async job cancellation + Redis error handling)
- NATS consumer config:
max_deliver=5,ack_wait=30s(configurable viaNATS_TASK_TTR) - The job's
dispatch_modeisasync_apiandpipelineis set - JetStream storage is ephemeral (
/tmp/nats/jetstream), so a NATS container restart also causes silent data loss — the beat task (Option B) would catch this too
Acceptance Criteria
- A job whose NATS tasks have all been exhausted (dead) is detected and marked as FAILURE
- An error message is logged on the job record explaining that tasks were dropped
- The detection works even if no external workers are actively polling
- Existing tests pass; new test covers the dead-message scenario