diff --git a/apps/web/app/api/chat/chat.test.ts b/apps/web/app/api/chat/chat.test.ts index 46a1b374ff07..4cdeeedd1c4d 100644 --- a/apps/web/app/api/chat/chat.test.ts +++ b/apps/web/app/api/chat/chat.test.ts @@ -44,6 +44,10 @@ vi.mock("@/app/api/sessions/shared", () => ({ getAgentSession: vi.fn(() => undefined), })); +vi.mock("@/lib/dench-cloud-settings", () => ({ + readConfiguredSelectedDenchModel: vi.fn(() => null), +})); + describe("Chat API routes", () => { beforeEach(() => { vi.resetModules(); @@ -85,6 +89,9 @@ describe("Chat API routes", () => { vi.mock("@/app/api/sessions/shared", () => ({ getAgentSession: vi.fn(() => undefined), })); + vi.mock("@/lib/dench-cloud-settings", () => ({ + readConfiguredSelectedDenchModel: vi.fn(() => null), + })); }); afterEach(() => { @@ -179,6 +186,36 @@ describe("Chat API routes", () => { ); }); + it("passes the configured selected model into startRun when no override is provided", async () => { + const { startRun, hasActiveRun, subscribeToRun } = await import("@/lib/active-runs"); + const { readConfiguredSelectedDenchModel } = await import("@/lib/dench-cloud-settings"); + vi.mocked(hasActiveRun).mockReturnValue(false); + vi.mocked(subscribeToRun).mockReturnValue(() => {}); + vi.mocked(readConfiguredSelectedDenchModel).mockReturnValue("anthropic.claude-sonnet-4-6-v1"); + + const { POST } = await import("./route.js"); + const req = new Request("http://localhost/api/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + messages: [ + { id: "m1", role: "user", parts: [{ type: "text", text: "hello" }] }, + ], + sessionId: "s1", + }), + }); + + await POST(req); + + expect(startRun).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "s1", + sessionModel: "anthropic.claude-sonnet-4-6-v1", + modelOverride: undefined, + }), + ); + }); + it("returns JSON when an unsafe OpenAI switch needs acknowledgement", async () => { const { getAgentSession } = await import("@/app/api/sessions/shared"); const { startRun } = await import("@/lib/active-runs"); diff --git a/apps/web/app/api/chat/route.ts b/apps/web/app/api/chat/route.ts index ae8506714e0a..e1c9322a08b9 100644 --- a/apps/web/app/api/chat/route.ts +++ b/apps/web/app/api/chat/route.ts @@ -31,6 +31,7 @@ import { isLikelyOpenAiModelId, needsOpenAiSwitchAcknowledgement, } from "@/lib/chat-models"; +import { readConfiguredSelectedDenchModel } from "@/lib/dench-cloud-settings"; export const runtime = "nodejs"; @@ -259,6 +260,8 @@ export async function POST(req: Request) { const gatewayThreadId = sessionMeta?.gatewaySessionId ?? sessionId; const imageAttachments = extractImageAttachmentsFromMessage(agentMessage); + const sessionModel = + normalizedModelOverride ?? readConfiguredSelectedDenchModel() ?? undefined; try { startRun({ @@ -267,6 +270,7 @@ export async function POST(req: Request) { agentSessionId: gatewayThreadId, overrideAgentId: effectiveAgentId, modelOverride: normalizedModelOverride, + sessionModel, imageAttachments: imageAttachments.length > 0 ? imageAttachments : undefined, diff --git a/apps/web/app/api/chat/stop/route.test.ts b/apps/web/app/api/chat/stop/route.test.ts index b1dbd85e5646..d40732bb3719 100644 --- a/apps/web/app/api/chat/stop/route.test.ts +++ b/apps/web/app/api/chat/stop/route.test.ts @@ -110,4 +110,33 @@ describe("POST /api/chat/stop", () => { expect(listSubagentsForRequesterSession).not.toHaveBeenCalled(); expect(json).toEqual({ aborted: true, abortedChildren: 0 }); }); + + it("stops a gateway-backed session when a non-subagent sessionKey is provided", async () => { + const { abortRun, getActiveRun } = await import("@/lib/active-runs"); + const { listSubagentsForRequesterSession } = await import("@/lib/subagent-registry"); + + vi.mocked(getActiveRun).mockImplementation(((runKey: string) => { + if (runKey === "agent:main:telegram:channel-1") { + return { status: "running" }; + } + return undefined; + }) as never); + vi.mocked(abortRun).mockReturnValue(true); + + const { POST } = await import("./route.js"); + const req = new Request("http://localhost/api/chat/stop", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + sessionKey: "agent:main:telegram:channel-1", + }), + }); + + const res = await POST(req); + const json = await res.json(); + + expect(abortRun).toHaveBeenCalledWith("agent:main:telegram:channel-1"); + expect(listSubagentsForRequesterSession).not.toHaveBeenCalled(); + expect(json).toEqual({ aborted: true, abortedChildren: 0 }); + }); }); diff --git a/apps/web/app/api/chat/stop/route.ts b/apps/web/app/api/chat/stop/route.ts index a19327445d68..756e37621bfa 100644 --- a/apps/web/app/api/chat/stop/route.ts +++ b/apps/web/app/api/chat/stop/route.ts @@ -2,7 +2,7 @@ * POST /api/chat/stop * * Abort an active agent run. Called by the Stop button. - * Works for both parent sessions (by sessionId) and subagent sessions (by sessionKey). + * Works for parent sessions (by sessionId) and any session-key backed run. */ import { abortRun, getActiveRun } from "@/lib/active-runs"; import { listSubagentsForRequesterSession } from "@/lib/subagent-registry"; @@ -17,11 +17,15 @@ export async function POST(req: Request) { .json() .catch(() => ({})); - const isSubagentSession = typeof body.sessionKey === "string" && body.sessionKey.includes(":subagent:"); - const runKey = isSubagentSession && body.sessionKey ? body.sessionKey : body.sessionId; + const sessionKey = + typeof body.sessionKey === "string" && body.sessionKey.trim() + ? body.sessionKey.trim() + : undefined; + const isSubagentSession = Boolean(sessionKey?.includes(":subagent:")); + const runKey = sessionKey ?? body.sessionId; if (!runKey) { - return new Response("sessionId or subagent sessionKey required", { status: 400 }); + return new Response("sessionId or sessionKey required", { status: 400 }); } const run = getActiveRun(runKey); @@ -30,7 +34,7 @@ export async function POST(req: Request) { const aborted = canAbort ? abortRun(runKey) : false; let abortedChildren = 0; - if (!isSubagentSession && body.sessionId && body.cascadeChildren) { + if (!sessionKey && body.sessionId && body.cascadeChildren) { const fallbackAgentId = resolveActiveAgentId(); const requesterSessionKey = resolveSessionKey(body.sessionId, fallbackAgentId); for (const subagent of listSubagentsForRequesterSession(requesterSessionKey)) { diff --git a/apps/web/app/components/chat-panel.tsx b/apps/web/app/components/chat-panel.tsx index 2f5bbc4cd08c..a029421dd5ad 100644 --- a/apps/web/app/components/chat-panel.tsx +++ b/apps/web/app/components/chat-panel.tsx @@ -27,6 +27,7 @@ import type { ChatPanelRuntimeState } from "@/lib/chat-session-registry"; import { getStreamActivityLabel, getIncompleteAssistantReplyReason, + hasAssistantRunningTool, hasAssistantPostToolText, hasAssistantText, hasAssistantToolActivity, @@ -737,6 +738,84 @@ export function createStreamParser() { }; } +const STREAM_ACTIVITY_TIMEOUT_MS = 90_000; +const STREAM_TIMEOUT_MESSAGE = + "Request timed out — no response from agent. Try again or check the gateway."; +const TOOL_RUNNING_TIMEOUT_MESSAGE = + "Tool is still running. Waiting for the gateway to finish..."; + +function roughJsonLength(value: unknown): number { + try { + const serialized = JSON.stringify(value); + return typeof serialized === "string" ? serialized.length : 0; + } catch { + return 0; + } +} + +function getAssistantActivityMarker(message: UIMessage | null): string { + if (!message || message.role !== "assistant") { + return ""; + } + const parts = message.parts.map((part, index) => { + if (part.type === "text") { + return `text:${index}:${part.text.length}`; + } + if (part.type === "reasoning") { + const text = typeof part.text === "string" ? part.text : ""; + const state = typeof part.state === "string" ? part.state : ""; + return `reasoning:${index}:${text.length}:${state}`; + } + if (part.type === "dynamic-tool") { + const dynamicToolPart = part as { + toolCallId: string; + toolName?: string; + state?: string; + preliminary?: boolean; + input?: unknown; + output?: unknown; + }; + return [ + "dynamic-tool", + index, + dynamicToolPart.toolCallId, + dynamicToolPart.toolName ?? "", + dynamicToolPart.state ?? "", + dynamicToolPart.preliminary === true ? "preliminary" : "final", + roughJsonLength(dynamicToolPart.input), + roughJsonLength(dynamicToolPart.output), + ].join(":"); + } + if (part.type === "tool-invocation") { + const toolInvocationPart = part as { + toolCallId: string; + toolName?: string; + args?: unknown; + result?: unknown; + errorText?: unknown; + }; + return [ + "tool-invocation", + index, + toolInvocationPart.toolCallId, + toolInvocationPart.toolName ?? "", + roughJsonLength(toolInvocationPart.args), + roughJsonLength(toolInvocationPart.result), + typeof toolInvocationPart.errorText === "string" + ? toolInvocationPart.errorText.length + : 0, + ].join(":"); + } + return `${part.type}:${index}:${roughJsonLength(part)}`; + }); + return `${message.id}:${parts.join("|")}`; +} + +type ActiveRunTarget = { + sessionId: string | null; + sessionKey: string | null; +}; + /** Imperative handle for parent-driven session control (main page). */ export type ChatPanelHandle = { loadSession: (sessionId: string) => Promise; @@ -895,6 +974,12 @@ export const ChatPanel = forwardRef( // ── Stream-level error (empty response detection) ── const [streamError, setStreamError] = useState(null); + const activeRunTargetRef = useRef({ + sessionId: null, + sessionKey: null, + }); + const lastAssistantActivityAtRef = useRef(null); + const lastAssistantActivityMarkerRef = useRef(""); // Track persisted messages to avoid double-saves const savedMessageIdsRef = useRef>(new Set()); @@ -971,6 +1056,50 @@ export const ChatPanel = forwardRef( status === "submitted" || isReconnecting; + const stopActiveRun = useCallback( + async (options?: { + abortServer?: boolean; + cascadeChildren?: boolean; + target?: ActiveRunTarget; + }) => { + reconnectAbortRef.current?.abort(); + reconnectAbortRef.current = null; + setIsReconnecting(false); + void stop(); + + const target = options?.target ?? activeRunTargetRef.current; + if (options?.abortServer && (target.sessionKey || target.sessionId)) { + try { + const body = target.sessionKey + ? { sessionKey: target.sessionKey } + : { + sessionId: target.sessionId, + ...(options?.cascadeChildren ? { cascadeChildren: true } : {}), + }; + await fetch("/api/chat/stop", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(body), + }); + } catch { + // Best-effort only; local stream teardown still proceeds. + } + } + }, + [stop], + ); + + useEffect(() => { + if (!isSubagentMode && !isGatewayMode) { + activeRunTargetRef.current = { + sessionId: currentSessionId, + sessionKey: null, + }; + } + }, [currentSessionId, isGatewayMode, isSubagentMode]); + // Keep cloud catalog + primary model in sync (hero, session switches, and after // completed turns — agent tools may change agents.defaults.model.primary). useEffect(() => { @@ -1031,30 +1160,66 @@ export const ChatPanel = forwardRef( loadingSession, ]); - // Stream stall detection: if we stay in "submitted" (no first - // token received) for too long, surface an error and reset. - const stallTimerRef = useRef | null>(null); + // Reset the watchdog whenever the assistant visibly changes: text, + // reasoning, or tool input/output updates all count as activity. useEffect(() => { - if (stallTimerRef.current) { - clearTimeout(stallTimerRef.current); - stallTimerRef.current = null; + if (!isStreaming) { + lastAssistantActivityAtRef.current = null; + lastAssistantActivityMarkerRef.current = ""; + return; } - if (status === "submitted") { - stallTimerRef.current = setTimeout(() => { - stallTimerRef.current = null; - if (status === "submitted") { - setStreamError("Request timed out — no response from agent. Try again or check the gateway."); - void stop(); - } - }, 90_000); + + const lastMessage = messages[messages.length - 1] ?? null; + const activityMarker = getAssistantActivityMarker(lastMessage); + if ( + activityMarker && + activityMarker !== lastAssistantActivityMarkerRef.current + ) { + lastAssistantActivityMarkerRef.current = activityMarker; + lastAssistantActivityAtRef.current = Date.now(); + if ( + streamError === STREAM_TIMEOUT_MESSAGE || + streamError === TOOL_RUNNING_TIMEOUT_MESSAGE + ) { + setStreamError(null); + } } - return () => { - if (stallTimerRef.current) { - clearTimeout(stallTimerRef.current); - stallTimerRef.current = null; + + if (lastAssistantActivityAtRef.current === null) { + lastAssistantActivityAtRef.current = Date.now(); + if (streamError) { + setStreamError(null); + } + } + + const intervalId = setInterval(() => { + const lastActivityAt = lastAssistantActivityAtRef.current; + if (lastActivityAt === null) { + return; + } + if (Date.now() - lastActivityAt < STREAM_ACTIVITY_TIMEOUT_MS) { + return; } + if (hasAssistantRunningTool(lastMessage)) { + if (streamError !== TOOL_RUNNING_TIMEOUT_MESSAGE) { + setStreamError(TOOL_RUNNING_TIMEOUT_MESSAGE); + } + lastAssistantActivityAtRef.current = Date.now(); + return; + } + if (streamError !== STREAM_TIMEOUT_MESSAGE) { + setStreamError(STREAM_TIMEOUT_MESSAGE); + } + void stopActiveRun({ + abortServer: true, + cascadeChildren: true, + }); + }, 1_000); + + return () => { + clearInterval(intervalId); }; - }, [status, stop]); + }, [isStreaming, messages, stopActiveRun, streamError]); // Auto-scroll to bottom on new messages, but only when the user // is already near the bottom. If the user scrolls up during @@ -1281,6 +1446,11 @@ export const ChatPanel = forwardRef( } let cancelled = false; + void stopActiveRun({ + abortServer: true, + cascadeChildren: true, + }); + activeRunTargetRef.current = { sessionId: null, sessionKey: null }; sessionIdRef.current = null; setCurrentSessionId(null); onActiveSessionChange?.(null); @@ -1302,6 +1472,10 @@ export const ChatPanel = forwardRef( : undefined) ?? sessions[0]; setCurrentSessionId(target.id); sessionIdRef.current = target.id; + activeRunTargetRef.current = { + sessionId: target.id, + sessionKey: null, + }; onActiveSessionChange?.(target.id); isFirstFileMessageRef.current = false; @@ -1372,7 +1546,7 @@ export const ChatPanel = forwardRef( cancelled = true; }; // eslint-disable-next-line react-hooks/exhaustive-deps -- stable setters - }, [filePath, attemptReconnect]); + }, [filePath, attemptReconnect, isSubagentMode, onActiveSessionChange, setMessages, stopActiveRun]); // ── Non-file panel: auto-restore session on mount or URL change ── const initialSessionHandled = useRef(false); @@ -1395,8 +1569,14 @@ export const ChatPanel = forwardRef( if (!subagentSessionKey || !subagentTask) {return;} let cancelled = false; - reconnectAbortRef.current?.abort(); - void stop(); + void stopActiveRun({ + abortServer: true, + cascadeChildren: true, + }); + activeRunTargetRef.current = { + sessionId: null, + sessionKey: subagentSessionKey, + }; savedMessageIdsRef.current.clear(); setQueuedMessages([]); @@ -1467,15 +1647,21 @@ export const ChatPanel = forwardRef( reconnectAbortRef.current?.abort(); }; // eslint-disable-next-line react-hooks/exhaustive-deps -- stable setters - }, [subagentSessionKey, subagentTask, attemptReconnect]); + }, [subagentSessionKey, subagentTask, attemptReconnect, stopActiveRun]); // ── Gateway session mode: load transcript + reconnect to active stream ── useEffect(() => { if (!gatewaySessionKey || !gatewaySessionId) return; let cancelled = false; - reconnectAbortRef.current?.abort(); - void stop(); + void stopActiveRun({ + abortServer: true, + cascadeChildren: true, + }); + activeRunTargetRef.current = { + sessionId: null, + sessionKey: gatewaySessionKey, + }; savedMessageIdsRef.current.clear(); setQueuedMessages([]); setLoadingSession(true); @@ -1518,7 +1704,7 @@ export const ChatPanel = forwardRef( reconnectAbortRef.current?.abort(); }; // eslint-disable-next-line react-hooks/exhaustive-deps - }, [gatewaySessionKey, gatewaySessionId, attemptReconnect]); + }, [gatewaySessionKey, gatewaySessionId, attemptReconnect, stopActiveRun]); // ── Poll for subagent spawns during active streaming ── const [hasRunningSubagents, setHasRunningSubagents] = useState(false); @@ -1652,9 +1838,6 @@ export const ChatPanel = forwardRef( } }, 50); } - if (status === "submitted") { - setStreamError(null); - } return () => { if (emptyStreamTimerRef.current) { clearTimeout(emptyStreamTimerRef.current); @@ -1745,6 +1928,10 @@ export const ChatPanel = forwardRef( sessionId = await createSession(title); setCurrentSessionId(sessionId); sessionIdRef.current = sessionId; + activeRunTargetRef.current = { + sessionId, + sessionKey: null, + }; onActiveSessionChange?.(sessionId); onSessionsChange?.(); @@ -1790,7 +1977,12 @@ export const ChatPanel = forwardRef( role: "user" as const, parts: [{ type: "text" as const, text: messageText }] as UIMessage["parts"], }; - setMessages((prev) => [...prev, userMsg]); + const nextMessages = [...messages, userMsg]; + activeRunTargetRef.current = { + sessionId: null, + sessionKey: gatewaySessionKey, + }; + setMessages(nextMessages); try { const res = await fetch("/api/gateway/chat", { @@ -1799,7 +1991,9 @@ export const ChatPanel = forwardRef( body: JSON.stringify({ sessionKey: gatewaySessionKey, message: messageText }), }); if (res.ok && res.body) { - await attemptReconnect(gatewaySessionKey, [], { sessionKey: gatewaySessionKey }); + await attemptReconnect(gatewaySessionKey, nextMessages, { + sessionKey: gatewaySessionKey, + }); } } catch { /* ignore */ } } else { @@ -1819,6 +2013,7 @@ export const ChatPanel = forwardRef( onSessionsChange, filePath, fileContext, + messages, sendMessage, gatewaySessionKey, attemptReconnect, @@ -1857,13 +2052,19 @@ export const ChatPanel = forwardRef( return; } - // Stop any active stream/reconnection for the old session. - reconnectAbortRef.current?.abort(); - void stop(); + // Stop any active stream/reconnection for the old session. + await stopActiveRun({ + abortServer: true, + cascadeChildren: true, + }); setLoadingSession(true); setCurrentSessionId(sessionId); sessionIdRef.current = sessionId; + activeRunTargetRef.current = { + sessionId, + sessionKey: null, + }; onActiveSessionChange?.(sessionId); savedMessageIdsRef.current.clear(); isFirstFileMessageRef.current = false; @@ -1944,15 +2145,17 @@ export const ChatPanel = forwardRef( currentSessionId, setMessages, onActiveSessionChange, - stop, + stopActiveRun, attemptReconnect, ], ); const handleNewSession = useCallback(() => { - reconnectAbortRef.current?.abort(); - void stop(); - setIsReconnecting(false); + void stopActiveRun({ + abortServer: true, + cascadeChildren: true, + }); + activeRunTargetRef.current = { sessionId: null, sessionKey: null }; setCurrentSessionId(null); sessionIdRef.current = null; onActiveSessionChange?.(null); @@ -1965,7 +2168,7 @@ export const ChatPanel = forwardRef( requestAnimationFrame(() => { editorRef.current?.focus(); }); - }, [setMessages, onActiveSessionChange, stop]); + }, [onActiveSessionChange, setMessages, stopActiveRun]); // Keep the ref in sync so handleEditorSubmit can call it handleNewSessionRef.current = handleNewSession; @@ -1983,6 +2186,10 @@ export const ChatPanel = forwardRef( const sessionId = await createSession(title); setCurrentSessionId(sessionId); sessionIdRef.current = sessionId; + activeRunTargetRef.current = { + sessionId, + sessionKey: null, + }; onActiveSessionChange?.(sessionId); onSessionsChange?.(); userScrolledAwayRef.current = false; @@ -1997,35 +2204,11 @@ export const ChatPanel = forwardRef( // ── Stop handler (aborts server-side run + client-side stream) ── const handleStop = useCallback(async () => { - // Abort reconnection stream if active (immediate visual feedback). - reconnectAbortRef.current?.abort(); - setIsReconnecting(false); - - // Read from refs to avoid stale closures — sessionIdRef is updated - // synchronously in handleEditorSubmit, so it's always current even - // if React hasn't re-rendered with the new state yet. - const sk = subagentSessionKeyRef.current; - const sid = sessionIdRef.current; - const stopKey = sk || sid; - if (stopKey) { - try { - await fetch("/api/chat/stop", { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify( - sk - ? { sessionKey: sk } - : { sessionId: sid }, - ), - }); - } catch { /* ignore */ } - } - - // Stop the useChat transport stream (transitions status → "ready"). - void stop(); - }, [stop]); + await stopActiveRun({ + abortServer: true, + cascadeChildren: true, + }); + }, [stopActiveRun]); // ── Queue handlers ── @@ -2659,13 +2842,13 @@ export const ChatPanel = forwardRef( {/* Scroll to bottom button */} {showScrollButton && !showHeroState && ( -
+