diff --git a/packages/runtime/src/agent-definition.ts b/packages/runtime/src/agent-definition.ts index 85d8816e..a892f16c 100644 --- a/packages/runtime/src/agent-definition.ts +++ b/packages/runtime/src/agent-definition.ts @@ -19,6 +19,7 @@ const AGENT_PROFILE_FIELDS = new Set([ 'subagents', 'thinkingLevel', 'compaction', + 'modelRetries', ]); const AGENT_RUNTIME_FIELDS = new Set([ @@ -48,6 +49,7 @@ const AgentProfileSchema = v.looseObject({ subagents: v.optional(v.array(v.unknown())), thinkingLevel: v.optional(v.string()), compaction: v.optional(v.union([v.literal(false), v.looseObject({})])), + modelRetries: v.optional(v.union([v.literal(false), v.looseObject({})])), }); /** @@ -105,6 +107,7 @@ export function resolveAgentProfile(options: AgentRuntimeConfig | undefined): Ag ? options?.thinkingLevel : profile?.thinkingLevel, compaction: hasOwn(options, 'compaction') ? options?.compaction : profile?.compaction, + modelRetries: hasOwn(options, 'modelRetries') ? options?.modelRetries : profile?.modelRetries, }; } @@ -173,6 +176,7 @@ function assertAgentProfile( assertNonEmptyString(definition.description, `${label} description`); assertThinkingLevel(definition.thinkingLevel, label); assertCompaction(definition.compaction, label); + assertModelRetries(definition.modelRetries, label); assertTools(definition.tools, label); assertSkills(definition.skills, label); assertSubagents(definition.subagents, label, activeDefinitions); @@ -216,6 +220,45 @@ function assertCompaction(definition: AgentProfile['compaction'], label: string) } } +function assertModelRetries(definition: AgentProfile['modelRetries'], label: string): void { + if (definition === undefined || definition === false) { + return; + } + + for (const key of Object.keys(definition)) { + if (key !== 'maxRetries' && key !== 'initialDelayMs' && key !== 'maxDelayMs' && key !== 'backoffFactor') { + throw new Error(`[flue] ${label} modelRetries received unknown field "${key}".`); + } + } + assertRetryCount(definition.maxRetries, `${label} modelRetries.maxRetries`); + assertRetryDelay(definition.initialDelayMs, `${label} modelRetries.initialDelayMs`); + assertRetryDelay(definition.maxDelayMs, `${label} modelRetries.maxDelayMs`); + if ( + definition.backoffFactor !== undefined && + (!Number.isFinite(definition.backoffFactor) || definition.backoffFactor < 1) + ) { + throw new Error(`[flue] ${label} modelRetries.backoffFactor must be a finite number greater than or equal to 1.`); + } +} + +function assertRetryCount(value: number | undefined, label: string): void { + if (value === undefined) { + return; + } + if (!Number.isInteger(value) || value < 0) { + throw new Error(`[flue] ${label} must be a non-negative integer.`); + } +} + +function assertRetryDelay(value: number | undefined, label: string): void { + if (value === undefined) { + return; + } + if (!Number.isFinite(value) || value < 0) { + throw new Error(`[flue] ${label} must be a finite non-negative number.`); + } +} + function assertTokenCount(value: number | undefined, label: string): void { if (value === undefined) { return; diff --git a/packages/runtime/src/client.ts b/packages/runtime/src/client.ts index 107d6594..d908dcbe 100644 --- a/packages/runtime/src/client.ts +++ b/packages/runtime/src/client.ts @@ -5,11 +5,11 @@ import { dispatchGlobalEvent } from './runtime/events.ts'; import { bashFactoryToSessionEnv, createCwdSessionEnv, isBashLike } from './sandbox.ts'; import type { AgentConfig, + AgentHarnessOptions, AgentProfile, AgentRuntimeConfig, - AgentHarnessOptions, - CreatedAgent, BashFactory, + CreatedAgent, FlueContext, FlueEvent, FlueEventCallback, @@ -185,6 +185,7 @@ export function createFlueContext(config: FlueContextConfig): FlueContextInterna model: agentModel, thinkingLevel: definition.thinkingLevel ?? config.agentConfig.thinkingLevel, compaction: definition.compaction ?? config.agentConfig.compaction, + modelRetries: definition.modelRetries ?? config.agentConfig.modelRetries, }; return new Harness( diff --git a/packages/runtime/src/harness.ts b/packages/runtime/src/harness.ts index 0e1c68db..e64ad990 100644 --- a/packages/runtime/src/harness.ts +++ b/packages/runtime/src/harness.ts @@ -182,6 +182,7 @@ export class Harness implements FlueHarness { : this.config.model, thinkingLevel: taskAgent?.thinkingLevel ?? this.config.thinkingLevel, compaction: taskAgent?.compaction ?? this.config.compaction, + modelRetries: taskAgent?.modelRetries ?? this.config.modelRetries, }; const storageKey = createSessionStorageKey(this.instanceId, this.name, sessionName); const affinityKey = createSessionAffinityKey(this.instanceId, this.name, sessionName); diff --git a/packages/runtime/src/session.ts b/packages/runtime/src/session.ts index 4bcf6fe0..a882fada 100644 --- a/packages/runtime/src/session.ts +++ b/packages/runtime/src/session.ts @@ -2,7 +2,6 @@ import type { AgentMessage, AgentTool, AgentToolResult, StreamFn } from '@earendil-works/pi-agent-core'; import { Agent } from '@earendil-works/pi-agent-core'; -import { streamSimple } from '@earendil-works/pi-ai'; import type { AssistantMessage, ImageContent, @@ -11,6 +10,7 @@ import type { ToolResultMessage, UserMessage, } from '@earendil-works/pi-ai'; +import { streamSimple } from '@earendil-works/pi-ai'; import type * as v from 'valibot'; import { abortErrorFor, createCallHandle } from './abort.ts'; import { @@ -42,10 +42,10 @@ import { type ResultToolBundle, ResultUnavailableError, } from './result.ts'; +import type { DispatchInput } from './runtime/dispatch-queue.ts'; import { generateOperationId, generateTurnId } from './runtime/ids.ts'; import { getProviderConfiguration, getRegisteredApiKey } from './runtime/providers.ts'; import { createFlueFs } from './sandbox.ts'; - import type { AgentConfig, AgentProfile, @@ -58,6 +58,7 @@ import type { FlueFs, FlueSession, MessageEntry, + PackagedSkillDirectory, PromptModel, PromptOptions, PromptResponse, @@ -70,17 +71,21 @@ import type { SessionToolFactory, ShellOptions, ShellResult, - PackagedSkillDirectory, - SkillReference, SkillOptions, + SkillReference, TaskOptions, ThinkingLevel, ToolDefinition, } from './types.ts'; -import type { DispatchInput } from './runtime/dispatch-queue.ts'; import { addUsage, emptyUsage, fromProviderUsage } from './usage.ts'; const MAX_TASK_DEPTH = 4; +const DEFAULT_MODEL_RETRY_SETTINGS = { + maxRetries: 2, + initialDelayMs: 500, + maxDelayMs: 8000, + backoffFactor: 2, +}; type TurnInputMessage = Extract['input']['messages'][number]; type TurnInputTool = NonNullable['input']['tools']>[number]; @@ -196,6 +201,22 @@ interface InternalTaskResult { cwd?: string; } +interface ModelRetrySettings { + maxRetries: number; + initialDelayMs: number; + maxDelayMs: number; + backoffFactor: number; +} + +interface ModelTurnRetryOptions { + errorLabel: string; + source: MessageSource; + cursor: number; + signal?: AbortSignal; + dispatch?: DispatchMessageMetadata; + start: () => Promise; +} + /** * Read the per-call result schema option, accepting both the canonical * `result` field and the deprecated `schema` alias. @@ -1757,11 +1778,71 @@ export class Session implements FlueSession { this.emit({ type: 'log', level, message, attributes: normalizeLogAttributes(attributes) }); } - private throwIfError(context: string): void { - const errorMsg = this.harness.state.errorMessage; - if (errorMsg) { - throw new Error(`[flue] ${context} failed: ${errorMsg}`); + private resolveModelRetrySettings(): ModelRetrySettings | undefined { + const config = this.config.modelRetries; + if (config === false) { + return undefined; } + return { + maxRetries: config?.maxRetries ?? DEFAULT_MODEL_RETRY_SETTINGS.maxRetries, + initialDelayMs: config?.initialDelayMs ?? DEFAULT_MODEL_RETRY_SETTINGS.initialDelayMs, + maxDelayMs: config?.maxDelayMs ?? DEFAULT_MODEL_RETRY_SETTINGS.maxDelayMs, + backoffFactor: config?.backoffFactor ?? DEFAULT_MODEL_RETRY_SETTINGS.backoffFactor, + }; + } + + private async runModelTurnWithRetries(options: ModelTurnRetryOptions): Promise { + const retry = this.resolveModelRetrySettings(); + let cursor = options.cursor; + let delayMs = retry?.initialDelayMs ?? 0; + let lastError: string | undefined; + + for (let attempt = 0; ; attempt++) { + if (options.signal?.aborted) throw abortErrorFor(options.signal); + if (attempt === 0) { + await options.start(); + } else { + await this.harness.continue(); + } + await this.harness.waitForIdle(); + await this.syncHarnessMessagesSince(cursor, options.source, attempt === 0 ? options.dispatch : undefined); + cursor = this.harness.state.messages.length; + await this.checkLatestAssistantForCompaction(); + + const errorMsg = this.harness.state.errorMessage; + if (!errorMsg) { + return cursor; + } + + lastError = errorMsg; + if (!retry || attempt >= retry.maxRetries || !isRetryableModelError(errorMsg)) { + break; + } + + const retryNumber = attempt + 1; + this.removeLatestAssistantErrorTurn(); + cursor = this.harness.state.messages.length; + await this.save(); + this.internalLog('warn', `[flue:model-retry] Retrying ${options.errorLabel} after transient provider error (${retryNumber}/${retry.maxRetries}).`, { + attempt: retryNumber, + maxRetries: retry.maxRetries, + error: errorMsg, + }); + await waitForRetryDelay(Math.min(delayMs, retry.maxDelayMs), options.signal); + delayMs = Math.min(Math.max(delayMs * retry.backoffFactor, delayMs), retry.maxDelayMs); + } + + throw new Error(`[flue] ${options.errorLabel} failed: ${lastError}`); + } + + private removeLatestAssistantErrorTurn(): void { + const messages = this.harness.state.messages; + const lastMsg = messages[messages.length - 1]; + if (lastMsg?.role === 'assistant' && (lastMsg as AssistantMessage).stopReason === 'error') { + this.harness.state.messages = messages.slice(0, -1); + this.history.removeLeafMessage(lastMsg as AgentMessage); + } + (this.harness.state as { errorMessage?: string }).errorMessage = undefined; } /** @@ -1869,11 +1950,12 @@ export class Session implements FlueSession { ); if (!persistedAssistant) { const beforeLength = this.harness.state.messages.length; - await this.harness.continue(); - await this.harness.waitForIdle(); - await this.syncHarnessMessagesSince(beforeLength, options.outputSource); - await this.checkLatestAssistantForCompaction(); - this.throwIfError(options.errorLabel); + await this.runModelTurnWithRetries({ + errorLabel: options.errorLabel, + source: options.outputSource, + cursor: beforeLength, + start: () => this.harness.continue(), + }); } else { const assistant = persistedAssistant.message as AssistantMessage; if (assistant.stopReason === 'error' || assistant.stopReason === 'aborted') { @@ -1946,11 +2028,14 @@ export class Session implements FlueSession { }; } - await this.harness.prompt(args.promptText, args.images); - await this.harness.waitForIdle(); - await this.syncHarnessMessagesSince(beforeLength, args.source, args.dispatch); - await this.checkLatestAssistantForCompaction(); - this.throwIfError(args.errorLabel); + await this.runModelTurnWithRetries({ + errorLabel: args.errorLabel, + source: args.source, + cursor: beforeLength, + signal: args.signal, + dispatch: args.dispatch, + start: () => this.harness.prompt(args.promptText, args.images), + }); return { text: this.getAssistantText(), @@ -1991,12 +2076,13 @@ export class Session implements FlueSession { if (signal.aborted) throw abortErrorFor(signal); // Images attach only on the first turn — retry follow-ups carry text // only, so we don't re-bill image bytes on every result-tool retry. - await this.harness.prompt(nextPrompt, attempt === 0 ? initialImages : undefined); - await this.harness.waitForIdle(); - await this.syncHarnessMessagesSince(cursor, source); - cursor = this.harness.state.messages.length; - await this.checkLatestAssistantForCompaction(); - this.throwIfError(errorLabel); + cursor = await this.runModelTurnWithRetries({ + errorLabel, + source, + cursor, + signal, + start: () => this.harness.prompt(nextPrompt, attempt === 0 ? initialImages : undefined), + }); const outcome = bundle.getOutcome(); if (outcome.type === 'finished') { @@ -2032,6 +2118,56 @@ export function normalizePath(p: string): string { return `/${result.join('/')}`; } +function isRetryableModelError(message: string): boolean { + const lower = message.toLowerCase(); + if ( + lower.includes('usage limits') || + lower.includes('invalid_request_error') || + lower.includes('insufficient_quota') || + lower.includes('authentication') || + lower.includes('api key') || + lower.includes('permission') || + lower.includes('content filter') || + lower.includes('aborted') || + lower.includes('request was aborted') + ) { + return false; + } + return ( + lower.includes('overloaded_error') || + lower.includes('overloaded') || + lower.includes('rate limit') || + lower.includes('temporarily unavailable') || + lower.includes('timeout') || + lower.includes('timed out') || + lower.includes('econnreset') || + lower.includes('etimedout') || + lower.includes('socket hang up') || + /\b(?:429|500|502|503|504)\b/.test(lower) + ); +} + +async function waitForRetryDelay(delayMs: number, signal: AbortSignal | undefined): Promise { + if (delayMs <= 0) { + return; + } + await new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(abortErrorFor(signal)); + return; + } + const timeout = setTimeout(resolve, delayMs); + signal?.addEventListener( + 'abort', + () => { + clearTimeout(timeout); + reject(abortErrorFor(signal)); + }, + { once: true }, + ); + }); +} + export async function deleteSessionTree( store: SessionStore, storageKey: string, diff --git a/packages/runtime/src/types.ts b/packages/runtime/src/types.ts index 4e697297..8d2a8f72 100644 --- a/packages/runtime/src/types.ts +++ b/packages/runtime/src/types.ts @@ -395,6 +395,19 @@ export interface CompactionConfig { model?: string; } +// ─── Model Retries ────────────────────────────────────────────────────────── + +export interface ModelRetryConfig { + /** Number of retry attempts after the initial failed provider turn. Defaults to 2. */ + maxRetries?: number; + /** Delay before the first retry. Defaults to 500ms. */ + initialDelayMs?: number; + /** Maximum delay between retries. Defaults to 8000ms. */ + maxDelayMs?: number; + /** Multiplier applied after each retry. Defaults to 2. */ + backoffFactor?: number; +} + // ─── Provider Runtime Settings ────────────────────────────────────────────── /** Per-provider transport settings configured from `@flue/runtime`. */ @@ -445,6 +458,11 @@ export interface AgentConfig { * uses defaults. */ compaction?: false | CompactionConfig; + /** + * Retry transient provider failures such as overloaded APIs, rate limits, + * temporary outages, and network timeouts. Set to `false` to disable. + */ + modelRetries?: false | ModelRetryConfig; } /** Model specifier, or `false` to require call-level model selection. */ @@ -475,6 +493,11 @@ export interface AgentProfile { * calls still compact when needed. */ compaction?: false | CompactionConfig; + /** + * Retry transient provider failures such as overloaded APIs, rate limits, + * temporary outages, and network timeouts. Set to `false` to disable. + */ + modelRetries?: false | ModelRetryConfig; } /** Configuration returned by a {@link createAgent} initializer. */ @@ -501,6 +524,11 @@ export interface AgentRuntimeConfig { * calls still compact when needed. */ compaction?: false | CompactionConfig; + /** + * Retry transient provider failures such as overloaded APIs, rate limits, + * temporary outages, and network timeouts. Set to `false` to disable. + */ + modelRetries?: false | ModelRetryConfig; /** Working directory inside the initialized sandbox. */ cwd?: string; /** Sandbox factory used to construct the initialized environment. */ diff --git a/packages/runtime/test/model-retries.test.ts b/packages/runtime/test/model-retries.test.ts new file mode 100644 index 00000000..dcb6a4ac --- /dev/null +++ b/packages/runtime/test/model-retries.test.ts @@ -0,0 +1,124 @@ +import { fauxAssistantMessage, fauxText, registerFauxProvider } from '@earendil-works/pi-ai'; +import { describe, expect, it } from 'vitest'; +import { createAgent, defineAgentProfile } from '../src/agent-definition.ts'; +import { createFlueContext, InMemorySessionStore } from '../src/internal.ts'; +import type { SessionEnv } from '../src/types.ts'; + +function createEnv(): SessionEnv { + return { + cwd: '/', + resolvePath: (path) => (path.startsWith('/') ? path : `/${path}`), + exec: async () => ({ stdout: '', stderr: '', exitCode: 0 }), + readFile: async () => '', + readFileBuffer: async () => new Uint8Array(), + writeFile: async () => {}, + stat: async () => ({ isFile: false, isDirectory: false, isSymbolicLink: false, size: 0, mtime: new Date(0) }), + readdir: async () => [], + exists: async () => false, + mkdir: async () => {}, + rm: async () => {}, + }; +} + +describe('modelRetries', () => { + it('retries transient provider errors from a prompt turn', async () => { + const provider = `retry-${crypto.randomUUID()}`; + const modelId = 'overloaded'; + const modelSpecifier = `${provider}/${modelId}`; + const registration = registerFauxProvider({ + provider, + models: [{ id: modelId }], + }); + registration.setResponses([ + fauxAssistantMessage(fauxText(''), { + stopReason: 'error', + errorMessage: '{"type":"overloaded_error","message":"Overloaded"}', + }), + fauxAssistantMessage('recovered'), + ]); + + try { + const ctx = createFlueContext({ + id: 'retry-transient', + runId: undefined, + payload: {}, + env: {}, + agentConfig: { + systemPrompt: '', + skills: {}, + model: undefined, + resolveModel: (model) => model === modelSpecifier ? registration.getModel(modelId) : undefined, + }, + createDefaultEnv: async () => createEnv(), + defaultStore: new InMemorySessionStore(), + }); + const agent = createAgent(() => ({ + model: modelSpecifier, + modelRetries: { maxRetries: 1, initialDelayMs: 0 }, + })); + const harness = await ctx.init(agent); + const session = await harness.session(); + + const response = await session.prompt('Try once more.'); + + expect(response.text).toBe('recovered'); + expect(registration.state.callCount).toBe(2); + } finally { + registration.unregister(); + } + }); + + it('does not retry quota and usage-limit failures', async () => { + const provider = `retry-${crypto.randomUUID()}`; + const modelId = 'quota'; + const modelSpecifier = `${provider}/${modelId}`; + const registration = registerFauxProvider({ + provider, + models: [{ id: modelId }], + }); + registration.setResponses([ + fauxAssistantMessage(fauxText(''), { + stopReason: 'error', + errorMessage: 'You have reached your specified API usage limits.', + }), + fauxAssistantMessage('should not be used'), + ]); + + try { + const ctx = createFlueContext({ + id: 'retry-quota', + runId: undefined, + payload: {}, + env: {}, + agentConfig: { + systemPrompt: '', + skills: {}, + model: undefined, + resolveModel: (model) => model === modelSpecifier ? registration.getModel(modelId) : undefined, + }, + createDefaultEnv: async () => createEnv(), + defaultStore: new InMemorySessionStore(), + }); + const agent = createAgent(() => ({ + model: modelSpecifier, + modelRetries: { maxRetries: 2, initialDelayMs: 0 }, + })); + const harness = await ctx.init(agent); + const session = await harness.session(); + + await expect(session.prompt('Try once more.')).rejects.toThrow('You have reached your specified API usage limits.'); + expect(registration.state.callCount).toBe(1); + } finally { + registration.unregister(); + } + }); + + it('validates retry config fields', () => { + expect(() => + defineAgentProfile({ + name: 'bad-retry', + modelRetries: { maxRetries: -1 }, + }), + ).toThrow('modelRetries.maxRetries must be a non-negative integer'); + }); +});