From 866887fa9686c564446c7a2c25868381404e7255 Mon Sep 17 00:00:00 2001 From: kenny Date: Thu, 28 May 2026 15:59:53 -0600 Subject: [PATCH 1/3] feat: add openrouter harness support --- docs/pages/architecture.mdx | 3 ++ docs/pages/deploying-in-production.mdx | 12 +++++- docs/pages/index.mdx | 2 +- docs/pages/quickstart.mdx | 9 +++-- docs/pages/secrets/onepassword.mdx | 1 + docs/public/md/architecture.md | 3 ++ docs/public/md/deploying-in-production.md | 12 +++++- docs/public/md/index.md | 2 +- docs/public/md/quickstart.md | 14 ++++--- docs/public/md/secrets/onepassword.md | 1 + harness/codex/config.toml | 7 ++++ .../api-client/test/harness-events.test.ts | 40 +++++++++++++++++++ packages/harness-events/src/normalize.ts | 17 ++++++-- services/api/api/agent.py | 2 +- services/api/api/routers/agent.py | 1 + services/api/api/runtime_control.py | 7 ++++ services/api/api/sandbox/config.py | 3 +- services/api/api/sandbox/harness_protocol.py | 10 +++-- services/api/api/sandbox/kubernetes.py | 6 +++ services/api/api/sandbox/normalize.py | 7 ++-- services/api/api/tool_manager.py | 9 +++++ services/api/api/warm_pool.py | 4 +- services/api/api/workflow_engine.py | 2 +- .../api/api/workflows/slack_thread_turn.py | 2 +- .../api/tests/test_agent_control_plane.py | 15 +++++++ services/api/tests/test_codex_app_wrapper.py | 14 +++++++ services/api/tests/test_harness_protocol.py | 15 +++++++ services/api/tests/test_integration.py | 7 ++++ services/api/tests/test_normalize.py | 12 ++++++ services/api/tests/test_sandbox_entrypoint.py | 36 +++++++++++++++++ .../tests/test_sandbox_kubernetes_backend.py | 1 + services/api/tests/test_tool_manager.py | 9 +++++ services/api/tests/test_workflows.py | 2 + services/sandbox/codex-app-wrapper.py | 14 +++++++ services/sandbox/entrypoint.sh | 10 ++--- 35 files changed, 277 insertions(+), 34 deletions(-) create mode 100644 packages/api-client/test/harness-events.test.ts diff --git a/docs/pages/architecture.mdx b/docs/pages/architecture.mdx index dafa20f4d..f33ea95f3 100644 --- a/docs/pages/architecture.mdx +++ b/docs/pages/architecture.mdx @@ -74,6 +74,9 @@ mounting, and network policies all assume Kubernetes sandboxes. | Claude Code | Passes the Anthropic-shaped content through directly. | | Codex / pi-mono | Extracts text blocks for CLIs that accept a plain prompt. | +The OpenRouter selector uses the Codex adapter with an OpenRouter provider +configuration, so clients still see the same durable execution protocol. + The pod receives the prompt files, CLI command, internal API URL, proxy CA, and proxy settings. It does not need Kubernetes credentials or long-lived third-party API keys. diff --git a/docs/pages/deploying-in-production.mdx b/docs/pages/deploying-in-production.mdx index 38374abbb..87a2056cd 100644 --- a/docs/pages/deploying-in-production.mdx +++ b/docs/pages/deploying-in-production.mdx @@ -80,6 +80,7 @@ Store one secret per enabled harness credential: | Harness | API value | Slack selector | Credential to store | Upstream | |---------|-----------|----------------|---------------------|----------| | Codex default | `codex` | none or `--codex` | `OPENAI_API_KEY` | `api.openai.com` | +| OpenRouter via Codex | `openrouter` | `--openrouter` | `OPENROUTER_API_KEY` | `openrouter.ai` | | Amp | `amp` | `--amp` | `AMP_API_KEY` | `ampcode.com` | | Claude Code | `claude-code` | `--claude` | `ANTHROPIC_API_KEY` | `api.anthropic.com` | | pi-mono | `pi-mono` | `--pi` | `ANTHROPIC_API_KEY` | `api.anthropic.com` | @@ -91,7 +92,13 @@ headers the secret is bound to. When `ironProxy.secretSource` is `onepassword`, [iron-proxy](https://docs.iron.sh) resolves these values from `op://$OP_VAULT//credential`. For example, store the default -Codex credential in a 1Password item named `OPENAI_API_KEY`. +Codex credential in a 1Password item named `OPENAI_API_KEY`, and store the +OpenRouter credential in a 1Password item named `OPENROUTER_API_KEY` if you +enable the `--openrouter` selector. + +The OpenRouter selector runs the Codex harness against the configured +OpenRouter provider. It defaults to `openrouter/auto`; set `OPENROUTER_MODEL` +on the API deployment if you want a fixed OpenRouter model slug. Whatever source you pick, the vault is shared across the whole deployment, so any thread can use any configured credential. Per-user and per-channel @@ -323,7 +330,8 @@ reply with exactly PONG ``` Slack messages without a harness flag use Codex. Use `--amp`, `--claude`, -`--codex`, or `--pi` only when you want to select a specific harness. +`--codex`, `--openrouter`, or `--pi` only when you want to select a specific +harness. Inspect sandbox pods with the labels Centaur actually sets: diff --git a/docs/pages/index.mdx b/docs/pages/index.mdx index 205979b4f..a1b0827fb 100644 --- a/docs/pages/index.mdx +++ b/docs/pages/index.mdx @@ -73,7 +73,7 @@ Onboard me to Centaur locally. Use https://centaur.run/llms-full.txt and follow
  • Harness agnostic. - Use Amp, Codex, Claude Code, pi-mono, or your own CLI harness with the same durable execution model. + Use Amp, Codex, Codex through OpenRouter, Claude Code, pi-mono, or your own CLI harness with the same durable execution model.
  • diff --git a/docs/pages/quickstart.mdx b/docs/pages/quickstart.mdx index d1b241e30..8b2966ba0 100644 --- a/docs/pages/quickstart.mdx +++ b/docs/pages/quickstart.mdx @@ -72,14 +72,16 @@ enabled in `values.dev.yaml`; use a real token if you want to test Slack. Postgres on startup, so it must exist before `just up`. Application-level model and tool secrets, such as `OPENAI_API_KEY`, -`ANTHROPIC_API_KEY`, `AMP_API_KEY`, and `GITHUB_TOKEN`, should live in +`OPENROUTER_API_KEY`, `ANTHROPIC_API_KEY`, `AMP_API_KEY`, and `GITHUB_TOKEN`, +should live in 1Password or the configured [iron-proxy](https://docs.iron.sh) secret source. Sandboxes receive placeholder values and [iron-proxy](https://docs.iron.sh) injects the real credentials only on approved outbound requests. The default harness is `codex`, so `OPENAI_API_KEY` must exist in the configured secret source before Slack agent turns can complete. Use explicit harness -selectors only when you want a non-default harness such as Amp or Claude Code. +selectors only when you want a non-default harness such as Amp, Claude Code, or +OpenRouter. ## 3. Boot the stack @@ -156,7 +158,8 @@ Mention the bot in a test channel where the Slack app is installed: ``` Slack messages without a harness flag use Codex. Add a selector such as -`--amp`, `--claude`, or `--pi` only when you want to override the default. +`--amp`, `--claude`, `--openrouter`, or `--pi` only when you want to override +the default. If Slack receives the mention but no agent runs, inspect Slackbot logs: diff --git a/docs/pages/secrets/onepassword.mdx b/docs/pages/secrets/onepassword.mdx index caf91494b..1ad2e4156 100644 --- a/docs/pages/secrets/onepassword.mdx +++ b/docs/pages/secrets/onepassword.mdx @@ -110,6 +110,7 @@ Store enabled harness credentials the same way: | Credential | Used for | |------------|----------| | `OPENAI_API_KEY` | Codex default | +| `OPENROUTER_API_KEY` | OpenRouter via Codex | | `AMP_API_KEY` | Amp | | `ANTHROPIC_API_KEY` | Claude Code and pi-mono | diff --git a/docs/public/md/architecture.md b/docs/public/md/architecture.md index dafa20f4d..f33ea95f3 100644 --- a/docs/public/md/architecture.md +++ b/docs/public/md/architecture.md @@ -74,6 +74,9 @@ mounting, and network policies all assume Kubernetes sandboxes. | Claude Code | Passes the Anthropic-shaped content through directly. | | Codex / pi-mono | Extracts text blocks for CLIs that accept a plain prompt. | +The OpenRouter selector uses the Codex adapter with an OpenRouter provider +configuration, so clients still see the same durable execution protocol. + The pod receives the prompt files, CLI command, internal API URL, proxy CA, and proxy settings. It does not need Kubernetes credentials or long-lived third-party API keys. diff --git a/docs/public/md/deploying-in-production.md b/docs/public/md/deploying-in-production.md index f30d44b11..465c1e0f1 100644 --- a/docs/public/md/deploying-in-production.md +++ b/docs/public/md/deploying-in-production.md @@ -80,6 +80,7 @@ Store one secret per enabled harness credential: | Harness | API value | Slack selector | Credential to store | Upstream | |---------|-----------|----------------|---------------------|----------| | Codex default | `codex` | none or `--codex` | `OPENAI_API_KEY` | `api.openai.com` | +| OpenRouter via Codex | `openrouter` | `--openrouter` | `OPENROUTER_API_KEY` | `openrouter.ai` | | Amp | `amp` | `--amp` | `AMP_API_KEY` | `ampcode.com` | | Claude Code | `claude-code` | `--claude` | `ANTHROPIC_API_KEY` | `api.anthropic.com` | | pi-mono | `pi-mono` | `--pi` | `ANTHROPIC_API_KEY` | `api.anthropic.com` | @@ -91,7 +92,13 @@ headers the secret is bound to. When `ironProxy.secretSource` is `onepassword`, [iron-proxy](https://docs.iron.sh) resolves these values from `op://$OP_VAULT//credential`. For example, store the default -Codex credential in a 1Password item named `OPENAI_API_KEY`. +Codex credential in a 1Password item named `OPENAI_API_KEY`, and store the +OpenRouter credential in a 1Password item named `OPENROUTER_API_KEY` if you +enable the `--openrouter` selector. + +The OpenRouter selector runs the Codex harness against the configured +OpenRouter provider. It defaults to `openrouter/auto`; set `OPENROUTER_MODEL` +on the API deployment if you want a fixed OpenRouter model slug. Whatever source you pick, the vault is shared across the whole deployment, so any thread can use any configured credential. Per-user and per-channel @@ -233,7 +240,8 @@ reply with exactly PONG ``` Slack messages without a harness flag use Codex. Use `--amp`, `--claude`, -`--codex`, or `--pi` only when you want to select a specific harness. +`--codex`, `--openrouter`, or `--pi` only when you want to select a specific +harness. Inspect sandbox pods with the labels Centaur actually sets: diff --git a/docs/public/md/index.md b/docs/public/md/index.md index a2964fb34..1da847b03 100644 --- a/docs/public/md/index.md +++ b/docs/public/md/index.md @@ -73,7 +73,7 @@ Onboard me to Centaur locally. Use https://centaur.run/llms-full.txt and follow
  • Harness agnostic. - Use Amp, Codex, Claude Code, pi-mono, or your own CLI harness with the same durable execution model. + Use Amp, Codex, Codex through OpenRouter, Claude Code, pi-mono, or your own CLI harness with the same durable execution model.
  • diff --git a/docs/public/md/quickstart.md b/docs/public/md/quickstart.md index d1b241e30..4f73e5ce6 100644 --- a/docs/public/md/quickstart.md +++ b/docs/public/md/quickstart.md @@ -72,14 +72,15 @@ enabled in `values.dev.yaml`; use a real token if you want to test Slack. Postgres on startup, so it must exist before `just up`. Application-level model and tool secrets, such as `OPENAI_API_KEY`, -`ANTHROPIC_API_KEY`, `AMP_API_KEY`, and `GITHUB_TOKEN`, should live in -1Password or the configured [iron-proxy](https://docs.iron.sh) secret source. Sandboxes receive -placeholder values and [iron-proxy](https://docs.iron.sh) injects the real credentials only on approved -outbound requests. +`OPENROUTER_API_KEY`, `ANTHROPIC_API_KEY`, `AMP_API_KEY`, and `GITHUB_TOKEN`, +should live in 1Password or the configured [iron-proxy](https://docs.iron.sh) +secret source. Sandboxes receive placeholder values and [iron-proxy](https://docs.iron.sh) injects the +real credentials only on approved outbound requests. The default harness is `codex`, so `OPENAI_API_KEY` must exist in the configured secret source before Slack agent turns can complete. Use explicit harness -selectors only when you want a non-default harness such as Amp or Claude Code. +selectors only when you want a non-default harness such as Amp, Claude Code, or +OpenRouter. ## 3. Boot the stack @@ -156,7 +157,8 @@ Mention the bot in a test channel where the Slack app is installed: ``` Slack messages without a harness flag use Codex. Add a selector such as -`--amp`, `--claude`, or `--pi` only when you want to override the default. +`--amp`, `--claude`, `--openrouter`, or `--pi` only when you want to override +the default. If Slack receives the mention but no agent runs, inspect Slackbot logs: diff --git a/docs/public/md/secrets/onepassword.md b/docs/public/md/secrets/onepassword.md index caf91494b..1ad2e4156 100644 --- a/docs/public/md/secrets/onepassword.md +++ b/docs/public/md/secrets/onepassword.md @@ -110,6 +110,7 @@ Store enabled harness credentials the same way: | Credential | Used for | |------------|----------| | `OPENAI_API_KEY` | Codex default | +| `OPENROUTER_API_KEY` | OpenRouter via Codex | | `AMP_API_KEY` | Amp | | `ANTHROPIC_API_KEY` | Claude Code and pi-mono | diff --git a/harness/codex/config.toml b/harness/codex/config.toml index fd9622ef6..a8b709bd3 100644 --- a/harness/codex/config.toml +++ b/harness/codex/config.toml @@ -26,5 +26,12 @@ job_max_runtime_seconds = 1800 [tools] view_image = true +[model_providers.openrouter] +name = "OpenRouter" +base_url = "https://openrouter.ai/api/v1" +env_key = "OPENROUTER_API_KEY" +wire_api = "responses" +requires_openai_auth = false + [projects."/"] trust_level = "trusted" diff --git a/packages/api-client/test/harness-events.test.ts b/packages/api-client/test/harness-events.test.ts new file mode 100644 index 000000000..2b7e87f60 --- /dev/null +++ b/packages/api-client/test/harness-events.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, it } from "vitest"; + +import { normalizeHarnessEvent } from "@centaur/harness-events"; + +describe("normalizeHarnessEvent", () => { + it("normalizes OpenRouter thread starts through the Codex event path", () => { + expect( + normalizeHarnessEvent("openrouter", { + type: "thread.started", + thread_id: "thread-or", + }), + ).toEqual([{ type: "system", subtype: "init", session_id: "thread-or" }]); + }); + + it("normalizes OpenRouter turn completion usage through the Codex event path", () => { + expect( + normalizeHarnessEvent("openrouter", { + type: "turn.completed", + model: "openai/gpt-4o-mini", + usage: { input_tokens: 3, output_tokens: 5 }, + }), + ).toEqual([ + { + type: "usage", + usage: { input_tokens: 3, output_tokens: 5 }, + model: "openai/gpt-4o-mini", + authoritative: true, + }, + ]); + }); + + it("passes OpenRouter Codex item events through instead of treating them as amp events", () => { + const event = { + type: "item.completed", + item: { type: "agent_message", text: "hello" }, + }; + + expect(normalizeHarnessEvent("openrouter", event)).toEqual([event]); + }); +}); diff --git a/packages/harness-events/src/normalize.ts b/packages/harness-events/src/normalize.ts index a3dc7374e..12de1cae5 100644 --- a/packages/harness-events/src/normalize.ts +++ b/packages/harness-events/src/normalize.ts @@ -12,6 +12,14 @@ import { asString, asRecord } from './parse-utils' import type { CanonicalEvent, SubagentActivity } from './types' +type NodeCrypto = { + createHash: (algorithm: string) => { + update: (input: string, encoding: string) => { digest: (encoding: string) => string } + } +} + +declare const require: ((name: string) => NodeCrypto) | undefined + // --------------------------------------------------------------------------- // Internal helpers // --------------------------------------------------------------------------- @@ -94,7 +102,10 @@ function stableSortedStringify(value: unknown): string { */ function sha1Hex(input: string): string { // In Next.js / Node.js environment, use the built-in crypto module - const crypto = require('crypto') as typeof import('crypto') + const crypto = require?.('crypto') + if (!crypto) { + throw new Error('SHA-1 hashing requires Node.js crypto') + } return crypto.createHash('sha1').update(input, 'utf-8').digest('hex') } @@ -639,7 +650,7 @@ function normalizeCodexEvent(event: Record): CanonicalEvent[] { } if (eventType === 'assistant') { - return [event] + return [event as CanonicalEvent] } if (eventType === 'error') { @@ -872,7 +883,7 @@ export function normalizeHarnessEvent( } } - if (normalizedHarness === 'codex') { + if (normalizedHarness === 'codex' || normalizedHarness === 'openrouter') { return normalizeCodexEvent(event) } if (normalizedHarness === 'pi-mono') { diff --git a/services/api/api/agent.py b/services/api/api/agent.py index 62cd99487..c5ccd6e52 100644 --- a/services/api/api/agent.py +++ b/services/api/api/agent.py @@ -98,7 +98,7 @@ } ) -_ENGINE_HARNESSES = {"amp", "claude-code", "codex", "pi-mono"} +_ENGINE_HARNESSES = {"amp", "claude-code", "codex", "openrouter", "pi-mono"} _REUSABLE_DB_STATES = {"running", "idle", "delivering", "error", "suspended"} IDLE_TTL_S = int(os.getenv("IDLE_TTL_S", "86400")) # 24 hours diff --git a/services/api/api/routers/agent.py b/services/api/api/routers/agent.py index 4d41f4856..166be5972 100644 --- a/services/api/api/routers/agent.py +++ b/services/api/api/routers/agent.py @@ -78,6 +78,7 @@ def _enforce_sandbox_thread_scope(request: Request, thread_key: str) -> None: "claude": "claude-code", "claude-code": "claude-code", "codex": "codex", + "openrouter": "openrouter", "pi": "pi-mono", "pi-mono": "pi-mono", } diff --git a/services/api/api/runtime_control.py b/services/api/api/runtime_control.py index 99838de9f..e6273b366 100644 --- a/services/api/api/runtime_control.py +++ b/services/api/api/runtime_control.py @@ -241,6 +241,11 @@ def _resolve_codex_model_label(model: str | None) -> str: return f"codex-{raw}" +def _resolve_openrouter_model_label(model: str | None) -> str: + raw = (model or os.getenv("OPENROUTER_MODEL") or "openrouter/auto").strip().lower() + return raw or "openrouter/auto" + + def _engine_model_label( *, engine: str | None, @@ -258,6 +263,8 @@ def _engine_model_label( return _resolve_claude_model_label(explicit or None) if engine_name == "codex": return _resolve_codex_model_label(explicit or None) + if engine_name == "openrouter": + return _resolve_openrouter_model_label(explicit or None) if engine_name == "amp": return f"amp-{explicit}" if explicit else "amp" if explicit: diff --git a/services/api/api/sandbox/config.py b/services/api/api/sandbox/config.py index 2ba107e15..5410a9f97 100644 --- a/services/api/api/sandbox/config.py +++ b/services/api/api/sandbox/config.py @@ -21,6 +21,7 @@ def image() -> str: _HARNESS_STUB_KEYS = ( "ANTHROPIC_API_KEY", "OPENAI_API_KEY", + "OPENROUTER_API_KEY", "AMP_API_KEY", "GITHUB_TOKEN", ) @@ -155,7 +156,7 @@ def build_harness_cmd(engine: str, model: str | None = None) -> list[str]: """Build the container CMD for a given harness engine.""" if engine == "amp": return ["amp-wrapper"] - if engine == "codex": + if engine in {"codex", "openrouter"}: return ["codex-app-wrapper"] if engine == "claude-code": return ["claude-app-wrapper"] diff --git a/services/api/api/sandbox/harness_protocol.py b/services/api/api/sandbox/harness_protocol.py index f3f96c1ad..ff3bd827d 100644 --- a/services/api/api/sandbox/harness_protocol.py +++ b/services/api/api/sandbox/harness_protocol.py @@ -21,6 +21,10 @@ def _extract_error_message(event: dict) -> str: return msg if isinstance(msg, str) else "" +def _is_codex_like(engine: str) -> bool: + return engine in ("codex", "openrouter") + + def is_turn_done(engine: str, event: dict) -> bool: """Return True when *event* signals the end of a main-agent turn. @@ -58,7 +62,7 @@ def is_turn_done(engine: str, event: dict) -> bool: return False return msg.get("stop_reason") == "end_turn" return False - if engine == "codex": + if _is_codex_like(engine): return t in ("turn.completed", "turn.failed") return t == "agent_end" # pi-mono @@ -91,7 +95,7 @@ def extract_result(engine: str, event: dict) -> str | None: if texts: return texts[-1] return None - if engine == "codex": + if _is_codex_like(engine): if t == "assistant": msg = event.get("message", {}) content = msg.get("content", []) @@ -120,7 +124,7 @@ def extract_thread_id(engine: str, event: dict) -> str | None: return event.get("session_id") or None if t == "assistant": return event.get("session_id") or None - elif engine == "codex": + elif _is_codex_like(engine): if t == "thread.started": return event.get("thread_id") or None elif engine == "pi-mono" and t == "session": diff --git a/services/api/api/sandbox/kubernetes.py b/services/api/api/sandbox/kubernetes.py index 1abfb8c7f..8c4ac6295 100644 --- a/services/api/api/sandbox/kubernetes.py +++ b/services/api/api/sandbox/kubernetes.py @@ -1406,6 +1406,12 @@ async def create( overlay_image = _overlay_image() if overlay_image: env.append(f"CENTAUR_OVERLAY_DIR={_SANDBOX_OVERLAY_DIR}") + if engine == "openrouter": + env.append("CODEX_MODEL_PROVIDER=openrouter") + openrouter_model = ( + model or os.getenv("OPENROUTER_MODEL") or "openrouter/auto" + ) + env.append(f"CODEX_MODEL={openrouter_model}") if engine == "claude-code" and model: env.append(f"CLAUDE_MODEL={model}") if engine == "claude-code" and resume_thread_id: diff --git a/services/api/api/sandbox/normalize.py b/services/api/api/sandbox/normalize.py index a5acc9d86..867c191f4 100644 --- a/services/api/api/sandbox/normalize.py +++ b/services/api/api/sandbox/normalize.py @@ -844,7 +844,7 @@ def _normalize_pi_event(event: dict) -> list[dict]: # Main dispatcher # --------------------------------------------------------------------------- -_ENGINE_HARNESSES = {"amp", "claude-code", "codex", "pi-mono"} +_ENGINE_HARNESSES = {"amp", "claude-code", "codex", "openrouter", "pi-mono"} def normalize_harness_event(engine: str, event: dict) -> list[dict]: @@ -853,7 +853,8 @@ def normalize_harness_event(engine: str, event: dict) -> list[dict]: Parameters ---------- engine: - The engine name (``amp``, ``claude-code``, ``codex``, ``pi-mono``). + The engine name (``amp``, ``claude-code``, ``codex``, ``openrouter``, + ``pi-mono``). Persona names (e.g. ``legal``, ``eng``) are treated as amp-like. event: A single raw JSON dict from harness stdout. @@ -888,7 +889,7 @@ def normalize_harness_event(engine: str, event: dict) -> list[dict]: else: normalized = "amp" - if normalized == "codex": + if normalized in {"codex", "openrouter"}: return _normalize_codex_event(event) if normalized == "pi-mono": return _normalize_pi_event(event) diff --git a/services/api/api/tool_manager.py b/services/api/api/tool_manager.py index 8b03ae886..a590684e7 100644 --- a/services/api/api/tool_manager.py +++ b/services/api/api/tool_manager.py @@ -1934,6 +1934,14 @@ def discover(self) -> list[LoadedTool]: inject_header="chatgpt-account-id", ), ), + ("openrouter", "api_key"): ( + HttpSecret( + name="OPENROUTER_API_KEY", + secret_ref="OPENROUTER_API_KEY", + hosts=("openrouter.ai", "*.openrouter.ai"), + match_headers=("Authorization",), + ), + ), } # Maps an engine to the env-var name (in ``sandbox.extraEnv``) that @@ -1942,6 +1950,7 @@ def discover(self) -> list[LoadedTool]: _HARNESS_AUTH_MODE_ENV: ClassVar[dict[str, str]] = { "claude-code": "CLAUDE_CODE_AUTH_MODE", "codex": "CODEX_AUTH_MODE", + "openrouter": "OPENROUTER_AUTH_MODE", } @classmethod diff --git a/services/api/api/warm_pool.py b/services/api/api/warm_pool.py index a81425650..e73a3c9fb 100644 --- a/services/api/api/warm_pool.py +++ b/services/api/api/warm_pool.py @@ -94,7 +94,9 @@ async def _spawn_warm_container() -> WarmContainer | None: if not backend.supports_warm_pool: return None engine = ( - POOL_HARNESS if POOL_HARNESS in {"amp", "claude-code", "codex"} else "codex" + POOL_HARNESS + if POOL_HARNESS in {"amp", "claude-code", "codex", "openrouter"} + else "codex" ) placeholder_key = f"warm-{int(time.time() * 1000)}-{id(asyncio.current_task())}" diff --git a/services/api/api/workflow_engine.py b/services/api/api/workflow_engine.py index fdb606f53..f5fe3916f 100644 --- a/services/api/api/workflow_engine.py +++ b/services/api/api/workflow_engine.py @@ -942,7 +942,7 @@ def __getattr__(self, tool: str) -> _ToolMethodProxy: # ── Agent-turn helper (domain logic, NOT on the context) ────────────── -_EXECUTION_HARNESSES = frozenset({"amp", "claude-code", "codex", "pi-mono"}) +_EXECUTION_HARNESSES = frozenset({"amp", "claude-code", "codex", "openrouter", "pi-mono"}) async def _compute_agent_session_title( diff --git a/services/api/api/workflows/slack_thread_turn.py b/services/api/api/workflows/slack_thread_turn.py index cddf4f412..7707f6fc4 100644 --- a/services/api/api/workflows/slack_thread_turn.py +++ b/services/api/api/workflows/slack_thread_turn.py @@ -12,7 +12,7 @@ WORKFLOW_NAME = "slack_thread_turn" -_EXECUTION_HARNESSES = frozenset({"amp", "claude-code", "codex", "pi-mono"}) +_EXECUTION_HARNESSES = frozenset({"amp", "claude-code", "codex", "openrouter", "pi-mono"}) _PROMPT_FLAG_ALIASES = { "claude": "claude-code", "pi": "pi-mono", diff --git a/services/api/tests/test_agent_control_plane.py b/services/api/tests/test_agent_control_plane.py index 553dd1f7d..609f83011 100644 --- a/services/api/tests/test_agent_control_plane.py +++ b/services/api/tests/test_agent_control_plane.py @@ -27,6 +27,21 @@ def test_agent_session_title_formats_base_and_persona_runs(): ) +def test_agent_session_header_formats_openrouter_default_model(monkeypatch): + from api.runtime_control import _agent_session_header + + monkeypatch.delenv("OPENROUTER_MODEL", raising=False) + + assert ( + _agent_session_header( + persona_id=None, + engine="openrouter", + harness="openrouter", + ) + == "base · openrouter/auto" + ) + + def test_slackbot_streamed_answer_chars_requires_positive_integer_offset(): from api.runtime_control import ( _slackbot_live_delivery_covers_result, diff --git a/services/api/tests/test_codex_app_wrapper.py b/services/api/tests/test_codex_app_wrapper.py index db814e162..816e8a6ee 100644 --- a/services/api/tests/test_codex_app_wrapper.py +++ b/services/api/tests/test_codex_app_wrapper.py @@ -344,6 +344,20 @@ def test_emit_notification_collects_agent_message_delta_output(monkeypatch) -> N assert emitted[0]["type"] == "item.agentMessage.delta" +def test_codex_config_overrides_from_environment(monkeypatch) -> None: + wrapper = _load_wrapper() + + monkeypatch.setenv("CODEX_MODEL_PROVIDER", "openrouter") + monkeypatch.setenv("CODEX_MODEL", "openrouter/auto") + + assert wrapper._codex_config_overrides() == [ + "-c", + 'model_provider="openrouter"', + "-c", + 'model="openrouter/auto"', + ] + + def test_main_lazy_starts_app_server_after_input(monkeypatch) -> None: wrapper = _load_wrapper() requests: list[tuple[str, dict]] = [] diff --git a/services/api/tests/test_harness_protocol.py b/services/api/tests/test_harness_protocol.py index 1c585411f..d3c5ea4fe 100644 --- a/services/api/tests/test_harness_protocol.py +++ b/services/api/tests/test_harness_protocol.py @@ -82,6 +82,9 @@ def test_claude_code_result_event(self): def test_codex_turn_completed(self): assert is_turn_done("codex", {"type": "turn.completed"}) is True + def test_openrouter_turn_completed(self): + assert is_turn_done("openrouter", {"type": "turn.completed"}) is True + def test_codex_turn_failed(self): assert is_turn_done("codex", {"type": "turn.failed"}) is True @@ -163,6 +166,13 @@ def test_codex_item_completed(self): } assert extract_result("codex", event) == "codex says" + def test_openrouter_item_completed(self): + event = { + "type": "item.completed", + "item": {"type": "agent_message", "text": "openrouter says"}, + } + assert extract_result("openrouter", event) == "openrouter says" + def test_codex_item_completed_camel_case(self): event = { "type": "item.completed", @@ -211,6 +221,10 @@ def test_codex_thread_started(self): event = {"type": "thread.started", "thread_id": "T-abc"} assert extract_thread_id("codex", event) == "T-abc" + def test_openrouter_thread_started(self): + event = {"type": "thread.started", "thread_id": "T-or"} + assert extract_thread_id("openrouter", event) == "T-or" + def test_pi_mono_session(self): event = {"type": "session", "id": "sess-42"} assert extract_thread_id("pi-mono", event) == "sess-42" @@ -218,6 +232,7 @@ def test_pi_mono_session(self): def test_unrelated_event_returns_none(self): assert extract_thread_id("amp", {"type": "assistant"}) is None assert extract_thread_id("codex", {"type": "item.completed"}) is None + assert extract_thread_id("openrouter", {"type": "item.completed"}) is None assert extract_thread_id("pi-mono", {"type": "message_end"}) is None diff --git a/services/api/tests/test_integration.py b/services/api/tests/test_integration.py index 74bec38f9..eb89da5b6 100644 --- a/services/api/tests/test_integration.py +++ b/services/api/tests/test_integration.py @@ -341,6 +341,12 @@ def test_codex(self): cmd = build_harness_cmd("codex") assert cmd == ["codex-app-wrapper"] + def test_openrouter_uses_codex_wrapper(self): + from api.sandbox.config import build_harness_cmd + + cmd = build_harness_cmd("openrouter") + assert cmd == ["codex-app-wrapper"] + def test_claude_code(self): from api.sandbox.config import build_harness_cmd @@ -389,6 +395,7 @@ def test_container_env_includes_claude_hardening( env = container_env("thread-key", "sandbox-id", "firewall.internal") assert "ANTHROPIC_API_KEY=ANTHROPIC_API_KEY" in env + assert "OPENROUTER_API_KEY=OPENROUTER_API_KEY" in env assert "CLAUDE_CODE_DISABLE_FEEDBACK_SURVEY=1" in env assert "CLAUDE_CODE_DISABLE_OFFICIAL_MARKETPLACE_AUTOINSTALL=1" in env assert "DISABLE_ERROR_REPORTING=1" in env diff --git a/services/api/tests/test_normalize.py b/services/api/tests/test_normalize.py index d31f01878..120fb12ba 100644 --- a/services/api/tests/test_normalize.py +++ b/services/api/tests/test_normalize.py @@ -127,6 +127,18 @@ def test_item_completed_message(self): ) assert result == [event] + def test_openrouter_uses_codex_normalizer(self): + result = normalize_harness_event( + "openrouter", + { + "type": "thread.started", + "thread_id": "thread-or", + }, + ) + assert result == [ + {"type": "system", "subtype": "init", "session_id": "thread-or"} + ] + def test_agent_message_delta_passthrough(self): event = { "type": "item.agentMessage.delta", diff --git a/services/api/tests/test_sandbox_entrypoint.py b/services/api/tests/test_sandbox_entrypoint.py index 0312e25ab..b42c924c2 100644 --- a/services/api/tests/test_sandbox_entrypoint.py +++ b/services/api/tests/test_sandbox_entrypoint.py @@ -135,3 +135,39 @@ def test_sandbox_entrypoint_installs_codex_harness_config(tmp_path: Path) -> Non assert result.returncode == 0, result.stderr or result.stdout assert result.stdout == (harness_dir / "codex" / "config.toml").read_text() + + +def test_sandbox_entrypoint_skips_codex_login_for_openrouter(tmp_path: Path) -> None: + home = tmp_path / "home" + harness_dir = _write_codex_harness_config(home) + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + codex_calls = tmp_path / "codex-calls" + codex_bin = bin_dir / "codex" + codex_bin.write_text( + "#!/bin/sh\n" + f"printf '%s\\n' \"$*\" >> {codex_calls}\n" + "exit 0\n" + ) + codex_bin.chmod(0o755) + + result = subprocess.run( + [ + "bash", + str(ENTRYPOINT_SH), + "true", + ], + check=False, + capture_output=True, + text=True, + env={ + "HOME": str(home), + "PATH": f"{bin_dir}:{os.environ.get('PATH', '/usr/bin:/bin')}", + "CENTAUR_HARNESS_CONFIG_DIR": str(harness_dir), + "CODEX_MODEL_PROVIDER": "openrouter", + "OPENAI_API_KEY": "placeholder-openai-key", + }, + ) + + assert result.returncode == 0, result.stderr or result.stdout + assert not codex_calls.exists() diff --git a/services/api/tests/test_sandbox_kubernetes_backend.py b/services/api/tests/test_sandbox_kubernetes_backend.py index ce1f8c787..4869c40dd 100644 --- a/services/api/tests/test_sandbox_kubernetes_backend.py +++ b/services/api/tests/test_sandbox_kubernetes_backend.py @@ -271,6 +271,7 @@ def test_container_env_includes_firewall_host_for_secret_bootstrap( # iron-proxy rewrites the placeholder mid-flight. assert env_map["AMP_API_KEY"] == "AMP_API_KEY" assert env_map["OPENAI_API_KEY"] == "OPENAI_API_KEY" + assert env_map["OPENROUTER_API_KEY"] == "OPENROUTER_API_KEY" assert env_map["CENTAUR_TRACE_ID"] == "00000000-0000-0000-0000-000000000123" assert env_map["NO_PROXY"] == "localhost,127.0.0.1,firewall.internal,api.internal" assert env_map["no_proxy"] == env_map["NO_PROXY"] diff --git a/services/api/tests/test_tool_manager.py b/services/api/tests/test_tool_manager.py index f055960d5..6ed89efb2 100644 --- a/services/api/tests/test_tool_manager.py +++ b/services/api/tests/test_tool_manager.py @@ -711,6 +711,14 @@ def test_codex_access_token_swaps_to_brokered_with_account_id(self) -> None: assert "OPENAI_CODEX_ACCOUNT_ID" in names assert "OPENAI_API_KEY" not in names + def test_openrouter_includes_only_openrouter_provider_secret(self) -> None: + tm = ToolManager.__new__(ToolManager) + tm.tools = {} + names = self._names(tm.secrets_for_sandbox("openrouter", {})) + assert "OPENROUTER_API_KEY" in names + assert "OPENAI_API_KEY" not in names + assert "ANTHROPIC_API_KEY" not in names + def test_unset_auth_mode_defaults_to_api_key(self) -> None: tm = ToolManager.__new__(ToolManager) tm.tools = {} @@ -738,6 +746,7 @@ def test_collect_secrets_returns_union_of_all_harness_variants(self) -> None: names = self._names(tm.collect_secrets()) assert "ANTHROPIC_API_KEY" in names assert "OPENAI_API_KEY" in names + assert "OPENROUTER_API_KEY" in names assert "anthropic-claude" in names assert "openai-codex" in names assert "OPENAI_CODEX_ACCOUNT_ID" in names diff --git a/services/api/tests/test_workflows.py b/services/api/tests/test_workflows.py index a80a56387..de5c336cb 100644 --- a/services/api/tests/test_workflows.py +++ b/services/api/tests/test_workflows.py @@ -405,12 +405,14 @@ def test_recovery_command_paraphrases_are_recognized(): ("`--invest` hyperliquid miqs", None, "invest", "hyperliquid miqs"), ("`--invest hyperliquid miqs`", None, "invest", "hyperliquid miqs"), ("--claude review this", "claude-code", None, "review this"), + ("--openrouter review this", "openrouter", None, "review this"), ("--pi analyze this", "pi-mono", None, "analyze this"), # Persona + harness compose orthogonally. ("--invest --claude review this", "claude-code", "invest", "review this"), ("--claude --invest review this", "claude-code", "invest", "review this"), ("--invest --amp review this", "amp", "invest", "review this"), ("--invest --codex review this", "codex", "invest", "review this"), + ("--invest --openrouter review this", "openrouter", "invest", "review this"), ("please use --opus and review this", None, None, "please use and review this"), ("please use --model opus and review this", None, None, "please use and review this"), ("please use `--model opus` and review this", None, None, "please use and review this"), diff --git a/services/sandbox/codex-app-wrapper.py b/services/sandbox/codex-app-wrapper.py index aa79c34a3..fdd57a7fc 100755 --- a/services/sandbox/codex-app-wrapper.py +++ b/services/sandbox/codex-app-wrapper.py @@ -103,6 +103,19 @@ def notify(method: str, params: dict[str, Any] | None = None) -> None: send_raw({"method": method, "params": params or {}}) +def _codex_config_overrides() -> list[str]: + overrides: list[str] = [] + for env_key, config_key in ( + ("CODEX_MODEL_PROVIDER", "model_provider"), + ("CODEX_MODEL", "model"), + ("CODEX_SERVICE_TIER", "service_tier"), + ): + value = (os.environ.get(env_key) or "").strip() + if value: + overrides.extend(["-c", f"{config_key}={json.dumps(value)}"]) + return overrides + + def start_app_server() -> None: global APP, APP_INITIALIZED if APP is not None and APP.poll() is None and APP_INITIALIZED: @@ -114,6 +127,7 @@ def start_app_server() -> None: APP = subprocess.Popen( [ "codex", + *_codex_config_overrides(), "app-server", "--listen", "stdio://", diff --git a/services/sandbox/entrypoint.sh b/services/sandbox/entrypoint.sh index 851d323de..3f1482f15 100644 --- a/services/sandbox/entrypoint.sh +++ b/services/sandbox/entrypoint.sh @@ -231,11 +231,11 @@ fi # signaling readiness, otherwise warm pods can be claimed with no auth loaded. # Skipped under access_token mode — that path relies on the chatgpt auth.json # installed above plus iron-proxy injecting the real Bearer at request time. -if [ "$CODEX_AUTH_MODE" != "access_token" ]; then - CODEX_KEY="${CODEX_API_KEY:-${OPENAI_API_KEY:-}}" - if [ -n "$CODEX_KEY" ]; then - echo "$CODEX_KEY" | codex login --with-api-key 2>/dev/null || true - fi +CODEX_KEY="${CODEX_API_KEY:-${OPENAI_API_KEY:-}}" +if [ "$CODEX_AUTH_MODE" != "access_token" ] && + [ "${CODEX_MODEL_PROVIDER:-}" != "openrouter" ] && + [ -n "$CODEX_KEY" ]; then + echo "$CODEX_KEY" | codex login --with-api-key 2>/dev/null || true fi # Signal readiness From e3f3534e7279b8009dd0c502de9e0207965776eb Mon Sep 17 00:00:00 2001 From: kenny Date: Thu, 28 May 2026 20:19:29 -0600 Subject: [PATCH 2/3] feat: support runtime model selection --- docs/public/md/deploying-in-production.md | 9 +- docs/public/md/quickstart.md | 3 +- packages/api-client/src/client.ts | 26 ++- packages/api-client/test/client.test.ts | 40 ++++ services/api/api/agent.py | 87 +++++--- services/api/api/db.py | 2 + services/api/api/routers/agent.py | 38 +++- services/api/api/runtime_control.py | 109 ++++++++-- services/api/api/sandbox/base.py | 1 + services/api/api/sandbox/kubernetes.py | 13 +- services/api/api/workflow_engine.py | 7 +- .../api/api/workflows/slack_thread_turn.py | 89 ++++++-- .../db/migrations/037_add_runtime_model.sql | 15 ++ .../api/tests/test_agent_control_plane.py | 197 +++++++++++++++++- .../api/tests/test_db_schema_compatibility.py | 14 +- services/api/tests/test_workflows.py | 150 +++++++++++-- 16 files changed, 695 insertions(+), 105 deletions(-) create mode 100644 services/api/db/migrations/037_add_runtime_model.sql diff --git a/docs/public/md/deploying-in-production.md b/docs/public/md/deploying-in-production.md index 465c1e0f1..9078db668 100644 --- a/docs/public/md/deploying-in-production.md +++ b/docs/public/md/deploying-in-production.md @@ -98,7 +98,11 @@ enable the `--openrouter` selector. The OpenRouter selector runs the Codex harness against the configured OpenRouter provider. It defaults to `openrouter/auto`; set `OPENROUTER_MODEL` -on the API deployment if you want a fixed OpenRouter model slug. +on the API deployment if you want a fixed default slug, or choose a model for a +new runtime with `--openrouter --model anthropic/claude-sonnet-4.5` in Slack. +API clients can pass the same model slug as `model` on `/agent/spawn` or on the +single-call `/agent/execute` convenience path when no `assignment_generation` +is supplied. Whatever source you pick, the vault is shared across the whole deployment, so any thread can use any configured credential. Per-user and per-channel @@ -241,7 +245,8 @@ reply with exactly PONG Slack messages without a harness flag use Codex. Use `--amp`, `--claude`, `--codex`, `--openrouter`, or `--pi` only when you want to select a specific -harness. +harness. Add `--model ` with `--openrouter` to pin the new +runtime to a specific OpenRouter model. Inspect sandbox pods with the labels Centaur actually sets: diff --git a/docs/public/md/quickstart.md b/docs/public/md/quickstart.md index 4f73e5ce6..7792aefc2 100644 --- a/docs/public/md/quickstart.md +++ b/docs/public/md/quickstart.md @@ -158,7 +158,8 @@ Mention the bot in a test channel where the Slack app is installed: Slack messages without a harness flag use Codex. Add a selector such as `--amp`, `--claude`, `--openrouter`, or `--pi` only when you want to override -the default. +the default. For OpenRouter, add `--model ` when you want the +new runtime to use a specific model instead of `openrouter/auto`. If Slack receives the mention but no agent runs, inspect Slackbot logs: diff --git a/packages/api-client/src/client.ts b/packages/api-client/src/client.ts index bbee8389e..c412c8124 100644 --- a/packages/api-client/src/client.ts +++ b/packages/api-client/src/client.ts @@ -19,6 +19,7 @@ export interface SpawnOptions { spawnId?: string; harness?: string; engine?: string; + model?: string; personaId?: string; agentsMdOverride?: string; } @@ -30,6 +31,7 @@ export interface SpawnResult { trace_id?: string; assignment_state: string; assignment_generation: number; + model?: string | null; persona_id?: string | null; prompt_ref?: string | null; effective_agents_md_sha256?: string | null; @@ -46,9 +48,8 @@ export interface MessageOptions { metadata?: Record; } -export interface ExecuteOptions { +interface ExecuteBaseOptions { threadKey: string; - assignmentGeneration: number; executeId?: string; harness?: string; platform?: string; @@ -57,6 +58,22 @@ export interface ExecuteOptions { delivery?: Record; } +export type ExecuteOptions = + | (ExecuteBaseOptions & { + assignmentGeneration: number; + message?: never; + engine?: never; + model?: never; + personaId?: never; + }) + | (ExecuteBaseOptions & { + assignmentGeneration?: undefined; + message: string; + engine?: string; + model?: string; + personaId?: string; + }); + export interface ExecutionAccepted { ok: boolean; execution_id: string; @@ -143,6 +160,7 @@ export class CentaurClient { spawn_id: opts.spawnId, harness: opts.harness, engine: opts.engine, + model: opts.model, persona_id: opts.personaId, agents_md_override: opts.agentsMdOverride, }); @@ -175,6 +193,10 @@ export class CentaurClient { assignment_generation: opts.assignmentGeneration, execute_id: opts.executeId, harness: opts.harness, + message: opts.message, + engine: opts.engine, + model: opts.model, + persona_id: opts.personaId, platform: opts.platform, user_id: opts.userId, metadata: opts.metadata, diff --git a/packages/api-client/test/client.test.ts b/packages/api-client/test/client.test.ts index 76dea8cd4..8e618b78e 100644 --- a/packages/api-client/test/client.test.ts +++ b/packages/api-client/test/client.test.ts @@ -124,6 +124,46 @@ describe("CentaurClient", () => { ); }); + it("posts model selectors for spawn and one-shot execute", async () => { + const client = new CentaurClient({ + apiUrl: "http://api.local", + apiKey: "test-key", + }); + const postMock = vi.spyOn(client.http, "post").mockResolvedValue({ data: { ok: true } }); + + await client.spawn({ + threadKey: "thread-1", + harness: "openrouter", + model: "anthropic/claude-sonnet-4.5", + }); + await client.execute({ + threadKey: "thread-2", + harness: "openrouter", + model: "google/gemini-2.5-pro", + message: "review this", + }); + + expect(postMock).toHaveBeenNthCalledWith( + 1, + "/agent/spawn", + expect.objectContaining({ + thread_key: "thread-1", + harness: "openrouter", + model: "anthropic/claude-sonnet-4.5", + }), + ); + expect(postMock).toHaveBeenNthCalledWith( + 2, + "/agent/execute", + expect.objectContaining({ + thread_key: "thread-2", + harness: "openrouter", + model: "google/gemini-2.5-pro", + message: "review this", + }), + ); + }); + it("throws useful errors for non-OK event stream responses", async () => { vi.stubGlobal("fetch", vi.fn(async () => new Response( "upstream unavailable", diff --git a/services/api/api/agent.py b/services/api/api/agent.py index c5ccd6e52..78eed512e 100644 --- a/services/api/api/agent.py +++ b/services/api/api/agent.py @@ -124,6 +124,23 @@ def _drop_runtime(sandbox_id: str) -> None: _runtime.pop(sandbox_id, None) +def _normalize_model(value: str | None) -> str: + return (value or "").strip() + + +def _resolve_runtime_model(engine: str, model: str | None) -> str: + requested = _normalize_model(model) + if requested: + return requested + if engine == "openrouter": + return _normalize_model(os.getenv("OPENROUTER_MODEL")) or "openrouter/auto" + if engine == "codex": + return _normalize_model(os.getenv("CODEX_MODEL")) + if engine == "claude-code": + return _normalize_model(os.getenv("CLAUDE_MODEL")) + return "" + + def _elapsed_since(start_s: float) -> float: """Return a non-negative elapsed duration for logging. @@ -201,7 +218,7 @@ async def _db_get_session(thread_key: str) -> SandboxSession | None: row = await pool.fetchrow( "SELECT thread_key, sandbox_id, harness, engine, state, started_at, " "agent_thread_id, last_delivered_id, inflight_turn_id, inflight_turn_input, " - "inflight_attempts, last_result, trace_id " + "inflight_attempts, last_result, model, trace_id " "FROM sandbox_sessions WHERE thread_key = $1", thread_key, ) @@ -212,6 +229,7 @@ async def _db_get_session(thread_key: str) -> SandboxSession | None: thread_key=row["thread_key"], harness=row["harness"], engine=row["engine"], + model=row["model"] or "", started_at=row["started_at"].timestamp() if row["started_at"] else 0.0, backend_name="kubernetes", db_state=row["state"], @@ -254,18 +272,19 @@ async def _db_insert_session( session.trace_id = trace_id row = await pool.fetchrow( "INSERT INTO sandbox_sessions (" - "thread_key, sandbox_id, harness, engine, state, started_at, " + "thread_key, sandbox_id, harness, engine, model, state, started_at, " "agent_thread_id, last_delivered_id, inflight_turn_id, inflight_turn_input, " "inflight_started_at, inflight_attempts, last_result, last_result_at, trace_id" - ") VALUES ($1, $2, $3, $4, $5, NOW(), $6, $7, $8::text, $9::jsonb, " - "CASE WHEN $8::text IS NULL THEN NULL ELSE NOW() END, $10, $11, " - "CASE WHEN $11::text = '' THEN NULL ELSE NOW() END, $12::uuid) " + ") VALUES ($1, $2, $3, $4, $5, $6, NOW(), $7, $8, $9::text, $10::jsonb, " + "CASE WHEN $9::text IS NULL THEN NULL ELSE NOW() END, $11, $12, " + "CASE WHEN $12::text = '' THEN NULL ELSE NOW() END, $13::uuid) " "ON CONFLICT (thread_key) DO NOTHING " "RETURNING thread_key", session.thread_key, session.sandbox_id, harness, engine, + _normalize_model(session.model) or None, initial_state, agent_thread_id or None, last_delivered_id or None, @@ -820,6 +839,7 @@ async def get_or_spawn( harness: str | None = None, *, engine: str | None = None, + model: str | None = None, persona: str | None = None, ) -> SandboxSession: """Get existing session or spawn a new sandbox. @@ -835,15 +855,21 @@ async def get_or_spawn( old_last_result: str = "" old_trace_id: str = "" pool = _get_pool() + effective_harness = harness or default_harness() + resolved_engine, resolved_persona, repo = _resolve_harness_profile( + effective_harness, persona=persona, engine_override=engine + ) + resolved_model = _resolve_runtime_model(resolved_engine, model) session = await _db_get_session(thread_key) if session: if session.db_state in _REUSABLE_DB_STATES: backend = get_backend() st = await backend.status(session) - if st == "running": + model_matches = _normalize_model(session.model) == resolved_model + if st == "running" and model_matches: _get_runtime(session.sandbox_id) return session - if session.db_state == "suspended": + if session.db_state == "suspended" and model_matches: try: await backend.resume_by_id(session.sandbox_id) resumed_status = await backend.status(session) @@ -872,13 +898,17 @@ async def get_or_spawn( f"failed to resume suspended sandbox: {session.sandbox_id}" ) from exc # Container is gone — save agent_thread_id and cursor for resume, clean up row - old_agent_thread_id = session.agent_thread_id - old_last_delivered_id = session.last_delivered_id - old_inflight_turn_id = session.inflight_turn_id - old_inflight_turn_input = session.inflight_turn_input - old_inflight_attempts = session.inflight_attempts - old_last_result = session.last_result - old_trace_id = session.trace_id + if model_matches: + old_agent_thread_id = session.agent_thread_id + old_last_delivered_id = session.last_delivered_id + old_inflight_turn_id = session.inflight_turn_id + old_inflight_turn_input = session.inflight_turn_input + old_inflight_attempts = session.inflight_attempts + old_last_result = session.last_result + old_trace_id = session.trace_id + elif st == "running": + with contextlib.suppress(Exception): + await backend.stop(session) if session.db_state == "suspended": with contextlib.suppress(Exception): await backend.stop_by_id(session.sandbox_id) @@ -886,28 +916,23 @@ async def get_or_spawn( _drop_runtime(session.sandbox_id) else: # state is stopped/gone — clean up stale row - old_agent_thread_id = session.agent_thread_id - old_last_delivered_id = session.last_delivered_id - old_inflight_turn_id = session.inflight_turn_id - old_inflight_turn_input = session.inflight_turn_input - old_inflight_attempts = session.inflight_attempts - old_last_result = session.last_result - old_trace_id = session.trace_id + if _normalize_model(session.model) == resolved_model: + old_agent_thread_id = session.agent_thread_id + old_last_delivered_id = session.last_delivered_id + old_inflight_turn_id = session.inflight_turn_id + old_inflight_turn_input = session.inflight_turn_input + old_inflight_attempts = session.inflight_attempts + old_last_result = session.last_result + old_trace_id = session.trace_id await _db_delete_session(thread_key) _drop_runtime(session.sandbox_id) thread_trace_id = await get_or_create_thread_trace_id(pool, thread_key) - effective_harness = harness or default_harness() - - # Resolve harness profile (engine, persona, repo) once for both warm and cold paths - resolved_engine, resolved_persona, repo = _resolve_harness_profile( - effective_harness, persona=persona, engine_override=engine - ) - # Try warm pool first should_try_warm = ( not engine + and not resolved_model and not old_agent_thread_id and not old_inflight_turn_id and not (effective_harness == "amp" and resolved_engine == "codex") @@ -938,9 +963,6 @@ async def get_or_spawn( return claimed # Cold spawn - resolved_engine, resolved_persona, repo = _resolve_harness_profile( - effective_harness, persona=persona, engine_override=engine - ) backend = get_backend() await _evict_idle_sessions_for_capacity(backend) trace_id = old_trace_id or thread_trace_id or str(uuid.uuid4()) @@ -950,9 +972,11 @@ async def get_or_spawn( resolved_engine, persona=resolved_persona, repo=repo, + model=resolved_model or None, resume_thread_id=old_agent_thread_id or None, trace_id=trace_id, ) + session.model = resolved_model session.trace_id = trace_id if old_agent_thread_id: session.agent_thread_id = old_agent_thread_id @@ -2147,6 +2171,7 @@ async def get_status(thread_key: str) -> dict[str, Any]: "sandbox_id": session.sandbox_id[:12], "harness": session.harness, "engine": session.engine, + "model": session.model or None, "started_at": session.started_at, } if session.inflight_turn_id: diff --git a/services/api/api/db.py b/services/api/api/db.py index cc7270c29..ddae09a5f 100644 --- a/services/api/api/db.py +++ b/services/api/api/db.py @@ -38,6 +38,7 @@ "inflight_attempts", "last_result", "last_result_at", + "model", "trace_id", } ) @@ -52,6 +53,7 @@ "010", "011", "035", + "037", } ) diff --git a/services/api/api/routers/agent.py b/services/api/api/routers/agent.py index 166be5972..c72ccfe50 100644 --- a/services/api/api/routers/agent.py +++ b/services/api/api/routers/agent.py @@ -89,13 +89,14 @@ def _enforce_sandbox_thread_scope(request: Request, thread_key: str) -> None: } -def parse_harness_from_message(text: str) -> tuple[str | None, str, bool]: +def parse_harness_from_message(text: str) -> tuple[str | None, str | None, str, bool]: """Parse harness directives from message text. - Returns (harness_or_None, cleaned_text, harness_was_explicit). + Returns (harness_or_None, model_or_None, cleaned_text, harness_was_explicit). """ cleaned = text harness: str | None = None + model: str | None = None explicit = False # 1. key=value syntax: harness=X @@ -105,6 +106,11 @@ def parse_harness_from_message(text: str) -> tuple[str | None, str, bool]: explicit = True cleaned = (cleaned[: kv_match.start()] + cleaned[kv_match.end() :]).strip() + model_match = re.search(r"\bmodel\s*=\s*([^\s`]+)", cleaned, re.IGNORECASE) + if model_match: + model = model_match.group(1) + cleaned = (cleaned[: model_match.start()] + cleaned[model_match.end() :]).strip() + # 2. Known harness flags: --amp, --claude, etc. for flag, value in _HARNESS_FLAGS.items(): pattern = re.compile(r"(^|\s)--" + re.escape(flag) + r"(?=\s|$)", re.IGNORECASE) @@ -113,12 +119,21 @@ def parse_harness_from_message(text: str) -> tuple[str | None, str, bool]: explicit = True cleaned = pattern.sub(" ", cleaned) + model_flag = re.search( + r"(^|\s)--model(?:=|\s+)([^\s`]+)(?=\s|`|$)", + cleaned, + re.IGNORECASE, + ) + if model_flag: + model = model_flag.group(2) + cleaned = (cleaned[: model_flag.start()] + cleaned[model_flag.end() :]).strip() + # 3. Strip legacy engine/model flags (no harness effect) cleaned = re.sub( - r"(^|\s)--(engine|model)\s+[A-Za-z0-9._-]+(?=\s|$)", " ", cleaned, flags=re.IGNORECASE + r"(^|\s)--(engine|model)\s+[^\s`]+(?=\s|`|$)", " ", cleaned, flags=re.IGNORECASE ) cleaned = re.sub(r"(^|\s)--(opus|sonnet|haiku)(?=\s|$)", " ", cleaned, flags=re.IGNORECASE) - cleaned = re.sub(r"\bmodel\s*=\s*[A-Za-z0-9._-]+\b", "", cleaned, flags=re.IGNORECASE) + cleaned = re.sub(r"\bmodel\s*=\s*[^\s`]+", "", cleaned, flags=re.IGNORECASE) # 4. Generic --flag → persona/harness name (any unknown flag) generic_re = re.compile(r"(^|\s)--([a-z][a-z0-9-]*)(?=\s|$)", re.IGNORECASE) @@ -133,7 +148,7 @@ def parse_harness_from_message(text: str) -> tuple[str | None, str, bool]: # Normalise whitespace cleaned = re.sub(r"\s+", " ", cleaned).strip() - return harness, cleaned, explicit + return harness, model, cleaned, explicit class ExecuteRequest(BaseModel): @@ -149,6 +164,7 @@ class ExecuteRequest(BaseModel): # the server auto-orchestrates spawn → message → execute. message: str | None = None engine: str | None = None + model: str | None = None persona_id: str | None = None @@ -157,6 +173,7 @@ class SpawnRequest(BaseModel): spawn_id: str | None = None harness: str | None = None engine: str | None = None + model: str | None = None persona_id: str | None = None agents_md_override: str | None = None @@ -269,6 +286,13 @@ async def execute(request: Request): except ControlPlaneError as exc: return _json_error(exc.code, exc.message, exc.status_code) + if body.model: + return _json_error( + "MODEL_REQUIRES_SPAWN", + "model can only be set when spawning a runtime", + 422, + ) + execute_id = body.execute_id or f"exec-{uuid.uuid4().hex[:16]}" delivery = body.delivery or { "channel": "slack", @@ -311,6 +335,7 @@ async def _auto_execute(pool, body: ExecuteRequest) -> JSONResponse: spawn_id=f"{nonce}:spawn", harness=body.harness, engine=body.engine, + model=body.model, persona_id=body.persona_id, agents_md_override=None, ) @@ -370,6 +395,7 @@ async def spawn(req: SpawnRequest, request: Request): spawn_id=spawn_id, harness=req.harness, engine=req.engine, + model=req.model, persona_id=req.persona_id, agents_md_override=req.agents_md_override, ) @@ -567,6 +593,7 @@ async def status(request: Request, key: str): "assignment_generation": int(active["assignment_generation"]), "runtime_id": active["runtime_id"], "harness": active["harness"], + "model": active["model"], "persona_id": active["persona_id"], "prompt_ref": active["prompt_ref"], "effective_agents_md_sha256": active["effective_agents_md_sha256"], @@ -588,6 +615,7 @@ async def runtime(request: Request, key: str): "runtime_id": active["runtime_id"] if active else None, "harness": active["harness"] if active else None, "engine": active["engine"] if active else None, + "model": active["model"] if active else None, "persona_id": persona_id, "persona": _persona_payload(persona_id), "overlay": _overlay_runtime_payload(), diff --git a/services/api/api/runtime_control.py b/services/api/api/runtime_control.py index e6273b366..19dee2a5c 100644 --- a/services/api/api/runtime_control.py +++ b/services/api/api/runtime_control.py @@ -18,6 +18,7 @@ from api.agent import ( _get_runtime, + _resolve_harness_profile, _stream_stdout, get_or_spawn, inject_stdin, @@ -115,6 +116,7 @@ 0.0, ) WORKER_INSTANCE_ID = f"{os.getenv('HOSTNAME') or 'api'}:{uuid.uuid4().hex[:8]}" +_MODEL_CAPABLE_ENGINES = frozenset({"claude-code", "codex", "openrouter"}) _worker_tasks: list[asyncio.Task] = [] _worker_wake = asyncio.Event() @@ -246,6 +248,22 @@ def _resolve_openrouter_model_label(model: str | None) -> str: return raw or "openrouter/auto" +def _clean_model(model: str | None) -> str | None: + value = (model or "").strip() + return value or None + + +def _ensure_model_supported(*, engine: str | None, harness: str | None) -> None: + resolved = (engine or harness or "").strip() + if resolved in _MODEL_CAPABLE_ENGINES: + return + raise ControlPlaneError( + "MODEL_UNSUPPORTED_FOR_HARNESS", + f"model is not supported for harness {harness or engine or 'unknown'}", + 422, + ) + + def _engine_model_label( *, engine: str | None, @@ -500,7 +518,7 @@ async def _write_agents_override(runtime_id: str, agents_md_override: str) -> No async def get_active_assignment(pool, thread_key: str) -> dict[str, Any] | None: row = await pool.fetchrow( "SELECT thread_key, assignment_generation, runtime_id, harness, engine, persona_id, " - "prompt_ref, effective_agents_md_sha256, agents_md_override, state " + "prompt_ref, effective_agents_md_sha256, agents_md_override, model, state " "FROM agent_runtime_assignments " "WHERE thread_key = $1 AND state = 'active' " "ORDER BY assignment_generation DESC LIMIT 1", @@ -516,6 +534,7 @@ async def spawn_assignment( spawn_id: str, harness: str | None, engine: str | None, + model: str | None, persona_id: str | None, agents_md_override: str | None, ) -> dict[str, Any]: @@ -541,7 +560,7 @@ async def spawn_assignment( 422, ) - attach_active_assignment = ( + inherit_active_assignment = ( harness is None and engine is None and persona_id is None @@ -549,7 +568,7 @@ async def spawn_assignment( ) active_assignment = ( await get_active_assignment(pool, thread_key) - if attach_active_assignment + if inherit_active_assignment else None ) @@ -558,6 +577,9 @@ async def spawn_assignment( effective_engine = active_assignment.get("engine") effective_persona_id = active_assignment.get("persona_id") effective_agents_md_override = active_assignment.get("agents_md_override") + effective_model = _clean_model(model) or _clean_model( + active_assignment.get("model") + ) else: # Explicit harness wins; otherwise inherit from the persona's declared # engine; otherwise use the deployment default. @@ -570,12 +592,14 @@ async def spawn_assignment( effective_engine = engine effective_persona_id = persona_id effective_agents_md_override = agents_md_override + effective_model = _clean_model(model) payload = { "thread_key": thread_key, "spawn_id": spawn_id, "harness": effective_harness, "engine": effective_engine, + "model": effective_model, "persona_id": effective_persona_id, "agents_md_override": effective_agents_md_override, } @@ -596,24 +620,51 @@ async def spawn_assignment( ) return decode_jsonb(existing_idem["response_json"], {}) + resolved_engine, _, _ = _resolve_harness_profile( + str(effective_harness), + persona=str(effective_persona_id) if effective_persona_id else None, + engine_override=str(effective_engine) if effective_engine else None, + ) + if effective_model: + _ensure_model_supported(engine=resolved_engine, harness=str(effective_harness)) + + active_for_precheck = await get_active_assignment(pool, thread_key) + if active_for_precheck: + _, precheck_prompt_sha = prompt_identity( + harness=str(effective_harness), + persona_id=str(effective_persona_id) if effective_persona_id else None, + agents_md_override=effective_agents_md_override, + ) + same_prompt_identity = ( + active_for_precheck["effective_agents_md_sha256"] == precheck_prompt_sha + and active_for_precheck["harness"] == effective_harness + and active_for_precheck["engine"] == resolved_engine + ) + if not same_prompt_identity: + raise ControlPlaneError( + "ACTIVE_ASSIGNMENT_PROMPT_MISMATCH", + "active assignment exists with different prompt identity", + 409, + ) + with start_span( "centaur.agent.spawn", attributes={ "centaur.thread_key": thread_key, "centaur.harness": effective_harness, "centaur.engine": effective_engine, + "centaur.model": effective_model, "centaur.persona_id": effective_persona_id, "centaur.spawn_id": spawn_id, - "centaur.assignment.attach_active": attach_active_assignment, + "centaur.assignment.attach_active": inherit_active_assignment, }, ) as span: - spawn_kwargs: dict[str, Any] = {"engine": effective_engine} - if effective_persona_id is not None: - spawn_kwargs["persona"] = effective_persona_id session = await get_or_spawn( thread_key, effective_harness, - **spawn_kwargs, + engine=effective_engine, + model=effective_model, + persona=effective_persona_id, ) set_span_attributes( span, @@ -622,8 +673,10 @@ async def spawn_assignment( "centaur.trace_id": session.trace_id, "centaur.engine": session.engine, "centaur.harness": session.harness, + "centaur.model": session.model, }, ) + resolved_model = _clean_model(session.model) or effective_model if effective_agents_md_override is not None: await _write_agents_override(session.sandbox_id, effective_agents_md_override) @@ -654,13 +707,37 @@ async def spawn_assignment( active = await conn.fetchrow( "SELECT assignment_generation, runtime_id, harness, engine, persona_id, " - "prompt_ref, effective_agents_md_sha256, agents_md_override " + "prompt_ref, effective_agents_md_sha256, agents_md_override, model " "FROM agent_runtime_assignments " "WHERE thread_key = $1 AND state = 'active' " "ORDER BY assignment_generation DESC LIMIT 1", thread_key, ) + if active: + same_prompt_identity = ( + active["effective_agents_md_sha256"] == prompt_sha + and active["harness"] == session.harness + and active["engine"] == session.engine + ) + same_model = _clean_model(active["model"]) == resolved_model + if not same_prompt_identity or not same_model: + if same_prompt_identity and not same_model: + await conn.execute( + "UPDATE agent_runtime_assignments " + "SET state = 'released', released_at = NOW(), updated_at = NOW() " + "WHERE thread_key = $1 AND assignment_generation = $2", + thread_key, + active["assignment_generation"], + ) + active = None + else: + raise ControlPlaneError( + "ACTIVE_ASSIGNMENT_PROMPT_MISMATCH", + "active assignment exists with different prompt identity", + 409, + ) + if active: if ( active["effective_agents_md_sha256"] != prompt_sha @@ -686,6 +763,7 @@ async def spawn_assignment( resolved_persona = active["persona_id"] resolved_prompt_ref = active["prompt_ref"] resolved_prompt_sha = active["effective_agents_md_sha256"] + resolved_model = _clean_model(active["model"]) else: generation = ( int( @@ -700,8 +778,8 @@ async def spawn_assignment( await conn.execute( "INSERT INTO agent_runtime_assignments (" "thread_key, assignment_generation, runtime_id, harness, engine, " - "persona_id, prompt_ref, effective_agents_md_sha256, agents_md_override, state" - ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'active')", + "persona_id, prompt_ref, effective_agents_md_sha256, agents_md_override, model, state" + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, 'active')", thread_key, generation, session.sandbox_id, @@ -711,6 +789,7 @@ async def spawn_assignment( prompt_ref, prompt_sha, effective_agents_md_override, + resolved_model, ) runtime_id = session.sandbox_id assignment_state = "assigned_idle" @@ -725,6 +804,7 @@ async def spawn_assignment( "trace_id": session.trace_id, "assignment_state": assignment_state, "assignment_generation": generation, + "model": resolved_model, "persona_id": resolved_persona, "prompt_ref": resolved_prompt_ref, "effective_agents_md_sha256": resolved_prompt_sha, @@ -746,6 +826,7 @@ async def spawn_assignment( assignment_generation=generation, harness=session.harness, engine=session.engine, + model=resolved_model, persona_id=resolved_persona, prompt_ref=resolved_prompt_ref, prompt_sha=resolved_prompt_sha, @@ -2681,7 +2762,8 @@ async def _process_execution_impl(pool, row: dict[str, Any]) -> None: ) assignment = await pool.fetchrow( - "SELECT harness, engine, runtime_id, agents_md_override, persona_id, prompt_ref, effective_agents_md_sha256 " + "SELECT harness, engine, model, runtime_id, agents_md_override, persona_id, " + "prompt_ref, effective_agents_md_sha256 " "FROM agent_runtime_assignments " "WHERE thread_key = $1 AND assignment_generation = $2", thread_key, @@ -2701,6 +2783,7 @@ async def _process_execution_impl(pool, row: dict[str, Any]) -> None: harness = str(assignment["harness"]) engine = str(assignment["engine"] or harness) + model = _clean_model(assignment["model"]) persona_id = assignment["persona_id"] prompt_ref = assignment["prompt_ref"] prompt_sha = assignment["effective_agents_md_sha256"] @@ -2716,6 +2799,7 @@ async def _process_execution_impl(pool, row: dict[str, Any]) -> None: thread_key=thread_key, harness=harness, engine=engine, + model=model, persona_id=persona_id, prompt_ref=prompt_ref, prompt_sha=prompt_sha, @@ -2773,6 +2857,7 @@ async def _process_execution_impl(pool, row: dict[str, Any]) -> None: thread_key, assignment["harness"], engine=assignment["engine"], + model=model, persona=assignment["persona_id"], ) if session.sandbox_id != assignment["runtime_id"]: diff --git a/services/api/api/sandbox/base.py b/services/api/api/sandbox/base.py index 5ce95785c..1d385f72f 100644 --- a/services/api/api/sandbox/base.py +++ b/services/api/api/sandbox/base.py @@ -36,6 +36,7 @@ class SandboxSession: thread_key: str harness: str engine: str + model: str = "" started_at: float = 0.0 backend_name: str = "" # "kubernetes", etc. db_state: str = "" diff --git a/services/api/api/sandbox/kubernetes.py b/services/api/api/sandbox/kubernetes.py index 8c4ac6295..c7d384740 100644 --- a/services/api/api/sandbox/kubernetes.py +++ b/services/api/api/sandbox/kubernetes.py @@ -1403,17 +1403,19 @@ async def create( resume_thread_id=resume_thread_id, pg_dsns=sandbox_pg_dsns, ) + effective_model = (model or "").strip() overlay_image = _overlay_image() if overlay_image: env.append(f"CENTAUR_OVERLAY_DIR={_SANDBOX_OVERLAY_DIR}") if engine == "openrouter": env.append("CODEX_MODEL_PROVIDER=openrouter") - openrouter_model = ( - model or os.getenv("OPENROUTER_MODEL") or "openrouter/auto" + effective_model = effective_model or ( + os.getenv("OPENROUTER_MODEL") or "openrouter/auto" ) - env.append(f"CODEX_MODEL={openrouter_model}") - if engine == "claude-code" and model: - env.append(f"CLAUDE_MODEL={model}") + if engine in {"codex", "openrouter"} and effective_model: + env.append(f"CODEX_MODEL={effective_model}") + if engine == "claude-code" and effective_model: + env.append(f"CLAUDE_MODEL={effective_model}") if engine == "claude-code" and resume_thread_id: env.append(f"CLAUDE_CONTINUE_SESSION_ID={resume_thread_id}") if persona: @@ -1632,6 +1634,7 @@ async def create( thread_key=thread_key, harness=harness, engine=engine, + model=effective_model, started_at=time.time(), backend_name=self.name, trace_id=trace_id or "", diff --git a/services/api/api/workflow_engine.py b/services/api/api/workflow_engine.py index f5fe3916f..9a7ece2f1 100644 --- a/services/api/api/workflow_engine.py +++ b/services/api/api/workflow_engine.py @@ -993,6 +993,7 @@ async def _compute_agent_session_header( persona = selector.get("persona_id") harness = selector.get("harness") + model = selector.get("model") engine: str | None = None if not persona or not harness: active = await get_active_assignment(pool, thread_key) @@ -1000,12 +1001,14 @@ async def _compute_agent_session_header( persona = persona or _nonempty(active.get("persona_id")) harness = harness or _nonempty(active.get("harness")) engine = _nonempty(active.get("engine")) + model = model or _nonempty(active.get("model")) if persona and not engine: engine = _persona_default_engine(persona) return _agent_session_header( persona_id=persona, engine=engine, harness=harness, + model=model, ) @@ -1169,6 +1172,7 @@ async def do_agent_turn( metadata: dict[str, Any] | None = None, delivery: Delivery | dict[str, Any] | None = None, harness: str | None = None, + model: str | None = None, persona: str | None = None, agents_md_override: str | None = None, ) -> dict[str, Any]: @@ -1215,7 +1219,7 @@ async def _dispatch() -> dict[str, Any]: else: effective_delivery = dict(run_in.get("delivery") or {}) effective_history = history_messages or run_in.get("history_messages") or [] - selector = {"persona_id": persona, "harness": harness} + selector = {"persona_id": persona, "harness": harness, "model": model} slackbot_session_id: str | None = None try: @@ -1225,6 +1229,7 @@ async def _dispatch() -> dict[str, Any]: spawn_id=f"{step_id}:spawn", harness=harness, engine=None, + model=model, persona_id=persona, agents_md_override=agents_md_override, ) diff --git a/services/api/api/workflows/slack_thread_turn.py b/services/api/api/workflows/slack_thread_turn.py index 7707f6fc4..e10bdfb71 100644 --- a/services/api/api/workflows/slack_thread_turn.py +++ b/services/api/api/workflows/slack_thread_turn.py @@ -17,10 +17,14 @@ "claude": "claude-code", "pi": "pi-mono", } -_PROMPT_FLAG_SKIP = frozenset({"engine", "model", "opus", "sonnet", "haiku"}) -_PROMPT_FLAG_VALUE_SKIP = frozenset({"engine", "model"}) +_PROMPT_FLAG_SKIP = frozenset({"engine", "opus", "sonnet", "haiku"}) +_PROMPT_FLAG_VALUE_SKIP = frozenset({"engine"}) _PROMPT_FLAG_RE = re.compile( - r"(^|\s)(`?)(--|[\u2013\u2014])([a-z][a-z0-9-]*)(?=\s|`|$)", + r"(^|\s)(`?)(--|[\u2013\u2014])([a-z][a-z0-9-]*)(?:=([^\s`]+))?(?=\s|`|$)", + re.IGNORECASE, +) +_PROMPT_MODEL_ASSIGNMENT_RE = re.compile( + r"(^|\s)(`?)model\s*=\s*([^\s`]+)(`?)", re.IGNORECASE, ) _BARE_PERSONA_PROMPT = ( @@ -72,6 +76,7 @@ class PromptSelection: """ harness: str | None + model: str | None persona: str | None parts: list[dict[str, Any]] @@ -87,6 +92,7 @@ class Input: history_messages: list[dict[str, Any]] = field(default_factory=list) delivery: Delivery = field(default_factory=Delivery) harness: str | None = None + model: str | None = None persona: str | None = None agents_md_override: str | None = None @@ -141,22 +147,36 @@ def _extract_prompt_selection_from_text( text: str, *, personas: set[str], -) -> tuple[str | None, str | None, str]: - """Strip known flags and return ``(harness, persona, cleaned_text)``.""" +) -> tuple[str | None, str | None, str | None, str]: + """Strip known flags and return ``(harness, model, persona, cleaned_text)``.""" harness: str | None = None + model: str | None = None persona: str | None = None ranges: list[tuple[int, int]] = [] + + for match in _PROMPT_MODEL_ASSIGNMENT_RE.finditer(text): + leading = match.group(1) or "" + strip_start = match.start() + len(leading) + strip_end = match.end() + ranges.append((strip_start, strip_end)) + model = match.group(3) + for match in _PROMPT_FLAG_RE.finditer(text): leading = match.group(1) or "" opening_tick = match.group(2) or "" - marker = match.group(3) or "" flag = match.group(4).lower() + inline_value = match.group(5) flag_start = match.start() + len(leading) + len(opening_tick) - flag_end = flag_start + len(marker) + len(flag) strip_start = flag_start - len(opening_tick) if opening_tick else flag_start - strip_end = flag_end + 1 if flag_end < len(text) and text[flag_end] == "`" else flag_end + strip_end = match.end() + value = inline_value + if flag == "model" and value is None: + value_match = re.match(r"\s+([^\s`]+)", text[strip_end:]) + if value_match: + value = value_match.group(1) + strip_end += value_match.end() if flag in _PROMPT_FLAG_VALUE_SKIP: strip_end = _extend_value_skip(text, strip_end) closing_tick = -1 @@ -168,7 +188,11 @@ def _extract_prompt_selection_from_text( is_skip = flag in _PROMPT_FLAG_SKIP classified_harness, classified_persona = _classify_flag(flag, personas) - recognized = is_skip or classified_harness or classified_persona + if flag == "model" and value: + model = value + recognized = ( + is_skip or classified_harness or classified_persona or flag == "model" + ) if not recognized: continue @@ -181,13 +205,14 @@ def _extract_prompt_selection_from_text( persona = classified_persona cleaned = _strip_ranges(text, ranges) if ranges else text.strip() - return harness, persona, cleaned + return harness, model, persona, cleaned def _extract_prompt_selection( parts: list[dict[str, Any]], *, explicit_harness: str | None = None, + explicit_model: str | None = None, explicit_persona: str | None = None, personas: set[str] | None = None, ) -> PromptSelection: @@ -198,6 +223,7 @@ def _extract_prompt_selection( """ known_personas = personas if personas is not None else _known_personas() harness: str | None = None + model: str | None = None persona: str | None = None cleaned_parts: list[dict[str, Any]] = [] has_non_text_part = False @@ -208,18 +234,23 @@ def _extract_prompt_selection( has_non_text_part = True continue - part_harness, part_persona, cleaned_text = _extract_prompt_selection_from_text( - part["text"], - personas=known_personas, + part_harness, part_model, part_persona, cleaned_text = ( + _extract_prompt_selection_from_text( + part["text"], + personas=known_personas, + ) ) if part_harness: harness = part_harness + if part_model: + model = part_model if part_persona: persona = part_persona if cleaned_text: cleaned_parts.append({**part, "text": cleaned_text}) harness = (explicit_harness or harness or "").strip().lower() or None + model = (explicit_model or model or "").strip() or None persona = (explicit_persona or persona or "").strip().lower() or None if harness: harness = _PROMPT_FLAG_ALIASES.get(harness, harness) @@ -233,7 +264,12 @@ def _extract_prompt_selection( if not cleaned_parts: cleaned_parts = parts - return PromptSelection(harness=harness, persona=persona, parts=cleaned_parts) + return PromptSelection( + harness=harness, + model=model, + persona=persona, + parts=cleaned_parts, + ) def _with_prompt_switch_context_note( @@ -476,9 +512,31 @@ async def handler(inp: Input, ctx: WorkflowContext) -> dict[str, Any]: selection = _extract_prompt_selection( inp.effective_parts, explicit_harness=inp.harness, + explicit_model=inp.model, explicit_persona=inp.persona, ) - selection_changed = bool(selection.harness or selection.persona) + if selection.model and not selection.harness and not selection.persona: + from api.runtime_control import get_active_assignment + + active = await get_active_assignment(ctx._pool, thread_key) + if isinstance(active, dict): + selection = PromptSelection( + harness=active.get("harness"), + model=selection.model, + persona=active.get("persona_id"), + parts=selection.parts, + ) + if selection.model: + from api.agent import _resolve_harness_profile + from api.runtime_control import _ensure_model_supported + + resolved_engine, _, _ = _resolve_harness_profile( + selection.harness, + persona=selection.persona, + ) + _ensure_model_supported(engine=resolved_engine, harness=selection.harness) + + selection_changed = bool(selection.harness or selection.model or selection.persona) if selection_changed: await _release_for_prompt_switch( ctx, @@ -521,6 +579,7 @@ async def handler(inp: Input, ctx: WorkflowContext) -> dict[str, Any]: metadata=inp.metadata, delivery=inp.delivery, harness=selection.harness, + model=selection.model, persona=selection.persona, agents_md_override=inp.agents_md_override, ) diff --git a/services/api/db/migrations/037_add_runtime_model.sql b/services/api/db/migrations/037_add_runtime_model.sql new file mode 100644 index 000000000..aec58fd32 --- /dev/null +++ b/services/api/db/migrations/037_add_runtime_model.sql @@ -0,0 +1,15 @@ +-- migrate:up + +ALTER TABLE sandbox_sessions + ADD COLUMN IF NOT EXISTS model TEXT; + +ALTER TABLE agent_runtime_assignments + ADD COLUMN IF NOT EXISTS model TEXT; + +-- migrate:down + +ALTER TABLE agent_runtime_assignments + DROP COLUMN IF EXISTS model; + +ALTER TABLE sandbox_sessions + DROP COLUMN IF EXISTS model; diff --git a/services/api/tests/test_agent_control_plane.py b/services/api/tests/test_agent_control_plane.py index 609f83011..2232387b5 100644 --- a/services/api/tests/test_agent_control_plane.py +++ b/services/api/tests/test_agent_control_plane.py @@ -96,11 +96,18 @@ async def test_spawn_assignment_defaults_to_codex_when_no_selector(db_pool, monk spawn_id="spawn-default", harness=None, engine=None, + model=None, persona_id=None, agents_md_override=None, ) - get_or_spawn.assert_awaited_once_with(thread_key, "codex", engine=None) + get_or_spawn.assert_awaited_once_with( + thread_key, + "codex", + engine=None, + model=None, + persona=None, + ) assert result["persona_id"] is None assignment = await db_pool.fetchrow( "SELECT harness, engine, persona_id FROM agent_runtime_assignments WHERE thread_key = $1", @@ -182,12 +189,17 @@ async def test_spawn_assignment_treats_harness_persona_selector_as_persona(db_po spawn_id="spawn-legal", harness="legal", engine=None, + model=None, persona_id=None, agents_md_override=None, ) get_or_spawn.assert_awaited_once_with( - thread_key, "codex", engine=None, persona="legal" + thread_key, + "codex", + engine=None, + model=None, + persona="legal", ) assert result["persona_id"] == "legal" assert result["prompt_ref"] == "persona:legal" @@ -202,6 +214,165 @@ async def test_spawn_assignment_treats_harness_persona_selector_as_persona(db_po assert assignment["prompt_ref"] == "persona:legal" +@pytest.mark.asyncio +async def test_spawn_assignment_persists_requested_model(db_pool): + from api.runtime_control import spawn_assignment + + model = "anthropic/claude-sonnet-4.5" + thread_key = f"slack:C-test:{uuid.uuid4().hex}:openrouter-model" + session = SandboxSession( + sandbox_id=f"rt-{uuid.uuid4().hex[:8]}", + thread_key=thread_key, + harness="openrouter", + engine="openrouter", + model=model, + ) + get_or_spawn = AsyncMock(return_value=session) + + with patch("api.runtime_control.get_or_spawn", new=get_or_spawn): + result = await spawn_assignment( + db_pool, + thread_key=thread_key, + spawn_id="spawn-openrouter-model", + harness="openrouter", + engine=None, + model=model, + persona_id=None, + agents_md_override=None, + ) + + get_or_spawn.assert_awaited_once_with( + thread_key, + "openrouter", + engine=None, + model=model, + persona=None, + ) + assert result["model"] == model + assignment = await db_pool.fetchrow( + "SELECT harness, engine, model FROM agent_runtime_assignments WHERE thread_key = $1", + thread_key, + ) + assert assignment is not None + assert assignment["harness"] == "openrouter" + assert assignment["engine"] == "openrouter" + assert assignment["model"] == model + + +@pytest.mark.asyncio +async def test_spawn_assignment_model_change_creates_new_generation(db_pool): + from api.runtime_control import prompt_identity, spawn_assignment + + old_model = "openrouter/auto" + new_model = "google/gemini-2.5-pro" + thread_key = f"slack:C-test:{uuid.uuid4().hex}:openrouter-model-switch" + prompt_ref, prompt_sha = prompt_identity( + harness="openrouter", + persona_id=None, + agents_md_override=None, + ) + await db_pool.execute( + "INSERT INTO agent_runtime_assignments (" + "thread_key, assignment_generation, runtime_id, harness, engine, " + "persona_id, prompt_ref, effective_agents_md_sha256, model, state" + ") VALUES ($1, 1, 'rt-old', 'openrouter', 'openrouter', NULL, $2, $3, $4, 'active')", + thread_key, + prompt_ref, + prompt_sha, + old_model, + ) + session = SandboxSession( + sandbox_id=f"rt-{uuid.uuid4().hex[:8]}", + thread_key=thread_key, + harness="openrouter", + engine="openrouter", + model=new_model, + ) + + get_or_spawn = AsyncMock(return_value=session) + with patch("api.runtime_control.get_or_spawn", new=get_or_spawn): + result = await spawn_assignment( + db_pool, + thread_key=thread_key, + spawn_id="spawn-openrouter-model-switch", + harness=None, + engine=None, + model=new_model, + persona_id=None, + agents_md_override=None, + ) + + get_or_spawn.assert_awaited_once_with( + thread_key, + "openrouter", + engine="openrouter", + model=new_model, + persona=None, + ) + assert result["assignment_generation"] == 2 + assert result["model"] == new_model + rows = await db_pool.fetch( + "SELECT assignment_generation, runtime_id, model, state " + "FROM agent_runtime_assignments WHERE thread_key = $1 " + "ORDER BY assignment_generation", + thread_key, + ) + assert [ + (r["assignment_generation"], r["runtime_id"], r["model"], r["state"]) + for r in rows + ] == [ + (1, "rt-old", old_model, "released"), + (2, session.sandbox_id, new_model, "active"), + ] + + +@pytest.mark.asyncio +async def test_spawn_assignment_prompt_mismatch_does_not_spawn(db_pool): + from api.runtime_control import ControlPlaneError, prompt_identity, spawn_assignment + + thread_key = f"slack:C-test:{uuid.uuid4().hex}:openrouter-mismatch" + prompt_ref, prompt_sha = prompt_identity( + harness="amp", + persona_id=None, + agents_md_override=None, + ) + await db_pool.execute( + "INSERT INTO agent_runtime_assignments (" + "thread_key, assignment_generation, runtime_id, harness, engine, " + "persona_id, prompt_ref, effective_agents_md_sha256, state" + ") VALUES ($1, 1, 'rt-old', 'amp', 'amp', NULL, $2, $3, 'active')", + thread_key, + prompt_ref, + prompt_sha, + ) + get_or_spawn = AsyncMock() + + with ( + patch("api.runtime_control.get_or_spawn", new=get_or_spawn), + pytest.raises(ControlPlaneError) as exc_info, + ): + await spawn_assignment( + db_pool, + thread_key=thread_key, + spawn_id="spawn-openrouter-mismatch", + harness="openrouter", + engine=None, + model="google/gemini-2.5-pro", + persona_id=None, + agents_md_override=None, + ) + + assert exc_info.value.code == "ACTIVE_ASSIGNMENT_PROMPT_MISMATCH" + get_or_spawn.assert_not_awaited() + active = await db_pool.fetchrow( + "SELECT runtime_id, state FROM agent_runtime_assignments WHERE thread_key = $1", + thread_key, + ) + assert active is not None + assert active["runtime_id"] == "rt-old" + assert active["state"] == "active" + + @pytest.mark.asyncio async def test_db_insert_session_initial_state_tracks_inflight_turn(db_pool): from api.agent import _db_insert_session @@ -1485,14 +1656,17 @@ async def test_worker_marks_turn_done_error_as_failed_and_updates_runtime(db_poo execution_id = f"exe-{uuid.uuid4().hex[:12]}" prior_runtime_id = f"rt-old-{uuid.uuid4().hex[:8]}" resumed_runtime_id = f"rt-new-{uuid.uuid4().hex[:8]}" + model = "anthropic/claude-sonnet-4.5" await db_pool.execute( "INSERT INTO agent_runtime_assignments (" "thread_key, assignment_generation, runtime_id, harness, engine, " - "persona_id, prompt_ref, effective_agents_md_sha256, state" - ") VALUES ($1, 1, $2, 'amp', 'amp', NULL, 'harness:amp', 'sha', 'active')", + "persona_id, prompt_ref, effective_agents_md_sha256, model, state" + ") VALUES ($1, 1, $2, 'openrouter', 'openrouter', NULL, " + "'harness:openrouter', 'sha', $3, 'active')", thread_key, prior_runtime_id, + model, ) await db_pool.execute( "INSERT INTO agent_execution_requests (" @@ -1519,8 +1693,9 @@ async def test_worker_marks_turn_done_error_as_failed_and_updates_runtime(db_poo session = SandboxSession( sandbox_id=resumed_runtime_id, thread_key=thread_key, - harness="amp", - engine="amp", + harness="openrouter", + engine="openrouter", + model=model, ) async def _fake_stream(*_args, **_kwargs): @@ -1536,8 +1711,9 @@ async def _fake_stream(*_args, **_kwargs): } backend = SimpleNamespace(attach=AsyncMock(), close_streams=AsyncMock()) + get_or_spawn = AsyncMock(return_value=session) with ( - patch("api.runtime_control.get_or_spawn", new=AsyncMock(return_value=session)), + patch("api.runtime_control.get_or_spawn", new=get_or_spawn), patch( "api.runtime_control.inject_stdin", new=AsyncMock( @@ -1568,6 +1744,13 @@ async def _fake_stream(*_args, **_kwargs): ) assert assignment is not None assert assignment["runtime_id"] == resumed_runtime_id + get_or_spawn.assert_awaited_once_with( + thread_key, + "openrouter", + engine="openrouter", + model=model, + persona=None, + ) backend.close_streams.assert_awaited_once_with(session) diff --git a/services/api/tests/test_db_schema_compatibility.py b/services/api/tests/test_db_schema_compatibility.py index 76aca1ec0..b35412b86 100644 --- a/services/api/tests/test_db_schema_compatibility.py +++ b/services/api/tests/test_db_schema_compatibility.py @@ -31,6 +31,8 @@ async def test_schema_compatibility_ok() -> None: {"column_name": "inflight_attempts"}, {"column_name": "last_result"}, {"column_name": "last_result_at"}, + {"column_name": "model"}, + {"column_name": "trace_id"}, ], [ {"version": "005"}, @@ -40,7 +42,8 @@ async def test_schema_compatibility_ok() -> None: {"version": "009"}, {"version": "010"}, {"version": "011"}, - {"version": "016"}, + {"version": "035"}, + {"version": "037"}, ], ] ) @@ -80,10 +83,12 @@ async def test_schema_compatibility_detects_missing_state_column_and_migration() {"column_name": "inflight_attempts"}, {"column_name": "last_result"}, # last_result_at intentionally missing + # model intentionally missing + # trace_id intentionally missing ], [ {"version": "005"}, - # 006/007/008/009/010/011/016 intentionally missing + # 006/007/008/009/010/011/035/037 intentionally missing ], ] ) @@ -93,13 +98,16 @@ async def test_schema_compatibility_detects_missing_state_column_and_migration() assert report["compatible"] is False assert "suspended" in report["required_states_missing"] assert "last_result_at" in report["required_columns_missing"] + assert "model" in report["required_columns_missing"] + assert "trace_id" in report["required_columns_missing"] assert "006" in report["required_migrations_missing"] assert "007" in report["required_migrations_missing"] assert "008" in report["required_migrations_missing"] assert "009" in report["required_migrations_missing"] assert "010" in report["required_migrations_missing"] assert "011" in report["required_migrations_missing"] - assert "016" in report["required_migrations_missing"] + assert "035" in report["required_migrations_missing"] + assert "037" in report["required_migrations_missing"] @pytest.mark.asyncio diff --git a/services/api/tests/test_workflows.py b/services/api/tests/test_workflows.py index de5c336cb..58d37a601 100644 --- a/services/api/tests/test_workflows.py +++ b/services/api/tests/test_workflows.py @@ -396,37 +396,58 @@ def test_recovery_command_paraphrases_are_recognized(): @pytest.mark.parametrize( - ("text", "harness", "persona", "cleaned"), + ("text", "harness", "model", "persona", "cleaned"), [ - ("--invest hyperliquid miqs", None, "invest", "hyperliquid miqs"), - ("--INVEST hyperliquid miqs", None, "invest", "hyperliquid miqs"), - ("\u2014invest hyperliquid miqs", None, "invest", "hyperliquid miqs"), - ("\u2013invest hyperliquid miqs", None, "invest", "hyperliquid miqs"), - ("`--invest` hyperliquid miqs", None, "invest", "hyperliquid miqs"), - ("`--invest hyperliquid miqs`", None, "invest", "hyperliquid miqs"), - ("--claude review this", "claude-code", None, "review this"), - ("--openrouter review this", "openrouter", None, "review this"), - ("--pi analyze this", "pi-mono", None, "analyze this"), + ("--invest hyperliquid miqs", None, None, "invest", "hyperliquid miqs"), + ("--INVEST hyperliquid miqs", None, None, "invest", "hyperliquid miqs"), + ("\u2014invest hyperliquid miqs", None, None, "invest", "hyperliquid miqs"), + ("\u2013invest hyperliquid miqs", None, None, "invest", "hyperliquid miqs"), + ("`--invest` hyperliquid miqs", None, None, "invest", "hyperliquid miqs"), + ("`--invest hyperliquid miqs`", None, None, "invest", "hyperliquid miqs"), + ("--claude review this", "claude-code", None, None, "review this"), + ("--openrouter review this", "openrouter", None, None, "review this"), + ("--pi analyze this", "pi-mono", None, None, "analyze this"), # Persona + harness compose orthogonally. - ("--invest --claude review this", "claude-code", "invest", "review this"), - ("--claude --invest review this", "claude-code", "invest", "review this"), - ("--invest --amp review this", "amp", "invest", "review this"), - ("--invest --codex review this", "codex", "invest", "review this"), - ("--invest --openrouter review this", "openrouter", "invest", "review this"), - ("please use --opus and review this", None, None, "please use and review this"), - ("please use --model opus and review this", None, None, "please use and review this"), - ("please use `--model opus` and review this", None, None, "please use and review this"), + ("--invest --claude review this", "claude-code", None, "invest", "review this"), + ("--claude --invest review this", "claude-code", None, "invest", "review this"), + ("--invest --amp review this", "amp", None, "invest", "review this"), + ("--invest --codex review this", "codex", None, "invest", "review this"), + ("--invest --openrouter review this", "openrouter", None, "invest", "review this"), + ( + "--openrouter --model anthropic/claude-sonnet-4.5 review", + "openrouter", + "anthropic/claude-sonnet-4.5", + None, + "review", + ), + ( + "--openrouter --model=google/gemini-2.5-pro review", + "openrouter", + "google/gemini-2.5-pro", + None, + "review", + ), + ( + "--openrouter model=anthropic/claude-3.5-sonnet:beta review", + "openrouter", + "anthropic/claude-3.5-sonnet:beta", + None, + "review", + ), + ("please use --opus and review this", None, None, None, "please use and review this"), + ("please use --model opus and review this", None, "opus", None, "please use and review this"), + ("please use `--model opus` and review this", None, "opus", None, "please use and review this"), ], ) def test_prompt_selection_extraction_handles_slack_flag_shapes( - text, harness, persona, cleaned + text, harness, model, persona, cleaned ): from api.workflows.slack_thread_turn import _extract_prompt_selection_from_text assert _extract_prompt_selection_from_text( text, personas={"invest"}, - ) == (harness, persona, cleaned) + ) == (harness, model, persona, cleaned) def test_prompt_selection_extraction_preserves_unknown_flags(): @@ -436,7 +457,7 @@ def test_prompt_selection_extraction_preserves_unknown_flags(): assert _extract_prompt_selection_from_text( text, personas={"invest"}, - ) == (None, None, text) + ) == (None, None, None, text) def test_bare_persona_flag_gets_intro_prompt(): @@ -964,6 +985,93 @@ async def test_slack_thread_turn_without_flag_keeps_default_harness_path(db_pool ] +@pytest.mark.asyncio +async def test_slack_thread_turn_model_only_inherits_active_runtime(db_pool): + from api.workflow_engine import WorkflowContext + from api.workflows.slack_thread_turn import Input, handler + + run_id = f"wfr_{uuid.uuid4().hex[:16]}" + thread_key = f"slack:C-test:{uuid.uuid4().hex}" + ctx = WorkflowContext( + pool=db_pool, + run_id=run_id, + checkpoints={}, + lease_s=30.0, + worker_id="w1", + ) + do_agent_turn_mock = AsyncMock(return_value={"ok": True, "execution_id": "exe-1"}) + release_assignment_mock = AsyncMock(return_value={"ok": True, "released": True}) + active_assignment = { + "assignment_generation": 1, + "harness": "openrouter", + "engine": "openrouter", + "persona_id": None, + } + + with ( + patch("api.workflow_engine.do_agent_turn", new=do_agent_turn_mock), + patch( + "api.runtime_control.get_active_assignment", + new=AsyncMock(return_value=active_assignment), + ), + patch("api.runtime_control.release_assignment", new=release_assignment_mock), + ): + await handler( + Input( + thread_key=thread_key, + parts=[ + { + "type": "text", + "text": "--model google/gemini-2.5-pro summarize this thread", + } + ], + message_id="slack:current", + ), + ctx, + ) + + release_assignment_mock.assert_awaited_once() + assert do_agent_turn_mock.await_args.kwargs["harness"] == "openrouter" + assert do_agent_turn_mock.await_args.kwargs["model"] == "google/gemini-2.5-pro" + assert do_agent_turn_mock.await_args.kwargs["parts"] == [ + {"type": "text", "text": "summarize this thread"}, + ] + + +@pytest.mark.asyncio +async def test_slack_thread_turn_rejects_unsupported_model_before_release(db_pool): + from api.runtime_control import ControlPlaneError + from api.workflow_engine import WorkflowContext + from api.workflows.slack_thread_turn import Input, handler + + run_id = f"wfr_{uuid.uuid4().hex[:16]}" + thread_key = f"slack:C-test:{uuid.uuid4().hex}" + ctx = WorkflowContext( + pool=db_pool, + run_id=run_id, + checkpoints={}, + lease_s=30.0, + worker_id="w1", + ) + release_assignment_mock = AsyncMock(return_value={"ok": True, "released": True}) + + with ( + patch("api.runtime_control.release_assignment", new=release_assignment_mock), + pytest.raises(ControlPlaneError) as exc_info, + ): + await handler( + Input( + thread_key=thread_key, + parts=[{"type": "text", "text": "--amp --model opus summarize"}], + message_id="slack:current", + ), + ctx, + ) + + assert exc_info.value.code == "MODEL_UNSUPPORTED_FOR_HARNESS" + release_assignment_mock.assert_not_awaited() + + @pytest.mark.asyncio async def test_slack_thread_turn_without_flag_replays_history_only_for_new_assignment(db_pool): from api.workflow_engine import WorkflowContext From 72e5ef9abe1b09d7f214feadc53a217fa61c3cca Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Thu, 28 May 2026 21:56:52 -0700 Subject: [PATCH 3/3] fix: resolve openrouter rebase fallout --- services/api/api/agent.py | 28 ++++++++++------- services/api/api/runtime_control.py | 30 ++++++++++++++----- services/api/api/tool_manager.py | 24 +++++++++------ services/api/tests/test_sandbox_entrypoint.py | 6 +--- services/api/tests/test_tool_manager.py | 24 +++++++++++++-- 5 files changed, 77 insertions(+), 35 deletions(-) diff --git a/services/api/api/agent.py b/services/api/api/agent.py index 78eed512e..0d262f376 100644 --- a/services/api/api/agent.py +++ b/services/api/api/agent.py @@ -734,7 +734,9 @@ async def _insert_system_message( ) -> None: """Insert a static system message with platform formatting rules (idempotent).""" pool = _get_pool() - effective_platform = platform or ("slack" if thread_key.startswith("slack:") else None) + effective_platform = platform or ( + "slack" if thread_key.startswith("slack:") else None + ) msg_id = f"system-{thread_key}-{effective_platform or 'generic'}" effective_user_id = user_id or await _get_latest_thread_user_id(thread_key) requester_identity = await _resolve_requester_identity( @@ -799,9 +801,7 @@ def _resolve_harness_profile( if normalized_harness and normalized_harness not in _ENGINE_HARNESSES: raise ValueError(f"Unknown harness: {normalized_harness}") - persona_info = ( - get_tool_manager().get_persona(persona) if persona else None - ) + persona_info = get_tool_manager().get_persona(persona) if persona else None if persona and persona_info is None: raise ValueError(f"Unknown persona: {persona}") @@ -942,7 +942,11 @@ async def get_or_spawn( trace_id = old_trace_id or thread_trace_id or str(uuid.uuid4()) claimed = await claim_container( - thread_key, effective_harness, persona=resolved_persona, repo=repo, trace_id=trace_id + thread_key, + effective_harness, + persona=resolved_persona, + repo=repo, + trace_id=trace_id, ) if claimed: if old_agent_thread_id: @@ -1051,8 +1055,7 @@ def _build_session_context( github_login = github_handle.removeprefix("@") lines.extend( [ - "- GitHub handle from Slack profile: " - f"{github_handle}", + f"- GitHub handle from Slack profile: {github_handle}", "- GitHub handle source: " f"{requester_identity['github_handle_source']}", "- GitHub handle verified: yes", @@ -1062,8 +1065,7 @@ def _build_session_context( "- If you create a GitHub PR for this Slack request, " f"the PR body MUST contain this standalone line: `Prompted by: {github_handle}`", "- This is a GitHub PR body requirement, not a Slack response mention rule.", - "- Assign the PR to the requester when possible: " - f"`{github_login}`", + f"- Assign the PR to the requester when possible: `{github_login}`", ] ) else: @@ -1342,7 +1344,9 @@ async def stream_connect( """ rt = _get_runtime(session.sandbox_id) - effective_platform = platform or ("slack" if session.thread_key.startswith("slack:") else None) + effective_platform = platform or ( + "slack" if session.thread_key.startswith("slack:") else None + ) if effective_platform: await _insert_system_message( session.thread_key, @@ -1438,7 +1442,9 @@ async def inject_stdin( """ rt = _get_runtime(session.sandbox_id) - effective_platform = platform or ("slack" if session.thread_key.startswith("slack:") else None) + effective_platform = platform or ( + "slack" if session.thread_key.startswith("slack:") else None + ) if effective_platform: await _insert_system_message( session.thread_key, diff --git a/services/api/api/runtime_control.py b/services/api/api/runtime_control.py index 19dee2a5c..19ef40b15 100644 --- a/services/api/api/runtime_control.py +++ b/services/api/api/runtime_control.py @@ -1200,7 +1200,9 @@ def _slackbot_streamed_answer_chars(value: Any) -> int: return 0 -def _slackbot_live_delivery_covers_result(result_text: str, streamed_chars: int) -> bool: +def _slackbot_live_delivery_covers_result( + result_text: str, streamed_chars: int +) -> bool: text = result_text.strip() if not text: return True @@ -2197,7 +2199,11 @@ async def _mark_execution_terminal( if delivery_platform == "dev" or suppress_legacy_delivery: slackbot_agent_session_id = str(metadata.get("slackbot_agent_session_id") or "") result_size = payload_size_bytes(result_text) - if suppress_legacy_delivery and result_size > 0 and slackbot_streamed_answer_chars <= 0: + if ( + suppress_legacy_delivery + and result_size > 0 + and slackbot_streamed_answer_chars <= 0 + ): log.warning( "final_delivery_skipped_without_live_answer", execution_id=execution_id, @@ -2283,7 +2289,9 @@ async def _mark_execution_terminal( "result_text": result_text, **({"error_text": error_text} if error_text else {}), **( - {"slackbot_streamed_answer_chars": slackbot_streamed_answer_chars} + { + "slackbot_streamed_answer_chars": slackbot_streamed_answer_chars + } if slackbot_streamed_answer_chars else {} ), @@ -2672,7 +2680,9 @@ async def _process_execution(pool, row: dict[str, Any]) -> None: }, ) if status in {"failed_permanent", "cancelled"}: - mark_error(span, str(terminal_reason or terminal["error_text"] or status)) + mark_error( + span, str(terminal_reason or terminal["error_text"] or status) + ) async def _execution_input_text( @@ -2714,7 +2724,9 @@ async def _store_execution_span_context( "UPDATE agent_execution_requests " "SET metadata = metadata || $1::jsonb, updated_at = NOW() " "WHERE execution_id = $2", - canonical_json({_OTEL_METADATA_KEY: {_OTEL_EXECUTION_SPAN_CONTEXT_KEY: span_context}}), + canonical_json( + {_OTEL_METADATA_KEY: {_OTEL_EXECUTION_SPAN_CONTEXT_KEY: span_context}} + ), execution_id, ) @@ -2880,7 +2892,9 @@ async def _process_execution_impl(pool, row: dict[str, Any]) -> None: else: requester_user_id = None if isinstance(delivery, dict): - requester_user_id = delivery.get("recipient_user_id") or delivery.get("user_id") + requester_user_id = delivery.get("recipient_user_id") or delivery.get( + "user_id" + ) requester_user_id = requester_user_id or execution_metadata.get("user_id") trace_metadata = _execution_laminar_metadata( thread_key=thread_key, @@ -2913,7 +2927,9 @@ async def _process_execution_impl(pool, row: dict[str, Any]) -> None: inject_result = await inject_stdin( session, "", - platform=delivery.get("platform") if isinstance(delivery, dict) else None, + platform=delivery.get("platform") + if isinstance(delivery, dict) + else None, user_id=requester_user_id, trace_id=inject_span_context.get("trace_id"), traceparent=current_traceparent(span), diff --git a/services/api/api/tool_manager.py b/services/api/api/tool_manager.py index a590684e7..7154b4a32 100644 --- a/services/api/api/tool_manager.py +++ b/services/api/api/tool_manager.py @@ -396,9 +396,7 @@ def _parse_oauth_fields( f"oauth_token entry {secret_name!r} field {field_name!r} is not " f"valid for grant {grant!r}; allowed: {sorted(allowed)}" ) - parsed[field_name] = _parse_oauth_field_source( - secret_name, field_name, raw - ) + parsed[field_name] = _parse_oauth_field_source(secret_name, field_name, raw) missing = required - parsed.keys() if missing: raise ValueError( @@ -597,8 +595,7 @@ def _parse_hmac_credentials( """Parse ``credentials`` for an ``hmac_sign`` entry; require ``secret``.""" if not isinstance(raw, dict) or not raw: raise ValueError( - f"hmac_sign entry {secret_name!r} 'credentials' must be a non-empty " - f"table" + f"hmac_sign entry {secret_name!r} 'credentials' must be a non-empty table" ) parsed: dict[str, OAuthFieldSource] = {} for field_name, value in raw.items(): @@ -829,7 +826,9 @@ def _parse_secret(entry: Any, *, default_hosts: tuple[str, ...] = ()) -> SecretD replacer=entry, ) if not isinstance(entry, dict): - raise ValueError(f"secret entry must be a string or table, got {type(entry).__name__}") + raise ValueError( + f"secret entry must be a string or table, got {type(entry).__name__}" + ) name = entry.get("name") if not isinstance(name, str) or not name: raise ValueError(f"secret entry missing 'name': {entry!r}") @@ -1174,7 +1173,9 @@ async def _capture_live_slack_send( return None active_channel = parts[2] active_thread_ts = parts[3] - requested_channel = str(args.get("channel") or args.get("channel_id") or "").lstrip("#") + requested_channel = str(args.get("channel") or args.get("channel_id") or "").lstrip( + "#" + ) requested_thread_ts = str(args.get("thread_ts") or "") channel_is_id = bool(re.match(r"^[CDG][A-Z0-9]+$", requested_channel)) if channel_is_id and requested_channel != active_channel: @@ -1921,7 +1922,10 @@ def discover(self) -> list[LoadedTool]: name="openai-codex", hosts=("chatgpt.com",), fields=( - ("client_id", OAuthFieldSource(secret_ref="OPENAI_CODEX_CLIENT_ID")), + ( + "client_id", + OAuthFieldSource(secret_ref="OPENAI_CODEX_CLIENT_ID"), + ), ("refresh_token", OAuthFieldSource(secret_ref="OPENAI_CODEX_BLOB")), ), token_endpoint="https://auth.openai.com/oauth/token", @@ -2482,7 +2486,9 @@ async def call_tool( except (SystemExit, Exception) as e: duration_ms = round((time.monotonic() - t0) * 1000) if isinstance(e, asyncio.TimeoutError): - error_msg = f"Tool call timed out after {_timeout_label(lt.timeout_s)}" + error_msg = ( + f"Tool call timed out after {_timeout_label(lt.timeout_s)}" + ) elif isinstance(e, SystemExit): error_msg = f"sys.exit({e.code})" else: diff --git a/services/api/tests/test_sandbox_entrypoint.py b/services/api/tests/test_sandbox_entrypoint.py index b42c924c2..ba53e5782 100644 --- a/services/api/tests/test_sandbox_entrypoint.py +++ b/services/api/tests/test_sandbox_entrypoint.py @@ -144,11 +144,7 @@ def test_sandbox_entrypoint_skips_codex_login_for_openrouter(tmp_path: Path) -> bin_dir.mkdir() codex_calls = tmp_path / "codex-calls" codex_bin = bin_dir / "codex" - codex_bin.write_text( - "#!/bin/sh\n" - f"printf '%s\\n' \"$*\" >> {codex_calls}\n" - "exit 0\n" - ) + codex_bin.write_text(f"#!/bin/sh\nprintf '%s\\n' \"$*\" >> {codex_calls}\nexit 0\n") codex_bin.chmod(0o755) result = subprocess.run( diff --git a/services/api/tests/test_tool_manager.py b/services/api/tests/test_tool_manager.py index 6ed89efb2..1ea196e77 100644 --- a/services/api/tests/test_tool_manager.py +++ b/services/api/tests/test_tool_manager.py @@ -8,7 +8,7 @@ import httpx import pytest -from fastapi import FastAPI +from fastapi import FastAPI, Request sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) @@ -22,6 +22,8 @@ ToolManager, ToolMethod, ) +from api.api_keys import APIKeyInfo # noqa: E402 +from api.deps import verify_api_key # noqa: E402 class TestDescribeMethodDocstring: @@ -39,7 +41,10 @@ def test_empty_or_none_returns_empty_string(self): assert _describe_method_docstring(" \n \n") == "" def test_single_line_docstring_returned_verbatim(self): - assert _describe_method_docstring("Search Slack for messages.") == "Search Slack for messages." + assert ( + _describe_method_docstring("Search Slack for messages.") + == "Search Slack for messages." + ) def test_multi_paragraph_description_preserved(self): doc = """Hybrid research engine. @@ -611,6 +616,19 @@ async def test_tool_rest_router_lists_describes_and_invokes_tools( manager = ToolManager(tools_dir) manager.discover() app = FastAPI() + + async def allow_tools(request: Request) -> str: + request.state.api_key_info = APIKeyInfo( + id="test-key", + name="test", + key_prefix="test", + scopes=["tools:*"], + created_by="test", + source="test", + ) + return "key:test" + + app.dependency_overrides[verify_api_key] = allow_tools app.include_router(manager.create_rest_router()) transport = httpx.ASGITransport(app=app) @@ -657,7 +675,7 @@ async def test_tool_rest_router_lists_describes_and_invokes_tools( missing_response = await client.post("/tools/alpha/missing", json={}) assert missing_response.status_code == 200 assert missing_response.json()["result"] == ( - '{"error": "Method \'missing\' not found in tool \'alpha\'", ' + "{\"error\": \"Method 'missing' not found in tool 'alpha'\", " '"available_methods": ["async_echo", "secret_values", "sync_echo"]}' )