diff --git a/open-sse/handlers/chatCore.js b/open-sse/handlers/chatCore.js index 780a4fee6..e82f20879 100644 --- a/open-sse/handlers/chatCore.js +++ b/open-sse/handlers/chatCore.js @@ -24,7 +24,7 @@ import { detectClientTool, isNativePassthrough } from "../utils/clientDetector.j * @param {object} options.credentials - Provider credentials * @param {string} options.sourceFormatOverride - Override detected source format (e.g. "openai-responses") */ -export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey, ccFilterNaming, sourceFormatOverride, providerThinking }) { +export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey, ccFilterNaming, sourceFormatOverride, providerThinking, ttftTimeoutMs = 0, ttftCooldownMs = 15000 }) { const { provider, model } = modelInfo; const requestStartTime = Date.now(); @@ -236,8 +236,89 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred } // Streaming response - const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx }); - return handleStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete }); + const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`; + const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx, streamDetailId }); + + // TTFT deadline race: if enabled, measure from full request start → first token + let responseToStream = providerResponse; + if (stream && ttftTimeoutMs > 0) { + const elapsedBeforeStreamMs = Date.now() - requestStartTime; + const remainingTtftMs = ttftTimeoutMs - elapsedBeforeStreamMs; + + if (remainingTtftMs <= 0) { + streamController.abort(); + trackPendingRequest(model, provider, connectionId, false, true); + appendRequestLog({ model, provider, connectionId, status: "TTFT_TIMEOUT" }).catch(() => {}); + saveRequestDetail(buildRequestDetail({ + provider, model, connectionId, + latency: { ttft: Date.now() - requestStartTime, total: Date.now() - requestStartTime }, + tokens: { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + response: { error: "ttft_timeout", message: "Timed out before first token; fell back to the next account.", thinking: null }, + status: "error" + }, { id: streamDetailId })).catch(() => {}); + console.log(`[TTFT] ${provider}/${model} exceeded ${ttftTimeoutMs}ms before first token`); + return createErrorResult(408, "ttft_timeout"); + } + + const ttftResult = await raceTtftDeadline(providerResponse, remainingTtftMs, streamController); + if (ttftResult.timedOut) { + trackPendingRequest(model, provider, connectionId, false, true); + appendRequestLog({ model, provider, connectionId, status: "TTFT_TIMEOUT" }).catch(() => {}); + saveRequestDetail(buildRequestDetail({ + provider, model, connectionId, + latency: { ttft: Date.now() - requestStartTime, total: Date.now() - requestStartTime }, + tokens: { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + response: { error: "ttft_timeout", message: "Timed out before first token; fell back to the next account.", thinking: null }, + status: "error" + }, { id: streamDetailId })).catch(() => {}); + console.log(`[TTFT] ${provider}/${model} exceeded ${ttftTimeoutMs}ms before first token`); + return createErrorResult(408, "ttft_timeout"); + } + responseToStream = ttftResult.response; + } + + return handleStreamingResponse({ ...sharedCtx, providerResponse: responseToStream, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete, streamDetailId }); +} + +async function raceTtftDeadline(providerResponse, ttftTimeoutMs, streamController) { + return new Promise((resolve) => { + const timer = setTimeout(() => { + streamController.abort(); + resolve({ timedOut: true }); + }, ttftTimeoutMs); + + const reader = providerResponse.body.getReader(); + reader.read().then(({ value, done }) => { + clearTimeout(timer); + const newBody = new ReadableStream({ + start(controller) { + if (!done && value) controller.enqueue(value); + if (done) { controller.close(); return; } + }, + async pull(controller) { + const { value: chunk, done: isDone } = await reader.read(); + if (isDone) { controller.close(); return; } + controller.enqueue(chunk); + }, + cancel() { reader.cancel(); } + }); + resolve({ + timedOut: false, + response: new Response(newBody, { + status: providerResponse.status, + statusText: providerResponse.statusText, + headers: providerResponse.headers + }) + }); + }).catch(() => { + clearTimeout(timer); + resolve({ timedOut: true }); + }); + }); } export function isTokenExpiringSoon(expiresAt, bufferMs = 5 * 60 * 1000) { diff --git a/open-sse/handlers/chatCore/streamingHandler.js b/open-sse/handlers/chatCore/streamingHandler.js index 03d12c139..6e6813f42 100644 --- a/open-sse/handlers/chatCore/streamingHandler.js +++ b/open-sse/handlers/chatCore/streamingHandler.js @@ -39,13 +39,31 @@ function buildTransformStream({ provider, sourceFormat, targetFormat, userAgent, /** * Handle streaming response — pipe provider SSE through transform stream to client. */ -export function handleStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, userAgent, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, toolNameMap, streamController, onStreamComplete }) { - if (onRequestSuccess) onRequestSuccess(); +export function handleStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, userAgent, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, toolNameMap, streamController, onStreamComplete, streamDetailId }) { + let responseToStream = providerResponse; + if (onRequestSuccess) { + let firstChunkFired = false; + const original = providerResponse.body; + const wrapped = new TransformStream({ + transform(chunk, controller) { + if (!firstChunkFired) { + firstChunkFired = true; + onRequestSuccess(); + } + controller.enqueue(chunk); + } + }); + original.pipeTo(wrapped.writable).catch(() => {}); + responseToStream = new Response(wrapped.readable, { + status: providerResponse.status, + statusText: providerResponse.statusText, + headers: providerResponse.headers + }); + } const transformStream = buildTransformStream({ provider, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey }); - const transformedBody = pipeWithDisconnect(providerResponse, transformStream, streamController); + const transformedBody = pipeWithDisconnect(responseToStream, transformStream, streamController); - const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`; saveRequestDetail(buildRequestDetail({ provider, model, connectionId, latency: { ttft: 0, total: Date.now() - requestStartTime }, @@ -68,8 +86,7 @@ export function handleStreamingResponse({ providerResponse, provider, model, sou /** * Build onStreamComplete callback for streaming usage tracking. */ -export function buildOnStreamComplete({ provider, model, connectionId, apiKey, requestStartTime, body, stream, finalBody, translatedBody, clientRawRequest }) { - const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`; +export function buildOnStreamComplete({ provider, model, connectionId, apiKey, requestStartTime, body, stream, finalBody, translatedBody, clientRawRequest, streamDetailId }) { const onStreamComplete = (contentObj, usage, ttftAt) => { const latency = { diff --git a/open-sse/services/accountFallback.js b/open-sse/services/accountFallback.js index 957655c32..fba0c1c1f 100644 --- a/open-sse/services/accountFallback.js +++ b/open-sse/services/accountFallback.js @@ -18,12 +18,16 @@ export function getQuotaCooldown(backoffLevel = 0) { * @param {number} backoffLevel - Current backoff level for exponential backoff * @returns {{ shouldFallback: boolean, cooldownMs: number, newBackoffLevel?: number }} */ -export function checkFallbackError(status, errorText, backoffLevel = 0) { +export function checkFallbackError(status, errorText, backoffLevel = 0, options = {}) { // Check error message FIRST - specific patterns take priority over status codes if (errorText) { const errorStr = typeof errorText === "string" ? errorText : JSON.stringify(errorText); const lowerError = errorStr.toLowerCase(); + if (lowerError.includes("ttft_timeout")) { + return { shouldFallback: true, cooldownMs: options?.ttftCooldownMs ?? 15000 }; + } + if (lowerError.includes("no credentials")) { return { shouldFallback: true, cooldownMs: COOLDOWN_MS.notFound }; } diff --git a/src/app/(dashboard)/dashboard/profile/page.js b/src/app/(dashboard)/dashboard/profile/page.js index 27310276c..1bd1b97eb 100644 --- a/src/app/(dashboard)/dashboard/profile/page.js +++ b/src/app/(dashboard)/dashboard/profile/page.js @@ -223,6 +223,36 @@ export default function ProfilePage() { } }; + const updateTtftTimeout = async (val) => { + const num = parseInt(val); + if (isNaN(num) || num < 0) return; + try { + const res = await fetch("/api/settings", { + method: "PATCH", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ ttftTimeoutMs: num }), + }); + if (res.ok) setSettings(prev => ({ ...prev, ttftTimeoutMs: num })); + } catch (err) { + console.error("Failed to update TTFT timeout:", err); + } + }; + + const updateTtftCooldown = async (val) => { + const num = parseInt(val); + if (isNaN(num) || num < 1000) return; + try { + const res = await fetch("/api/settings", { + method: "PATCH", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ ttftCooldownMs: num }), + }); + if (res.ok) setSettings(prev => ({ ...prev, ttftCooldownMs: num })); + } catch (err) { + console.error("Failed to update TTFT cooldown:", err); + } + }; + const updateRequireLogin = async (requireLogin) => { try { const res = await fetch("/api/settings", { @@ -548,6 +578,44 @@ export default function ProfilePage() { /> + {/* TTFT Timeout */} +
TTFT Timeout (ms)
++ Maximum time to wait for first token (0 = disabled) +
+TTFT Cooldown (ms)
++ How long to lock slow providers after a timeout +
+{settings.fallbackStrategy === "round-robin" ? `Currently distributing requests across all available accounts with ${settings.stickyRoundRobinLimit || 3} calls per account.` diff --git a/src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js b/src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js index 4f5273610..2e7c0e717 100644 --- a/src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js +++ b/src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js @@ -285,6 +285,11 @@ export default function RequestDetailsTab() {