feat: event listeners on Worker, Queue, and Job#147
Merged
Conversation
Adds an EventEmitter-style listener surface across the Node shim:
- Worker emits ready / active / completed / failed / error / closing /
closed / drained / paused / resumed. The drained subscriber is
lazily wired the first time someone calls worker.on('drained', ...)
and torn down on close(), so workers that never subscribe pay no
extra Redis connections.
- QueueEvents now fans broadcast events onto per-id channels
(active:<jobId>, completed:<jobId>, failed:<jobId>) so targeted
subscribers (like Job.waitUntilFinished) avoid the O(N-listeners)
broadcast dispatch cost.
- Job.waitUntilFinished(queueEvents, ttl?) — event-driven completion
wait that subscribes to the per-id channels and resolves with the
stored result (when storeResults: true) or undefined, or rejects
with the engine failedReason. WaitUntilFinishedTimeoutError is a
new public error type.
- progress / stalled listener names are accepted but currently no-op
(engine doesn't emit those transitions yet). Listed in the docstring
so subscriber code keeps type-checking.
Includes an integration test suite gated on REDIS_URL covering
drained, paused/resumed, per-id channels, waitUntilFinished
(completed / undefined / failed / timeout / queue mismatch).
Mirrors the Node shim's listener surface in Python:
- Worker emits ready / active / completed / failed / error / closing /
closed / drained / paused / resumed. Sync and async def callbacks
both work — async callbacks are scheduled on the running loop. The
drained subscriber is lazily wired on the first on('drained', ...)
call and torn down on close().
- QueueEvents gains an EventEmitter-style listener API alongside the
existing async iterator (the two are mutually exclusive — they
share the Redis connection but only one XREAD consumer at a time).
Broadcast events plus per-id channels (active:<jobId> /
completed:<jobId> / failed:<jobId>). wait_until_ready() lets
callers deterministically gate on the subscriber's first XREAD
BLOCK landing.
- Job.wait_until_finished(queue_events, timeout=...) — event-driven
completion wait that returns the stored result (when
store_results=True) or None, or raises with the engine
failedReason. WaitUntilFinishedTimeoutError is a new public error.
- progress / stalled listener names are accepted but currently no-op
(engine doesn't emit those transitions yet).
Includes an integration test suite gated on REDIS_URL covering
drained, paused/resumed, per-id channels, async/sync callbacks,
wait_until_finished (completed / None / failed / timeout /
queue mismatch), and CancelledError handling.
User-visible documentation for the event listener slice:
- README: adds event listeners and waitUntilFinished to the feature
comparison table.
- Node + Python shim READMEs: new "Subscribing to events" and
"Awaiting a single job's completion" sections, mirrored across
shims (same headings, language-specific examples).
- Starlight site:
- new concepts/events-and-listeners.md explaining the two layers
(in-process Worker EventEmitter vs cross-process QueueEvents),
per-id channels, the return-value choice, the two completion-
wait helpers, the lazy-subscriber lifecycle, and the lost-event
race.
- reference/node-api.md and reference/python-api.md gain the new
Worker event table, the QueueEvents listener API, per-id
channels, and waitUntilFinished / wait_until_finished.
- registered in astro.config.mjs sidebar under Concepts.
- docs/history.md: slice entry covering the engine surface and the
cross-shim API.
e076435 to
6c950f4
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Close the BullMQ-parity gap for event-driven consumption. The engine already emits a cross-process events stream covering
waiting / active / completed / failed / retry-scheduled / delayed / dlq / drained; this slice finishes the high-level shim surfaces on top so application code mirrors BullMQ exactly. Zero engine changes (emptygit diff main -- chasquimq/src/).Three new user-visible surfaces, mirrored across both shims:
worker.on('drained' | 'paused' | 'resumed' | ...)on both shims. Node adds the three new events to the existingEventEmitter; Python gains a full.on()/.off()/.once()listener API alongside the existing native handler.'drained'lazily spawns an embeddedQueueEventssubscriber on the first listener attach (zero-cost when unused);'paused'/'resumed'fire from the localpause()/resume()methods.'progress'/'stalled'documented as accepted-but-no-op for parity (engine doesn't emit those transitions yet).active:<jobId>/completed:<jobId>/failed:<jobId>) emitted alongside the existing broadcast channels so targeted subscribers can listen on a single job without paying the O(N-listeners) broadcast dispatch cost. Python gains the.on()/.off()/.once()listener API alongside the existing async-iterator; sync and async callbacks both supported.Job.waitUntilFinished(queueEvents, ttl?)(Node) andjob.wait_until_finished(queue_events, *, timeout=None)(Python). Event-driven (no polling), resolves with the stored handler result viaQueue.getJobResultwhenstoreResults=true, rejects withnew Error(failedReason)onfailed, throws newWaitUntilFinishedTimeoutErroronttlelapse.Design notes
completedevents don't carry the return value. BullMQ'sJob.waitUntilFinishedexposes the value because BullMQ JSON-encodes it onto the events stream. Re-implementing that on ChasquiMQ would either regress the msgpack wire-format invariant or force every subscriber to depend on msgpack. Composingevent + Queue.getJobResultkeeps the events stream small (1 MB result × 50 dashboards = 50 MB/job otherwise), keeps subscribers msgpack-free, and works without the result backend for "just detect completion" cases. Documented as a deliberate cross-shim contract.XREAD BLOCKlands are missed. Mitigations: NodeWorker.run()awaits a ready promise on the embedded drained subscriber before kicking the engine; PythonQueueEventsexposesawait events.wait_until_ready()so callers can gate on subscriber-is-listening.completedevents before the per-entryJOB_OK_SCRIPTwrites the result key (events emit lives off the ack hot path).waitUntilFinishedhandles this with a short retry loop (10× 50ms) on thegetJobResultfetch; falls through toundefinedfor workers running withoutstoreResults=true.'drained'open zero extra Redis connections. The embedded subscriber usesblockingTimeout: 1000(vs the QueueEvents 10s default) for snappy shutdown.Workerdoes not fire'failed'or'error'onasyncio.CancelledError— cancellation is a control-flow signal (test teardown, shutdown), not a handler failure. Matches BullMQ.Test plan
cargo test --workspace -- --include-ignoredagainst live Redis 8.6.2 → 356 passed (38 suites).npm test(vitest) → 194 passed | 1 skipped | 1 todo (185 before + 9 new in__test__/event-listeners.test.ts).pytest→ 160 passed (149 before + 11 new intests/test_event_listeners.py).cargo fmt --all -- --checkclean;cargo clippy --all-targets --workspace -- -D warningsclean (only pre-existing non-root profile warnings).queue-add-bulk185.7k → 186.9k (+0.7%, flat);worker-concurrentretried run shows branch 116.8k vs main 109.8k (+6.3%, contention-driven). Per CLAUDE.md host-load gate, an empty engine diff defends any smallworker-concurrentmovement against host noise.waitUntilFinished; cancellation-vs-failure distinction; sync.on()outside coroutine surfaces clearRuntimeError; lazy subscriber error-forwarding skips close-time races.Docs updated symmetrically on both shim READMEs (
Subscribing to events+Awaiting a single job's completion), Node + Python API reference, newconcepts/events-and-listeners.mdregistered insite/astro.config.mjs, root README feature table, anddocs/history.md.