feat: add worker custom trigger type to iii-worker-ops#1683
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughThis PR adds a custom worker trigger system that emits typed lifecycle events for worker operations, integrates it into the daemon, enhances all worker operations with granular stage and failure events, and validates the end-to-end flow with an in-process integration test. It also includes comprehensive documentation for worker and sandbox operations. ChangesWorker Trigger System
Worker and Sandbox Documentation
🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/iii-worker/src/cli/worker_trigger.rs`:
- Around line 341-403: The event_to_request mapping currently hardcodes
source/version/status to None for all WorkerOpEvent branches, which drops
terminal add/update metadata; update event_to_request (the match arms that
construct WorkerCallRequest for WorkerOpEvent::Started, ::Stage, ::PullProgress,
::Done, ::Failed, etc.) to extract and forward the event's source, version, and
status fields (when present on the incoming WorkerOpEvent variants) into
WorkerCallRequest.source, .version, and .status instead of setting them to None;
keep existing conversions (op_from_str, stage_from_str) and the error mapping
for WorkerErrorPayload unchanged.
In `@engine/config.yaml`:
- Around line 196-204: The config currently uses a wildcard in
cors.allowed_origins which is insecure; change cors.allowed_origins to be an
explicit, env-configurable allowlist (e.g., read CORS_ALLOWED_ORIGINS and parse
as CSV) and remove the default '*' so production defaults to an empty or
specific host list (localhost only for dev). Update the loader that reads
cors.allowed_origins and the related consumers (keys: cors.allowed_origins,
cors.allowed_methods) to accept the env override and to only allow wildcard when
an explicit DEV flag or environment (e.g., NODE_ENV=development or
CORS_ALLOW_WILDCARD=true) is set; keep allowed_methods explicit but consider
narrowing defaults if needed. Ensure validation fails or warns if
allowed_origins is empty in production mode so deployers must configure a safe
origin list.
In `@engine/tests/worker_trigger_e2e.rs`:
- Around line 416-433: The fixed 250ms sleep before draining events
(tokio::time::sleep(Duration::from_millis(250)).await) makes the checks flaky;
replace it with bounded timeout-based receives on the subscribers' channels: use
tokio::time::timeout(...) around downloaded_subscriber.rx.recv() to await the
expected event and assert its contents (stage/operation/worker) when
Some(Ok/Value) is returned, and use tokio::time::timeout(...) around
remove_subscriber.rx.recv() and assert it returns Err(elapsed) or None within
the timeout to confirm no removal event arrived; update or remove usages of
drain_immediate to use these timeout recv checks so CI variance won’t cause
flakes.
In `@new_skills/sandbox/skills/sandbox/create.md`:
- Line 97: Replace the CLI-style guidance text that currently says "iii worker
add <image-ref>" with the canonical primitive identifier "worker::add" so the
S101 message uses documented primitive IDs; locate the S101 string ("rootfs
missing on disk — run iii worker add <image-ref> first.") in the create.md
content and update it to read something like "rootfs missing on disk — run
worker::add first." to conform to the primitives documentation.
In `@new_skills/worker/skills/worker/events.md`:
- Around line 194-196: The docs incorrectly label a malformed
WorkerTriggerConfig as W101; remove the W101 mapping in the paragraph
referencing WorkerTriggerConfig and replace it with the correct
outcome/identifier `trigger_registration_failed`, update the example sentence to
state that the trigger registration is rejected with
`trigger_registration_failed` (and nothing is stored), and ensure the text
references the WorkerTriggerConfig primitive exactly as defined in the docs so
SDK consumers see the correct contract.
In `@new_skills/worker/skills/worker/schema.md`:
- Around line 41-42: The inline comment for the "function_id" example
incorrectly calls "worker::add" a "dotted op id"; update the comment to
reference the engine convention and say something like "::-separated function
id" or "`::`-separated function id" so the example matches naming guidance
(e.g., refer to "worker::add" and the :: separator in the comment near the
"function_id" field).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 1fd665f3-45ef-4036-a55a-7deb4841d4a8
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lockengine/iii.lockis excluded by!**/*.lock
📒 Files selected for processing (29)
crates/iii-worker/src/cli/mod.rscrates/iii-worker/src/cli/stderr_sink.rscrates/iii-worker/src/cli/worker_manager_daemon.rscrates/iii-worker/src/cli/worker_trigger.rscrates/iii-worker/src/core/add.rscrates/iii-worker/src/core/clear.rscrates/iii-worker/src/core/events.rscrates/iii-worker/src/core/remove.rscrates/iii-worker/src/core/start.rscrates/iii-worker/src/core/stop.rscrates/iii-worker/src/core/update.rsengine/Cargo.tomlengine/config.yamlengine/tests/worker_trigger_e2e.rsnew_skills/sandbox/index.mdnew_skills/sandbox/skills/sandbox/create.mdnew_skills/sandbox/skills/sandbox/exec.mdnew_skills/sandbox/skills/sandbox/list.mdnew_skills/sandbox/skills/sandbox/stop.mdnew_skills/worker/index.mdnew_skills/worker/skills/worker/add.mdnew_skills/worker/skills/worker/clear.mdnew_skills/worker/skills/worker/events.mdnew_skills/worker/skills/worker/list.mdnew_skills/worker/skills/worker/remove.mdnew_skills/worker/skills/worker/schema.mdnew_skills/worker/skills/worker/start.mdnew_skills/worker/skills/worker/stop.mdnew_skills/worker/skills/worker/update.md
| WorkerOpEvent::Started { op, worker } => Some(WorkerCallRequest { | ||
| operation: op_from_str(op)?, | ||
| worker, | ||
| stage: WorkerStage::Started, | ||
| timestamp_ms, | ||
| source: None, | ||
| version: None, | ||
| status: None, | ||
| caller_mode, | ||
| progress: None, | ||
| error: None, | ||
| }), | ||
| WorkerOpEvent::Stage { op, stage, worker } => Some(WorkerCallRequest { | ||
| operation: op_from_str(op)?, | ||
| worker, | ||
| stage: stage_from_str(stage)?, | ||
| timestamp_ms, | ||
| source: None, | ||
| version: None, | ||
| status: None, | ||
| caller_mode, | ||
| progress: None, | ||
| error: None, | ||
| }), | ||
| WorkerOpEvent::PullProgress { worker, fraction } => Some(WorkerCallRequest { | ||
| operation: WorkerOperation::Add, | ||
| worker, | ||
| stage: WorkerStage::Downloading, | ||
| timestamp_ms, | ||
| source: None, | ||
| version: None, | ||
| status: None, | ||
| caller_mode, | ||
| progress: Some(fraction), | ||
| error: None, | ||
| }), | ||
| WorkerOpEvent::Done { op, worker } => Some(WorkerCallRequest { | ||
| operation: op_from_str(op)?, | ||
| worker, | ||
| stage: WorkerStage::Done, | ||
| timestamp_ms, | ||
| source: None, | ||
| version: None, | ||
| status: None, | ||
| caller_mode, | ||
| progress: None, | ||
| error: None, | ||
| }), | ||
| WorkerOpEvent::Failed { op, worker, error } => Some(WorkerCallRequest { | ||
| operation: op_from_str(op)?, | ||
| worker, | ||
| stage: WorkerStage::Failed, | ||
| timestamp_ms, | ||
| source: None, | ||
| version: None, | ||
| status: None, | ||
| caller_mode, | ||
| progress: None, | ||
| error: Some(WorkerErrorPayload { | ||
| code: "W900".to_string(), | ||
| message: error, | ||
| }), | ||
| }), |
There was a problem hiding this comment.
event_to_request drops documented terminal metadata for add/update flows.
All branches currently hardcode source, version, and status to None. This prevents subscribers from receiving required terminal add/update payload fields (status and version when available), even though WorkerCallRequest models them.
As per coding guidelines, new_skills/worker/skills/worker/add.md and new_skills/worker/skills/worker/events.md require terminal add payloads to surface status and version (when available) under the typed event contract.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/iii-worker/src/cli/worker_trigger.rs` around lines 341 - 403, The
event_to_request mapping currently hardcodes source/version/status to None for
all WorkerOpEvent branches, which drops terminal add/update metadata; update
event_to_request (the match arms that construct WorkerCallRequest for
WorkerOpEvent::Started, ::Stage, ::PullProgress, ::Done, ::Failed, etc.) to
extract and forward the event's source, version, and status fields (when present
on the incoming WorkerOpEvent variants) into WorkerCallRequest.source, .version,
and .status instead of setting them to None; keep existing conversions
(op_from_str, stage_from_str) and the error mapping for WorkerErrorPayload
unchanged.
| /// Terminal failure event. Emitted by orchestrators before | ||
| /// returning an `Err` from the host shim so subscribers see the | ||
| /// lifecycle terminate cleanly alongside the typed error envelope. | ||
| Failed { | ||
| op: &'static str, | ||
| worker: String, | ||
| error: String, | ||
| }, |
There was a problem hiding this comment.
Preserve structured WorkerOpError data in WorkerOpEvent::Failed.
WorkerOpEvent::Failed currently stores only error: String, which forces downstream payload mapping to lose concrete Wxxx codes and emit generic failures. This breaks the typed failure contract expected by subscribers.
As per coding guidelines, new_skills/worker/index.md defines the WorkerOpError envelope with {"type":"WorkerOpError","code":"Wxxx",...} and expected W codes.
| cors: | ||
| allowed_origins: | ||
| - '*' | ||
| allowed_methods: | ||
| - GET | ||
| - POST | ||
| - PUT | ||
| - DELETE | ||
| - OPTIONS |
There was a problem hiding this comment.
Avoid wildcard CORS origin as the default in engine config.
Allowing '*' with broad HTTP methods weakens the default security baseline. Please default to an explicit allowlist (env-configurable), and keep wildcard-only for local/dev overrides.
Suggested hardening
cors:
allowed_origins:
- - '*'
+ - ${III_HTTP_ALLOWED_ORIGIN:http://localhost:3000}
allowed_methods:
- GET
- POST
- PUT
- DELETE
- OPTIONS📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| cors: | |
| allowed_origins: | |
| - '*' | |
| allowed_methods: | |
| - GET | |
| - POST | |
| - PUT | |
| - DELETE | |
| - OPTIONS | |
| cors: | |
| allowed_origins: | |
| - ${III_HTTP_ALLOWED_ORIGIN:http://localhost:3000} | |
| allowed_methods: | |
| - GET | |
| - POST | |
| - PUT | |
| - DELETE | |
| - OPTIONS |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@engine/config.yaml` around lines 196 - 204, The config currently uses a
wildcard in cors.allowed_origins which is insecure; change cors.allowed_origins
to be an explicit, env-configurable allowlist (e.g., read CORS_ALLOWED_ORIGINS
and parse as CSV) and remove the default '*' so production defaults to an empty
or specific host list (localhost only for dev). Update the loader that reads
cors.allowed_origins and the related consumers (keys: cors.allowed_origins,
cors.allowed_methods) to accept the env override and to only allow wildcard when
an explicit DEV flag or environment (e.g., NODE_ENV=development or
CORS_ALLOW_WILDCARD=true) is set; keep allowed_methods explicit but consider
narrowing defaults if needed. Ensure validation fails or warns if
allowed_origins is empty in production mode so deployers must configure a safe
origin list.
| tokio::time::sleep(Duration::from_millis(250)).await; | ||
|
|
||
| let downloaded_events = drain_immediate(&mut downloaded_subscriber.rx); | ||
| assert_eq!( | ||
| downloaded_events.len(), | ||
| 1, | ||
| "stages:[downloaded] subscriber should receive exactly one event, got {n}: {downloaded_events:#?}", | ||
| n = downloaded_events.len() | ||
| ); | ||
| assert_eq!(downloaded_events[0]["stage"], "downloaded"); | ||
| assert_eq!(downloaded_events[0]["operation"], "add"); | ||
| assert_eq!(downloaded_events[0]["worker"], "iii-http"); | ||
|
|
||
| let remove_events = drain_immediate(&mut remove_subscriber.rx); | ||
| assert!( | ||
| remove_events.is_empty(), | ||
| "operations:[remove] subscriber should receive zero events from an `add`, got: {remove_events:#?}" | ||
| ); |
There was a problem hiding this comment.
Replace fixed-delay fan-out checks with timeout-based receives.
The 250ms sleep on Line 416 makes this assertion block timing-sensitive and can intermittently fail on slower CI executors. Use bounded timeout(..., rx.recv()) checks so delivery latency variance doesn’t create flakes.
Suggested direction
- tokio::time::sleep(Duration::from_millis(250)).await;
-
- let downloaded_events = drain_immediate(&mut downloaded_subscriber.rx);
+ let first_downloaded = tokio::time::timeout(
+ Duration::from_secs(2),
+ downloaded_subscriber.rx.recv(),
+ )
+ .await
+ .expect("timed out waiting for downloaded event")
+ .expect("downloaded subscriber channel closed unexpectedly");
+ let mut downloaded_events = vec![first_downloaded];
+ downloaded_events.extend(drain_immediate(&mut downloaded_subscriber.rx));
@@
- let remove_events = drain_immediate(&mut remove_subscriber.rx);
- assert!(
- remove_events.is_empty(),
- "operations:[remove] subscriber should receive zero events from an `add`, got: {remove_events:#?}"
- );
+ let remove_probe = tokio::time::timeout(Duration::from_millis(500), remove_subscriber.rx.recv()).await;
+ assert!(
+ matches!(remove_probe, Err(_) | Ok(None)),
+ "operations:[remove] subscriber should receive zero events from an `add`, got: {remove_probe:?}"
+ );🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@engine/tests/worker_trigger_e2e.rs` around lines 416 - 433, The fixed 250ms
sleep before draining events
(tokio::time::sleep(Duration::from_millis(250)).await) makes the checks flaky;
replace it with bounded timeout-based receives on the subscribers' channels: use
tokio::time::timeout(...) around downloaded_subscriber.rx.recv() to await the
expected event and assert its contents (stage/operation/worker) when
Some(Ok/Value) is returned, and use tokio::time::timeout(...) around
remove_subscriber.rx.recv() and assert it returns Err(elapsed) or None within
the timeout to confirm no removal event arrived; update or remove usages of
drain_immediate to use these timeout recv checks so CI variance won’t cause
flakes.
|
|
||
| - **S001** invalid request (malformed `image`, missing required field). | ||
| - **S100** image not in catalog. Add it to the catalog or use a preset. | ||
| - **S101** rootfs missing on disk — run `iii worker add <image-ref>` first. |
There was a problem hiding this comment.
Use canonical primitive ID instead of CLI syntax in error guidance.
Line 97 should reference worker::add (primitive) rather than iii worker add <image-ref> to stay consistent with documented primitives.
Suggested edit
-- **S101** rootfs missing on disk — run `iii worker add <image-ref>` first.
+- **S101** rootfs missing on disk — run [`worker::add`](iii://worker/add) for that image first.As per coding guidelines: "**/*.md*: Ensure all references to primitives reflect those defined in docs/".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - **S101** rootfs missing on disk — run `iii worker add <image-ref>` first. | |
| - **S101** rootfs missing on disk — run [`worker::add`](iii://worker/add) for that image first. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@new_skills/sandbox/skills/sandbox/create.md` at line 97, Replace the
CLI-style guidance text that currently says "iii worker add <image-ref>" with
the canonical primitive identifier "worker::add" so the S101 message uses
documented primitive IDs; locate the S101 string ("rootfs missing on disk — run
iii worker add <image-ref> first.") in the create.md content and update it to
read something like "rootfs missing on disk — run worker::add first." to conform
to the primitives documentation.
| - **W101** the `WorkerTriggerConfig` payload was malformed (e.g. | ||
| `operations: "add"` instead of `["add"]`). The trigger registration | ||
| is rejected with `trigger_registration_failed`; nothing is stored. |
There was a problem hiding this comment.
Malformed WorkerTriggerConfig should not be documented as W101.
This error path is a trigger-registration failure (trigger_registration_failed), not a WorkerOpError code. Please remove W101 mapping here to avoid contract drift for SDK consumers.
As per coding guidelines: "**/.md: Ensure all references to primitives reflect those defined in docs/".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@new_skills/worker/skills/worker/events.md` around lines 194 - 196, The docs
incorrectly label a malformed WorkerTriggerConfig as W101; remove the W101
mapping in the paragraph referencing WorkerTriggerConfig and replace it with the
correct outcome/identifier `trigger_registration_failed`, update the example
sentence to state that the trigger registration is rejected with
`trigger_registration_failed` (and nothing is stored), and ensure the text
references the WorkerTriggerConfig primitive exactly as defined in the docs so
SDK consumers see the correct contract.
| "function_id": "worker::add", // dotted op id | ||
| "description": "Install a worker from registry name or OCI ref", // one-line description matching the op's how-to title |
There was a problem hiding this comment.
Terminology mismatch: worker::add is not a “dotted” id.
The example value is correct (worker::add), but the inline description should say ::-separated function id to match engine naming conventions.
As per coding guidelines: "Use :: separator for function IDs: orders::validate, reports::daily-summary".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@new_skills/worker/skills/worker/schema.md` around lines 41 - 42, The inline
comment for the "function_id" example incorrectly calls "worker::add" a "dotted
op id"; update the comment to reference the engine convention and say something
like "::-separated function id" or "`::`-separated function id" so the example
matches naming guidance (e.g., refer to "worker::add" and the :: separator in
the comment near the "function_id" field).
Add
workercustom trigger type toiii-worker-opsRegister a new SDK-callable trigger type,
worker, so any worker can subscribe to the lifecycle of everyworker::*op (add,remove,update,start,stop,clear) without pollingworker::list. Subscribers narrow what they receive via filter fields onWorkerTriggerConfig(operations/stages/workers), and the daemon fans typedWorkerCallRequestpayloads out fire-and-forget so a slow subscriber can never stall an op.Summary
workerregistered byiii_worker::cli::worker_manager_daemon.trigger_request_format=WorkerTriggerConfig(subscriber filters),call_request_format=WorkerCallRequest(event payload).core/{add,remove,update,start,stop,clear}.rsnow emitStarted → Stage(verb-ing) → Stage(verb-ed)|Doneon success andStarted → Stage(verb-ing) → Failed { error }on shim error.WorkerOpEvent::Failednew variant so terminal failures flow through the sameEventSinkasDone.IIIEventSinkbridgesWorkerOpEvent→iii.trigger(fn_id, payload, Void)for every matching subscriber. Single dispatcher task per sink (mpsc-serialized) so wire order matches emit order — fixes a race where multi-thread runtimes delivereddonebeforedownloaded.new_skills/worker/skills/worker/events.mdhow-to plus per-op cross-links (add,remove,update,start,stop,clear) and a new "trigger type" section inindex.md.EngineBuilder::serve()+ daemon spawned as a tokio task + three subscribers exercising the filter matrix +iii.trigger("worker::add", iii-http).Filter semantics
WorkerTriggerConfighas three optionalVec<…>fields:operationsadd/remove/update/start/stop/clearstagesstarted/downloading/downloaded/removing/updating/starting/stopping/clearing/done/failedworkersNoneandSome(vec![])are both wildcards.The lead use case — "notify when
iii-httpfinishes downloading":{ "operations": ["add"], "stages": ["downloaded"] } <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes * **New Features** * Added `worker` custom trigger type for subscribing to worker lifecycle events with filtering by operation, stage, and worker name. * Enhanced worker operations with granular lifecycle event emission, including stage progression and failure reporting. * **Documentation** * Added comprehensive documentation for all worker operations and lifecycle events. * Added complete documentation for sandbox operations and usage examples. <!-- review_stack_entry_start --> [](https://app.coderabbit.ai/change-stack/iii-hq/iii/pull/1683?utm_source=github_walkthrough&utm_medium=github&utm_campaign=change_stack) <!-- review_stack_entry_end --> <!-- end of auto-generated comment: release notes by coderabbit.ai -->