diff --git a/docs/api-reference/error-codes.mdx b/docs/api-reference/error-codes.mdx new file mode 100644 index 0000000000..4069920c36 --- /dev/null +++ b/docs/api-reference/error-codes.mdx @@ -0,0 +1,141 @@ +--- +title: "Error Codes" +description: "Stable engine error codes returned in invocation failure bodies. The wire ABI for SDK callers reacting to specific failure modes." +--- + +## Error body shape + +When a function invocation fails, the engine returns an `ErrorBody` with three fields. The `code` is the stable identifier you should match on; the `message` is human-readable and may evolve. + +| Field | Type | Description | +|---|---|---| +| `code` | `string` | Stable error code (e.g. `invocation_stopped`, `function_not_found`). The wire ABI — match on this for targeted recovery. | +| `message` | `string` | Human-readable explanation. Often includes the offending function ID, payload size, or limit value. | +| `stacktrace` | `string \| null` | Optional worker-side stacktrace, when the failure originated in handler code. | + +SDKs surface these as language-native exceptions: + +- **Node** — `Error` with the engine `code` / `message` / `stacktrace` propagated through. +- **Python** — `IIIRemoteError` carrying the `code` and `message`. +- **Rust** — `IIIError::Remote { code, message, stacktrace }`. + +Producer-side guards (e.g. [`IIIPayloadTooLarge`](#producer-side-errors)) raise *before* the WebSocket round-trip and have their own SDK-specific exception types. + +## Codes + +### Invocation lifecycle + +| Code | Emitted when | What to do | +|---|---|---| +| `invocation_stopped` | An in-flight invocation was halted by a clean worker disconnect, engine shutdown, or EOF on the WebSocket. The legacy generic stop code. | Usually transient. Retry idempotent calls; if the worker is gone for good, re-route or fail the upstream caller. | +| `invocation_failed_payload_too_large` | The engine closed the WebSocket because an inbound message from the worker exceeded `iii-worker-manager.max_message_size` (default 16 MiB). Any in-flight invocation on that connection resolves with this code. | Shrink the payload, raise `max_message_size` on both the engine config and the SDK [`InitOptions`](/api-reference/sdk-node#initoptions), or move binary data to [channels](/how-to/use-channels). | +| `function_not_found` | A `trigger()` referenced a function ID that is not registered with the engine. | Check the function ID for typos; verify the worker that owns it is connected. | +| `invocation_error` | The engine could not deliver the invocation to the target worker (channel send failed, worker dropped mid-route). | Retry; if persistent, inspect engine logs for the underlying transport error. | +| `serialization_error` | The engine failed to serialize or deserialize an invocation payload, response, or error envelope. | The payload contains a value that does not round-trip through JSON. Inspect the offending field. | +| `registration_failed` | A worker's `register_function` / `register_trigger` message was rejected (duplicate ID, malformed format, or invalid trigger config). | Check the registration message against the [SDK reference](/api-reference/sdk-node) and the engine logs for the rejection reason. | +| `timeout` | The engine's per-invocation deadline expired before the worker returned a result. | Raise `invocation_timeout_ms` on [`InitOptions`](/api-reference/sdk-node#initoptions) or `timeout_ms` on the trigger request, or split the work. | + +### Trigger condition evaluation + +| Code | Emitted when | What to do | +|---|---|---| +| `fail` | A trigger condition explicitly evaluated to fail (`{"fail": "..."}` in a condition expression). | The handler chose to short-circuit. Inspect the condition definition to confirm the intent. | + +### Authentication & secrets (`auth`) + +| Code | Emitted when | What to do | +|---|---|---| +| `missing_env_var` | A function configured with bearer / HMAC / API-key auth references an environment variable that is not set on the engine process. | Set the referenced env var before starting the engine. | +| `secret_not_found` | Bearer-auth secret env var is not present at invocation time. | Set the env var. | +| `token_not_found` | HMAC-auth token env var is not present at invocation time. | Set the env var. | +| `api_key_not_found` | API-key auth env var is not present at invocation time. | Set the env var. | + +### HTTP external invocation + +These codes are emitted when the engine invokes a function over HTTP (e.g. AWS Lambda, Cloudflare Worker) instead of an SDK-connected worker. + +| Code | Emitted when | What to do | +|---|---|---| +| `http_error` | The remote endpoint returned a non-2xx status that did not parse as a structured error body. | Check the remote handler logs; verify the URL and auth config. | +| `http_request_failed` | The HTTP request itself failed (connection refused, DNS error, TLS error). | Confirm reachability from the engine; check the URL. | +| `http_response_failed` | The connection succeeded but reading the response body failed (truncated, timeout). | Retry; raise the per-invocation timeout if the remote is slow. | +| `url_validation_failed` | The configured invocation URL is not a valid absolute HTTP(S) URL. | Fix the URL in the function registration. | +| `serialization_error` | The engine failed to serialize the request body before sending. | Inspect the payload for non-JSON-serializable values. | +| `timestamp_error` | HMAC auth failed to produce a signing timestamp. | System clock issue on the engine host — confirm time is set. | +| `invalid_response` | The remote returned a body that did not match the declared response schema. | Align the remote handler's output with the registered `response_format`. | + +### Worker connection transport + +| Code | Emitted when | What to do | +|---|---|---| +| `channel_send_failed` | The engine could not enqueue an outbound message onto a worker's WebSocket send channel (worker dropped or backed-up beyond capacity). | Usually transient; the worker will reconnect. Persistent failures indicate the worker is overloaded. | + +### Queue worker (`iii-queue`) + +| Code | Emitted when | What to do | +|---|---|---| +| `topic_not_set` | A queue trigger or call omitted the required `topic` field. | Provide `topic` in the trigger config or call payload. | +| `topic_required` | A queue management call (stats, DLQ listing) was made without specifying a topic. | Pass the target topic. | +| `queue_not_set` | An admin call referenced an unknown queue. | Verify the queue name; create it via the queue config if missing. | +| `message_id_not_set` | A redrive / discard call did not specify the target `message_id`. | Include the message ID. | +| `redrive_failed` | The adapter failed to redrive a topic from its DLQ. | Check the adapter logs (Redis, RabbitMQ, builtin). | +| `redrive_message_failed` | The adapter failed to redrive a single message. | Inspect adapter logs. | +| `discard_message_failed` | The adapter failed to permanently delete a DLQ message. | Inspect adapter logs. | +| `list_topics_failed` | The queue adapter failed to enumerate topics. | Adapter / storage backend issue. | +| `topic_stats_failed` | The queue adapter failed to read topic stats. | Adapter / storage backend issue. | +| `dlq_topics_failed` | The queue adapter failed to enumerate DLQ topics. | Adapter / storage backend issue. | +| `dlq_messages_failed` | The queue adapter failed to read DLQ messages for a topic. | Adapter / storage backend issue. | + +### Pub/sub worker (`iii-pubsub`) + +| Code | Emitted when | What to do | +|---|---|---| +| `topic_not_set` | A pub/sub publish or subscribe call omitted the required `topic` field. | Provide `topic` in the call payload. | + +### Bridge worker (`iii-bridge`) + +| Code | Emitted when | What to do | +|---|---|---| +| `bridge_error` | A bridge invocation against a remote engine failed at the transport layer. | Check the remote engine reachability and bridge config. | +| `deserialization_error` | A bridge response could not be parsed. | Likely a version mismatch between bridged engines; align versions. | + +### Observability worker (`iii-observability`) + +| Code | Emitted when | What to do | +|---|---|---| +| `memory_exporter_not_enabled` | A spans/metrics read call was made but the in-memory exporter was not enabled in the OTel config. | Set `otel.in_memory_exporter: true` in the worker config. | + +## Producer-side errors + +Some failures never reach the engine. SDKs include a producer-side guard that runs *before* the WebSocket send, so oversized payloads fail fast with a local exception instead of triggering a server-side disconnect: + +| SDK | Exception | Trigger | +|---|---|---| +| Python | `IIIPayloadTooLarge` (subclass of `ValueError`) carrying `payload_bytes` / `limit_bytes`. | Serialized message would exceed `InitOptions.max_message_size`. | +| Node | `IIIPayloadTooLarge` carrying `payloadBytes` / `limitBytes`. | Serialized message would exceed `InitOptions.maxMessageSize`. | +| Rust | [`IIIError::PayloadTooLarge { actual, limit }`](/api-reference/sdk-rust#iiierror). | Serialized message would exceed `InitOptions::resolved_max_message_size()`. | + +The message wording is identical across all three SDKs: + +``` +Payload {n} bytes exceeds invocation limit {limit} bytes. For binary blobs use channels: https://iii.dev/docs/how-to/use-channels +``` + +If you raise the SDK limit above the engine's `max_message_size`, you skip the local guard but then trip [`invocation_failed_payload_too_large`](#invocation_failed_payload_too_large) on the server side. Keep the two values aligned. + +## Next steps + + + + Stream binary data instead of cramming it into a single trigger payload + + + Tune `iii-worker-manager.max_message_size` and other engine settings + + + `maxMessageSize` and other `InitOptions` fields + + + `max_message_size` and other `InitOptions` fields + + diff --git a/docs/api-reference/sdk-node.mdx b/docs/api-reference/sdk-node.mdx index 2e78b21e2b..2fd539cf53 100644 --- a/docs/api-reference/sdk-node.mdx +++ b/docs/api-reference/sdk-node.mdx @@ -524,6 +524,7 @@ Configuration options passed to registerWorker. | `enableMetricsReporting` | `boolean` | No | Enable worker metrics via OpenTelemetry. Defaults to `true`. | | `headers` | `Record` | No | Custom HTTP headers sent during the WebSocket handshake. | | `invocationTimeoutMs` | `number` | No | Default timeout for `trigger()` in milliseconds. Defaults to `30000`. | +| `maxMessageSize` | `number` | No | Maximum size in bytes for a single outbound WebSocket message. Defaults to `16777216` (16 MiB), matching the engine. The producer-side guard throws `IIIPayloadTooLarge` before sending if a serialized message would exceed this limit; for streamable payloads see [Use Channels](/how-to/use-channels) and the [`invocation_failed_payload_too_large`](/api-reference/error-codes#invocation_failed_payload_too_large) error code. | | `otel` | Omit<[`OtelConfig`](#otelconfig), "engineWsUrl"> | No | OpenTelemetry configuration. OTel is initialized automatically by default.
Set `{ enabled: false }` or env `OTEL_ENABLED=false/0/no/off` to disable.
The `engineWsUrl` is set automatically from the III address. | | `reconnectionConfig` | Partial<[`IIIReconnectionConfig`](#iiireconnectionconfig)> | No | WebSocket reconnection behavior. | | `workerName` | `string` | No | Display name for this worker. Defaults to `hostname:pid`. | diff --git a/docs/api-reference/sdk-python.mdx b/docs/api-reference/sdk-python.mdx index 242bde18ae..9e9a91a5d5 100644 --- a/docs/api-reference/sdk-python.mdx +++ b/docs/api-reference/sdk-python.mdx @@ -540,6 +540,7 @@ Options for configuring the III SDK. | `enable_metrics_reporting` | `bool` | No | Enable worker metrics via OpenTelemetry. Default ``True``. | | `headers` | `dict[str, str] \| None` | No | - | | `invocation_timeout_ms` | `int` | No | Default timeout for ``trigger()`` in milliseconds. Default ``30000``. | +| `max_message_size` | `int` | No | Maximum size in bytes for a single outbound WebSocket message. Default ``16777216`` (16 MiB), matching the engine. The producer-side guard raises ``IIIPayloadTooLarge`` (subclass of ``ValueError``) before sending if a serialized message would exceed this limit; for streamable payloads see [Use Channels](/how-to/use-channels) and the [`invocation_failed_payload_too_large`](/api-reference/error-codes#invocation_failed_payload_too_large) error code. | | `otel` | [`OtelConfig`](#otelconfig) \| dict[str, Any] \| None | No | OpenTelemetry configuration. Enabled by default. Set ``\{'enabled': False\}`` or env ``OTEL_ENABLED=false`` to disable. | | `reconnection_config` | [`ReconnectionConfig`](#reconnectionconfig) \| None | No | WebSocket reconnection behavior. | | `telemetry` | [`TelemetryOptions`](#telemetryoptions) \| None | No | Internal telemetry metadata. | diff --git a/docs/api-reference/sdk-rust.mdx b/docs/api-reference/sdk-rust.mdx index 32d1258fba..a1a5a23e71 100644 --- a/docs/api-reference/sdk-rust.mdx +++ b/docs/api-reference/sdk-rust.mdx @@ -429,6 +429,7 @@ Configuration options passed to [`register_worker`]. | --- | --- | --- | --- | | `metadata` | Option<[`WorkerMetadata`](#workermetadata)> | No | Custom worker metadata. Auto-detected if `None`. | | `headers` | `Option>` | No | Custom HTTP headers sent during the WebSocket handshake. | +| `max_message_size` | `Option` | No | Maximum size in bytes for a single outbound WebSocket message. When `None`, falls back to `DEFAULT_MAX_MESSAGE_SIZE` (16 MiB), matching the engine. Use `InitOptions::resolved_max_message_size()` to read the effective value. The producer-side guard returns `IIIError::PayloadTooLarge { actual, limit }` before sending if a serialized message would exceed this limit; for streamable payloads see [Use Channels](/how-to/use-channels) and the [`invocation_failed_payload_too_large`](/api-reference/error-codes#invocation_failed_payload_too_large) error code. | | `otel` | Option<[`OtelConfig`](#otelconfig)> | No | OpenTelemetry configuration. Requires the `otel` feature. | ### IIIError @@ -444,6 +445,7 @@ Errors returned by the III SDK. | `Handler` | `(String)` | Yes | - | | `Serde` | `(String)` | Yes | - | | `WebSocket` | `(String)` | Yes | - | +| `PayloadTooLarge` | `{ actual: usize, limit: usize }` | Yes | Raised by the producer guard before sending when a serialized message would exceed [`InitOptions::max_message_size`](#initoptions). See [`invocation_failed_payload_too_large`](/api-reference/error-codes#invocation_failed_payload_too_large) for the matching engine-side code. | ### IIIConnectionState diff --git a/docs/changelog/0-11-0/payload-size-limit.mdx b/docs/changelog/0-11-0/payload-size-limit.mdx new file mode 100644 index 0000000000..d41a249ed8 --- /dev/null +++ b/docs/changelog/0-11-0/payload-size-limit.mdx @@ -0,0 +1,144 @@ +--- +title: 'Configurable WebSocket Payload Limit' +description: 'A consistent 16 MiB invocation payload ceiling across the engine and every SDK, with a typed error code and a producer-side guard that fails fast.' +--- + +## Why this change + +Before this release, oversized `trigger()` payloads failed in inconsistent and confusing ways. The Node and Rust SDKs used `tokio-tungstenite` defaults that quietly accepted large frames; the Python SDK silently inherited the [`websockets`](https://websockets.readthedocs.io) library's 1 MiB default and snapped at exactly that boundary. The engine had no explicit limit and surfaced any oversize-induced disconnect as the generic `invocation_stopped`, which gave callers no way to tell "worker crashed" apart from "your payload was too big." + +The result: Python users hit a silent 1 MiB cliff that Node and Rust users never saw, and every language's failure mode looked like a transient connection drop instead of a programming error. + +This release lands a single, configurable ceiling end-to-end: + +- The engine enforces it. +- Every SDK defaults to the same value. +- Producers refuse to send oversized messages locally instead of triggering a server-side disconnect. +- A dedicated error code (`invocation_failed_payload_too_large`) lets callers branch on it. + +The default is **16 MiB**, large enough for almost every realistic JSON payload. For larger or streamable data, use [channels](/how-to/use-channels). + +--- + +## What changed + +### Engine + +A new `max_message_size` field on the `iii-worker-manager` config sets the inbound WebSocket message ceiling. It is applied to both `WebSocketUpgrade::max_message_size` and `max_frame_size` on every worker connection. + +```yaml +workers: + - name: iii-worker-manager + config: + # max_message_size: 16777216 # bytes; default is 16 MiB +``` + +When a worker exceeds the limit, the engine closes the socket and any in-flight invocation on that connection resolves with the new error code: + +| Code | Meaning | +|---|---| +| `invocation_failed_payload_too_large` | The worker sent a WebSocket message larger than `max_message_size`. Emitted by `cleanup_worker` on the failed connection. | +| `invocation_stopped` | Unchanged — still emitted for clean disconnects, shutdown, and EOF. | + +WebSocket `recv` errors are now logged at `WARN` with peer / worker_id / error context, so operators can see oversize disconnects without enabling debug logging. + +See [Error Codes](/api-reference/error-codes) for the full table. + +### Python SDK + +```python +from iii import register_worker, InitOptions, IIIPayloadTooLarge + +iii = register_worker( + "ws://localhost:49134", + InitOptions(max_message_size=16 * 1024 * 1024), +) + +try: + iii.trigger({"function_id": "files::ingest", "payload": large_blob}) +except IIIPayloadTooLarge as e: + print(f"too big: {e.payload_bytes} > {e.limit_bytes}") +``` + +- New `InitOptions.max_message_size: int = 16 * 1024 * 1024`. +- New `IIIPayloadTooLarge` exception (subclass of `ValueError`) carrying `payload_bytes` and `limit_bytes`, exported from the package root. +- Producer guard runs before every WS send. + +### Node SDK + +```typescript +import { registerWorker, IIIPayloadTooLarge } from 'iii-sdk' + +const iii = registerWorker(process.env.III_URL!, { + maxMessageSize: 16 * 1024 * 1024, +}) + +try { + await iii.trigger({ function_id: 'files::ingest', payload: largeBlob }) +} catch (e) { + if (e instanceof IIIPayloadTooLarge) { + console.error(`too big: ${e.payloadBytes} > ${e.limitBytes}`) + } +} +``` + +- New `InitOptions.maxMessageSize?: number` (defaults to 16 MiB). +- New `IIIPayloadTooLarge` exported class with `payloadBytes` / `limitBytes`. +- Producer guard runs before every WS send. + +### Rust SDK + +```rust +use iii_sdk::{register_worker, InitOptions, IIIError, DEFAULT_MAX_MESSAGE_SIZE}; + +let iii = register_worker( + "ws://localhost:49134", + InitOptions { + max_message_size: Some(DEFAULT_MAX_MESSAGE_SIZE), + ..Default::default() + }, +); + +match iii.trigger(req).await { + Err(IIIError::PayloadTooLarge { actual, limit }) => { + eprintln!("too big: {actual} > {limit}"); + } + other => { /* ... */ } +} +``` + +- New `InitOptions::max_message_size: Option` plus `InitOptions::resolved_max_message_size()`. +- `DEFAULT_MAX_MESSAGE_SIZE` constant exported from the crate root (16 MiB). +- New `IIIError::PayloadTooLarge { actual, limit }` variant. +- Producer guard runs before every WS send. + +--- + +## Behavior change worth flagging + +**Python users get a 15× larger default.** Before this release, the Python SDK silently inherited the `websockets` library's 1 MiB max-message-size default, while Node and Rust effectively had no ceiling. Python callers occasionally saw mysterious `invocation_stopped` errors that turned out to be that hidden 1 MiB cap firing. + +After this release, the Python default is **16 MiB**, matching Node, Rust, and the engine. Payloads between 1 MiB and 16 MiB that previously failed with `invocation_stopped` will now succeed. + +If you were relying on the implicit 1 MiB ceiling as a safety check, set `InitOptions(max_message_size=1024 * 1024)` explicitly to preserve the old behavior — but consider switching to [channels](/how-to/use-channels) for any payload that genuinely needs to be that large. + +--- + +## Sizing notes + +The 16 MiB default is the *serialized* WebSocket message size, not your raw application data: + +- **Base64** inflates binary content by ~33%. +- The **JSON envelope** (function ID, invocation ID, headers, etc.) adds another ~10%. + +A 12 MiB raw blob, base64-encoded inside a JSON envelope, lands close to the 16 MiB ceiling. For anything bigger or streamable, use [channels](/how-to/use-channels) instead — they chunk over the same WebSocket without the per-message limit. + +--- + +## Migration checklist + +- [ ] No action required for most users — the new defaults are backward-compatible with payloads under 1 MiB. +- [ ] Python users with payloads in the 1–16 MiB range: previously failing calls will now succeed. +- [ ] If you raise the SDK limit above 16 MiB, also raise `iii-worker-manager.max_message_size` on the engine — otherwise the local guard passes and the engine disconnects with `invocation_failed_payload_too_large`. +- [ ] Update error handling to match on `invocation_failed_payload_too_large` (engine-side) and `IIIPayloadTooLarge` / `IIIError::PayloadTooLarge` (producer-side) where the distinction matters. +- [ ] For payloads that consistently exceed 16 MiB, migrate to [channels](/how-to/use-channels). diff --git a/docs/docs.json b/docs/docs.json index 9ed282690d..65819fb419 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -201,6 +201,7 @@ "api-reference/sdk-python", "api-reference/sdk-rust", "api-reference/sandbox", + "api-reference/error-codes", "api-reference/disable-telemetry" ] } @@ -217,6 +218,7 @@ "group": "0.11.0", "pages": [ "changelog/0-11-0/everything-is-a-worker", + "changelog/0-11-0/payload-size-limit", "changelog/0-11-0/migrating-from-motia-js", "changelog/0-11-0/migrating-from-motia-py", "changelog/0-11-0/migrated-examples" diff --git a/docs/how-to/use-channels.mdx b/docs/how-to/use-channels.mdx index 7f4ae5780f..11db055417 100644 --- a/docs/how-to/use-channels.mdx +++ b/docs/how-to/use-channels.mdx @@ -66,7 +66,9 @@ Channels solve all of these by giving each side a real stream backed by the engi | Passing a config object to another function | No | Put it in the `trigger()` payload directly | | Fire-and-forget notifications | No | Use `TriggerAction.Void()` or a queue instead | -**Rule of thumb:** if your data is small enough to fit comfortably in a JSON payload (< 1 MB) and you don't need incremental delivery, use a regular `trigger()` call. Use channels when you need streaming, binary data, or when the payload is too large to serialize at once. +**Rule of thumb:** direct `trigger()` payloads ride a single WebSocket message. The default ceiling is **16 MiB** (engine-enforced; SDKs default to the same value). Remember that base64 inflates binary by ~33% and the JSON envelope adds another ~10%, so a 12 MiB raw blob lands close to the limit. For larger or streamable payloads, use channels. + +If you cross the limit you get [`invocation_failed_payload_too_large`](/api-reference/error-codes#invocation_failed_payload_too_large) from the engine, or `IIIPayloadTooLarge` raised client-side before the round-trip. ## Steps diff --git a/engine/config.yaml b/engine/config.yaml index 98a3467cc3..4cf192ed39 100644 --- a/engine/config.yaml +++ b/engine/config.yaml @@ -176,6 +176,18 @@ workers: # remote_function: state::get # timeout_ms: 5000 + # The worker manager (iii-worker-manager) is loaded automatically and + # accepts WebSocket connections from SDK workers. Override the inbound + # message ceiling here when you need to lift it for very large direct + # invocations; for streamable or > 16 MiB payloads, prefer channels. + # - name: iii-worker-manager + # config: + # # 16 MiB is the default; increase if you have a use case for + # # larger single-message invocations. Workers exceeding this + # # limit are disconnected and any in-flight invocations on that + # # connection resolve with `invocation_failed_payload_too_large`. + # max_message_size: 16777216 + # Ephemeral sandboxes (VMs booted on demand from OCI rootfs images). # Run `iii worker add iii-sandbox` to append the block below automatically, # or uncomment and restart the engine. The daemon ships inside the diff --git a/engine/src/engine/mod.rs b/engine/src/engine/mod.rs index d6396a8955..4e5292c5bf 100644 --- a/engine/src/engine/mod.rs +++ b/engine/src/engine/mod.rs @@ -30,7 +30,9 @@ use crate::{ inject_baggage_from_context, inject_traceparent_from_context, }, trigger::{Trigger, TriggerRegistry, TriggerType}, - worker_connections::{RuntimeWorkerInfo, WorkerConnection, WorkerConnectionRegistry}, + worker_connections::{ + DisconnectReason, RuntimeWorkerInfo, WorkerConnection, WorkerConnectionRegistry, + }, workers::worker::rbac_session::Session, workers::{ engine_fn::TRIGGER_WORKERS_AVAILABLE, @@ -39,6 +41,26 @@ use crate::{ }, }; +/// Classifies a `axum::Error` raised by the WS recv stream so cleanup +/// can pick the right error code for in-flight invocations. axum wraps +/// `tungstenite::Error` opaquely; we match on its formatted Display +/// because `axum::Error::into_inner` yields a `Box` that +/// requires a downcast against tungstenite's exact version, which would +/// couple the engine to axum's transitive dep. +/// +/// `tungstenite::Error::Capacity` renders as `"Space limit exceeded: ..."` +/// (see tungstenite 0.29 `error.rs`). That is the only variant that +/// fires when our configured `max_message_size` is exceeded by an +/// inbound frame. +fn classify_recv_error(err: &axum::Error) -> DisconnectReason { + let text = err.to_string(); + if text.contains("Space limit exceeded") || text.contains("Message too long") { + DisconnectReason::PayloadTooLarge + } else { + DisconnectReason::Other + } +} + /// Abstraction for enqueuing messages to named queues. /// /// This trait decouples the Engine from the concrete QueueWorker @@ -1419,7 +1441,19 @@ impl Engine { let _ = tx.send(Outbound::Raw(WsMessage::Pong(payload))).await; } Some(Ok(WsMessage::Pong(_))) => {} - Some(Err(_)) | None => { + Some(Err(err)) => { + let reason = classify_recv_error(&err); + tracing::warn!( + peer = %peer, + worker_id = %worker.id, + error = %err, + reason = ?reason, + "Worker WS recv error" + ); + worker.set_disconnect_reason(reason).await; + break; + } + None => { break; } } @@ -1587,9 +1621,22 @@ impl Engine { let worker_invocations = worker.invocations.read().await; tracing::debug!(worker_id = %worker.id, invocations = ?worker_invocations, "Worker invocations"); + let (halt_code, halt_message) = match worker.disconnect_reason().await { + Some(DisconnectReason::PayloadTooLarge) => ( + "invocation_failed_payload_too_large", + "Worker disconnected: WS message exceeded the configured size limit. \ + For larger or streamable payloads, use channels.", + ), + Some(DisconnectReason::Other) | None => ("invocation_stopped", "Invocation stopped"), + }; for invocation_id in worker_invocations.iter() { - tracing::debug!(invocation_id = %invocation_id, "Halting invocation"); - self.invocations.halt_invocation(invocation_id); + tracing::debug!( + invocation_id = %invocation_id, + error_code = halt_code, + "Halting invocation" + ); + self.invocations + .halt_invocation_with_reason(invocation_id, halt_code, halt_message); } self.trigger_registry.unregister_worker(&worker.id).await; diff --git a/engine/src/invocation/mod.rs b/engine/src/invocation/mod.rs index c0c4547421..927e9e75e9 100644 --- a/engine/src/invocation/mod.rs +++ b/engine/src/invocation/mod.rs @@ -60,12 +60,18 @@ impl InvocationHandler { } pub fn halt_invocation(&self, invocation_id: &Uuid) { - let invocation = self.remove(invocation_id); + self.halt_invocation_with_reason(invocation_id, "invocation_stopped", "Invocation stopped"); + } - if let Some(invocation) = invocation { + /// Halts an invocation with a specific error code/message instead of + /// the legacy `invocation_stopped`. Used by `cleanup_worker` to + /// distinguish payload-too-large disconnects from generic worker + /// teardowns so SDK callers can react to the specific failure mode. + pub fn halt_invocation_with_reason(&self, invocation_id: &Uuid, code: &str, message: &str) { + if let Some(invocation) = self.remove(invocation_id) { let _ = invocation.sender.send(Err(ErrorBody { - code: "invocation_stopped".into(), - message: "Invocation stopped".into(), + code: code.into(), + message: message.into(), stacktrace: None, })); } @@ -359,6 +365,38 @@ mod tests { handler.halt_invocation(&id); } + #[test] + fn test_halt_invocation_with_reason_uses_specific_code() { + let handler = InvocationHandler::new(); + let id = Uuid::new_v4(); + let (sender, mut receiver) = oneshot::channel(); + + let invocation = Invocation { + id, + function_id: "test_fn".to_string(), + worker_id: None, + sender, + traceparent: None, + baggage: None, + }; + handler.invocations.insert(id, invocation); + + handler.halt_invocation_with_reason( + &id, + "invocation_failed_payload_too_large", + "Worker disconnected: WS message exceeded the configured size limit", + ); + + assert!(handler.invocations.is_empty()); + let inner = receiver.try_recv().expect("oneshot delivered"); + let error = inner.expect_err("invocation should be halted with an error"); + assert_eq!(error.code, "invocation_failed_payload_too_large"); + assert_eq!( + error.message, + "Worker disconnected: WS message exceeded the configured size limit" + ); + } + #[test] fn test_invocation_handler_multiple_invocations() { let handler = InvocationHandler::new(); diff --git a/engine/src/worker_connections/mod.rs b/engine/src/worker_connections/mod.rs index 85cadb79f9..957ab69bec 100644 --- a/engine/src/worker_connections/mod.rs +++ b/engine/src/worker_connections/mod.rs @@ -174,6 +174,22 @@ impl WorkerConnectionRegistry { } } +/// Why the engine's worker recv loop tore down a worker connection. +/// +/// Captured on the `WorkerConnection` when the engine observes a recv +/// error so that `cleanup_worker` can pick a specific error code for the +/// halted in-flight invocations. See `Engine::handle_worker`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DisconnectReason { + /// The worker sent a frame that exceeded the configured + /// `max_message_size`. Surfaces as `invocation_failed_payload_too_large`. + PayloadTooLarge, + /// Any other recv-time tungstenite error (Io, Protocol, etc.). + /// Surfaces as the generic `invocation_stopped` so existing operator + /// alerts on that code still work for non-size disconnects. + Other, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)] pub enum WorkerConnectionStatus { #[default] @@ -224,6 +240,11 @@ pub struct WorkerConnection { pub pid: Option, pub isolation: Option, pub session: Option>, + /// Set by `Engine::handle_worker` when a recv-side error tears down + /// the connection. Read by `cleanup_worker` to pick the error code + /// for in-flight invocations. `None` means the socket ended cleanly + /// (Close frame, shutdown signal, or end-of-stream). + pub disconnect_reason: Arc>>, } impl WorkerConnection { @@ -245,6 +266,7 @@ impl WorkerConnection { pid: None, isolation: None, session: None, + disconnect_reason: Arc::new(RwLock::new(None)), } } @@ -266,9 +288,18 @@ impl WorkerConnection { pid: None, isolation: None, session: Some(Arc::new(session)), + disconnect_reason: Arc::new(RwLock::new(None)), } } + pub async fn set_disconnect_reason(&self, reason: DisconnectReason) { + *self.disconnect_reason.write().await = Some(reason); + } + + pub async fn disconnect_reason(&self) -> Option { + *self.disconnect_reason.read().await + } + pub async fn function_count(&self) -> usize { let regular = self.function_ids.read().await.len(); let external = self.external_function_ids.read().await.len(); diff --git a/engine/src/workers/worker/mod.rs b/engine/src/workers/worker/mod.rs index dd8345e838..5ecc42b224 100644 --- a/engine/src/workers/worker/mod.rs +++ b/engine/src/workers/worker/mod.rs @@ -32,6 +32,16 @@ use crate::{ pub const DEFAULT_PORT: u16 = 49134; +/// Default ceiling for inbound WebSocket messages from workers, in bytes. +/// +/// 16 MiB is large enough that "small blob" use cases (single images, +/// JSON-with-embedded-data) ride the direct invocation path without +/// users needing to switch to channels, but small enough that a runaway +/// producer can't pin engine memory. Anything larger should use +/// channels. The engine is the source of truth; SDKs default to the +/// same value so they don't quietly underflow. +pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; + #[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema)] pub struct CreateChannelInput { #[serde(default)] @@ -54,6 +64,13 @@ pub struct WorkerManagerConfig { #[serde(default)] pub middleware_function_id: Option, pub rbac: Option, + /// Maximum size in bytes for a single inbound WebSocket message from + /// a worker. Workers exceeding this limit are disconnected and any + /// in-flight invocation on that connection resolves with the + /// `invocation_failed_payload_too_large` error code. Defaults to 16 + /// MiB; for larger or streamable payloads, use channels. + #[serde(default = "default_max_message_size")] + pub max_message_size: usize, } fn default_port() -> u16 { @@ -64,6 +81,10 @@ fn default_host() -> String { "0.0.0.0".to_string() } +fn default_max_message_size() -> usize { + DEFAULT_MAX_MESSAGE_SIZE +} + impl Default for WorkerManagerConfig { fn default() -> Self { Self { @@ -71,6 +92,7 @@ impl Default for WorkerManagerConfig { host: default_host(), middleware_function_id: None, rbac: None, + max_message_size: default_max_message_size(), } } } @@ -178,6 +200,19 @@ async fn shutdown_signal() -> anyhow::Result<()> { Ok(()) } +/// Applies the configured per-message ceiling to a `WebSocketUpgrade`. +/// +/// Both `ws_handler` and `otel_ws_handler` pass through here so the worker +/// path and the OTEL path share the same limit and there is one place to +/// adjust the policy. +fn apply_message_size_limit( + ws: WebSocketUpgrade, + config: &WorkerManagerConfig, +) -> WebSocketUpgrade { + ws.max_message_size(config.max_message_size) + .max_frame_size(config.max_message_size) +} + async fn ws_handler( State(state): State, ws: WebSocketUpgrade, @@ -187,6 +222,7 @@ async fn ws_handler( ) -> impl IntoResponse { let engine = state.engine.clone(); let config = state.config.clone(); + let ws = apply_message_size_limit(ws, &config); ws.on_upgrade(move |socket| async move { if let Err(err) = engine @@ -211,6 +247,7 @@ async fn otel_ws_handler( ) -> impl IntoResponse { let engine = state.engine.clone(); let config = state.config.clone(); + let ws = apply_message_size_limit(ws, &config); ws.on_upgrade(move |socket| async move { if let Err(err) = engine @@ -247,4 +284,17 @@ mod tests { assert!(config.middleware_function_id.is_none()); assert!(config.rbac.is_none()); } + + #[test] + fn worker_config_default_max_message_size_is_16_mib() { + let config: WorkerManagerConfig = serde_json::from_str("{}").unwrap(); + assert_eq!(config.max_message_size, 16 * 1024 * 1024); + } + + #[test] + fn worker_config_max_message_size_override() { + let config: WorkerManagerConfig = + serde_json::from_str(r#"{"max_message_size": 32}"#).unwrap(); + assert_eq!(config.max_message_size, 32); + } } diff --git a/engine/tests/rbac_infrastructure_e2e.rs b/engine/tests/rbac_infrastructure_e2e.rs index c3c2d9e443..ee094ea4d9 100644 --- a/engine/tests/rbac_infrastructure_e2e.rs +++ b/engine/tests/rbac_infrastructure_e2e.rs @@ -66,6 +66,7 @@ fn session_with( host: "127.0.0.1".to_string(), middleware_function_id: middleware, rbac: Some(rbac), + ..Default::default() }), ip_address: "127.0.0.1".to_string(), session_id: Uuid::new_v4(), diff --git a/engine/tests/ws_payload_limit_e2e.rs b/engine/tests/ws_payload_limit_e2e.rs new file mode 100644 index 0000000000..86c9dc8279 --- /dev/null +++ b/engine/tests/ws_payload_limit_e2e.rs @@ -0,0 +1,306 @@ +//! End-to-end tests for the WebSocket payload-size limit and the +//! `invocation_failed_payload_too_large` error code contract. +//! +//! Background: the Python SDK was tearing the WS connection at ~1 MiB +//! because the `websockets` library defaults `max_size` to 2^20. The +//! engine's recv loop caught the resulting tungstenite error, dropped it +//! silently, and `cleanup_worker` halted every in-flight invocation on +//! that worker with the generic `invocation_stopped` code. Operators +//! had no way to tell oversize disconnects apart from idle disconnects, +//! shutdowns, or panics. +//! +//! These tests pin the new contract: +//! * The engine logs WS recv errors at WARN with peer/worker_id/error. +//! * Oversize disconnects surface as `invocation_failed_payload_too_large` +//! on every in-flight invocation, not the legacy `invocation_stopped`. +//! * Clean disconnects (Close frame) keep emitting `invocation_stopped`. +//! * `WorkerManagerConfig::max_message_size` defaults to 16 MiB and is +//! honored end-to-end via axum's `WebSocketUpgrade::max_message_size`. + +use std::sync::Arc; +use std::time::Duration; + +use futures_util::{SinkExt, StreamExt}; +use iii::engine::Engine; +use iii::workers::traits::Worker; +use iii::workers::worker::WorkerManager; +use serde_json::{Value, json}; +use tokio::net::TcpListener; +use tokio_tungstenite::tungstenite::Message as WsMessage; +use uuid::Uuid; + +/// Boots a `WorkerManager` on a random port with the supplied JSON config. +/// Returns (port, engine handle). +async fn spawn_engine_with_config(extra: Value) -> (u16, Arc) { + iii::workers::observability::metrics::ensure_default_meter(); + + let probe = TcpListener::bind("127.0.0.1:0").await.expect("bind probe"); + let port = probe.local_addr().expect("local_addr").port(); + drop(probe); + + let engine = Arc::new(Engine::new()); + let mut config = json!({ "port": port, "host": "127.0.0.1" }); + if let Value::Object(extra_obj) = extra { + let merged = config.as_object_mut().unwrap(); + for (k, v) in extra_obj { + merged.insert(k, v); + } + } + let worker = WorkerManager::create(engine.clone(), Some(config)) + .await + .expect("create WorkerManager"); + + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + worker + .start_background_tasks(shutdown_rx, shutdown_tx) + .await + .expect("start WorkerManager"); + + tokio::time::sleep(Duration::from_millis(150)).await; + (port, engine) +} + +async fn spawn_engine_with_max_message_size(max_message_size: usize) -> (u16, Arc) { + spawn_engine_with_config(json!({ "max_message_size": max_message_size })).await +} + +/// Connects a fake worker, registers a single deferred function, returns +/// the (open) socket and the function id once the engine has acknowledged +/// the registration via `WorkerRegistered` and the registration round-trip +/// has settled. +async fn connect_and_register( + port: u16, + function_id: &str, +) -> tokio_tungstenite::WebSocketStream> { + let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{}/", port)) + .await + .expect("connect to /"); + + // Drain `WorkerRegistered` so we know the registry insert happened. + tokio::time::timeout(Duration::from_millis(500), async { + loop { + match ws.next().await { + Some(Ok(WsMessage::Text(text))) => { + if text.contains("workerregistered") { + break; + } + } + Some(Ok(_)) => {} + Some(Err(e)) => panic!("ws error before WorkerRegistered: {e:?}"), + None => panic!("connection closed before WorkerRegistered"), + } + } + }) + .await + .expect("WorkerRegistered within 500ms"); + + let register = json!({ + "type": "registerfunction", + "id": function_id, + "request_format": null, + "response_format": null, + }); + ws.send(WsMessage::Text(register.to_string().into())) + .await + .expect("send RegisterFunction"); + + // Give the engine a moment to register the function. + tokio::time::sleep(Duration::from_millis(100)).await; + + ws +} + +#[tokio::test] +async fn test_oversize_message_emits_payload_too_large_error_code() { + let (port, engine) = spawn_engine_with_max_message_size(64 * 1024).await; + let mut ws = connect_and_register(port, "noop").await; + + // Spawn the invocation as a background task so we can drive the WS + // side independently and observe the resolved error code. + let engine_clone = engine.clone(); + let invocation_handle = tokio::spawn(async move { + use iii::engine::EngineTrait; + engine_clone.call("noop", json!({})).await + }); + + // Wait until the engine has dispatched InvokeFunction to the worker. + let invoked = tokio::time::timeout(Duration::from_secs(2), async { + loop { + match ws.next().await { + Some(Ok(WsMessage::Text(text))) => { + if text.contains("invokefunction") { + return text.to_string(); + } + } + Some(Ok(_)) => continue, + Some(Err(e)) => panic!("ws error before InvokeFunction: {e:?}"), + None => panic!("ws closed before InvokeFunction"), + } + } + }) + .await + .expect("InvokeFunction within 2s"); + assert!(invoked.contains("noop"), "should target noop function"); + + // Send an oversized text frame from the client to the engine. The + // engine has `max_message_size = 64 * 1024`, so 200 KiB blows past it. + let oversized = "x".repeat(200 * 1024); + let _ = ws.send(WsMessage::Text(oversized.into())).await; + + let result = tokio::time::timeout(Duration::from_secs(5), invocation_handle) + .await + .expect("invocation resolves within 5s") + .expect("invocation task didn't panic"); + + let err = result.expect_err("oversize disconnect must fail the invocation"); + assert_eq!( + err.code, "invocation_failed_payload_too_large", + "expected payload-too-large code, got {err:?}" + ); + let _ = engine.worker_registry.list_workers(); +} + +#[tokio::test] +async fn test_normal_disconnect_still_emits_invocation_stopped() { + let (port, engine) = spawn_engine_with_max_message_size(64 * 1024).await; + let mut ws = connect_and_register(port, "noop_close").await; + + let engine_clone = engine.clone(); + let invocation_handle = tokio::spawn(async move { + use iii::engine::EngineTrait; + engine_clone.call("noop_close", json!({})).await + }); + + // Drain InvokeFunction. + tokio::time::timeout(Duration::from_secs(2), async { + loop { + match ws.next().await { + Some(Ok(WsMessage::Text(text))) => { + if text.contains("invokefunction") { + return; + } + } + Some(Ok(_)) => continue, + Some(Err(_)) | None => return, + } + } + }) + .await + .expect("InvokeFunction within 2s"); + + // Clean close — engine must keep emitting the legacy + // `invocation_stopped` code so existing operators don't see code + // churn for graceful shutdowns. + let _ = ws.close(None).await; + + let result = tokio::time::timeout(Duration::from_secs(5), invocation_handle) + .await + .expect("invocation resolves within 5s") + .expect("invocation task didn't panic"); + + let err = result.expect_err("clean close still halts the invocation"); + assert_eq!( + err.code, "invocation_stopped", + "graceful close must keep emitting invocation_stopped, got {err:?}" + ); + let _ = engine.worker_registry.list_workers(); +} + +#[tokio::test] +async fn test_engine_rejects_message_above_configured_limit() { + // Phase 2 contract: the limit comes from `WorkerManagerConfig::max_message_size`. + let (port, engine) = spawn_engine_with_max_message_size(1024).await; + let mut ws = connect_and_register(port, "tiny_limit").await; + + let engine_clone = engine.clone(); + let invocation_handle = tokio::spawn(async move { + use iii::engine::EngineTrait; + engine_clone.call("tiny_limit", json!({})).await + }); + + tokio::time::timeout(Duration::from_secs(2), async { + loop { + match ws.next().await { + Some(Ok(WsMessage::Text(text))) => { + if text.contains("invokefunction") { + return; + } + } + Some(Ok(_)) => continue, + Some(Err(_)) | None => return, + } + } + }) + .await + .expect("InvokeFunction within 2s"); + + // 2 KiB > 1 KiB limit. + let oversized = "y".repeat(2048); + let _ = ws.send(WsMessage::Text(oversized.into())).await; + + let result = tokio::time::timeout(Duration::from_secs(5), invocation_handle) + .await + .expect("invocation resolves within 5s") + .expect("invocation task didn't panic"); + + let err = result.expect_err("oversize disconnect must fail the invocation"); + assert_eq!(err.code, "invocation_failed_payload_too_large"); + let _ = engine.worker_registry.list_workers(); +} + +#[tokio::test] +async fn test_engine_accepts_message_at_limit() { + // 1 MiB limit, 1 MiB - 1 KiB payload — must round-trip cleanly with + // no disconnect. We send an `invocation_result` for an invocation we + // first trigger via `engine.call`, and assert the call returns Ok. + let (port, engine) = spawn_engine_with_max_message_size(1024 * 1024).await; + let mut ws = connect_and_register(port, "echo").await; + + let engine_clone = engine.clone(); + let invocation_handle = tokio::spawn(async move { + use iii::engine::EngineTrait; + engine_clone.call("echo", json!({})).await + }); + + let invocation_id: Uuid = tokio::time::timeout(Duration::from_secs(2), async { + loop { + if let Some(Ok(WsMessage::Text(text))) = ws.next().await { + if let Ok(parsed) = serde_json::from_str::(&text) { + if parsed.get("type").and_then(|v| v.as_str()) == Some("invokefunction") { + let id = parsed + .get("invocation_id") + .and_then(|v| v.as_str()) + .expect("invoke_function should carry invocation_id"); + return Uuid::parse_str(id).expect("uuid"); + } + } + } + } + }) + .await + .expect("InvokeFunction within 2s"); + + // Build a payload that sits just under the limit. Reserve ~256 bytes + // for the JSON envelope. + let big_blob = "z".repeat(1024 * 1024 - 256); + let result_msg = json!({ + "type": "invocationresult", + "invocation_id": invocation_id.to_string(), + "function_id": "echo", + "result": { "blob": big_blob }, + }); + ws.send(WsMessage::Text(result_msg.to_string().into())) + .await + .expect("send result"); + + let result = tokio::time::timeout(Duration::from_secs(5), invocation_handle) + .await + .expect("invocation resolves within 5s") + .expect("invocation task didn't panic") + .expect("at-limit payload must succeed"); + + assert!( + result.is_some(), + "at-limit payload should produce a result body" + ); +} diff --git a/sdk/packages/node/iii/src/errors.ts b/sdk/packages/node/iii/src/errors.ts index 14a0fcdc5e..a0fce5fd4b 100644 --- a/sdk/packages/node/iii/src/errors.ts +++ b/sdk/packages/node/iii/src/errors.ts @@ -32,6 +32,42 @@ export class IIIInvocationError extends Error { } } +/** + * Producer-side guard: thrown synchronously from `trigger()` (or any other + * SDK send path) when the serialized invocation envelope would exceed + * `maxMessageSize`. The error never leaves the client — the WebSocket frame + * is not sent — so callers can distinguish "I asked for too much" from + * "the engine rejected my message" (`invocation_failed_payload_too_large`). + * + * The message format mirrors the Python SDK so cross-language tooling can + * grep for one canonical wording. + */ +export class IIIPayloadTooLarge extends Error { + public readonly payloadBytes: number + public readonly limitBytes: number + + constructor(payloadBytes: number, limitBytes: number) { + super( + `Payload ${payloadBytes} bytes exceeds invocation limit ${limitBytes} bytes. ` + + `For binary blobs use channels: https://iii.dev/docs/how-to/use-channels`, + ) + this.name = 'IIIPayloadTooLarge' + this.payloadBytes = payloadBytes + this.limitBytes = limitBytes + } +} + +/** + * Throws {@link IIIPayloadTooLarge} when a serialized message would exceed + * the configured limit. Centralised so every send path (`trigger`, `invoke`, + * future producers) uses the same check and error wording. + */ +export function assertWithinLimit(payloadBytes: number, limitBytes: number): void { + if (payloadBytes > limitBytes) { + throw new IIIPayloadTooLarge(payloadBytes, limitBytes) + } +} + /** * True when `value` looks like the wire `ErrorBody` the engine sends in * `InvocationResult.error`: `{ code: string, message: string, stacktrace?: string }`. diff --git a/sdk/packages/node/iii/src/iii.ts b/sdk/packages/node/iii/src/iii.ts index d878b11ea8..a1163883df 100644 --- a/sdk/packages/node/iii/src/iii.ts +++ b/sdk/packages/node/iii/src/iii.ts @@ -3,7 +3,7 @@ import { createRequire } from 'node:module' import * as os from 'node:os' import { type Data, WebSocket } from 'ws' import { ChannelReader, ChannelWriter } from './channels' -import { IIIInvocationError, isErrorBody } from './errors' +import { assertWithinLimit, IIIInvocationError, isErrorBody } from './errors' import { DEFAULT_BRIDGE_RECONNECTION_CONFIG, DEFAULT_INVOCATION_TIMEOUT_MS, @@ -60,6 +60,13 @@ import { isChannelRef } from './utils' const require = createRequire(import.meta.url) const { version: SDK_VERSION } = require('../package.json') +/** + * Default ceiling for a single WebSocket invocation message. Matches the + * engine default and the other SDKs (Python, Rust). Anything larger should + * ride a channel — see https://iii.dev/docs/how-to/use-channels. + */ +const DEFAULT_MAX_MESSAGE_SIZE = 16 * 1024 * 1024 + function getOsInfo(): string { return `${os.platform()} ${os.release()} (${os.arch()})` } @@ -109,6 +116,14 @@ export type InitOptions = { otel?: Omit /** Custom HTTP headers sent during the WebSocket handshake. */ headers?: Record + /** + * Maximum size, in bytes, of a single WebSocket message — both the + * envelope the SDK sends and what it accepts back. Defaults to 16 MiB, + * matching the engine default. The producer-side guard rejects oversize + * payloads with {@link IIIPayloadTooLarge} before the WS write so the + * caller gets a typed error instead of a generic disconnect. + */ + maxMessageSize?: number /** @internal */ telemetry?: TelemetryOptions } @@ -126,6 +141,7 @@ class Sdk implements ISdk { private reconnectTimeout?: NodeJS.Timeout private metricsReportingEnabled: boolean private invocationTimeoutMs: number + private maxMessageSize: number private reconnectionConfig: IIIReconnectionConfig private reconnectAttempt = 0 private connectionState: IIIConnectionState = 'disconnected' @@ -138,6 +154,7 @@ class Sdk implements ISdk { this.workerName = options?.workerName ?? getDefaultWorkerName() this.metricsReportingEnabled = options?.enableMetricsReporting ?? true this.invocationTimeoutMs = options?.invocationTimeoutMs ?? DEFAULT_INVOCATION_TIMEOUT_MS + this.maxMessageSize = options?.maxMessageSize ?? DEFAULT_MAX_MESSAGE_SIZE this.reconnectionConfig = { ...DEFAULT_BRIDGE_RECONNECTION_CONFIG, ...options?.reconnectionConfig, @@ -423,6 +440,12 @@ class Sdk implements ISdk { const { function_id, payload, action, timeoutMs } = request const effectiveTimeout = timeoutMs ?? this.invocationTimeoutMs + // Producer-side guard: refuse to put an oversize envelope on the WS. + // Without this the engine would just drop the connection (or `ws` would + // raise a generic "max payload exceeded"), which is harder to debug than + // a typed error pointing at channels. + assertWithinLimit(Buffer.byteLength(JSON.stringify(payload ?? null), 'utf8'), this.maxMessageSize) + // Void is fire-and-forget — no invocation_id, no response if (action?.type === 'void') { const traceparent = injectTraceparent() @@ -594,7 +617,10 @@ class Sdk implements ISdk { } this.setConnectionState('connecting') - this.ws = new WebSocket(this.address, { headers: this.options?.headers }) + this.ws = new WebSocket(this.address, { + headers: this.options?.headers, + maxPayload: this.maxMessageSize, + }) this.ws.on('open', this.onSocketOpen.bind(this)) this.ws.on('close', this.onSocketClose.bind(this)) this.ws.on('error', this.onSocketError.bind(this)) diff --git a/sdk/packages/node/iii/src/index.ts b/sdk/packages/node/iii/src/index.ts index b385a8fdf7..636fde3ff6 100644 --- a/sdk/packages/node/iii/src/index.ts +++ b/sdk/packages/node/iii/src/index.ts @@ -1,6 +1,6 @@ export { ChannelReader, ChannelWriter } from './channels' -export { IIIInvocationError, type IIIInvocationErrorInit } from './errors' +export { IIIInvocationError, type IIIInvocationErrorInit, IIIPayloadTooLarge } from './errors' export { type InitOptions, registerWorker, TriggerAction } from './iii' diff --git a/sdk/packages/node/iii/tests/payload-limits.test.ts b/sdk/packages/node/iii/tests/payload-limits.test.ts new file mode 100644 index 0000000000..a1ccdf4847 --- /dev/null +++ b/sdk/packages/node/iii/tests/payload-limits.test.ts @@ -0,0 +1,179 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +// Capture all `new WebSocket(...)` constructor calls so tests can assert on +// the options the SDK plumbs through. `vi.hoisted` lets the spy survive the +// hoisting of `vi.mock` and stay reachable from inside the factory. +const wsConstructorSpy = vi.hoisted(() => vi.fn()) + +vi.mock('ws', () => { + // Minimal WebSocket-like stand-in. We only need the API surface the SDK + // touches before the connection is "open": `on`, `removeAllListeners`, + // `terminate`, `send`, `close`, plus the static `OPEN` constant. + class FakeWebSocket { + public readyState = 0 + public listeners: Record void>> = {} + static OPEN = 1 + static CONNECTING = 0 + static CLOSING = 2 + static CLOSED = 3 + + constructor(address: string, options?: unknown) { + wsConstructorSpy(address, options) + } + + on(event: string, fn: (...args: unknown[]) => void): this { + this.listeners[event] ??= [] + this.listeners[event].push(fn) + return this + } + + removeAllListeners(): this { + this.listeners = {} + return this + } + + terminate(): void {} + close(): void {} + send(_data: string, cb?: (err?: Error) => void): void { + cb?.() + } + } + + return { WebSocket: FakeWebSocket, default: FakeWebSocket } +}) + +// Stub the OTel system so importing the SDK doesn't try to wire up a real +// exporter — `initOtel` is called from the Sdk constructor. +vi.mock('../src/telemetry-system', () => { + return { + extractContext: () => ({}), + getLogger: () => undefined, + getMeter: () => undefined, + getTracer: () => undefined, + initOtel: () => {}, + injectBaggage: () => undefined, + injectTraceparent: () => undefined, + SeverityNumber: { ERROR: 17 }, + shutdownOtel: async () => {}, + SpanKind: { SERVER: 1 }, + withSpan: async (_name: string, _opts: unknown, fn: () => unknown) => await fn(), + } +}) + +const DEFAULT_LIMIT = 16 * 1024 * 1024 +const ADDRESS = 'ws://localhost:65535' + +// Dynamic import so we pick up the SDK with the mocked `ws` module — the +// shared `setupFiles` already loaded the real SDK once, so we have to reset +// modules before re-importing. +async function loadSdk(): Promise { + vi.resetModules() + const iii = await import('../src/iii') + const errors = await import('../src/errors') + return { ...iii, ...errors } +} + +describe('Node SDK payload limits', () => { + beforeEach(() => { + wsConstructorSpy.mockClear() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('plumbs init option maxMessageSize to ws maxPayload', async () => { + const { registerWorker } = await loadSdk() + const limit = 8 * 1024 * 1024 + const sdk = registerWorker(ADDRESS, { maxMessageSize: limit }) + + expect(wsConstructorSpy).toHaveBeenCalledTimes(1) + const [address, options] = wsConstructorSpy.mock.calls[0] + expect(address).toBe(ADDRESS) + expect((options as { maxPayload?: number }).maxPayload).toBe(limit) + + await sdk.shutdown() + }) + + it('defaults maxMessageSize to 16 MiB when not supplied', async () => { + const { registerWorker } = await loadSdk() + const sdk = registerWorker(ADDRESS) + + expect(wsConstructorSpy).toHaveBeenCalledTimes(1) + const [, options] = wsConstructorSpy.mock.calls[0] + expect((options as { maxPayload?: number }).maxPayload).toBe(DEFAULT_LIMIT) + + await sdk.shutdown() + }) + + it('throws IIIPayloadTooLarge when trigger payload exceeds the limit', async () => { + const { registerWorker, IIIPayloadTooLarge } = await loadSdk() + const limit = 4 * 1024 + const sdk = registerWorker(ADDRESS, { maxMessageSize: limit }) + + // Build a payload that obviously exceeds the limit once JSON-encoded. + const oversize = 'A'.repeat(limit * 2) + + await expect( + sdk.trigger({ function_id: 'noop', payload: { blob: oversize } }), + ).rejects.toBeInstanceOf(IIIPayloadTooLarge) + + try { + await sdk.trigger({ function_id: 'noop', payload: { blob: oversize } }) + expect.fail('expected oversize trigger to throw') + } catch (err) { + expect(err).toBeInstanceOf(IIIPayloadTooLarge) + const message = (err as Error).message + expect(message).toContain(`limit ${limit}`) + expect(message).toContain('channels') + expect(message).toContain('https://iii.dev/docs/how-to/use-channels') + } + + await sdk.shutdown() + }) + + it('does not throw when the serialized payload is within the limit', async () => { + const { registerWorker, IIIPayloadTooLarge } = await loadSdk() + const limit = 1024 * 1024 + const sdk = registerWorker(ADDRESS, { + maxMessageSize: limit, + // Avoid waiting the full default timeout when the WS never opens. + invocationTimeoutMs: 50, + }) + + // Trigger should not throw IIIPayloadTooLarge synchronously. The promise + // will reject later with TIMEOUT (no real WS), which is fine — we only + // care that the producer guard let it through. + const promise = sdk.trigger({ function_id: 'noop', payload: { hello: 'world' } }) + + await expect(promise).rejects.not.toBeInstanceOf(IIIPayloadTooLarge) + + await sdk.shutdown() + }) +}) + +describe.skipIf(!process.env.III_URL)('Node SDK payload limits — integration', () => { + it('engine returns invocation_failed_payload_too_large for oversize trigger', async () => { + // Use the real SDK from the shared setup (skipping our mock) by importing + // through `vi.importActual` — the engine on `III_URL` will close the WS + // when it sees the oversize message and we want a real round-trip. + const { registerWorker } = await vi.importActual('../src/index') + const sdk = registerWorker(process.env.III_URL as string, { + maxMessageSize: 32 * 1024 * 1024, + }) + + // 20 MiB raw blob — should hit the engine ceiling and come back with the + // specific error code (Phase 1 contract). + const payload = { blob: 'A'.repeat(20 * 1024 * 1024) } + + try { + await sdk.trigger({ function_id: 'noop', payload }) + expect.fail('expected oversize trigger to reject') + } catch (err) { + const code = (err as { code?: string }).code + expect(code).toBe('invocation_failed_payload_too_large') + } finally { + await sdk.shutdown() + } + }) +}) diff --git a/sdk/packages/python/iii/pyproject.toml b/sdk/packages/python/iii/pyproject.toml index 72b34fbb81..0170117825 100644 --- a/sdk/packages/python/iii/pyproject.toml +++ b/sdk/packages/python/iii/pyproject.toml @@ -59,3 +59,6 @@ strict = true [tool.pytest.ini_options] addopts = "--cov=src/iii --cov-branch --cov-report=term-missing" testpaths = ["tests"] +markers = [ + "integration: marks tests that require a running engine (deselect with -m 'not integration')", +] diff --git a/sdk/packages/python/iii/src/iii/__init__.py b/sdk/packages/python/iii/src/iii/__init__.py index d5894ba699..7bfe59f7af 100644 --- a/sdk/packages/python/iii/src/iii/__init__.py +++ b/sdk/packages/python/iii/src/iii/__init__.py @@ -1,7 +1,12 @@ """III SDK for Python.""" from .channels import ChannelReader, ChannelWriter -from .errors import IIIForbiddenError, IIIInvocationError, IIITimeoutError +from .errors import ( + IIIForbiddenError, + IIIInvocationError, + IIIPayloadTooLarge, + IIITimeoutError, +) from .format_utils import extract_request_format, extract_response_format, python_type_to_format from .iii import TriggerAction, register_worker from .iii_constants import FunctionRef, InitOptions, ReconnectionConfig, TelemetryOptions @@ -65,6 +70,7 @@ # Errors "IIIForbiddenError", "IIIInvocationError", + "IIIPayloadTooLarge", "IIITimeoutError", # Core "FunctionRef", diff --git a/sdk/packages/python/iii/src/iii/errors.py b/sdk/packages/python/iii/src/iii/errors.py index 529095adc0..b5753e4667 100644 --- a/sdk/packages/python/iii/src/iii/errors.py +++ b/sdk/packages/python/iii/src/iii/errors.py @@ -44,6 +44,27 @@ class IIITimeoutError(IIIInvocationError): """Raised when an invocation exceeds its timeout. ``code == 'TIMEOUT'``.""" +class IIIPayloadTooLarge(ValueError): + """Raised client-side when an invocation payload exceeds the configured limit. + + The SDK rejects oversize payloads before they reach the WebSocket so a + single oversize message cannot tear the connection and stop unrelated + in-flight invocations on the same worker. Subclasses ``ValueError`` so + callers that already guard argument size catch it without changes. + + For binary blobs use channels (see https://iii.dev/docs/how-to/use-channels). + """ + + def __init__(self, payload_bytes: int, limit_bytes: int) -> None: + super().__init__( + f"Payload {payload_bytes} bytes exceeds invocation limit " + f"{limit_bytes} bytes. For binary blobs use channels: " + "https://iii.dev/docs/how-to/use-channels" + ) + self.payload_bytes = payload_bytes + self.limit_bytes = limit_bytes + + def _wrap_wire_error( error: Any, *, diff --git a/sdk/packages/python/iii/src/iii/iii.py b/sdk/packages/python/iii/src/iii/iii.py index 12fcc81a5a..db88a39bce 100644 --- a/sdk/packages/python/iii/src/iii/iii.py +++ b/sdk/packages/python/iii/src/iii/iii.py @@ -17,7 +17,12 @@ from websockets.asyncio.client import ClientConnection from .channels import ChannelReader, ChannelWriter -from .errors import IIIInvocationError, IIITimeoutError, _wrap_wire_error +from .errors import ( + IIIInvocationError, + IIIPayloadTooLarge, + IIITimeoutError, + _wrap_wire_error, +) from .format_utils import extract_request_format, extract_response_format from .iii_constants import ( DEFAULT_RECONNECTION_CONFIG, @@ -261,6 +266,7 @@ async def _do_connect(self) -> None: self._ws = await websockets.connect( self._address, additional_headers=self._options.headers, + max_size=self._options.max_message_size, ) log.info(f"Connected to {self._address}") await self._on_connected() @@ -351,6 +357,19 @@ def _to_dict(self, msg: Any) -> dict[str, Any]: return data return {"data": msg} + def _assert_within_limit(self, msg: Any) -> None: + """Reject oversize invocation envelopes before they reach the WS. + + Raises IIIPayloadTooLarge if the serialized message exceeds + ``InitOptions.max_message_size``. Pre-flight rejection prevents one + oversize message from tearing the WS connection and halting every + in-flight invocation on the worker. + """ + limit = self._options.max_message_size + encoded = json.dumps(self._to_dict(msg)).encode("utf-8") + if len(encoded) > limit: + raise IIIPayloadTooLarge(payload_bytes=len(encoded), limit_bytes=limit) + async def _send(self, msg: Any) -> None: data = self._to_dict(msg) if self._ws and self._ws.state.name == "OPEN": @@ -1042,39 +1061,40 @@ async def trigger_async(self, request: "dict[str, Any] | TriggerRequest") -> Any # Void: fire-and-forget, no response expected if isinstance(action, TriggerActionVoid): - await self._send( - InvokeFunctionMessage( - function_id=function_id, - data=payload, - traceparent=self._inject_traceparent(), - baggage=self._inject_baggage(), - action=action, - ) + msg = InvokeFunctionMessage( + function_id=function_id, + data=payload, + traceparent=self._inject_traceparent(), + baggage=self._inject_baggage(), + action=action, ) + self._assert_within_limit(msg) + await self._send(msg) return None # Enqueue and default: send invocation_id, await response invocation_id = str(uuid.uuid4()) future: asyncio.Future[Any] = self._loop.create_future() - self._pending[invocation_id] = _PendingInvocation( - future=future, function_id=function_id - ) - enqueue_action: TriggerActionEnqueue | None = ( action if isinstance(action, TriggerActionEnqueue) else None ) - await self._send( - InvokeFunctionMessage( - function_id=function_id, - data=payload, - invocation_id=invocation_id, - traceparent=self._inject_traceparent(), - baggage=self._inject_baggage(), - action=enqueue_action, - ) + msg = InvokeFunctionMessage( + function_id=function_id, + data=payload, + invocation_id=invocation_id, + traceparent=self._inject_traceparent(), + baggage=self._inject_baggage(), + action=enqueue_action, ) + self._assert_within_limit(msg) + + self._pending[invocation_id] = _PendingInvocation( + future=future, function_id=function_id + ) + + await self._send(msg) try: return await asyncio.wait_for(future, timeout=timeout_secs) diff --git a/sdk/packages/python/iii/src/iii/iii_constants.py b/sdk/packages/python/iii/src/iii/iii_constants.py index 439a7922b9..fc3f986b53 100644 --- a/sdk/packages/python/iii/src/iii/iii_constants.py +++ b/sdk/packages/python/iii/src/iii/iii_constants.py @@ -11,6 +11,7 @@ DEFAULT_INVOCATION_TIMEOUT_MS = 30000 MAX_QUEUE_SIZE = 1000 +DEFAULT_MAX_MESSAGE_SIZE = 16 * 1024 * 1024 @dataclass @@ -72,6 +73,11 @@ class InitOptions: otel: OpenTelemetry configuration. Enabled by default. Set ``{'enabled': False}`` or env ``OTEL_ENABLED=false`` to disable. telemetry: Internal telemetry metadata. + max_message_size: Maximum size in bytes of a single WebSocket message + sent to or received from the engine. Default ``16 MiB``. Direct + ``trigger()`` payloads ride a single message; for larger or + streamable payloads use channels. The engine enforces its own + ceiling — keep this in sync with the engine config. """ worker_name: str | None = None @@ -81,3 +87,4 @@ class InitOptions: otel: OtelConfig | dict[str, Any] | None = None headers: dict[str, str] | None = None telemetry: TelemetryOptions | None = None + max_message_size: int = DEFAULT_MAX_MESSAGE_SIZE diff --git a/sdk/packages/python/iii/tests/test_payload_limits.py b/sdk/packages/python/iii/tests/test_payload_limits.py new file mode 100644 index 0000000000..9772dead99 --- /dev/null +++ b/sdk/packages/python/iii/tests/test_payload_limits.py @@ -0,0 +1,224 @@ +"""Tests for the WebSocket invocation payload-size limit (Phase 3).""" + +from __future__ import annotations + +import asyncio +import os +from typing import Any + +import pytest + +from iii import InitOptions, TriggerAction +from iii.errors import IIIPayloadTooLarge +from iii.iii import III +from iii.iii_constants import DEFAULT_MAX_MESSAGE_SIZE + + +class _FakeWebSocket: + """Minimal stand-in for `websockets.asyncio.client.ClientConnection`. + + Captures sent frames and exposes the `state.name == "OPEN"` shape the SDK + checks before sending. + """ + + def __init__(self) -> None: + self.sent: list[str] = [] + + class _State: + name = "OPEN" + + self.state = _State() + + async def send(self, payload: str) -> None: + self.sent.append(payload) + + async def close(self) -> None: + self.state.name = "CLOSED" + + def __aiter__(self) -> "_FakeWebSocket": + return self + + async def __anext__(self) -> str: + await asyncio.sleep(3600) + raise StopAsyncIteration + + +@pytest.fixture +def captured_connect(monkeypatch: pytest.MonkeyPatch) -> dict[str, Any]: + """Patch `websockets.connect` so tests can observe its kwargs without I/O.""" + captured: dict[str, Any] = {} + + async def fake_connect(*args: Any, **kwargs: Any) -> _FakeWebSocket: + captured["args"] = args + captured["kwargs"] = kwargs + return _FakeWebSocket() + + import iii.iii as iii_mod + + monkeypatch.setattr(iii_mod.websockets, "connect", fake_connect) + return captured + + +def _make_client(options: InitOptions | None = None) -> III: + client = III("ws://localhost:1", options=options) + client._wait_until_connected() + return client + + +def test_default_max_message_size_constant_is_16_mib() -> None: + assert DEFAULT_MAX_MESSAGE_SIZE == 16 * 1024 * 1024 + + +def test_init_options_max_message_size_default_is_16_mib() -> None: + assert InitOptions().max_message_size == 16 * 1024 * 1024 + + +def test_init_options_max_message_size_override() -> None: + opts = InitOptions(max_message_size=8 * 1024 * 1024) + assert opts.max_message_size == 8 * 1024 * 1024 + + +def test_init_options_max_message_size_is_plumbed_to_websockets( + captured_connect: dict[str, Any], +) -> None: + client = _make_client(InitOptions(max_message_size=8 * 1024 * 1024)) + try: + assert captured_connect["kwargs"].get("max_size") == 8 * 1024 * 1024 + finally: + client.shutdown() + + +def test_init_options_default_max_message_size_is_plumbed_to_websockets( + captured_connect: dict[str, Any], +) -> None: + client = _make_client() + try: + assert captured_connect["kwargs"].get("max_size") == 16 * 1024 * 1024 + finally: + client.shutdown() + + +def test_trigger_with_oversize_payload_raises_specific_error( + captured_connect: dict[str, Any], +) -> None: + client = _make_client(InitOptions(max_message_size=1 * 1024 * 1024)) + try: + with pytest.raises(IIIPayloadTooLarge) as excinfo: + client.trigger( + { + "function_id": "noop", + "payload": {"data_b64": "A" * (4 * 1024 * 1024)}, + } + ) + msg = str(excinfo.value) + assert "channels" in msg + assert "https://iii.dev/docs/how-to/use-channels" in msg + assert "exceeds invocation limit" in msg + finally: + client.shutdown() + + +def test_trigger_void_with_oversize_payload_raises_specific_error( + captured_connect: dict[str, Any], +) -> None: + client = _make_client(InitOptions(max_message_size=1 * 1024 * 1024)) + try: + with pytest.raises(IIIPayloadTooLarge): + client.trigger( + { + "function_id": "noop", + "payload": {"data_b64": "A" * (4 * 1024 * 1024)}, + "action": TriggerAction.Void(), + } + ) + finally: + client.shutdown() + + +def test_trigger_below_limit_does_not_raise( + captured_connect: dict[str, Any], +) -> None: + """A payload comfortably below the limit must reach the WS send path.""" + client = _make_client(InitOptions(max_message_size=1 * 1024 * 1024)) + try: + # Replace _send so we can confirm it was reached without awaiting + # an engine response (default trigger waits for invocation_result). + async def fake_send(msg: Any) -> None: + return None + + client._send = fake_send # type: ignore[assignment] + + async def call() -> None: + await asyncio.wait_for( + client.trigger_async( + { + "function_id": "noop", + "payload": {"data_b64": "A" * 1024}, + "action": TriggerAction.Void(), + } + ), + timeout=2, + ) + + asyncio.run_coroutine_threadsafe(call(), client._loop).result(timeout=5) + finally: + client.shutdown() + + +def test_payload_too_large_error_message_format( + captured_connect: dict[str, Any], +) -> None: + """Cross-SDK consistent format: exact wording plus payload bytes & limit.""" + limit = 1024 + client = _make_client(InitOptions(max_message_size=limit)) + try: + oversize_payload = {"data": "x" * (limit * 4)} + with pytest.raises(IIIPayloadTooLarge) as excinfo: + client.trigger({"function_id": "noop", "payload": oversize_payload}) + msg = str(excinfo.value) + assert f"limit {limit} bytes" in msg + assert "Payload " in msg + assert " bytes exceeds invocation limit " in msg + assert "For binary blobs use channels" in msg + finally: + client.shutdown() + + +def test_payload_too_large_subclasses_value_error() -> None: + """The new exception should be catchable as a generic argument-size error.""" + err = IIIPayloadTooLarge(payload_bytes=2048, limit_bytes=1024) + assert isinstance(err, ValueError) + assert err.payload_bytes == 2048 + assert err.limit_bytes == 1024 + + +@pytest.mark.integration +@pytest.mark.skipif( + "III_URL" not in os.environ, + reason="requires a live III engine; set III_URL to run", +) +def test_oversize_invocation_returns_payload_too_large_code() -> None: + """End-to-end: engine rejects oversize WS message with the new error code. + + Engine default ceiling is 16 MiB. The SDK client is constructed with a + larger ``max_message_size`` so the producer-side guard does not preempt + the engine — we want to confirm the engine's `invocation_failed_payload_too_large` + code surfaces back to the caller via the wire `ErrorBody`. + """ + from iii.errors import IIIInvocationError + + engine_ws_url = os.environ["III_URL"] + # 64 MiB SDK ceiling so the producer guard lets a 24 MiB payload through; + # 24 MiB raw exceeds the engine's 16 MiB recv limit. + client = III( + engine_ws_url, + options=InitOptions(max_message_size=64 * 1024 * 1024), + ) + client._wait_until_connected() + try: + payload = {"data_b64": "A" * (24 * 1024 * 1024)} + with pytest.raises(IIIInvocationError) as excinfo: + client.trigger({"function_id": "noop", "payload": payload}) + assert excinfo.value.code == "invocation_failed_payload_too_large" + finally: + client.shutdown() diff --git a/sdk/packages/python/iii/uv.lock b/sdk/packages/python/iii/uv.lock index 6166e9a8a3..a20aa05f99 100644 --- a/sdk/packages/python/iii/uv.lock +++ b/sdk/packages/python/iii/uv.lock @@ -500,7 +500,7 @@ wheels = [ [[package]] name = "iii-sdk" -version = "0.11.4.dev2" +version = "0.11.5" source = { editable = "." } dependencies = [ { name = "opentelemetry-api" }, diff --git a/sdk/packages/rust/iii/src/channels.rs b/sdk/packages/rust/iii/src/channels.rs index 1262cfc289..76ae5c542a 100644 --- a/sdk/packages/rust/iii/src/channels.rs +++ b/sdk/packages/rust/iii/src/channels.rs @@ -4,9 +4,10 @@ use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::Mutex; -use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage}; +use tokio_tungstenite::{connect_async_with_config, tungstenite::Message as WsMessage}; use crate::error::IIIError; +use crate::iii::build_ws_config; #[derive(Debug, Clone, Serialize, Deserialize, Default)] #[serde(rename_all = "lowercase")] @@ -90,7 +91,12 @@ impl ChannelWriter { if guard.is_some() { return Ok(()); } - let (stream, _) = connect_async(&self.url).await?; + let (stream, _) = connect_async_with_config( + &self.url, + Some(build_ws_config(crate::DEFAULT_MAX_MESSAGE_SIZE)), + false, + ) + .await?; let (writer, _reader) = stream.split(); *guard = Some(writer); Ok(()) @@ -159,7 +165,12 @@ impl ChannelReader { if guard.is_some() { return Ok(()); } - let (stream, _) = connect_async(&self.url).await?; + let (stream, _) = connect_async_with_config( + &self.url, + Some(build_ws_config(crate::DEFAULT_MAX_MESSAGE_SIZE)), + false, + ) + .await?; let (_writer, reader) = stream.split(); *guard = Some(reader); Ok(()) diff --git a/sdk/packages/rust/iii/src/error.rs b/sdk/packages/rust/iii/src/error.rs index f45f593120..0901a59b8b 100644 --- a/sdk/packages/rust/iii/src/error.rs +++ b/sdk/packages/rust/iii/src/error.rs @@ -23,6 +23,10 @@ pub enum IIIError { Serde(String), #[error("websocket error: {0}")] WebSocket(String), + #[error( + "Payload {actual} bytes exceeds invocation limit {limit} bytes. For binary blobs use channels: https://iii.dev/docs/how-to/use-channels" + )] + PayloadTooLarge { actual: usize, limit: usize }, } impl From for IIIError { diff --git a/sdk/packages/rust/iii/src/iii.rs b/sdk/packages/rust/iii/src/iii.rs index bc579d409f..be0b1dbfcc 100644 --- a/sdk/packages/rust/iii/src/iii.rs +++ b/sdk/packages/rust/iii/src/iii.rs @@ -2,7 +2,7 @@ use std::{ collections::{HashMap, HashSet}, sync::{ Arc, Mutex, MutexGuard, - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, }, time::Duration, }; @@ -26,7 +26,10 @@ use tokio::{ sync::{mpsc, oneshot}, time::sleep, }; -use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage}; +use tokio_tungstenite::{ + connect_async_with_config, + tungstenite::{Message as WsMessage, protocol::WebSocketConfig}, +}; use uuid::Uuid; const SDK_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -48,6 +51,16 @@ use crate::telemetry::types::OtelConfig; const DEFAULT_TIMEOUT_MS: u64 = 30_000; +/// Build a tungstenite [`WebSocketConfig`] with both the message and frame +/// ceilings set to `max_message_size`. Used at every `connect_async_with_config` +/// call site so the producer-side guard, the engine, and the underlying +/// tungstenite limits all agree on one number. +pub(crate) fn build_ws_config(max_message_size: usize) -> WebSocketConfig { + WebSocketConfig::default() + .max_message_size(Some(max_message_size)) + .max_frame_size(Some(max_message_size)) +} + /// Worker information returned by `engine::workers::list` #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerInfo { @@ -665,6 +678,7 @@ struct IIIInner { connection_thread: Mutex>>, headers: Mutex>>, otel_config: Mutex>, + max_message_size: AtomicUsize, } /// WebSocket client for communication with the III Engine. @@ -700,6 +714,7 @@ impl III { connection_thread: Mutex::new(None), headers: Mutex::new(None), otel_config: Mutex::new(None), + max_message_size: AtomicUsize::new(crate::DEFAULT_MAX_MESSAGE_SIZE), }; Self { inner: Arc::new(inner), @@ -726,6 +741,21 @@ impl III { *self.inner.otel_config.lock_or_recover() = Some(config); } + /// Configure the maximum size in bytes of a single WebSocket invocation + /// message. Producer-side `trigger()` calls raise + /// [`IIIError::PayloadTooLarge`] when an encoded envelope exceeds this + /// value, and the underlying tungstenite connection is built with the + /// same ceiling so oversize *incoming* messages also fail loudly instead + /// of tearing the connection. + pub fn set_max_message_size(&self, size: usize) { + self.inner.max_message_size.store(size, Ordering::SeqCst); + } + + /// Current invocation-message ceiling (bytes). + pub fn max_message_size(&self) -> usize { + self.inner.max_message_size.load(Ordering::SeqCst) + } + pub(crate) fn connect(&self) { if self.inner.started.swap(true, Ordering::SeqCst) { return; @@ -1082,14 +1112,16 @@ impl III { // Void is fire-and-forget — no invocation_id, no response if matches!(req.action, Some(TriggerAction::Void)) { - self.send_message(Message::InvokeFunction { + let message = Message::InvokeFunction { invocation_id: None, function_id: req.function_id, data: req.payload, traceparent: tp, baggage: bg, action: req.action, - })?; + }; + self.assert_within_limit(&message)?; + self.send_message(message)?; return Ok(Value::Null); } @@ -1098,19 +1130,22 @@ impl III { let invocation_id = Uuid::new_v4(); let (tx, rx) = oneshot::channel(); - self.inner - .pending - .lock_or_recover() - .insert(invocation_id, tx); - - self.send_message(Message::InvokeFunction { + let message = Message::InvokeFunction { invocation_id: Some(invocation_id), function_id: req.function_id, data: req.payload, traceparent: tp, baggage: bg, action: req.action, - })?; + }; + self.assert_within_limit(&message)?; + + self.inner + .pending + .lock_or_recover() + .insert(invocation_id, tx); + + self.send_message(message)?; match tokio::time::timeout(timeout, rx).await { Ok(Ok(result)) => result, @@ -1222,6 +1257,7 @@ impl III { let custom_headers = self.inner.headers.lock_or_recover().clone(); + let ws_config = Some(build_ws_config(self.max_message_size())); let connect_result = if let Some(ref h) = custom_headers { use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::http; @@ -1239,9 +1275,9 @@ impl III { request.headers_mut().insert(name, val); } } - connect_async(request).await + connect_async_with_config(request, ws_config, false).await } else { - connect_async(&self.inner.address).await + connect_async_with_config(self.inner.address.as_str(), ws_config, false).await }; match connect_result { @@ -1384,6 +1420,21 @@ impl III { Ok(()) } + /// Producer-side guard: refuse to send an invocation message whose + /// JSON-encoded form exceeds [`Self::max_message_size`]. Mirrors the + /// Python and Node SDK guards so a payload that would tear the WS + /// connection or trip the engine's `invocation_failed_payload_too_large` + /// gets a fast, deterministic error before the round-trip. + fn assert_within_limit(&self, message: &Message) -> Result<(), IIIError> { + let limit = self.max_message_size(); + let encoded = serde_json::to_vec(message)?; + let actual = encoded.len(); + if actual > limit { + return Err(IIIError::PayloadTooLarge { actual, limit }); + } + Ok(()) + } + async fn send_ws(&self, ws_tx: &mut WsTx, message: &Message) -> Result<(), IIIError> { let payload = serde_json::to_string(message)?; ws_tx.send(WsMessage::Text(payload.into())).await?; diff --git a/sdk/packages/rust/iii/src/lib.rs b/sdk/packages/rust/iii/src/lib.rs index 5fe2cd5675..6e597a1f2a 100644 --- a/sdk/packages/rust/iii/src/lib.rs +++ b/sdk/packages/rust/iii/src/lib.rs @@ -54,6 +54,11 @@ pub use serde_json::Value; /// /// let iii = register_worker("ws://localhost:49134", InitOptions::default()); /// ``` +/// Default invocation message ceiling — 16 MiB. Matches the engine and the +/// other SDKs (Python, Node) so a payload that succeeds in one language +/// succeeds in all of them. +pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; + #[derive(Debug, Clone, Default)] pub struct InitOptions { /// Custom worker metadata. Auto-detected if `None`. @@ -62,6 +67,19 @@ pub struct InitOptions { pub headers: Option>, /// OpenTelemetry configuration. pub otel: Option, + /// Maximum size in bytes of a single WebSocket invocation message. + /// `None` resolves to [`DEFAULT_MAX_MESSAGE_SIZE`] (16 MiB). + /// Producer-side `trigger()` calls raise [`IIIError::PayloadTooLarge`] + /// before sending if the encoded envelope exceeds this value. + pub max_message_size: Option, +} + +impl InitOptions { + /// Resolve [`Self::max_message_size`] to a concrete byte count, falling + /// back to [`DEFAULT_MAX_MESSAGE_SIZE`] when unset. + pub fn resolved_max_message_size(&self) -> usize { + self.max_message_size.unwrap_or(DEFAULT_MAX_MESSAGE_SIZE) + } } /// Create and return a connected SDK instance. The WebSocket connection is @@ -86,10 +104,12 @@ pub struct InitOptions { /// iii.shutdown(); // cleanly stops the connection thread /// ``` pub fn register_worker(address: &str, options: InitOptions) -> III { + let max_message_size = options.resolved_max_message_size(); let InitOptions { metadata, headers, otel, + max_message_size: _, } = options; let iii = if let Some(metadata) = metadata { @@ -98,6 +118,8 @@ pub fn register_worker(address: &str, options: InitOptions) -> III { III::new(address) }; + iii.set_max_message_size(max_message_size); + if let Some(h) = headers { iii.set_headers(h); } diff --git a/sdk/packages/rust/iii/tests/payload_limits.rs b/sdk/packages/rust/iii/tests/payload_limits.rs new file mode 100644 index 0000000000..04e1449aff --- /dev/null +++ b/sdk/packages/rust/iii/tests/payload_limits.rs @@ -0,0 +1,148 @@ +//! Tests for the payload size limit / `max_message_size` option (Phase 5). +//! +//! TDD red → green: +//! 1. Default on `InitOptions` resolves to 16 MiB. +//! 2. `InitOptions` builder accepts an override. +//! 3. `trigger()` raises `IIIError::PayloadTooLarge` before the WS round-trip +//! when the encoded envelope exceeds `max_message_size`. +//! 4. Integration test (gated on engine URL env, `#[ignore]` until Phase 1 +//! lands the `invocation_failed_payload_too_large` error code). + +use iii_sdk::{IIIError, InitOptions, register_worker}; +use serde_json::json; + +/// 16 MiB — the cross-SDK default ceiling. +const DEFAULT_MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; + +#[test] +fn max_message_size_default_is_16_mib() { + let opts = InitOptions::default(); + assert_eq!(opts.max_message_size, None); + assert_eq!(opts.resolved_max_message_size(), DEFAULT_MAX_MESSAGE_SIZE); +} + +#[test] +fn init_options_accepts_max_message_size_override() { + let opts = InitOptions { + max_message_size: Some(8 * 1024 * 1024), + ..InitOptions::default() + }; + assert_eq!(opts.resolved_max_message_size(), 8 * 1024 * 1024); +} + +#[tokio::test] +async fn trigger_with_oversize_payload_returns_payload_too_large_error() { + // Configure a tiny ceiling so we don't have to allocate megabytes. + let limit: usize = 1024; + let iii = register_worker( + "ws://127.0.0.1:1", // never actually connected — guard fires first + InitOptions { + max_message_size: Some(limit), + ..InitOptions::default() + }, + ); + + // Build a payload whose JSON encoding will clearly exceed the limit. + let blob = "A".repeat(limit * 4); + let request = iii_sdk::TriggerRequest { + function_id: "noop".to_string(), + payload: json!({ "data": blob }), + action: None, + timeout_ms: Some(50), + }; + + let result = iii.trigger(request).await; + iii.shutdown(); + + match result { + Err(IIIError::PayloadTooLarge { actual, limit: l }) => { + assert_eq!(l, limit, "limit should match the configured value"); + assert!( + actual > limit, + "actual ({actual}) should exceed limit ({l})" + ); + } + other => panic!("expected IIIError::PayloadTooLarge, got: {other:?}"), + } +} + +#[test] +fn payload_too_large_display_matches_cross_sdk_format() { + // The cross-SDK message format must be stable and mention "channels" + // plus a docs URL anchor (see plan §3.2). + let err = IIIError::PayloadTooLarge { + actual: 20_000_000, + limit: 16 * 1024 * 1024, + }; + let s = err.to_string(); + assert!( + s.contains("20000000"), + "display should contain actual size: {s}" + ); + assert!( + s.contains(&format!("{}", 16 * 1024 * 1024)), + "display should contain limit: {s}" + ); + assert!( + s.to_lowercase().contains("channels"), + "display should reference channels: {s}" + ); + assert!( + s.contains("https://"), + "display should include a docs URL: {s}" + ); +} + +// Integration test: requires a live engine that emits the +// `invocation_failed_payload_too_large` error code (Phase 1 of the +// ws-payload-size-limit work). Skips silently when `III_URL` is unset so +// `cargo test` stays green in unit-only environments. +// +// Run with: +// III_URL=ws://127.0.0.1:49134 cargo test --test payload_limits integration -- --include-ignored +#[tokio::test] +#[ignore = "requires a running engine reachable at III_URL"] +async fn integration_oversize_invocation_returns_payload_too_large_code() { + let url = match std::env::var("III_URL") { + Ok(u) => u, + Err(_) => { + eprintln!("III_URL not set; skipping"); + return; + } + }; + + // Use an engine-side enforced cap. The producer guard would otherwise + // intercept any payload sized to test the engine. We disable the guard + // by setting `max_message_size` higher than the engine limit and rely + // on the engine to reject the oversize message. + let iii = register_worker( + &url, + InitOptions { + max_message_size: Some(64 * 1024 * 1024), + ..InitOptions::default() + }, + ); + + // Wait briefly for connect. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let blob = "A".repeat(20 * 1024 * 1024); + let request = iii_sdk::TriggerRequest { + function_id: "noop".to_string(), + payload: json!({ "data": blob }), + action: None, + timeout_ms: Some(5_000), + }; + + let result = iii.trigger(request).await; + iii.shutdown(); + + match result { + Err(IIIError::Remote { code, .. }) => { + assert_eq!(code, "invocation_failed_payload_too_large"); + } + other => { + panic!("expected IIIError::Remote(invocation_failed_payload_too_large), got: {other:?}") + } + } +}