diff --git a/Cargo.lock b/Cargo.lock index 17b5320e9..0df790a33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2789,6 +2789,7 @@ dependencies = [ "tokio", "tokio-tungstenite 0.28.0", "tracing", + "tracing-test", "uuid", ] @@ -6487,6 +6488,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a4c448db514d4f24c5ddb9f73f2ee71bfb24c526cf0c570ba142d1119e0051" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad06847b7afb65c7866a36664b75c40b895e318cea4f71299f013fb22965329d" +dependencies = [ + "quote", + "syn 2.0.117", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/docs/understanding-iii/triggers.mdx b/docs/understanding-iii/triggers.mdx index 7bc2d968a..d19564246 100644 --- a/docs/understanding-iii/triggers.mdx +++ b/docs/understanding-iii/triggers.mdx @@ -82,3 +82,34 @@ Trigger fires, the Engine invokes the condition function with the same payload t receive. If the condition returns a truthy value, the handler runs; if not, the invocation is skipped. Use this when the same event source should sometimes fire the Function and sometimes not, without splitting the Trigger into separate event-source registrations. + +## Registration errors + +When the Engine cannot register a Trigger — most commonly because the Trigger type's Worker is not +active in the project — it sends a `TriggerRegistrationResult` with an `error` body back to the +Worker that initiated the request. The SDK logs the error at `ERROR` level so the user sees it +during development. + +For built-in Trigger types — `http`, `cron`, `subscribe`, `state`, `durable:subscriber`, `stream`, +`stream:join`, `stream:leave`, and `log` — the error message includes the install command for the +missing Worker (the exact name reported matches what the Worker registers, e.g. `stream:join` +rather than the generic `stream`). For unknown Trigger types that are not built-in, the error +message points to the workers directory at . + +Example log line when `iii-http` is not active: + +``` +[iii] Trigger registration failed for "1c1b…" (http): Trigger type "http" not found — worker iii-http is missing. Run: iii worker add iii-http +``` + +Example log line for an unknown non-built-in Trigger type: + +``` +[iii] Trigger registration failed for "1c1b…" (my-custom-trigger): Trigger type "my-custom-trigger" not found. Search for a worker that provides this trigger type at https://workers.iii.dev/ +``` + +Logging targets per SDK: + +- **Node** — `console.error` +- **Python** — the `iii` logger, level `ERROR` +- **Rust** — `tracing::error!` diff --git a/engine/src/engine/mod.rs b/engine/src/engine/mod.rs index c562ea451..434b055c0 100644 --- a/engine/src/engine/mod.rs +++ b/engine/src/engine/mod.rs @@ -606,6 +606,69 @@ impl Engine { error, } => { tracing::debug!(id = %id, trigger_type = %trigger_type, function_id = %function_id, error = ?error, "TriggerRegistrationResult"); + + let Some(trigger_entry) = self.trigger_registry.triggers.get(id) else { + tracing::debug!( + trigger_id = %id, + "TriggerRegistrationResult for unknown trigger; ignoring" + ); + return Ok(()); + }; + let stored_trigger_type = trigger_entry.trigger_type.clone(); + let stored_function_id = trigger_entry.function_id.clone(); + let originator_id = trigger_entry.worker_id; + drop(trigger_entry); + + // Only the registrator worker that owns this trigger_type may + // report its result. Otherwise any connected worker could spoof + // a failure for somebody else's trigger and tear it out of the + // registry. + let registrator_worker_id = self + .trigger_registry + .trigger_types + .get(&stored_trigger_type) + .and_then(|tt| tt.worker_id); + if registrator_worker_id != Some(worker.id) { + tracing::warn!( + trigger_id = %id, + trigger_type = %stored_trigger_type, + sender = %worker.id, + registrator = ?registrator_worker_id, + "TriggerRegistrationResult from non-registrator worker; ignoring" + ); + return Ok(()); + } + + if error.is_none() { + return Ok(()); + } + + self.trigger_registry.triggers.remove(id); + + let Some(originator_id) = originator_id else { + tracing::debug!( + trigger_id = %id, + "TriggerRegistrationResult for trigger without originator; ignoring" + ); + return Ok(()); + }; + + let Some(originator) = self.worker_registry.get_worker(&originator_id) else { + tracing::debug!( + trigger_id = %id, + originator = %originator_id, + "TriggerRegistrationResult originator no longer connected; dropping" + ); + return Ok(()); + }; + + let forward = Message::TriggerRegistrationResult { + id: id.clone(), + trigger_type: stored_trigger_type, + function_id: stored_function_id, + error: error.clone(), + }; + let _ = self.send_msg(&originator, forward).await; Ok(()) } Message::RegisterTriggerType { @@ -770,18 +833,46 @@ impl Engine { reg_function_id = format!("{prefix}::{reg_function_id}"); } - let _ = self + match self .trigger_registry .register_trigger(Trigger { - id: reg_trigger_id, - trigger_type: reg_trigger_type, - function_id: reg_function_id, + id: reg_trigger_id.clone(), + trigger_type: reg_trigger_type.clone(), + function_id: reg_function_id.clone(), config: reg_config, worker_id: Some(worker.id), metadata: metadata.clone(), }) - .await; - crate::workers::telemetry::collector::track_trigger_registered(); + .await + { + Ok(()) => { + crate::workers::telemetry::collector::track_trigger_registered(); + } + Err(err) => { + let error_body = match &err { + crate::trigger::RegisterTriggerError::UnknownBuiltin { .. } + | crate::trigger::RegisterTriggerError::Unknown { .. } => { + crate::protocol::ErrorBody::new( + "trigger_type_not_found", + err.to_string(), + ) + } + crate::trigger::RegisterTriggerError::Other(_) => { + crate::protocol::ErrorBody::new( + "trigger_registration_failed", + err.to_string(), + ) + } + }; + let result_msg = Message::TriggerRegistrationResult { + id: reg_trigger_id, + trigger_type: reg_trigger_type, + function_id: reg_function_id, + error: Some(error_body), + }; + let _ = self.send_msg(worker, result_msg).await; + } + } Ok(()) } @@ -3220,55 +3311,287 @@ mod tests { ); } + fn insert_trigger_type_for(engine: &Engine, type_id: &str, registrator: &WorkerConnection) { + engine.trigger_registry.trigger_types.insert( + type_id.to_string(), + crate::trigger::TriggerType::new( + type_id, + "test trigger type", + Box::new(registrator.clone()), + Some(registrator.id), + ), + ); + } + #[tokio::test] - async fn test_router_msg_trigger_registration_result_is_noop() { + async fn test_trigger_registration_result_forwards_error_to_originator() { ensure_default_meter(); let engine = Engine::new(); - let (tx, mut rx) = mpsc::channel::(8); - let worker = WorkerConnection::new(tx); + + let (user_tx, mut user_rx) = mpsc::channel::(8); + let user = WorkerConnection::new(user_tx); + engine.worker_registry.register_worker(user.clone()); + + let (registrator_tx, _registrator_rx) = mpsc::channel::(8); + let registrator = WorkerConnection::new(registrator_tx); + + insert_trigger_type_for(&engine, "http", ®istrator); + + engine.trigger_registry.triggers.insert( + "trig-1".to_string(), + crate::trigger::Trigger { + id: "trig-1".to_string(), + trigger_type: "http".to_string(), + function_id: "fn-1".to_string(), + config: serde_json::json!({}), + worker_id: Some(user.id), + metadata: None, + }, + ); + + let msg = Message::TriggerRegistrationResult { + id: "trig-1".to_string(), + trigger_type: "http".to_string(), + function_id: "fn-1".to_string(), + error: Some(crate::protocol::ErrorBody::new( + "invalid_config", + "api_path is required", + )), + }; + + engine + .router_msg(®istrator, &msg) + .await + .expect("router_msg should succeed"); + + let outbound = user_rx + .try_recv() + .expect("originator should receive forwarded TriggerRegistrationResult"); + let Outbound::Protocol(Message::TriggerRegistrationResult { + id, + trigger_type, + function_id, + error, + }) = outbound + else { + panic!("expected TriggerRegistrationResult, got {:?}", outbound); + }; + assert_eq!(id, "trig-1"); + assert_eq!(trigger_type, "http"); + assert_eq!(function_id, "fn-1"); + let err = error.expect("error should be populated"); + assert_eq!(err.code, "invalid_config"); + assert_eq!(err.message, "api_path is required"); + + assert!( + engine.trigger_registry.triggers.get("trig-1").is_none(), + "failed trigger should be removed from registry" + ); + } + + #[tokio::test] + async fn test_trigger_registration_result_success_does_not_forward_or_remove() { + ensure_default_meter(); + let engine = Engine::new(); + + let (user_tx, mut user_rx) = mpsc::channel::(8); + let user = WorkerConnection::new(user_tx); + engine.worker_registry.register_worker(user.clone()); + + let (registrator_tx, _registrator_rx) = mpsc::channel::(8); + let registrator = WorkerConnection::new(registrator_tx); + + insert_trigger_type_for(&engine, "http", ®istrator); + + engine.trigger_registry.triggers.insert( + "trig-2".to_string(), + crate::trigger::Trigger { + id: "trig-2".to_string(), + trigger_type: "http".to_string(), + function_id: "fn-2".to_string(), + config: serde_json::json!({}), + worker_id: Some(user.id), + metadata: None, + }, + ); let msg = Message::TriggerRegistrationResult { - id: "trigger-1".to_string(), - trigger_type: "my-type".to_string(), - function_id: "my-func".to_string(), + id: "trig-2".to_string(), + trigger_type: "http".to_string(), + function_id: "fn-2".to_string(), error: None, }; engine - .router_msg(&worker, &msg) + .router_msg(®istrator, &msg) .await - .expect("TriggerRegistrationResult should succeed"); + .expect("router_msg should succeed"); - // Should not produce any response assert!( - rx.try_recv().is_err(), - "TriggerRegistrationResult should not produce any outbound message" + user_rx.try_recv().is_err(), + "success result should not be forwarded" + ); + + assert!( + engine.trigger_registry.triggers.get("trig-2").is_some(), + "successful trigger should remain in registry" ); } #[tokio::test] - async fn test_router_msg_trigger_registration_result_with_error() { + async fn test_trigger_registration_result_unknown_trigger_id_is_noop() { ensure_default_meter(); let engine = Engine::new(); - let (tx, _rx) = mpsc::channel::(8); - let worker = WorkerConnection::new(tx); + + let (registrator_tx, _registrator_rx) = mpsc::channel::(8); + let registrator = WorkerConnection::new(registrator_tx); let msg = Message::TriggerRegistrationResult { - id: "trigger-1".to_string(), - trigger_type: "my-type".to_string(), - function_id: "my-func".to_string(), - error: Some(crate::protocol::ErrorBody { - code: "registration_failed".to_string(), - message: "registration failed".to_string(), - stacktrace: None, - }), + id: "ghost".to_string(), + trigger_type: "http".to_string(), + function_id: "fn-x".to_string(), + error: Some(crate::protocol::ErrorBody::new("x", "y")), + }; + + engine + .router_msg(®istrator, &msg) + .await + .expect("router_msg should succeed even when the trigger is unknown"); + } + + #[tokio::test] + async fn test_trigger_registration_result_from_non_registrator_is_ignored() { + ensure_default_meter(); + let engine = Engine::new(); + + let (user_tx, mut user_rx) = mpsc::channel::(8); + let user = WorkerConnection::new(user_tx); + engine.worker_registry.register_worker(user.clone()); + + // Registered registrator for "http". + let (registrator_tx, _registrator_rx) = mpsc::channel::(8); + let registrator = WorkerConnection::new(registrator_tx); + insert_trigger_type_for(&engine, "http", ®istrator); + + engine.trigger_registry.triggers.insert( + "trig-3".to_string(), + crate::trigger::Trigger { + id: "trig-3".to_string(), + trigger_type: "http".to_string(), + function_id: "fn-3".to_string(), + config: serde_json::json!({}), + worker_id: Some(user.id), + metadata: None, + }, + ); + + // Some OTHER worker tries to report a failure for trig-3. + let (spoofer_tx, _spoofer_rx) = mpsc::channel::(8); + let spoofer = WorkerConnection::new(spoofer_tx); + + let msg = Message::TriggerRegistrationResult { + id: "trig-3".to_string(), + trigger_type: "http".to_string(), + function_id: "fn-3".to_string(), + error: Some(crate::protocol::ErrorBody::new("spoofed", "boom")), + }; + + engine + .router_msg(&spoofer, &msg) + .await + .expect("router_msg should succeed"); + + assert!( + user_rx.try_recv().is_err(), + "non-registrator result must not be forwarded" + ); + assert!( + engine.trigger_registry.triggers.get("trig-3").is_some(), + "non-registrator result must not remove the trigger" + ); + } + + #[tokio::test] + async fn test_register_trigger_unknown_builtin_sends_install_hint() { + ensure_default_meter(); + let engine = Engine::new(); + let (tx, mut rx) = mpsc::channel::(8); + let worker = WorkerConnection::new(tx); + + let msg = Message::RegisterTrigger { + id: "trig-1".to_string(), + trigger_type: "http".to_string(), + function_id: "fn-1".to_string(), + config: serde_json::json!({}), + metadata: None, + }; + + engine + .router_msg(&worker, &msg) + .await + .expect("RegisterTrigger should succeed at protocol level"); + + let outbound = rx + .try_recv() + .expect("engine should emit TriggerRegistrationResult on failure"); + let Outbound::Protocol(Message::TriggerRegistrationResult { + id, + trigger_type, + function_id, + error, + }) = outbound + else { + panic!("expected TriggerRegistrationResult, got {:?}", outbound); + }; + assert_eq!(id, "trig-1"); + assert_eq!(trigger_type, "http"); + assert_eq!(function_id, "fn-1"); + let err = error.expect("error should be populated"); + assert_eq!(err.code, "trigger_type_not_found"); + assert!(err.message.contains("iii-http"), "msg: {}", err.message); + assert!( + err.message.contains("iii worker add"), + "msg: {}", + err.message + ); + } + + #[tokio::test] + async fn test_register_trigger_unknown_type_recommends_workers_directory() { + ensure_default_meter(); + let engine = Engine::new(); + let (tx, mut rx) = mpsc::channel::(8); + let worker = WorkerConnection::new(tx); + + let msg = Message::RegisterTrigger { + id: "trig-2".to_string(), + trigger_type: "totally-made-up".to_string(), + function_id: "fn-2".to_string(), + config: serde_json::json!({}), + metadata: None, }; - // Should still succeed (just logs the error) engine .router_msg(&worker, &msg) .await - .expect("TriggerRegistrationResult with error should succeed"); + .expect("RegisterTrigger should succeed at protocol level"); + + let outbound = rx.try_recv().expect("engine should emit a result"); + let Outbound::Protocol(Message::TriggerRegistrationResult { error, .. }) = outbound else { + panic!("expected TriggerRegistrationResult"); + }; + let err = error.expect("error should be populated"); + assert_eq!(err.code, "trigger_type_not_found"); + assert!( + err.message.contains("totally-made-up"), + "msg should name the missing type: {}", + err.message + ); + assert!( + err.message.contains("https://workers.iii.dev/"), + "msg should recommend the workers directory: {}", + err.message + ); } // ========================================================================= diff --git a/engine/src/trigger.rs b/engine/src/trigger.rs index ac5c5c882..5e54c9f91 100644 --- a/engine/src/trigger.rs +++ b/engine/src/trigger.rs @@ -32,6 +32,23 @@ fn worker_name_for_trigger_type(trigger_type_id: &str) -> Option<&'static str> { .map(|(_, worker)| *worker) } +#[derive(Debug, thiserror::Error)] +pub enum RegisterTriggerError { + #[error( + "Trigger type \"{trigger_type}\" not found — worker {worker} is missing. Run: iii worker add {worker}" + )] + UnknownBuiltin { + trigger_type: String, + worker: &'static str, + }, + #[error( + "Trigger type \"{trigger_type}\" not found. Search for a worker that provides this trigger type at https://workers.iii.dev/" + )] + Unknown { trigger_type: String }, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + pub struct TriggerType { pub id: String, pub _description: String, @@ -231,7 +248,7 @@ impl TriggerRegistry { Ok(()) } - pub async fn register_trigger(&self, trigger: Trigger) -> Result<(), anyhow::Error> { + pub async fn register_trigger(&self, trigger: Trigger) -> Result<(), RegisterTriggerError> { let trigger_type_id = trigger.trigger_type.clone(); let Some(trigger_type) = self.trigger_types.get(&trigger_type_id) else { if let Some(worker_name) = worker_name_for_trigger_type(&trigger_type_id) { @@ -241,28 +258,29 @@ impl TriggerRegistry { worker_name.cyan().bold(), format!("iii worker add {}", worker_name).green().bold() ); - return Err(anyhow::anyhow!( - "Trigger type \"{}\" not found — worker {} is missing. Run: iii worker add {}", - trigger_type_id, - worker_name, - worker_name - )); + return Err(RegisterTriggerError::UnknownBuiltin { + trigger_type: trigger_type_id, + worker: worker_name, + }); } - tracing::error!("Trigger type {} not found", trigger_type_id.purple()); - return Err(anyhow::anyhow!("Trigger type not found")); + tracing::error!( + "Trigger type {} not found. Search for a worker that provides this trigger type at {}", + trigger_type_id.purple().bold(), + "https://workers.iii.dev/".cyan().bold() + ); + return Err(RegisterTriggerError::Unknown { + trigger_type: trigger_type_id, + }); }; - match trigger_type + if let Err(err) = trigger_type .registrator .register_trigger(trigger.clone()) .await { - Ok(_) => {} - Err(err) => { - tracing::error!(error = %err, "Error registering trigger"); - return Err(err); - } + tracing::error!(error = %err, "Error registering trigger"); + return Err(RegisterTriggerError::Other(err)); } drop(trigger_type); @@ -475,7 +493,15 @@ mod tests { let trigger = make_trigger("t1", "nonexistent"); let result = registry.register_trigger(trigger).await; assert!(result.is_err()); - assert_eq!(result.unwrap_err().to_string(), "Trigger type not found"); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("\"nonexistent\" not found"), + "Expected unknown-type message, got: {err_msg}" + ); + assert!( + err_msg.contains("https://workers.iii.dev/"), + "Expected workers directory recommendation, got: {err_msg}" + ); assert!(registry.triggers.is_empty()); } diff --git a/sdk/packages/node/iii/src/iii-types.ts b/sdk/packages/node/iii/src/iii-types.ts index 0998bf6e4..59e1d22d1 100644 --- a/sdk/packages/node/iii/src/iii-types.ts +++ b/sdk/packages/node/iii/src/iii-types.ts @@ -28,13 +28,18 @@ export type UnregisterTriggerMessage = { type?: string } +export type ErrorBody = { + code: string + message: string + stacktrace?: string +} + export type TriggerRegistrationResultMessage = { message_type: MessageType.TriggerRegistrationResult id: string type: string function_id: string - result?: unknown - error?: unknown + error?: ErrorBody } export type RegisterTriggerMessage = { diff --git a/sdk/packages/node/iii/src/iii.ts b/sdk/packages/node/iii/src/iii.ts index 912c2ae39..8cf6ea569 100644 --- a/sdk/packages/node/iii/src/iii.ts +++ b/sdk/packages/node/iii/src/iii.ts @@ -970,6 +970,16 @@ class Sdk implements ISdk { } } + private onTriggerRegistrationResult( + message: { id: string; trigger_type?: string; type?: string; function_id: string; error?: { code: string; message: string; stacktrace?: string } }, + ): void { + if (!message.error) return + const triggerType = message.trigger_type ?? message.type ?? '' + console.error( + `[iii] Trigger registration failed for "${message.id}" (${triggerType}): ${message.error.message}`, + ) + } + private onMessage(socketMessage: Data): void { let msgType: MessageType let message: Record @@ -992,6 +1002,10 @@ class Sdk implements ISdk { this.onInvokeFunction(invocation_id, function_id, data, traceparent, baggage) } else if (msgType === MessageType.RegisterTrigger) { this.onRegisterTrigger(message as { trigger_type: string; id: string; function_id: string; config: unknown; metadata?: Record }) + } else if (msgType === MessageType.TriggerRegistrationResult) { + this.onTriggerRegistrationResult( + message as { id: string; trigger_type?: string; type?: string; function_id: string; error?: { code: string; message: string; stacktrace?: string } }, + ) } else if (msgType === MessageType.WorkerRegistered) { const { worker_id } = message as WorkerRegisteredMessage this.workerId = worker_id diff --git a/sdk/packages/node/iii/tests/trigger-registration-error.test.ts b/sdk/packages/node/iii/tests/trigger-registration-error.test.ts new file mode 100644 index 000000000..36655859c --- /dev/null +++ b/sdk/packages/node/iii/tests/trigger-registration-error.test.ts @@ -0,0 +1,81 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' +import { WebSocketServer, type WebSocket } from 'ws' +import { registerWorker } from '../src/iii' +import type { ISdk } from '../src/types' + +describe('trigger registration error surfacing', () => { + let wss: WebSocketServer + let url: string + let sdk: ISdk | undefined + let serverSocket: WebSocket | undefined + + beforeEach(async () => { + wss = new WebSocketServer({ port: 0 }) + await new Promise((resolve) => wss.once('listening', () => resolve())) + const address = wss.address() as { port: number } + url = `ws://127.0.0.1:${address.port}` + serverSocket = undefined + wss.on('connection', (ws) => { + serverSocket = ws + ws.send(JSON.stringify({ type: 'workerregistered', worker_id: 'test-worker' })) + }) + }) + + afterEach(async () => { + if (sdk) { + await sdk.shutdown() + } + vi.restoreAllMocks() + await new Promise((resolve) => wss.close(() => resolve())) + }) + + it('logs to console.error on TriggerRegistrationResult with error', async () => { + const spy = vi.spyOn(console, 'error').mockImplementation(() => {}) + sdk = registerWorker(url) + await new Promise((r) => setTimeout(r, 50)) + + serverSocket!.send( + JSON.stringify({ + type: 'triggerregistrationresult', + id: 'trig-1', + trigger_type: 'http', + function_id: 'fn-1', + error: { + code: 'trigger_type_not_found', + message: + 'Trigger type "http" not found — worker iii-http is missing. Run: iii worker add iii-http', + }, + }), + ) + + await new Promise((r) => setTimeout(r, 20)) + expect(spy).toHaveBeenCalled() + const formatted = spy.mock.calls.map((args) => args.join(' ')).join('\n') + expect(formatted).toContain('trig-1') + expect(formatted).toContain('http') + expect(formatted).toContain('iii worker add iii-http') + spy.mockRestore() + }) + + it('does not log on TriggerRegistrationResult success (no error field)', async () => { + const spy = vi.spyOn(console, 'error').mockImplementation(() => {}) + sdk = registerWorker(url) + await new Promise((r) => setTimeout(r, 50)) + + serverSocket!.send( + JSON.stringify({ + type: 'triggerregistrationresult', + id: 'trig-2', + trigger_type: 'http', + function_id: 'fn-2', + }), + ) + + await new Promise((r) => setTimeout(r, 20)) + const registrationLogs = spy.mock.calls + .map((args) => args.join(' ')) + .filter((msg) => msg.includes('Trigger registration')) + expect(registrationLogs).toEqual([]) + spy.mockRestore() + }) +}) diff --git a/sdk/packages/python/iii/src/iii/iii.py b/sdk/packages/python/iii/src/iii/iii.py index e7943779e..95f8297de 100644 --- a/sdk/packages/python/iii/src/iii/iii.py +++ b/sdk/packages/python/iii/src/iii/iii.py @@ -438,6 +438,8 @@ async def _handle_message(self, raw: str | bytes) -> None: ) elif msg_type == MessageType.REGISTER_TRIGGER.value: asyncio.create_task(self._handle_trigger_registration(data)) + elif msg_type == MessageType.TRIGGER_REGISTRATION_RESULT.value: + self._handle_trigger_registration_result(data) elif msg_type == MessageType.WORKER_REGISTERED.value: worker_id = data.get("worker_id", "") self._worker_id = worker_id @@ -701,6 +703,21 @@ async def _handle_trigger_registration(self, data: dict[str, Any]) -> None: } ) + def _handle_trigger_registration_result(self, data: dict[str, Any]) -> None: + error = data.get("error") + if not error: + return + + trigger_id = data.get("id", "") + trigger_type = data.get("trigger_type", "") + message = error.get("message", "") + log.error( + "[iii] Trigger registration failed for %r (%s): %s", + trigger_id, + trigger_type, + message, + ) + # Connection state management def _set_connection_state(self, state: IIIConnectionState) -> None: diff --git a/sdk/packages/python/iii/tests/test_trigger_registration_error.py b/sdk/packages/python/iii/tests/test_trigger_registration_error.py new file mode 100644 index 000000000..bc591b57b --- /dev/null +++ b/sdk/packages/python/iii/tests/test_trigger_registration_error.py @@ -0,0 +1,57 @@ +"""Tests for engine-reported trigger registration errors.""" + +import json + +from unittest.mock import AsyncMock, patch + +from iii.iii import III, InitOptions + + +def _send_message(client: III, payload: dict) -> None: + with patch.object(client, "_send", new_callable=AsyncMock): + client._run_on_loop(client._handle_message(json.dumps(payload))) + + +def test_trigger_registration_result_error_is_logged(caplog): + client = III(address="ws://localhost:9999", options=InitOptions(worker_name="test")) + caplog.set_level("ERROR", logger="iii") + + _send_message( + client, + { + "type": "triggerregistrationresult", + "id": "trig-1", + "trigger_type": "http", + "function_id": "fn-1", + "error": { + "code": "trigger_type_not_found", + "message": 'Trigger type "http" not found — worker iii-http is missing. Run: iii worker add iii-http', + }, + }, + ) + + messages = [record.getMessage() for record in caplog.records] + assert any("iii worker add iii-http" in m for m in messages), messages + assert any("trig-1" in m for m in messages), messages + + client.shutdown() + + +def test_trigger_registration_result_success_does_not_log(caplog): + client = III(address="ws://localhost:9999", options=InitOptions(worker_name="test")) + caplog.set_level("ERROR", logger="iii") + + _send_message( + client, + { + "type": "triggerregistrationresult", + "id": "trig-2", + "trigger_type": "http", + "function_id": "fn-2", + }, + ) + + messages = [record.getMessage() for record in caplog.records] + assert not any("Trigger registration" in m for m in messages), messages + + client.shutdown() diff --git a/sdk/packages/rust/iii/Cargo.toml b/sdk/packages/rust/iii/Cargo.toml index 0e1444c3d..8a507446c 100644 --- a/sdk/packages/rust/iii/Cargo.toml +++ b/sdk/packages/rust/iii/Cargo.toml @@ -46,3 +46,4 @@ opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "trace", "testin reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "multipart", "stream"] } serial_test = "3" tokio = { version = "1", features = ["test-util"] } +tracing-test = "0.2" diff --git a/sdk/packages/rust/iii/src/iii.rs b/sdk/packages/rust/iii/src/iii.rs index fc42c7950..bf786de58 100644 --- a/sdk/packages/rust/iii/src/iii.rs +++ b/sdk/packages/rust/iii/src/iii.rs @@ -1503,6 +1503,21 @@ impl III { Message::WorkerRegistered { worker_id } => { tracing::debug!(worker_id = %worker_id, "Worker registered"); } + Message::TriggerRegistrationResult { + id, + trigger_type, + function_id: _, + error: Some(err), + } => { + tracing::error!( + trigger_id = %id, + trigger_type = %trigger_type, + code = %err.code, + "[iii] Trigger registration failed for {:?}: {}", + id, + err.message + ); + } _ => {} } @@ -2160,4 +2175,43 @@ mod tests { assert!(!shutdown); assert!(queue.is_empty()); } + + #[tokio::test] + #[tracing_test::traced_test] + async fn trigger_registration_result_error_is_logged() { + let iii = register_worker("ws://localhost:1234", InitOptions::default()); + let payload = serde_json::json!({ + "type": "triggerregistrationresult", + "id": "trig-1", + "trigger_type": "http", + "function_id": "fn-1", + "error": { + "code": "trigger_type_not_found", + "message": "Trigger type \"http\" not found — worker iii-http is missing. Run: iii worker add iii-http", + }, + }) + .to_string(); + + iii.handle_message(&payload).unwrap(); + + assert!(logs_contain("iii worker add iii-http")); + assert!(logs_contain("trig-1")); + } + + #[tokio::test] + #[tracing_test::traced_test] + async fn trigger_registration_result_success_does_not_log_error() { + let iii = register_worker("ws://localhost:1234", InitOptions::default()); + let payload = serde_json::json!({ + "type": "triggerregistrationresult", + "id": "trig-2", + "trigger_type": "http", + "function_id": "fn-2", + }) + .to_string(); + + iii.handle_message(&payload).unwrap(); + + assert!(!logs_contain("Trigger registration failed")); + } }