Skip to content
Open
87 changes: 84 additions & 3 deletions open-sse/handlers/chatCore.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { detectClientTool, isNativePassthrough } from "../utils/clientDetector.j
* @param {object} options.credentials - Provider credentials
* @param {string} options.sourceFormatOverride - Override detected source format (e.g. "openai-responses")
*/
export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey, ccFilterNaming, sourceFormatOverride, providerThinking }) {
export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey, ccFilterNaming, sourceFormatOverride, providerThinking, ttftTimeoutMs = 0, ttftCooldownMs = 15000 }) {
const { provider, model } = modelInfo;
const requestStartTime = Date.now();

Expand Down Expand Up @@ -236,8 +236,89 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
}

// Streaming response
const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx });
return handleStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete });
const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx, streamDetailId });

// TTFT deadline race: if enabled, measure from full request start → first token
let responseToStream = providerResponse;
if (stream && ttftTimeoutMs > 0) {
const elapsedBeforeStreamMs = Date.now() - requestStartTime;
const remainingTtftMs = ttftTimeoutMs - elapsedBeforeStreamMs;

if (remainingTtftMs <= 0) {
streamController.abort();
trackPendingRequest(model, provider, connectionId, false, true);
appendRequestLog({ model, provider, connectionId, status: "TTFT_TIMEOUT" }).catch(() => {});
saveRequestDetail(buildRequestDetail({
provider, model, connectionId,
latency: { ttft: Date.now() - requestStartTime, total: Date.now() - requestStartTime },
tokens: { prompt_tokens: 0, completion_tokens: 0 },
request: extractRequestConfig(body, stream),
providerRequest: finalBody || translatedBody || null,
response: { error: "ttft_timeout", message: "Timed out before first token; fell back to the next account.", thinking: null },
status: "error"
}, { id: streamDetailId })).catch(() => {});
console.log(`[TTFT] ${provider}/${model} exceeded ${ttftTimeoutMs}ms before first token`);
return createErrorResult(408, "ttft_timeout");
}

const ttftResult = await raceTtftDeadline(providerResponse, remainingTtftMs, streamController);
if (ttftResult.timedOut) {
trackPendingRequest(model, provider, connectionId, false, true);
appendRequestLog({ model, provider, connectionId, status: "TTFT_TIMEOUT" }).catch(() => {});
saveRequestDetail(buildRequestDetail({
provider, model, connectionId,
latency: { ttft: Date.now() - requestStartTime, total: Date.now() - requestStartTime },
tokens: { prompt_tokens: 0, completion_tokens: 0 },
request: extractRequestConfig(body, stream),
providerRequest: finalBody || translatedBody || null,
response: { error: "ttft_timeout", message: "Timed out before first token; fell back to the next account.", thinking: null },
status: "error"
}, { id: streamDetailId })).catch(() => {});
console.log(`[TTFT] ${provider}/${model} exceeded ${ttftTimeoutMs}ms before first token`);
return createErrorResult(408, "ttft_timeout");
}
responseToStream = ttftResult.response;
}

return handleStreamingResponse({ ...sharedCtx, providerResponse: responseToStream, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete, streamDetailId });
}

async function raceTtftDeadline(providerResponse, ttftTimeoutMs, streamController) {
return new Promise((resolve) => {
const timer = setTimeout(() => {
streamController.abort();
resolve({ timedOut: true });
}, ttftTimeoutMs);

const reader = providerResponse.body.getReader();
reader.read().then(({ value, done }) => {
clearTimeout(timer);
const newBody = new ReadableStream({
start(controller) {
if (!done && value) controller.enqueue(value);
if (done) { controller.close(); return; }
},
async pull(controller) {
const { value: chunk, done: isDone } = await reader.read();
if (isDone) { controller.close(); return; }
controller.enqueue(chunk);
},
cancel() { reader.cancel(); }
});
resolve({
timedOut: false,
response: new Response(newBody, {
status: providerResponse.status,
statusText: providerResponse.statusText,
headers: providerResponse.headers
})
});
}).catch(() => {
clearTimeout(timer);
resolve({ timedOut: true });
});
});
}

export function isTokenExpiringSoon(expiresAt, bufferMs = 5 * 60 * 1000) {
Expand Down
29 changes: 23 additions & 6 deletions open-sse/handlers/chatCore/streamingHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,31 @@ function buildTransformStream({ provider, sourceFormat, targetFormat, userAgent,
/**
* Handle streaming response — pipe provider SSE through transform stream to client.
*/
export function handleStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, userAgent, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, toolNameMap, streamController, onStreamComplete }) {
if (onRequestSuccess) onRequestSuccess();
export function handleStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, userAgent, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, toolNameMap, streamController, onStreamComplete, streamDetailId }) {
let responseToStream = providerResponse;
if (onRequestSuccess) {
let firstChunkFired = false;
const original = providerResponse.body;
const wrapped = new TransformStream({
transform(chunk, controller) {
if (!firstChunkFired) {
firstChunkFired = true;
onRequestSuccess();
}
controller.enqueue(chunk);
}
});
original.pipeTo(wrapped.writable).catch(() => {});
responseToStream = new Response(wrapped.readable, {
status: providerResponse.status,
statusText: providerResponse.statusText,
headers: providerResponse.headers
});
}

const transformStream = buildTransformStream({ provider, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey });
const transformedBody = pipeWithDisconnect(providerResponse, transformStream, streamController);
const transformedBody = pipeWithDisconnect(responseToStream, transformStream, streamController);

const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
saveRequestDetail(buildRequestDetail({
provider, model, connectionId,
latency: { ttft: 0, total: Date.now() - requestStartTime },
Expand All @@ -68,8 +86,7 @@ export function handleStreamingResponse({ providerResponse, provider, model, sou
/**
* Build onStreamComplete callback for streaming usage tracking.
*/
export function buildOnStreamComplete({ provider, model, connectionId, apiKey, requestStartTime, body, stream, finalBody, translatedBody, clientRawRequest }) {
const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
export function buildOnStreamComplete({ provider, model, connectionId, apiKey, requestStartTime, body, stream, finalBody, translatedBody, clientRawRequest, streamDetailId }) {

const onStreamComplete = (contentObj, usage, ttftAt) => {
const latency = {
Expand Down
6 changes: 5 additions & 1 deletion open-sse/services/accountFallback.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ export function getQuotaCooldown(backoffLevel = 0) {
* @param {number} backoffLevel - Current backoff level for exponential backoff
* @returns {{ shouldFallback: boolean, cooldownMs: number, newBackoffLevel?: number }}
*/
export function checkFallbackError(status, errorText, backoffLevel = 0) {
export function checkFallbackError(status, errorText, backoffLevel = 0, options = {}) {
// Check error message FIRST - specific patterns take priority over status codes
if (errorText) {
const errorStr = typeof errorText === "string" ? errorText : JSON.stringify(errorText);
const lowerError = errorStr.toLowerCase();

if (lowerError.includes("ttft_timeout")) {
return { shouldFallback: true, cooldownMs: options?.ttftCooldownMs ?? 15000 };
}

if (lowerError.includes("no credentials")) {
return { shouldFallback: true, cooldownMs: COOLDOWN_MS.notFound };
}
Expand Down
68 changes: 68 additions & 0 deletions src/app/(dashboard)/dashboard/profile/page.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,36 @@ export default function ProfilePage() {
}
};

const updateTtftTimeout = async (val) => {
const num = parseInt(val);
if (isNaN(num) || num < 0) return;
try {
const res = await fetch("/api/settings", {
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ ttftTimeoutMs: num }),
});
if (res.ok) setSettings(prev => ({ ...prev, ttftTimeoutMs: num }));
} catch (err) {
console.error("Failed to update TTFT timeout:", err);
}
};

const updateTtftCooldown = async (val) => {
const num = parseInt(val);
if (isNaN(num) || num < 1000) return;
try {
const res = await fetch("/api/settings", {
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ ttftCooldownMs: num }),
});
if (res.ok) setSettings(prev => ({ ...prev, ttftCooldownMs: num }));
} catch (err) {
console.error("Failed to update TTFT cooldown:", err);
}
};

const updateRequireLogin = async (requireLogin) => {
try {
const res = await fetch("/api/settings", {
Expand Down Expand Up @@ -548,6 +578,44 @@ export default function ProfilePage() {
/>
</div>

{/* TTFT Timeout */}
<div className="flex items-center justify-between pt-4 border-t border-border/50">
<div>
<p className="font-medium">TTFT Timeout (ms)</p>
<p className="text-sm text-text-muted">
Maximum time to wait for first token (0 = disabled)
</p>
</div>
<Input
type="number"
min="0"
step="500"
value={settings.ttftTimeoutMs ?? 0}
onChange={(e) => updateTtftTimeout(e.target.value)}
disabled={loading}
className="w-24 text-center"
/>
</div>

{/* TTFT Cooldown */}
<div className="flex items-center justify-between pt-2 border-t border-border/50">
<div>
<p className="font-medium">TTFT Cooldown (ms)</p>
<p className="text-sm text-text-muted">
How long to lock slow providers after a timeout
</p>
</div>
<Input
type="number"
min="1000"
step="1000"
value={settings.ttftCooldownMs ?? 15000}
onChange={(e) => updateTtftCooldown(e.target.value)}
disabled={loading}
className="w-24 text-center"
/>
</div>

<p className="text-xs text-text-muted italic pt-2 border-t border-border/50">
{settings.fallbackStrategy === "round-robin"
? `Currently distributing requests across all available accounts with ${settings.stickyRoundRobinLimit || 3} calls per account.`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ export default function RequestDetailsTab() {
<div className="flex flex-col gap-0.5">
<div>TTFT: <span className="font-mono">{detail.latency?.ttft || 0}ms</span></div>
<div>Total: <span className="font-mono">{detail.latency?.total || 0}ms</span></div>
{detail.response?.error === "ttft_timeout" && (
<div className="text-[11px] text-amber-600 dark:text-amber-400">
Timed out before first token; fell back to next account.
</div>
)}
</div>
</td>
<td className="p-4 text-center">
Expand Down Expand Up @@ -397,6 +402,11 @@ export default function RequestDetailsTab() {
)}

<CollapsibleSection title="4. Client Response (Final)" defaultOpen={true} icon="output">
{selectedDetail.response?.error === "ttft_timeout" && (
<div className="mb-4 rounded-lg border border-amber-200 bg-amber-50 px-3 py-2 text-sm text-amber-800 dark:border-amber-800 dark:bg-amber-950/30 dark:text-amber-200">
Timed out before first token; fell back to the next account.
</div>
)}
{selectedDetail.response?.thinking && (
<div className="mb-4">
<h4 className="font-semibold text-text-main mb-2 flex items-center gap-2 text-xs uppercase tracking-wide opacity-70">
Expand Down
4 changes: 4 additions & 0 deletions src/lib/localDb.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ const defaultData = {
outboundProxyUrl: "",
outboundNoProxy: "",
mitmRouterBaseUrl: DEFAULT_MITM_ROUTER_BASE,
ttftTimeoutMs: 0,
ttftCooldownMs: 15000,
},
pricing: {} // NEW: pricing configuration
};
Expand Down Expand Up @@ -110,6 +112,8 @@ function cloneDefaultData() {
outboundProxyUrl: "",
outboundNoProxy: "",
mitmRouterBaseUrl: DEFAULT_MITM_ROUTER_BASE,
ttftTimeoutMs: 0,
ttftCooldownMs: 15000,
},
pricing: {},
};
Expand Down
4 changes: 3 additions & 1 deletion src/sse/handlers/chat.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ async function handleSingleModelChat(body, modelStr, clientRawRequest = null, re
providerThinking,
// Detect source format by endpoint + body
sourceFormatOverride: request?.url ? detectFormatByEndpoint(new URL(request.url).pathname, body) : null,
ttftTimeoutMs: chatSettings.ttftTimeoutMs || 0,
ttftCooldownMs: chatSettings.ttftCooldownMs || 15000,
onCredentialsRefreshed: async (newCreds) => {
await updateProviderCredentials(credentials.connectionId, {
accessToken: newCreds.accessToken,
Expand All @@ -219,7 +221,7 @@ async function handleSingleModelChat(body, modelStr, clientRawRequest = null, re
if (result.success) return result.response;

// Mark account unavailable (auto-calculates cooldown with exponential backoff)
const { shouldFallback } = await markAccountUnavailable(credentials.connectionId, result.status, result.error, provider, model);
const { shouldFallback } = await markAccountUnavailable(credentials.connectionId, result.status, result.error, provider, model, { ttftCooldownMs: chatSettings.ttftCooldownMs || 15000 });

if (shouldFallback) {
log.warn("AUTH", `Account ${credentials.connectionName} unavailable (${result.status}), trying fallback`);
Expand Down
4 changes: 2 additions & 2 deletions src/sse/services/auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ export async function getProviderCredentials(provider, excludeConnectionIds = nu
* @param {string|null} model - The specific model that triggered the error
* @returns {{ shouldFallback: boolean, cooldownMs: number }}
*/
export async function markAccountUnavailable(connectionId, status, errorText, provider = null, model = null) {
export async function markAccountUnavailable(connectionId, status, errorText, provider = null, model = null, options = {}) {
if (!connectionId || connectionId === "noauth") return { shouldFallback: false, cooldownMs: 0 };
const connections = await getProviderConnections({ provider });
const conn = connections.find(c => c.id === connectionId);
const backoffLevel = conn?.backoffLevel || 0;

const { shouldFallback, cooldownMs, newBackoffLevel } = checkFallbackError(status, errorText, backoffLevel);
const { shouldFallback, cooldownMs, newBackoffLevel } = checkFallbackError(status, errorText, backoffLevel, options);
if (!shouldFallback) return { shouldFallback: false, cooldownMs: 0 };

const reason = typeof errorText === "string" ? errorText.slice(0, 100) : "Provider error";
Expand Down