diff --git a/package.json b/package.json index 49d6dcda..1ed261d4 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ }, "homepage": "https://warden.sentry.dev", "dependencies": { + "@agentclientprotocol/sdk": "^0.21.0", "@anthropic-ai/claude-agent-sdk": "^0.2.22", "@anthropic-ai/sdk": "^0.72.1", "@inquirer/select": "^5.0.4", diff --git a/packages/docs/src/pages/config.astro b/packages/docs/src/pages/config.astro index 1ada9f98..ad12f13f 100644 --- a/packages/docs/src/pages/config.astro +++ b/packages/docs/src/pages/config.astro @@ -223,6 +223,27 @@ model = "claude-opus-4-5"`}

Model precedence: trigger > skill > defaults > CLI flag (-m) > env var (WARDEN_MODEL). Most specific wins.

Synthesis fallback: defaults.synthesis.model falls back to defaults.auxiliary.model when not set.

+

Agent Client Protocol runtime

+ +

Set defaults.runtime to acp to run hunk analysis through an ACP-compatible agent instead of Claude Code SDK. Use a custom command or a registry agent id.

+ + + + + +

Registry agents are resolved from https://cdn.agentclientprotocol.com/registry/v1/latest/registry.json by default. Analysis, extraction repair, consolidation, and fix-quality helper calls use the selected ACP agent.

+

Chunking

Control how files are split for analysis. By default, Warden analyzes each hunk separately.

diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index dd29fdde..a70889cd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8,6 +8,9 @@ importers: .: dependencies: + '@agentclientprotocol/sdk': + specifier: ^0.21.0 + version: 0.21.0(zod@4.3.6) '@anthropic-ai/claude-agent-sdk': specifier: ^0.2.22 version: 0.2.22(zod@4.3.6) @@ -127,6 +130,11 @@ importers: packages: + '@agentclientprotocol/sdk@0.21.0': + resolution: {integrity: sha512-ONj+Q8qOdNQp5XbH5jnMwzT9IKZJsSN0p0lkceS4GtUtNOPVLpNzSS8gqQdGMKfBvA0ESbkL8BTaSN1Rc9miEw==} + peerDependencies: + zod: ^3.25.0 || ^4.0.0 + '@alcalzone/ansi-tokenize@0.1.3': resolution: {integrity: sha512-3yWxPTq3UQ/FY9p1ErPxIyfT64elWaMvM9lIHnaqpyft63tkxodF5aUElYHrdisWve5cETkh1+KBw1yJuW0aRw==} engines: {node: '>=14.13.1'} @@ -3395,6 +3403,10 @@ packages: snapshots: + '@agentclientprotocol/sdk@0.21.0(zod@4.3.6)': + dependencies: + zod: 4.3.6 + '@alcalzone/ansi-tokenize@0.1.3': dependencies: ansi-styles: 6.2.3 diff --git a/src/action/triggers/executor.ts b/src/action/triggers/executor.ts index 5c3bc89b..b6077866 100644 --- a/src/action/triggers/executor.ts +++ b/src/action/triggers/executor.ts @@ -136,6 +136,7 @@ export async function executeTrigger( apiKey: anthropicApiKey, model: trigger.model, runtime: trigger.runtime, + acp: trigger.acp, auxiliaryModel: trigger.auxiliaryModel, synthesisModel: trigger.synthesisModel, maxTurns: trigger.maxTurns, diff --git a/src/action/workflow/schedule.ts b/src/action/workflow/schedule.ts index cd75de63..a0ad7e33 100644 --- a/src/action/workflow/schedule.ts +++ b/src/action/workflow/schedule.ts @@ -176,6 +176,7 @@ export async function runScheduleWorkflow( apiKey: inputs.anthropicApiKey, model: resolved.model, runtime: resolved.runtime, + acp: resolved.acp, auxiliaryModel: resolved.auxiliaryModel, synthesisModel: resolved.synthesisModel, maxTurns: resolved.maxTurns, diff --git a/src/cli/main.ts b/src/cli/main.ts index 287660a2..88e38f2e 100644 --- a/src/cli/main.ts +++ b/src/cli/main.ts @@ -482,6 +482,7 @@ interface SkillToRun { model?: string; maxTurns?: number; runtime?: SkillRunnerOptions['runtime']; + acp?: SkillRunnerOptions['acp']; auxiliaryModel?: string; synthesisModel?: string; auxiliaryMaxRetries?: number; @@ -507,7 +508,7 @@ interface ProcessedResults { type SkillRunnerOptionOverrides = Pick< SkillRunnerOptions, - 'model' | 'maxTurns' | 'runtime' | 'auxiliaryModel' | 'synthesisModel' | 'auxiliaryMaxRetries' + 'model' | 'maxTurns' | 'runtime' | 'acp' | 'auxiliaryModel' | 'synthesisModel' | 'auxiliaryMaxRetries' >; /** Apply per-skill runner overrides on top of the shared execution defaults. */ @@ -520,6 +521,7 @@ export function mergeSkillRunnerOptions( if (overrides.model !== undefined) merged.model = overrides.model; if (overrides.maxTurns !== undefined) merged.maxTurns = overrides.maxTurns; if (overrides.runtime !== undefined) merged.runtime = overrides.runtime; + if (overrides.acp !== undefined) merged.acp = overrides.acp; if (overrides.auxiliaryModel !== undefined) merged.auxiliaryModel = overrides.auxiliaryModel; if (overrides.synthesisModel !== undefined) merged.synthesisModel = overrides.synthesisModel; if (overrides.auxiliaryMaxRetries !== undefined) { @@ -819,20 +821,6 @@ export async function runSkills( // Not in a git repo - that's fine for file mode } - // Pre-flight: verify auth will work before starting analysis - try { - verifyAuth({ apiKey }); - } catch (error: unknown) { - const message = (error as WardenAuthenticationError).message; - reporter.error(message); - emitEmptyRunLog(repoPath ?? cwd, options, { - code: 'auth_failed', - message, - timestamp: new Date().toISOString(), - }); - return 1; - } - // Resolve config path let configPath: string | null = null; if (options.config) { @@ -849,6 +837,22 @@ export async function runSkills( const defaultAuxiliaryModel = resolveCliDefaultAuxiliaryModel(config); const defaultSynthesisModel = resolveCliDefaultSynthesisModel(config); + // Pre-flight: verify Claude auth only when a Claude-backed run may execute. + try { + if ((config?.defaults?.runtime ?? 'claude') === 'claude') { + verifyAuth({ apiKey }); + } + } catch (error: unknown) { + const message = (error as WardenAuthenticationError).message; + reporter.error(message); + emitEmptyRunLog(repoPath ?? cwd, options, { + code: 'auth_failed', + message, + timestamp: new Date().toISOString(), + }); + return 1; + } + // Determine which triggers/skills to run let skillsToRun: SkillToRun[]; if (options.skill) { @@ -868,6 +872,7 @@ export async function runSkills( model: match?.model ?? defaultModel, maxTurns: match?.maxTurns ?? config?.defaults?.agent?.maxTurns ?? config?.defaults?.maxTurns, runtime: match?.runtime ?? config?.defaults?.runtime ?? 'claude', + acp: match?.acp ?? config?.defaults?.agent?.acp, auxiliaryModel: match?.auxiliaryModel ?? defaultAuxiliaryModel, synthesisModel: match?.synthesisModel ?? defaultSynthesisModel, auxiliaryMaxRetries: @@ -894,6 +899,7 @@ export async function runSkills( model: t.model, maxTurns: t.maxTurns, runtime: t.runtime, + acp: t.acp, auxiliaryModel: t.auxiliaryModel, synthesisModel: t.synthesisModel, auxiliaryMaxRetries: t.auxiliaryMaxRetries, @@ -926,6 +932,7 @@ export async function runSkills( apiKey, model: sdkModel, runtime: config?.defaults?.runtime ?? 'claude', + acp: config?.defaults?.agent?.acp, auxiliaryModel: defaultAuxiliaryModel, synthesisModel: defaultSynthesisModel, abortController, @@ -1231,7 +1238,9 @@ async function runConfigMode(options: CLIOptions, reporter: Reporter): Promise { expect(resolved?.auxiliaryMaxRetries).toBe(2); }); + it('resolves ACP agent runtime options', () => { + const config: WardenConfig = { + ...baseConfig, + defaults: { + runtime: 'acp', + agent: { + acp: { command: 'atlas alta agent run' }, + }, + }, + }; + + const [resolved] = resolveSkillConfigs(config); + + expect(resolved?.runtime).toBe('acp'); + expect(resolved?.acp).toEqual({ command: 'atlas alta agent run' }); + }); + it('falls back to auxiliary model when synthesis model is unset', () => { const config: WardenConfig = { ...baseConfig, @@ -853,6 +870,29 @@ describe('maxTurns config', () => { expect(result.data?.defaults?.synthesis?.model).toBe('claude-opus-4-5'); }); + it('accepts ACP custom command and registry defaults', () => { + const customCommand = WardenConfigSchema.safeParse({ + version: 1, + defaults: { + runtime: 'acp', + agent: { acp: { command: 'atlas alta agent run' } }, + }, + skills: [], + }); + const registry = WardenConfigSchema.safeParse({ + version: 1, + defaults: { + runtime: 'acp', + agent: { acp: { registryId: 'amp-acp', registryUrl: 'https://example.com/registry.json' } }, + }, + skills: [], + }); + + expect(customCommand.success).toBe(true); + expect(registry.success).toBe(true); + expect(registry.data?.defaults?.agent?.acp?.registryUrl).toBe('https://example.com/registry.json'); + }); + it('rejects unknown runtimes', () => { const config = { version: 1, diff --git a/src/config/loader.ts b/src/config/loader.ts index d0fb7009..f124f1d6 100644 --- a/src/config/loader.ts +++ b/src/config/loader.ts @@ -13,6 +13,7 @@ import { type RunnerConfig, type LogsConfig, type RuntimeName, + type AcpAgentRuntimeConfig, } from './schema.js'; import type { SeverityThreshold, ConfidenceThreshold } from '../types/index.js'; @@ -292,6 +293,8 @@ export interface ResolvedTrigger { maxTurns?: number; /** Runtime backend for all model-backed execution. */ runtime?: RuntimeName; + /** Agent Client Protocol runtime options. */ + acp?: AcpAgentRuntimeConfig; /** Model for auxiliary structured model calls. */ auxiliaryModel?: string; /** Model for post-analysis synthesis/consolidation. */ @@ -340,6 +343,7 @@ export function resolveSkillConfigs( const envModel = emptyToUndefined(process.env['WARDEN_MODEL']); const result: ResolvedTrigger[] = []; const runtime = defaults?.runtime ?? 'claude'; + const acp = defaults?.agent?.acp; const auxiliaryModel = emptyToUndefined(defaults?.auxiliary?.model); const synthesisModel = emptyToUndefined(defaults?.synthesis?.model) ?? @@ -386,6 +390,7 @@ export function resolveSkillConfigs( model: baseModel, maxTurns: baseMaxTurns, runtime, + acp, auxiliaryModel, synthesisModel, auxiliaryMaxRetries, @@ -413,6 +418,7 @@ export function resolveSkillConfigs( model: emptyToUndefined(trigger.model) ?? baseModel, maxTurns: trigger.maxTurns ?? baseMaxTurns, runtime, + acp, auxiliaryModel, synthesisModel, auxiliaryMaxRetries, diff --git a/src/config/schema.ts b/src/config/schema.ts index 603e866a..a5a50abc 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -51,11 +51,27 @@ export type TriggerType = z.infer; export { RuntimeNameSchema }; export type { RuntimeName }; +export const AcpAgentRuntimeConfigSchema = z.object({ + /** Custom ACP agent command, e.g. "atlas alta agent run". */ + command: z.string().min(1).optional(), + /** Additional arguments appended to the custom ACP command. */ + args: z.array(z.string()).optional(), + /** ACP registry agent id resolved from the public registry. */ + registryId: z.string().min(1).optional(), + /** ACP registry URL. Defaults to the public latest registry. */ + registryUrl: z.string().url().optional(), + /** Extra environment variables for the ACP agent process. */ + env: z.record(z.string(), z.string()).optional(), +}).strict(); +export type AcpAgentRuntimeConfig = z.infer; + export const AgentRuntimeConfigSchema = z.object({ /** Model for repo-aware skill execution. Overrides legacy defaults.model. */ model: z.string().optional(), /** Maximum agentic turns for repo-aware skill execution. Overrides legacy defaults.maxTurns. */ maxTurns: z.number().int().positive().optional(), + /** Agent Client Protocol runtime options. Used when defaults.runtime = "acp". */ + acp: AcpAgentRuntimeConfigSchema.optional(), }).strict(); export type AgentRuntimeConfig = z.infer; diff --git a/src/sdk/analyze.ts b/src/sdk/analyze.ts index beac7c46..e41fe9de 100644 --- a/src/sdk/analyze.ts +++ b/src/sdk/analyze.ts @@ -66,7 +66,8 @@ function isAbortRequested(error: unknown, abortController?: AbortController): bo async function parseHunkOutput( result: SkillRunResult, filename: string, - options: SkillRunnerOptions + options: SkillRunnerOptions, + repoPath: string, ): Promise { if (result.status !== 'success') { // SDK error - not an extraction failure, just no findings @@ -86,6 +87,9 @@ async function parseHunkOutput( runtime: options.runtime, model: options.auxiliaryModel, maxRetries: options.auxiliaryMaxRetries, + repoPath, + providerOptions: options.runtime === 'acp' ? options.acp : undefined, + abortController: options.abortController, }); if (fallback.success) { @@ -200,9 +204,10 @@ async function analyzeHunk( try { const runtimeName = options.runtime ?? 'claude'; const runtime = getRuntime(runtimeName); - const providerOptions = - runtimeName === 'claude' - ? { pathToClaudeCodeExecutable: options.pathToClaudeCodeExecutable } + const providerOptions = runtimeName === 'claude' + ? { pathToClaudeCodeExecutable: options.pathToClaudeCodeExecutable } + : runtimeName === 'acp' + ? options.acp : undefined; const { result: resultMessage, authError } = await runtime.runSkill({ systemPrompt, @@ -271,7 +276,7 @@ async function analyzeHunk( }; } - const parseResult = await parseHunkOutput(resultMessage, hunkCtx.filename, options); + const parseResult = await parseHunkOutput(resultMessage, hunkCtx.filename, options, repoPath); // Filter findings outside hunk line range (defense-in-depth) const hunkRange = getHunkLineRange(hunkCtx.hunk); @@ -803,6 +808,8 @@ export async function runSkill( runtime: options.runtime, model: options.synthesisModel, maxRetries: options.auxiliaryMaxRetries, + providerOptions: options.runtime === 'acp' ? options.acp : undefined, + abortController: options.abortController, }); let mergedFindings = mergeResult.findings; if (mergeResult.usage) { @@ -814,6 +821,8 @@ export async function runSkill( runtime: options.runtime, model: options.auxiliaryModel, maxRetries: options.auxiliaryMaxRetries, + providerOptions: options.runtime === 'acp' ? options.acp : undefined, + abortController: options.abortController, }); mergedFindings = sanitized.findings; if (sanitized.usage) { diff --git a/src/sdk/extract.ts b/src/sdk/extract.ts index 64bb962c..d7561888 100644 --- a/src/sdk/extract.ts +++ b/src/sdk/extract.ts @@ -22,6 +22,9 @@ export interface AuxiliaryCallOptions { runtime?: RuntimeName; model?: string; maxRetries?: number; + repoPath?: string; + providerOptions?: unknown; + abortController?: AbortController; } /** @@ -181,7 +184,7 @@ export async function extractFindingsWithLLM( : { apiKey: apiKeyOrOptions, maxRetries }; const { apiKey, runtime, model } = options; - if (!apiKey) { + if (!apiKey && runtime !== 'acp') { return { success: false, error: 'no_api_key_for_fallback', @@ -217,6 +220,9 @@ ${truncatedText}`; maxTokens: LLM_FALLBACK_MAX_TOKENS, timeout: LLM_FALLBACK_TIMEOUT_MS, maxRetries: options.maxRetries, + repoPath: options.repoPath, + providerOptions: options.providerOptions, + abortController: options.abortController, }); if (!result.success) { @@ -457,11 +463,12 @@ export async function mergeCrossLocationFindings( options?: AuxiliaryCallOptions & { repoPath?: string } ): Promise { const apiKey = options?.apiKey; + const runtime = options?.runtime; const repoPath = options?.repoPath ?? '.'; // Early exit: need at least 2 located findings to merge const withLocations = findings.filter((f) => f.location); - if (withLocations.length < 2 || !apiKey) { + if (withLocations.length < 2 || (!apiKey && runtime !== 'acp')) { return { findings, mergedCount: 0 }; } @@ -483,7 +490,7 @@ ${findingDescriptions.join('\n')} Return a JSON array of arrays, where each inner array contains the 1-based indices of findings about the same issue. Singletons should not appear. Return [] if no findings describe the same issue.`; - const result = await getRuntime(options?.runtime).runSynthesis({ + const result = await getRuntime(runtime).runSynthesis({ task: 'consolidation', apiKey, prompt, @@ -491,6 +498,9 @@ Singletons should not appear. Return [] if no findings describe the same issue.` model: options?.model, maxTokens: 512, maxRetries: options?.maxRetries, + repoPath, + providerOptions: options?.providerOptions, + abortController: options?.abortController, }); if (!result.success) { diff --git a/src/sdk/fix-quality.ts b/src/sdk/fix-quality.ts index 2cc06117..24579f59 100644 --- a/src/sdk/fix-quality.ts +++ b/src/sdk/fix-quality.ts @@ -27,6 +27,8 @@ interface SanitizeSuggestedFixesOptions { runtime?: RuntimeName; model?: string; maxRetries?: number; + providerOptions?: unknown; + abortController?: AbortController; } const SEMANTIC_PROMPT_MAX_CHARS = 4000; @@ -133,7 +135,7 @@ async function runSemanticGate( options: SanitizeSuggestedFixesOptions ): Promise<{ verdict: 'pass' | 'fail' | 'unavailable'; usage?: UsageStats }> { const { apiKey, runtime, model, maxRetries } = options; - if (!apiKey) { + if (!apiKey && runtime !== 'acp') { return { verdict: 'unavailable' }; } const originalForPrompt = fileContent.slice(0, SEMANTIC_PROMPT_MAX_CHARS); @@ -166,6 +168,9 @@ async function runSemanticGate( maxTokens: 220, timeout: 8000, maxRetries: maxRetries ?? 1, + repoPath: options.repoPath, + providerOptions: options.providerOptions, + abortController: options.abortController, }); if (!result.success) { diff --git a/src/sdk/json-output.ts b/src/sdk/json-output.ts index e1a7b83f..ef7be995 100644 --- a/src/sdk/json-output.ts +++ b/src/sdk/json-output.ts @@ -19,6 +19,9 @@ export interface JsonOutputRepairOptions { maxRetries?: number; maxTokens?: number; timeout?: number; + repoPath?: string; + providerOptions?: unknown; + abortController?: AbortController; } export interface ParseJsonFromOutputOptions { @@ -66,7 +69,7 @@ async function repairJsonOutput( reason: string, repair: JsonOutputRepairOptions, ): Promise> { - if (!repair.apiKey) { + if (!repair.apiKey && repair.runtimeName !== 'acp') { return { success: false, error: `${reason}; repair_skipped: missing_api_key`, @@ -81,6 +84,9 @@ async function repairJsonOutput( maxRetries: repair.maxRetries, maxTokens: repair.maxTokens ?? JSON_REPAIR_MAX_TOKENS, timeout: repair.timeout ?? JSON_REPAIR_TIMEOUT_MS, + repoPath: repair.repoPath, + providerOptions: repair.providerOptions, + abortController: repair.abortController, schema, prompt: `Extract and repair the JSON value from this model output. diff --git a/src/sdk/runner.ts b/src/sdk/runner.ts index 21f90848..44ea2776 100644 --- a/src/sdk/runner.ts +++ b/src/sdk/runner.ts @@ -62,6 +62,7 @@ export { analyzeFile, runSkill, generateSummary } from './analyze.js'; // Re-export runtime registry and adapter contracts export { + acpRuntime, claudeRuntime, getRuntime, } from './runtimes/index.js'; diff --git a/src/sdk/runtimes/acp.test.ts b/src/sdk/runtimes/acp.test.ts new file mode 100644 index 00000000..23d61cf3 --- /dev/null +++ b/src/sdk/runtimes/acp.test.ts @@ -0,0 +1,267 @@ +import { existsSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { z } from 'zod'; +import { afterEach, describe, expect, it } from 'vitest'; +import { acpRuntime } from './acp.js'; + +const tempDirs: string[] = []; + +function makeTempDir(): string { + const dir = mkdtempSync(join(tmpdir(), 'warden-acp-runtime-')); + tempDirs.push(dir); + return dir; +} + +afterEach(() => { + while (tempDirs.length > 0) { + const dir = tempDirs.pop(); + if (dir) rmSync(dir, { recursive: true, force: true }); + } +}); + +function writeFakeAcpAgent(dir: string, responseText: string, promptUsage?: object, usageUpdate?: object): string { + const scriptPath = join(dir, 'fake-acp-agent.mjs'); + const usageNotification = usageUpdate + ? `send({ jsonrpc: '2.0', method: 'session/update', params: { + sessionId: 'session-123', + update: ${JSON.stringify(usageUpdate)} + }});` + : ''; + writeFileSync(scriptPath, ` +import readline from 'node:readline'; +const rl = readline.createInterface({ input: process.stdin }); +function send(message) { process.stdout.write(JSON.stringify(message) + '\\n'); } +rl.on('line', (line) => { + const message = JSON.parse(line); + if (message.method === 'initialize') { + send({ jsonrpc: '2.0', id: message.id, result: { + protocolVersion: 1, + agentCapabilities: {}, + agentInfo: { name: 'fake-acp-agent', version: '1.0.0' }, + authMethods: [] + }}); + return; + } + if (message.method === 'session/new') { + send({ jsonrpc: '2.0', id: message.id, result: { sessionId: 'session-123' } }); + return; + } + if (message.method === 'session/prompt') { + send({ jsonrpc: '2.0', method: 'session/update', params: { + sessionId: 'session-123', + update: { + sessionUpdate: 'agent_message_chunk', + content: { type: 'text', text: ${JSON.stringify(responseText)} } + } + }}); + ${usageNotification} + send({ jsonrpc: '2.0', id: message.id, result: { + stopReason: 'end_turn', + usage: ${promptUsage ? JSON.stringify(promptUsage) : 'undefined'} + } }); + } +}); +`); + return scriptPath; +} + +function writeAbortAwareAgent(dir: string, logPath: string, mode: 'before-session' | 'during-prompt'): string { + const scriptPath = join(dir, `abort-aware-${mode}.mjs`); + writeFileSync(scriptPath, ` +import { appendFileSync } from 'node:fs'; +import readline from 'node:readline'; +const rl = readline.createInterface({ input: process.stdin }); +function log(value) { appendFileSync(${JSON.stringify(logPath)}, JSON.stringify(value) + '\\n'); } +function send(message) { process.stdout.write(JSON.stringify(message) + '\\n'); } +rl.on('line', (line) => { + const message = JSON.parse(line); + log(message); + if (message.method === 'initialize') { + send({ jsonrpc: '2.0', id: message.id, result: { protocolVersion: 1, agentCapabilities: {}, agentInfo: { name: 'abort-aware' } } }); + ${mode === 'before-session' ? 'setTimeout(() => {}, 10000);' : ''} + return; + } + if (message.method === 'session/new') { + ${mode === 'before-session' ? 'setTimeout(() => {}, 10000); return;' : ''} + send({ jsonrpc: '2.0', id: message.id, result: { sessionId: 'session-actual' } }); + return; + } + if (message.method === 'session/prompt') { + setTimeout(() => {}, 10000); + } +}); +`); + return scriptPath; +} + +function readJsonLines(filePath: string): { method?: string; params?: { sessionId?: string } }[] { + if (!existsSync(filePath)) return []; + const content = readFileSync(filePath, 'utf8').trim(); + if (!content) return []; + return content.split('\n').filter(Boolean).map((line) => JSON.parse(line) as { method?: string; params?: { sessionId?: string } }); +} + +async function waitForMessage(filePath: string, method: string): Promise { + const deadline = Date.now() + 1000; + while (Date.now() < deadline) { + if (readJsonLines(filePath).some((message) => message.method === method)) return; + await new Promise((resolveWait) => setTimeout(resolveWait, 10)); + } + throw new Error(`Timed out waiting for ${method}`); +} + +describe('acpRuntime', () => { + it('runs a skill through an ACP stdio agent and collects streamed text and usage', async () => { + const dir = makeTempDir(); + const scriptPath = writeFakeAcpAgent(dir, '{"findings":[]}', { + inputTokens: 120, + outputTokens: 34, + cachedReadTokens: 56, + cachedWriteTokens: 78, + totalTokens: 288, + }, { + sessionUpdate: 'usage_update', + size: 200000, + used: 1500, + cost: { amount: 0.0123, currency: 'USD' }, + }); + + const response = await acpRuntime.runSkill({ + systemPrompt: 'system prompt', + userPrompt: 'user prompt', + repoPath: dir, + skillName: 'test-skill', + options: {}, + providerOptions: { + command: process.execPath, + args: [scriptPath], + }, + }); + + expect(response.result).toMatchObject({ + status: 'success', + text: '{"findings":[]}', + sessionId: 'session-123', + usage: { + inputTokens: 120, + outputTokens: 34, + cacheReadInputTokens: 56, + cacheCreationInputTokens: 78, + cacheCreation5mInputTokens: 78, + cacheCreation1hInputTokens: 0, + webSearchRequests: 0, + costUSD: 0.0123, + }, + }); + }); + + it('cancels the active ACP session id when aborted after session creation', async () => { + const dir = makeTempDir(); + const logPath = join(dir, 'messages.jsonl'); + const scriptPath = writeAbortAwareAgent(dir, logPath, 'during-prompt'); + const abortController = new AbortController(); + + const run = acpRuntime.runSkill({ + systemPrompt: 'system prompt', + userPrompt: 'user prompt', + repoPath: dir, + skillName: 'test-skill', + options: { abortController }, + providerOptions: { + command: process.execPath, + args: [scriptPath], + }, + }); + await waitForMessage(logPath, 'session/prompt'); + abortController.abort(); + const response = await run; + + const cancel = readJsonLines(logPath).find((message) => message.method === 'session/cancel'); + expect(response.result?.status).toBe('aborted'); + expect(cancel?.params?.sessionId).toBe('session-actual'); + }); + + it('does not send an empty ACP cancel when aborted before session creation', async () => { + const dir = makeTempDir(); + const logPath = join(dir, 'messages.jsonl'); + const scriptPath = writeAbortAwareAgent(dir, logPath, 'before-session'); + const abortController = new AbortController(); + + const run = acpRuntime.runSkill({ + systemPrompt: 'system prompt', + userPrompt: 'user prompt', + repoPath: dir, + skillName: 'test-skill', + options: { abortController }, + providerOptions: { + command: process.execPath, + args: [scriptPath], + }, + }); + await waitForMessage(logPath, 'session/new'); + abortController.abort(); + const response = await run; + + const cancel = readJsonLines(logPath).find((message) => message.method === 'session/cancel'); + expect(response.result?.status).toBe('aborted'); + expect(cancel).toBeUndefined(); + }); + + it('rejects cleanly when the ACP command cannot be spawned', async () => { + const dir = makeTempDir(); + + await expect(acpRuntime.runSkill({ + systemPrompt: 'system prompt', + userPrompt: 'user prompt', + repoPath: dir, + skillName: 'test-skill', + options: {}, + providerOptions: { + command: 'tmp-rovodev-missing-acp-command', + }, + })).rejects.toThrow(/tmp-rovodev-missing-acp-command|ENOENT|spawn/); + }); + + it('runs auxiliary structured calls through ACP', async () => { + const dir = makeTempDir(); + const scriptPath = writeFakeAcpAgent(dir, '{"findings":[{"title":"ok"}]}'); + + const result = await acpRuntime.runAuxiliary({ + task: 'extraction', + prompt: 'Extract findings', + schema: z.object({ findings: z.array(z.object({ title: z.string() })) }), + repoPath: dir, + providerOptions: { + command: process.execPath, + args: [scriptPath], + }, + }); + + expect(result).toMatchObject({ + success: true, + data: { findings: [{ title: 'ok' }] }, + }); + }); + + it('runs synthesis structured calls through ACP', async () => { + const dir = makeTempDir(); + const scriptPath = writeFakeAcpAgent(dir, '[[1,2]]'); + + const result = await acpRuntime.runSynthesis({ + task: 'consolidation', + prompt: 'Group duplicate findings', + schema: z.array(z.array(z.number())), + repoPath: dir, + providerOptions: { + command: process.execPath, + args: [scriptPath], + }, + }); + + expect(result).toMatchObject({ + success: true, + data: [[1, 2]], + }); + }); +}); diff --git a/src/sdk/runtimes/acp.ts b/src/sdk/runtimes/acp.ts new file mode 100644 index 00000000..4950ee8b --- /dev/null +++ b/src/sdk/runtimes/acp.ts @@ -0,0 +1,473 @@ +import { ClientSideConnection, PROTOCOL_VERSION, type Client } from '@agentclientprotocol/sdk'; +import type { + ReadTextFileRequest, + ReadTextFileResponse, + PermissionOption, + RequestPermissionRequest, + RequestPermissionResponse, + SessionNotification, + Usage, + UsageUpdate, +} from '@agentclientprotocol/sdk/dist/schema/index.js'; +import type { UsageStats } from '../../types/index.js'; +import { ndJsonStream } from '@agentclientprotocol/sdk/dist/stream.js'; +import { spawn } from 'node:child_process'; +import { readFile } from 'node:fs/promises'; +import { isAbsolute, relative, resolve, sep } from 'node:path'; +import { Readable, Writable } from 'node:stream'; +import { Sentry } from '../../sentry.js'; +import { extractJson } from '../haiku.js'; +import { emptyUsage } from '../usage.js'; +import type { + AuxiliaryRunRequest, + AuxiliaryRunResult, + Runtime, + SkillRunRequest, + SkillRunResponse, + SynthesisRunRequest, +} from './types.js'; + +const DEFAULT_REGISTRY_URL = 'https://cdn.agentclientprotocol.com/registry/v1/latest/registry.json'; + +interface AcpProviderOptions { + command?: string; + args?: string[]; + registryId?: string; + registryUrl?: string; + env?: Record; +} + +interface CommandSpec { + command: string; + args: string[]; + env?: Record; +} + +interface AcpUsageSnapshot { + usages: Usage[]; + usageUpdate?: UsageUpdate; +} + +function firstCommandPart(parts: string[], message: string): string { + const command = parts[0]; + if (!command) throw new Error(message); + return command; +} + +function parseCommand(command: string): string[] { + const parts: string[] = []; + let current = ''; + let quote: '"' | "'" | undefined; + let escaping = false; + + for (const char of command) { + if (escaping) { + current += char; + escaping = false; + continue; + } + if (char === '\\' && quote !== "'") { + escaping = true; + continue; + } + if ((char === '"' || char === "'") && (!quote || quote === char)) { + quote = quote ? undefined : char; + continue; + } + if (/\s/.test(char) && !quote) { + if (current) { + parts.push(current); + current = ''; + } + continue; + } + current += char; + } + + if (escaping) current += '\\'; + if (quote) throw new Error(`Unclosed quote in ACP command: ${command}`); + if (current) parts.push(current); + return parts; +} + +function getAcpProviderOptions(providerOptions: unknown): AcpProviderOptions { + if (!providerOptions || typeof providerOptions !== 'object') return {}; + const options = providerOptions as Record; + return { + command: typeof options['command'] === 'string' ? options['command'] : undefined, + args: Array.isArray(options['args']) && options['args'].every((arg) => typeof arg === 'string') + ? options['args'] as string[] + : undefined, + registryId: typeof options['registryId'] === 'string' ? options['registryId'] : undefined, + registryUrl: typeof options['registryUrl'] === 'string' ? options['registryUrl'] : undefined, + env: options['env'] && typeof options['env'] === 'object' && !Array.isArray(options['env']) + ? Object.fromEntries( + Object.entries(options['env'] as Record) + .filter((entry): entry is [string, string] => typeof entry[1] === 'string') + ) + : undefined, + }; +} + +async function resolveRegistryCommand(registryId: string, registryUrl = DEFAULT_REGISTRY_URL): Promise { + const response = await fetch(registryUrl); + if (!response.ok) { + throw new Error(`Failed to fetch ACP registry ${registryUrl}: ${response.status} ${response.statusText}`); + } + + const registry = await response.json() as { agents?: unknown[] }; + const agent = registry.agents?.find((entry): entry is Record => ( + Boolean(entry) && typeof entry === 'object' && (entry as Record)['id'] === registryId + )); + if (!agent) { + throw new Error(`ACP registry agent not found: ${registryId}`); + } + + const distribution = agent['distribution']; + if (!distribution || typeof distribution !== 'object') { + throw new Error(`ACP registry agent ${registryId} has no distribution metadata`); + } + + const npx = (distribution as Record)['npx']; + if (npx && typeof npx === 'object') { + const npxConfig = npx as Record; + const packageName = npxConfig['package']; + const args = npxConfig['args']; + if (typeof packageName !== 'string') { + throw new Error(`ACP registry agent ${registryId} has invalid npx package metadata`); + } + return { + command: 'npx', + args: ['-y', packageName, ...(Array.isArray(args) && args.every((arg) => typeof arg === 'string') ? args : [])], + }; + } + + const binary = (distribution as Record)['binary']; + if (binary && typeof binary === 'object') { + const binaryConfig = binary as Record; + const command = binaryConfig['command']; + const args = binaryConfig['args']; + if (typeof command === 'string') { + return { + command, + args: Array.isArray(args) && args.every((arg) => typeof arg === 'string') ? args : [], + }; + } + } + + const command = (distribution as Record)['command']; + if (typeof command === 'string') { + const parsed = parseCommand(command); + return { + command: firstCommandPart(parsed, `ACP registry agent ${registryId} has an empty command`), + args: parsed.slice(1), + }; + } + + throw new Error(`ACP registry agent ${registryId} does not expose a supported distribution`); +} + +async function resolveCommand(options: AcpProviderOptions): Promise { + if (options.command) { + const parsed = parseCommand(options.command); + return { + command: firstCommandPart(parsed, 'ACP command must not be empty'), + args: [...parsed.slice(1), ...(options.args ?? [])], + env: options.env, + }; + } + + if (options.registryId) { + const command = await resolveRegistryCommand(options.registryId, options.registryUrl); + return { ...command, env: options.env }; + } + + const envCommand = process.env['WARDEN_ACP_COMMAND']; + if (envCommand) { + const parsed = parseCommand(envCommand); + return { + command: firstCommandPart(parsed, 'WARDEN_ACP_COMMAND must not be empty'), + args: parsed.slice(1), + env: options.env, + }; + } + + throw new Error('ACP runtime requires defaults.agent.acp.command, defaults.agent.acp.registryId, or WARDEN_ACP_COMMAND'); +} + +function acpUsageToStats(snapshot: AcpUsageSnapshot | undefined): UsageStats { + const usages = snapshot?.usages ?? []; + const inputTokens = usages.reduce((total, usage) => total + usage.inputTokens, 0); + const outputTokens = usages.reduce((total, usage) => total + usage.outputTokens, 0); + const cacheReadInputTokens = usages.reduce((total, usage) => total + (usage.cachedReadTokens ?? 0), 0); + const cacheCreationInputTokens = usages.reduce((total, usage) => total + (usage.cachedWriteTokens ?? 0), 0); + const update = snapshot?.usageUpdate; + return { + inputTokens, + outputTokens, + cacheReadInputTokens, + cacheCreationInputTokens, + cacheCreation5mInputTokens: cacheCreationInputTokens, + cacheCreation1hInputTokens: 0, + webSearchRequests: 0, + costUSD: update?.cost?.currency === 'USD' ? update.cost.amount : 0, + }; +} + +class WardenAcpClient implements Client { + constructor( + private readonly repoPath: string, + private readonly textChunks: string[], + private readonly usageSnapshot: AcpUsageSnapshot, + ) {} + + async requestPermission(params: RequestPermissionRequest): Promise { + const allow = params.options.find((option: PermissionOption) => option.kind === 'allow_once' || option.kind === 'allow_always') ?? params.options[0]; + if (!allow) return { outcome: { outcome: 'cancelled' } }; + return { + outcome: { + outcome: 'selected', + optionId: allow.optionId, + }, + }; + } + + async sessionUpdate(params: SessionNotification): Promise { + const update = params.update; + if (update.sessionUpdate === 'agent_message_chunk' && update.content.type === 'text') { + this.textChunks.push(update.content.text); + return; + } + if (update.sessionUpdate === 'usage_update') { + this.usageSnapshot.usageUpdate = update; + } + } + + async readTextFile(params: ReadTextFileRequest): Promise { + const resolvedPath = resolve(this.repoPath, params.path); + const relativePath = relative(this.repoPath, resolvedPath); + if (relativePath === '..' || relativePath.startsWith(`..${sep}`) || isAbsolute(relativePath)) { + throw new Error('ACP file reads are restricted to the repository'); + } + return { content: await readFile(resolvedPath, 'utf8') }; + } +} + +function parseStructuredAcpOutput(text: string, schema: AuxiliaryRunRequest['schema']): AuxiliaryRunResult { + const json = extractJson(text); + if (!json) { + return { success: false, error: 'No JSON found in ACP response', usage: emptyUsage() }; + } + + let parsed: unknown; + try { + parsed = JSON.parse(json); + } catch (error) { + return { + success: false, + error: `Invalid JSON from ACP response: ${error instanceof Error ? error.message : String(error)}`, + usage: emptyUsage(), + }; + } + + const validated = schema.safeParse(parsed); + if (!validated.success) { + return { success: false, error: `Validation failed: ${validated.error.message}`, usage: emptyUsage() }; + } + + return { success: true, data: validated.data, usage: emptyUsage() }; +} + +function structuredPrompt(task: AuxiliaryRunRequest['task'] | SynthesisRunRequest['task'], prompt: string): string { + return [ + `You are running Warden helper task: ${task}.`, + 'Return only valid JSON accepted by the requested schema. Do not wrap it in markdown.', + 'Do not include prose before or after the JSON.', + '', + prompt, + ].join('\n'); +} + +export const acpRuntime: Runtime = { + name: 'acp', + + async runSkill(request: SkillRunRequest): Promise { + const { repoPath, skillName, systemPrompt, userPrompt, options, providerOptions } = request; + const acpOptions = getAcpProviderOptions(providerOptions); + const command = await resolveCommand(acpOptions); + const startedAt = Date.now(); + const textChunks: string[] = []; + const usageSnapshot: AcpUsageSnapshot = { usages: [] }; + + return Sentry.startSpan( + { + op: 'gen_ai.invoke_agent', + name: `invoke_acp_agent ${skillName}`, + attributes: { + 'gen_ai.operation.name': 'invoke_agent', + 'gen_ai.provider.name': 'acp', + 'gen_ai.agent.name': skillName, + 'gen_ai.request.model': options.model ?? 'default', + 'warden.request.max_turns': options.maxTurns ?? 0, + 'acp.command': [command.command, ...command.args].join(' '), + }, + }, + async (span) => { + const child = spawn(command.command, command.args, { + cwd: repoPath, + env: { ...process.env, ...command.env }, + stdio: ['pipe', 'pipe', 'pipe'], + }); + const childError = new Promise((_resolve, reject) => { + child.once('error', reject); + }); + void childError.catch(() => undefined); + const withChildError = async (operation: Promise): Promise => await Promise.race([operation, childError]); + const stderrChunks: string[] = []; + let abort: (() => void) | undefined; + let abortCleanup: Promise | undefined; + let sessionId: string | undefined; + + try { + child.stderr.setEncoding('utf8'); + child.stderr.on('data', (chunk: string) => stderrChunks.push(chunk)); + + const stream = ndJsonStream( + Writable.toWeb(child.stdin), + Readable.toWeb(child.stdout) as ReadableStream, + ); + const connection = new ClientSideConnection( + () => new WardenAcpClient(repoPath, textChunks, usageSnapshot), + stream, + ); + abort = (): void => { + const killChild = (): void => { + if (!child.killed) child.kill('SIGTERM'); + }; + if (sessionId) { + try { + const fallback = setTimeout(killChild, 250); + fallback.unref?.(); + abortCleanup = connection.cancel({ sessionId }) + .catch(() => undefined) + .finally(async () => { + clearTimeout(fallback); + await new Promise((resolveGrace) => setTimeout(resolveGrace, 50)); + killChild(); + }); + return; + } catch { + // Abort must never throw from the event listener; process kill is the fallback. + } + } + killChild(); + }; + options.abortController?.signal.addEventListener('abort', abort, { once: true }); + + await withChildError(connection.initialize({ + protocolVersion: PROTOCOL_VERSION, + clientCapabilities: { + fs: { readTextFile: true, writeTextFile: false }, + terminal: false, + }, + clientInfo: { name: 'warden', title: 'Warden', version: '1.0.0' }, + })); + + const session = await withChildError(connection.newSession({ + cwd: repoPath, + mcpServers: [], + })); + sessionId = session.sessionId; + + const promptResult = await withChildError(connection.prompt({ + sessionId, + prompt: [{ + type: 'text', + text: `${systemPrompt}\n\n${userPrompt}`, + }], + })); + if (promptResult.usage) { + usageSnapshot.usages.push(promptResult.usage); + } + + const text = textChunks.join(''); + span.setAttribute('gen_ai.response.text', JSON.stringify([text])); + return { + result: { + status: 'success', + text, + errors: [], + usage: acpUsageToStats(usageSnapshot), + responseModel: options.model, + sessionId, + durationMs: Date.now() - startedAt, + }, + stderr: stderrChunks.join('').trim() || undefined, + }; + } catch (error) { + if (options.abortController?.signal.aborted) { + await abortCleanup; + return { + result: { + status: 'aborted', + text: textChunks.join(''), + errors: ['ACP run aborted'], + usage: acpUsageToStats(usageSnapshot), + durationMs: Date.now() - startedAt, + }, + stderr: stderrChunks.join('').trim() || undefined, + }; + } + throw error; + } finally { + if (abort) { + options.abortController?.signal.removeEventListener('abort', abort); + } + if (abortCleanup) { + await abortCleanup; + } else { + child.kill('SIGTERM'); + } + } + } + ); + }, + + async runAuxiliary(request: AuxiliaryRunRequest): Promise> { + const response = await acpRuntime.runSkill({ + systemPrompt: 'You are Warden\'s structured helper runtime.', + userPrompt: structuredPrompt(request.task, request.prompt), + repoPath: request.repoPath ?? process.cwd(), + skillName: `auxiliary:${request.task}`, + options: { + model: request.model, + abortController: request.abortController, + }, + providerOptions: request.providerOptions, + }); + if (!response.result) { + return { success: false, error: response.stderr ?? 'ACP helper returned no result', usage: emptyUsage() }; + } + const parsed = parseStructuredAcpOutput(response.result.text, request.schema); + return { ...parsed, usage: response.result.usage }; + }, + + async runSynthesis(request: SynthesisRunRequest): Promise> { + const response = await acpRuntime.runSkill({ + systemPrompt: 'You are Warden\'s structured synthesis runtime.', + userPrompt: structuredPrompt(request.task, request.prompt), + repoPath: request.repoPath ?? process.cwd(), + skillName: `synthesis:${request.task}`, + options: { + model: request.model, + abortController: request.abortController, + }, + providerOptions: request.providerOptions, + }); + if (!response.result) { + return { success: false, error: response.stderr ?? 'ACP synthesis returned no result', usage: emptyUsage() }; + } + const parsed = parseStructuredAcpOutput(response.result.text, request.schema); + return { ...parsed, usage: response.result.usage }; + }, +}; diff --git a/src/sdk/runtimes/index.test.ts b/src/sdk/runtimes/index.test.ts index f23a2e6e..ab62be32 100644 --- a/src/sdk/runtimes/index.test.ts +++ b/src/sdk/runtimes/index.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it } from 'vitest'; import { z } from 'zod'; import { + acpRuntime, claudeRuntime, getRuntime, } from './index.js'; @@ -16,6 +17,16 @@ describe('runtimes', () => { expect(runtime.runSynthesis).toBeTypeOf('function'); }); + it('exposes ACP as an alternative runtime provider', () => { + const runtime = getRuntime('acp'); + + expect(runtime).toBe(acpRuntime); + expect(runtime.name).toBe('acp'); + expect(runtime.runSkill).toBeTypeOf('function'); + expect(runtime.runAuxiliary).toBeTypeOf('function'); + expect(runtime.runSynthesis).toBeTypeOf('function'); + }); + it('rejects unsupported runtimes explicitly', () => { expect(() => getRuntime('pi' as never)).toThrow('Unsupported runtime: pi'); }); diff --git a/src/sdk/runtimes/index.ts b/src/sdk/runtimes/index.ts index 3dfcabdd..34b65cfb 100644 --- a/src/sdk/runtimes/index.ts +++ b/src/sdk/runtimes/index.ts @@ -1,10 +1,13 @@ +import { acpRuntime } from './acp.js'; import { claudeRuntime } from './claude.js'; import type { Runtime, RuntimeName } from './types.js'; const RUNTIMES: Partial> = { + acp: acpRuntime, claude: claudeRuntime, }; +export { acpRuntime } from './acp.js'; export { claudeRuntime } from './claude.js'; export type { AuxiliaryRunRequest, diff --git a/src/sdk/runtimes/types.ts b/src/sdk/runtimes/types.ts index 936a6f43..0710f14f 100644 --- a/src/sdk/runtimes/types.ts +++ b/src/sdk/runtimes/types.ts @@ -16,7 +16,7 @@ import { z } from 'zod'; import type { ToolConfig } from '../../config/schema.js'; import type { UsageStats } from '../../types/index.js'; -export const RuntimeNameSchema = z.enum(['claude']); +export const RuntimeNameSchema = z.enum(['claude', 'acp']); export type RuntimeName = z.infer; export type SkillRunStatus = @@ -100,6 +100,9 @@ interface AuxiliaryRunRequestBase { maxTokens?: number; timeout?: number; maxRetries?: number; + repoPath?: string; + providerOptions?: unknown; + abortController?: AbortController; } interface AuxiliaryRunRequestWithoutTools extends AuxiliaryRunRequestBase { @@ -125,6 +128,9 @@ export interface SynthesisRunRequest { maxTokens?: number; timeout?: number; maxRetries?: number; + repoPath?: string; + providerOptions?: unknown; + abortController?: AbortController; } export interface Runtime { diff --git a/src/sdk/types.ts b/src/sdk/types.ts index cbee288e..bba0c8a6 100644 --- a/src/sdk/types.ts +++ b/src/sdk/types.ts @@ -1,6 +1,6 @@ import type { Finding, UsageStats, SkippedFile, RetryConfig, ErrorCode, HunkFailure } from '../types/index.js'; import type { HunkWithContext } from '../diff/index.js'; -import type { ChunkingConfig } from '../config/schema.js'; +import type { AcpAgentRuntimeConfig, ChunkingConfig } from '../config/schema.js'; import type { RuntimeName } from './runtimes/index.js'; /** A single auxiliary usage entry, keyed by agent name (e.g. 'extraction', 'dedup'). */ @@ -95,6 +95,8 @@ export interface SkillRunnerOptions { model?: string; /** Runtime backend for all model-backed execution. Defaults to Claude. */ runtime?: RuntimeName; + /** Agent Client Protocol runtime options. Used when runtime is acp. */ + acp?: AcpAgentRuntimeConfig; /** Model to use for auxiliary structured model calls. Uses runtime default if not specified. */ auxiliaryModel?: string; /** Model to use for post-analysis synthesis/consolidation. Falls back to auxiliaryModel when not specified. */