Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 17 additions & 95 deletions convex/ai/resilience.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@ import { COACH_MAX_STEPS } from "./coach";
import { type ProviderId } from "./providers";
import { type AttemptOutcome, runWithPrimaryCircuitBreaker } from "./resilienceCircuitBreaker";
import { type AccumulatorInit, RunAccumulator } from "./runTelemetry";
import { buildByokErrorMessage, classifyByokError } from "./byokErrors";
import { classifyByokError } from "./byokErrors";
import { runInRunSpan } from "./otel";
import { isQuotaError, isTransientError } from "./transientErrors";
import {
buildProviderTransientMessage,
classifyTransientError,
isQuotaError,
isTransientError,
} from "./transientErrors";
getFinalizeCodeForError,
safeFinalizePending,
safeReportError,
safeTryReportByok,
} from "./resilienceReporting";

// Re-export for backwards compatibility with existing callers/tests.
export { buildByokErrorMessage, classifyByokError, withByokErrorSanitization } from "./byokErrors";
export type { ByokErrorCode } from "./byokErrors";
export { isTransientError } from "./transientErrors";
export { getFinalizeCodeForError } from "./resilienceReporting";

const AI_ERROR_MESSAGE = "I'm having trouble right now. Please try again in a moment.";
const BUDGET_CAP_MESSAGE =
"This is getting expensive on your API key, so I'm simplifying here. Ask a narrower follow-up if you want me to keep going.";
const MAX_OUTPUT_TOKENS = 4096;
Expand Down Expand Up @@ -262,6 +263,15 @@ async function attemptStream({
// Telemetry must never fail the LLM turn.
}
},
onError: async ({ error }: { error: unknown }) => {
// @convex-dev/[email protected] does not catch stream error events in its
// internal finalizeMessage mutation — the raw provider error propagates
// as an unhandled exception that Convex reports to Sentry. Pre-empting
// with a sanitized finalize code here prevents the agent library from
// encountering the error-event delta when its mutation runs, because
// a message that is already "failed" skips further delta processing.
await safeFinalizePending(ctx, threadId, getFinalizeCodeForError(error));
},
},
STREAM_OPTIONS,
);
Expand Down Expand Up @@ -310,91 +320,3 @@ function buildTelemetryConfig(telemetry: TelemetryArgs): TelemetrySettings {
metadata,
};
}

interface ErrorReport {
threadId: string;
userId: string;
error: unknown;
isByok: boolean;
provider: ProviderId;
}

// streamText's abortSignal handler finalizes on clean aborts; provider errors
// thrown from result.text bypass that path and leave a stranded pending row.
async function finalizePendingMessages(
ctx: ActionCtx,
threadId: string,
reason: string,
): Promise<void> {
const result = await ctx.runQuery(components.agent.messages.listMessagesByThreadId, {
threadId,
paginationOpts: { cursor: null, numItems: 50 },
order: "desc",
});
for (const message of result.page) {
if (message.status !== "pending") continue;
await ctx.runMutation(components.agent.messages.finalizeMessage, {
messageId: message._id,
result: { status: "failed", error: reason },
});
}
}

// Best-effort wrappers must not leave users with stuck pending messages.
const safeFinalizePending = (ctx: ActionCtx, threadId: string, reason: string) =>
finalizePendingMessages(ctx, threadId, reason).catch(() => undefined);
const safeReportError = (ctx: ActionCtx, report: ErrorReport) =>
reportError(ctx, report).catch(() => undefined);
const safeTryReportByok = (ctx: ActionCtx, report: ErrorReport) =>
tryReportByok(ctx, report).catch(() => false);

export function getFinalizeCodeForError(error: unknown): string {
const transientKind = classifyTransientError(error);
return transientKind ?? (error instanceof Error ? error.name : "unknown_error");
}

async function tryReportByok(ctx: ActionCtx, report: ErrorReport): Promise<boolean> {
if (!report.isByok) return false;
const code = classifyByokError(report.error);
if (code === null) return false;
// Provider bodies can include the decrypted key, so the finalize reason is the code only.
await finalizePendingMessages(ctx, report.threadId, code);
await saveMessage(ctx, components.agent, {
threadId: report.threadId,
userId: report.userId,
message: { role: "assistant", content: buildByokErrorMessage(code, report.provider) },
});
await ctx.runAction(internal.discord.notifyError, {
source: "streamWithRetry",
message: `${code} on ${report.provider} (${report.error instanceof Error ? report.error.name : "Unknown"})`,
userId: report.userId,
});
return true;
}

async function reportError(ctx: ActionCtx, report: ErrorReport): Promise<void> {
const transientKind = classifyTransientError(report.error);
const content = transientKind
? buildProviderTransientMessage(transientKind, report.provider, report.isByok)
: AI_ERROR_MESSAGE;

// Keep provider text out of the agent component's failed-message field.
await finalizePendingMessages(ctx, report.threadId, getFinalizeCodeForError(report.error));

await saveMessage(ctx, components.agent, {
threadId: report.threadId,
userId: report.userId,
message: { role: "assistant", content },
});

// Upstream provider outages already surface to the user with an attributed
// message; paging Discord on every Gemini/Claude capacity blip is noise.
if (transientKind) return;

const reason = report.error instanceof Error ? report.error.message : String(report.error);
await ctx.runAction(internal.discord.notifyError, {
source: "streamWithRetry",
message: reason,
userId: report.userId,
});
}
96 changes: 96 additions & 0 deletions convex/ai/resilienceReporting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import type { ActionCtx } from "../_generated/server";
import { components, internal } from "../_generated/api";
import { saveMessage } from "@convex-dev/agent";
import { buildByokErrorMessage, classifyByokError } from "./byokErrors";
import type { ProviderId } from "./providers";
import { buildProviderTransientMessage, classifyTransientError } from "./transientErrors";

const AI_ERROR_MESSAGE = "I'm having trouble right now. Please try again in a moment.";

export interface ErrorReport {
threadId: string;
userId: string;
error: unknown;
isByok: boolean;
provider: ProviderId;
}

// streamText's abortSignal handler finalizes on clean aborts; provider errors
// thrown from result.text bypass that path and leave a stranded pending row.
export async function finalizePendingMessages(
ctx: ActionCtx,
threadId: string,
reason: string,
): Promise<void> {
const result = await ctx.runQuery(components.agent.messages.listMessagesByThreadId, {
threadId,
paginationOpts: { cursor: null, numItems: 50 },
order: "desc",
});
for (const message of result.page) {
if (message.status !== "pending") continue;
await ctx.runMutation(components.agent.messages.finalizeMessage, {
messageId: message._id,
result: { status: "failed", error: reason },
});
}
}

export function getFinalizeCodeForError(error: unknown): string {
const transientKind = classifyTransientError(error);
return transientKind ?? (error instanceof Error ? error.name : "unknown_error");
}

// Best-effort wrappers must not leave users with stuck pending messages.
export const safeFinalizePending = (ctx: ActionCtx, threadId: string, reason: string) =>
finalizePendingMessages(ctx, threadId, reason).catch(() => undefined);
export const safeReportError = (ctx: ActionCtx, report: ErrorReport) =>
reportError(ctx, report).catch(() => undefined);
export const safeTryReportByok = (ctx: ActionCtx, report: ErrorReport) =>
tryReportByok(ctx, report).catch(() => false);

async function tryReportByok(ctx: ActionCtx, report: ErrorReport): Promise<boolean> {
if (!report.isByok) return false;
const code = classifyByokError(report.error);
if (code === null) return false;
// Provider bodies can include the decrypted key, so the finalize reason is the code only.
await finalizePendingMessages(ctx, report.threadId, code);
await saveMessage(ctx, components.agent, {
threadId: report.threadId,
userId: report.userId,
message: { role: "assistant", content: buildByokErrorMessage(code, report.provider) },
});
await ctx.runAction(internal.discord.notifyError, {
source: "streamWithRetry",
message: `${code} on ${report.provider} (${report.error instanceof Error ? report.error.name : "Unknown"})`,
userId: report.userId,
});
return true;
}

async function reportError(ctx: ActionCtx, report: ErrorReport): Promise<void> {
const transientKind = classifyTransientError(report.error);
const content = transientKind
? buildProviderTransientMessage(transientKind, report.provider, report.isByok)
: AI_ERROR_MESSAGE;

// Keep provider text out of the agent component's failed-message field.
await finalizePendingMessages(ctx, report.threadId, getFinalizeCodeForError(report.error));

await saveMessage(ctx, components.agent, {
threadId: report.threadId,
userId: report.userId,
message: { role: "assistant", content },
});

// Upstream provider outages already surface to the user with an attributed
// message; paging Discord on every Gemini/Claude capacity blip is noise.
if (transientKind) return;

const reason = report.error instanceof Error ? report.error.message : String(report.error);
await ctx.runAction(internal.discord.notifyError, {
source: "streamWithRetry",
message: reason,
userId: report.userId,
});
}
Loading
Loading