Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions packages/runtime/src/agent-definition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const AGENT_PROFILE_FIELDS = new Set([
'subagents',
'thinkingLevel',
'compaction',
'modelRetries',
]);

const AGENT_RUNTIME_FIELDS = new Set([
Expand Down Expand Up @@ -48,6 +49,7 @@ const AgentProfileSchema = v.looseObject({
subagents: v.optional(v.array(v.unknown())),
thinkingLevel: v.optional(v.string()),
compaction: v.optional(v.union([v.literal(false), v.looseObject({})])),
modelRetries: v.optional(v.union([v.literal(false), v.looseObject({})])),
});

/**
Expand Down Expand Up @@ -105,6 +107,7 @@ export function resolveAgentProfile(options: AgentRuntimeConfig | undefined): Ag
? options?.thinkingLevel
: profile?.thinkingLevel,
compaction: hasOwn(options, 'compaction') ? options?.compaction : profile?.compaction,
modelRetries: hasOwn(options, 'modelRetries') ? options?.modelRetries : profile?.modelRetries,
};
}

Expand Down Expand Up @@ -173,6 +176,7 @@ function assertAgentProfile(
assertNonEmptyString(definition.description, `${label} description`);
assertThinkingLevel(definition.thinkingLevel, label);
assertCompaction(definition.compaction, label);
assertModelRetries(definition.modelRetries, label);
assertTools(definition.tools, label);
assertSkills(definition.skills, label);
assertSubagents(definition.subagents, label, activeDefinitions);
Expand Down Expand Up @@ -216,6 +220,45 @@ function assertCompaction(definition: AgentProfile['compaction'], label: string)
}
}

function assertModelRetries(definition: AgentProfile['modelRetries'], label: string): void {
if (definition === undefined || definition === false) {
return;
}

for (const key of Object.keys(definition)) {
if (key !== 'maxRetries' && key !== 'initialDelayMs' && key !== 'maxDelayMs' && key !== 'backoffFactor') {
throw new Error(`[flue] ${label} modelRetries received unknown field "${key}".`);
}
}
assertRetryCount(definition.maxRetries, `${label} modelRetries.maxRetries`);
assertRetryDelay(definition.initialDelayMs, `${label} modelRetries.initialDelayMs`);
assertRetryDelay(definition.maxDelayMs, `${label} modelRetries.maxDelayMs`);
if (
definition.backoffFactor !== undefined &&
(!Number.isFinite(definition.backoffFactor) || definition.backoffFactor < 1)
) {
throw new Error(`[flue] ${label} modelRetries.backoffFactor must be a finite number greater than or equal to 1.`);
}
}

function assertRetryCount(value: number | undefined, label: string): void {
if (value === undefined) {
return;
}
if (!Number.isInteger(value) || value < 0) {
throw new Error(`[flue] ${label} must be a non-negative integer.`);
}
}

function assertRetryDelay(value: number | undefined, label: string): void {
if (value === undefined) {
return;
}
if (!Number.isFinite(value) || value < 0) {
throw new Error(`[flue] ${label} must be a finite non-negative number.`);
}
}

function assertTokenCount(value: number | undefined, label: string): void {
if (value === undefined) {
return;
Expand Down
5 changes: 3 additions & 2 deletions packages/runtime/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import { dispatchGlobalEvent } from './runtime/events.ts';
import { bashFactoryToSessionEnv, createCwdSessionEnv, isBashLike } from './sandbox.ts';
import type {
AgentConfig,
AgentHarnessOptions,
AgentProfile,
AgentRuntimeConfig,
AgentHarnessOptions,
CreatedAgent,
BashFactory,
CreatedAgent,
FlueContext,
FlueEvent,
FlueEventCallback,
Expand Down Expand Up @@ -185,6 +185,7 @@ export function createFlueContext(config: FlueContextConfig): FlueContextInterna
model: agentModel,
thinkingLevel: definition.thinkingLevel ?? config.agentConfig.thinkingLevel,
compaction: definition.compaction ?? config.agentConfig.compaction,
modelRetries: definition.modelRetries ?? config.agentConfig.modelRetries,
};

return new Harness(
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/src/harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ export class Harness implements FlueHarness {
: this.config.model,
thinkingLevel: taskAgent?.thinkingLevel ?? this.config.thinkingLevel,
compaction: taskAgent?.compaction ?? this.config.compaction,
modelRetries: taskAgent?.modelRetries ?? this.config.modelRetries,
};
const storageKey = createSessionStorageKey(this.instanceId, this.name, sessionName);
const affinityKey = createSessionAffinityKey(this.instanceId, this.name, sessionName);
Expand Down
186 changes: 161 additions & 25 deletions packages/runtime/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import type { AgentMessage, AgentTool, AgentToolResult, StreamFn } from '@earendil-works/pi-agent-core';
import { Agent } from '@earendil-works/pi-agent-core';
import { streamSimple } from '@earendil-works/pi-ai';
import type {
AssistantMessage,
ImageContent,
Expand All @@ -11,6 +10,7 @@ import type {
ToolResultMessage,
UserMessage,
} from '@earendil-works/pi-ai';
import { streamSimple } from '@earendil-works/pi-ai';
import type * as v from 'valibot';
import { abortErrorFor, createCallHandle } from './abort.ts';
import {
Expand Down Expand Up @@ -42,10 +42,10 @@ import {
type ResultToolBundle,
ResultUnavailableError,
} from './result.ts';
import type { DispatchInput } from './runtime/dispatch-queue.ts';
import { generateOperationId, generateTurnId } from './runtime/ids.ts';
import { getProviderConfiguration, getRegisteredApiKey } from './runtime/providers.ts';
import { createFlueFs } from './sandbox.ts';

import type {
AgentConfig,
AgentProfile,
Expand All @@ -58,6 +58,7 @@ import type {
FlueFs,
FlueSession,
MessageEntry,
PackagedSkillDirectory,
PromptModel,
PromptOptions,
PromptResponse,
Expand All @@ -70,17 +71,21 @@ import type {
SessionToolFactory,
ShellOptions,
ShellResult,
PackagedSkillDirectory,
SkillReference,
SkillOptions,
SkillReference,
TaskOptions,
ThinkingLevel,
ToolDefinition,
} from './types.ts';
import type { DispatchInput } from './runtime/dispatch-queue.ts';
import { addUsage, emptyUsage, fromProviderUsage } from './usage.ts';

const MAX_TASK_DEPTH = 4;
const DEFAULT_MODEL_RETRY_SETTINGS = {
maxRetries: 2,
initialDelayMs: 500,
maxDelayMs: 8000,
backoffFactor: 2,
};

type TurnInputMessage = Extract<FlueEvent, { type: 'turn_request' }>['input']['messages'][number];
type TurnInputTool = NonNullable<Extract<FlueEvent, { type: 'turn_request' }>['input']['tools']>[number];
Expand Down Expand Up @@ -196,6 +201,22 @@ interface InternalTaskResult<T> {
cwd?: string;
}

interface ModelRetrySettings {
maxRetries: number;
initialDelayMs: number;
maxDelayMs: number;
backoffFactor: number;
}

interface ModelTurnRetryOptions {
errorLabel: string;
source: MessageSource;
cursor: number;
signal?: AbortSignal;
dispatch?: DispatchMessageMetadata;
start: () => Promise<void>;
}

/**
* Read the per-call result schema option, accepting both the canonical
* `result` field and the deprecated `schema` alias.
Expand Down Expand Up @@ -1757,11 +1778,71 @@ export class Session implements FlueSession {
this.emit({ type: 'log', level, message, attributes: normalizeLogAttributes(attributes) });
}

private throwIfError(context: string): void {
const errorMsg = this.harness.state.errorMessage;
if (errorMsg) {
throw new Error(`[flue] ${context} failed: ${errorMsg}`);
private resolveModelRetrySettings(): ModelRetrySettings | undefined {
const config = this.config.modelRetries;
if (config === false) {
return undefined;
}
return {
maxRetries: config?.maxRetries ?? DEFAULT_MODEL_RETRY_SETTINGS.maxRetries,
initialDelayMs: config?.initialDelayMs ?? DEFAULT_MODEL_RETRY_SETTINGS.initialDelayMs,
maxDelayMs: config?.maxDelayMs ?? DEFAULT_MODEL_RETRY_SETTINGS.maxDelayMs,
backoffFactor: config?.backoffFactor ?? DEFAULT_MODEL_RETRY_SETTINGS.backoffFactor,
};
}

private async runModelTurnWithRetries(options: ModelTurnRetryOptions): Promise<number> {
const retry = this.resolveModelRetrySettings();
let cursor = options.cursor;
let delayMs = retry?.initialDelayMs ?? 0;
let lastError: string | undefined;

for (let attempt = 0; ; attempt++) {
if (options.signal?.aborted) throw abortErrorFor(options.signal);
if (attempt === 0) {
await options.start();
} else {
await this.harness.continue();
}
await this.harness.waitForIdle();
await this.syncHarnessMessagesSince(cursor, options.source, attempt === 0 ? options.dispatch : undefined);
cursor = this.harness.state.messages.length;
await this.checkLatestAssistantForCompaction();

const errorMsg = this.harness.state.errorMessage;
if (!errorMsg) {
return cursor;
}

lastError = errorMsg;
if (!retry || attempt >= retry.maxRetries || !isRetryableModelError(errorMsg)) {
break;
}

const retryNumber = attempt + 1;
this.removeLatestAssistantErrorTurn();
cursor = this.harness.state.messages.length;
await this.save();
this.internalLog('warn', `[flue:model-retry] Retrying ${options.errorLabel} after transient provider error (${retryNumber}/${retry.maxRetries}).`, {
attempt: retryNumber,
maxRetries: retry.maxRetries,
error: errorMsg,
});
await waitForRetryDelay(Math.min(delayMs, retry.maxDelayMs), options.signal);
delayMs = Math.min(Math.max(delayMs * retry.backoffFactor, delayMs), retry.maxDelayMs);
}

throw new Error(`[flue] ${options.errorLabel} failed: ${lastError}`);
}

private removeLatestAssistantErrorTurn(): void {
const messages = this.harness.state.messages;
const lastMsg = messages[messages.length - 1];
if (lastMsg?.role === 'assistant' && (lastMsg as AssistantMessage).stopReason === 'error') {
this.harness.state.messages = messages.slice(0, -1);
this.history.removeLeafMessage(lastMsg as AgentMessage);
}
(this.harness.state as { errorMessage?: string }).errorMessage = undefined;
}

/**
Expand Down Expand Up @@ -1869,11 +1950,12 @@ export class Session implements FlueSession {
);
if (!persistedAssistant) {
const beforeLength = this.harness.state.messages.length;
await this.harness.continue();
await this.harness.waitForIdle();
await this.syncHarnessMessagesSince(beforeLength, options.outputSource);
await this.checkLatestAssistantForCompaction();
this.throwIfError(options.errorLabel);
await this.runModelTurnWithRetries({
errorLabel: options.errorLabel,
source: options.outputSource,
cursor: beforeLength,
start: () => this.harness.continue(),
});
} else {
const assistant = persistedAssistant.message as AssistantMessage;
if (assistant.stopReason === 'error' || assistant.stopReason === 'aborted') {
Expand Down Expand Up @@ -1946,11 +2028,14 @@ export class Session implements FlueSession {
};
}

await this.harness.prompt(args.promptText, args.images);
await this.harness.waitForIdle();
await this.syncHarnessMessagesSince(beforeLength, args.source, args.dispatch);
await this.checkLatestAssistantForCompaction();
this.throwIfError(args.errorLabel);
await this.runModelTurnWithRetries({
errorLabel: args.errorLabel,
source: args.source,
cursor: beforeLength,
signal: args.signal,
dispatch: args.dispatch,
start: () => this.harness.prompt(args.promptText, args.images),
});

return {
text: this.getAssistantText(),
Expand Down Expand Up @@ -1991,12 +2076,13 @@ export class Session implements FlueSession {
if (signal.aborted) throw abortErrorFor(signal);
// Images attach only on the first turn — retry follow-ups carry text
// only, so we don't re-bill image bytes on every result-tool retry.
await this.harness.prompt(nextPrompt, attempt === 0 ? initialImages : undefined);
await this.harness.waitForIdle();
await this.syncHarnessMessagesSince(cursor, source);
cursor = this.harness.state.messages.length;
await this.checkLatestAssistantForCompaction();
this.throwIfError(errorLabel);
cursor = await this.runModelTurnWithRetries({
errorLabel,
source,
cursor,
signal,
start: () => this.harness.prompt(nextPrompt, attempt === 0 ? initialImages : undefined),
});

const outcome = bundle.getOutcome();
if (outcome.type === 'finished') {
Expand Down Expand Up @@ -2032,6 +2118,56 @@ export function normalizePath(p: string): string {
return `/${result.join('/')}`;
}

function isRetryableModelError(message: string): boolean {
const lower = message.toLowerCase();
if (
lower.includes('usage limits') ||
lower.includes('invalid_request_error') ||
lower.includes('insufficient_quota') ||
lower.includes('authentication') ||
lower.includes('api key') ||
lower.includes('permission') ||
lower.includes('content filter') ||
lower.includes('aborted') ||
lower.includes('request was aborted')
) {
return false;
}
return (
lower.includes('overloaded_error') ||
lower.includes('overloaded') ||
lower.includes('rate limit') ||
lower.includes('temporarily unavailable') ||
lower.includes('timeout') ||
lower.includes('timed out') ||
lower.includes('econnreset') ||
lower.includes('etimedout') ||
lower.includes('socket hang up') ||
/\b(?:429|500|502|503|504)\b/.test(lower)
);
}

async function waitForRetryDelay(delayMs: number, signal: AbortSignal | undefined): Promise<void> {
if (delayMs <= 0) {
return;
}
await new Promise<void>((resolve, reject) => {
if (signal?.aborted) {
reject(abortErrorFor(signal));
return;
}
const timeout = setTimeout(resolve, delayMs);
signal?.addEventListener(
'abort',
() => {
clearTimeout(timeout);
reject(abortErrorFor(signal));
},
{ once: true },
);
});
}

export async function deleteSessionTree(
store: SessionStore,
storageKey: string,
Expand Down
Loading
Loading