diff --git a/apps/web/src/components/organisms/chat-view/hooks/use-chat-stream.ts b/apps/web/src/components/organisms/chat-view/hooks/use-chat-stream.ts index bdb146c..dcf4aa9 100644 --- a/apps/web/src/components/organisms/chat-view/hooks/use-chat-stream.ts +++ b/apps/web/src/components/organisms/chat-view/hooks/use-chat-stream.ts @@ -15,7 +15,7 @@ interface GatewayStreamEvent extends Omit { } import { useQueryClient } from "@tanstack/react-query" import type { FileUIPart } from "ai" -import { useCallback, useState } from "react" +import { useCallback, useRef, useState } from "react" import { toast } from "sonner" import type { ChatAttachment, @@ -40,6 +40,7 @@ type UseChatStreamReturn = { streamingreasoning: string streamingartifacts: FileArtifact[] handlesubmit: (message: SubmitMessage) => Promise + handlecancel: () => void } const useChatStream = ( @@ -51,6 +52,9 @@ const useChatStream = ( const { mutateAsync: saveMessage } = useSaveMessage() const { data: session } = useSession() + /** Ref to the active WebSocket so handlecancel can reach it across renders. */ + const wsRef = useRef(null) + const [streaming, setStreaming] = useState(false) const [streamingcontent, setStreamingcontent] = useState("") const [streamingtoolcalls, setStreamingtoolcalls] = useState([]) @@ -138,6 +142,7 @@ const useChatStream = ( try { const wsurl = `${GATEWAY_URL.replace(/^http/, "ws")}/ws` const ws = new WebSocket(wsurl) + wsRef.current = ws let fullcontent = "" let fullreasoning = "" const toolcalls = new Map() @@ -293,6 +298,7 @@ const useChatStream = ( } ws.onclose = () => { + wsRef.current = null resolve() } }) @@ -345,6 +351,13 @@ const useChatStream = ( [conversationid, conversation, participants, queryClient, saveMessage, session], ) + const handlecancel = useCallback(() => { + const ws = wsRef.current + if (!ws || ws.readyState !== WebSocket.OPEN) return + ws.send(JSON.stringify({ type: "cancel", sessionId: conversationid })) + ws.close() + }, [conversationid]) + return { streaming, streamingcontent, @@ -353,6 +366,7 @@ const useChatStream = ( streamingreasoning, streamingartifacts, handlesubmit, + handlecancel, } } diff --git a/apps/web/src/components/organisms/chat-view/index.tsx b/apps/web/src/components/organisms/chat-view/index.tsx index b91c619..f2d55ef 100644 --- a/apps/web/src/components/organisms/chat-view/index.tsx +++ b/apps/web/src/components/organisms/chat-view/index.tsx @@ -29,6 +29,7 @@ const ChatView = () => { streamingreasoning, streamingartifacts, handlesubmit, + handlecancel, } = useChatStream(conversationid, conversation, participants) const { artifacts, hasfiles } = useSessionArtifacts(conversationid, streamingartifacts) @@ -94,6 +95,7 @@ const ChatView = () => {
} streaming={streaming} @@ -157,6 +159,7 @@ const ChatView = () => {
} streaming={streaming} diff --git a/apps/web/src/components/organisms/chat-view/prompt-input.tsx b/apps/web/src/components/organisms/chat-view/prompt-input.tsx index d925a04..4c14aae 100644 --- a/apps/web/src/components/organisms/chat-view/prompt-input.tsx +++ b/apps/web/src/components/organisms/chat-view/prompt-input.tsx @@ -27,11 +27,13 @@ import { useCallback, useRef, useState } from "react" const PromptInput = ({ handlesubmit, + handlecancel, hasmessages, textarearef, streaming, }: { handlesubmit: (msg: { text: string; files: FileUIPart[] }) => void + handlecancel?: () => void hasmessages: boolean textarearef: React.RefObject streaming: boolean @@ -136,7 +138,17 @@ const PromptInput = ({ - + { + e.preventDefault() + handlecancel() + } + : undefined + } + />
diff --git a/infra/openshell/Dockerfile b/infra/openshell/Dockerfile index a5d5022..22d0f06 100644 --- a/infra/openshell/Dockerfile +++ b/infra/openshell/Dockerfile @@ -78,7 +78,8 @@ RUN npm install -g \ chart.js \ chartjs-node-canvas \ d3 \ - exceljs + exceljs \ + pi-interactive-shell # --------------------------------------------------------------------------- # Stage 2: Runtime @@ -97,6 +98,7 @@ FROM node:22-slim # tini: PID 1 / signal handling. # git, curl, wget, jq, less, tree, unzip, zip: general-purpose CLI tools for the agent. RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ curl \ git \ iproute2 \ diff --git a/packages/gateway/src/app.ts b/packages/gateway/src/app.ts index 172d0e0..d3ce2b8 100644 --- a/packages/gateway/src/app.ts +++ b/packages/gateway/src/app.ts @@ -303,6 +303,15 @@ export const createApp = ( return c.json({ role: "assistant", content: text }) }) + app.post("/api/v1/sessions/:id/cancel", requirePermission("sessions", "write"), async (c) => { + const session = sessionManager.getSession(c.req.param("id")) + if (!session) { + return c.json({ error: "Session not found" }, 404) + } + const cancelled = await sessionManager.cancelSession(c.req.param("id")) + return c.json({ ok: true, cancelled }) + }) + app.get("/api/v1/sessions/:id/messages", requirePermission("sessions", "read"), (c) => { const session = sessionManager.getSession(c.req.param("id")) if (!session) { diff --git a/packages/gateway/src/session-manager.ts b/packages/gateway/src/session-manager.ts index 0bacf64..feb5764 100644 --- a/packages/gateway/src/session-manager.ts +++ b/packages/gateway/src/session-manager.ts @@ -249,6 +249,20 @@ export class SessionManager { return this.sessions.get(id)?.workspaceDir } + /** + * Cancel the active turn for a session. + * + * In orchestrator mode, delegates to the sandbox-server's cancel endpoint. + * In local mode, cancellation is handled via the AbortSignal passed to + * sendMessage (the WebSocket handler in ws.ts manages those controllers). + */ + async cancelSession(id: string): Promise { + if (this.orchestrator) { + return this.orchestrator.cancelSession(id) + } + return false + } + deleteSession(id: string): boolean { if (this.orchestrator) { const session = this.sessions.get(id) diff --git a/packages/sandbox-server/src/agent.ts b/packages/sandbox-server/src/agent.ts index 1e3a67d..6bbf331 100644 --- a/packages/sandbox-server/src/agent.ts +++ b/packages/sandbox-server/src/agent.ts @@ -98,6 +98,8 @@ export class SandboxAgentManager { private snapshots = new Map>() /** Human-readable folder name per session, derived from the first user message. */ private sessionLabels = new Map() + /** Abort controllers for active turns, keyed by sessionId. */ + private activeTurnControllers = new Map() /** * Create a new agent session. @@ -163,6 +165,19 @@ export class SandboxAgentManager { this.sessionLabels.set(sessionId, sanitizeFolderName(content, sessionId)) } + // Create an internal controller for this turn so cancelSession() can abort + // it independently of the SSE stream disconnect signal. + const turnController = new AbortController() + this.activeTurnControllers.set(sessionId, turnController) + + // Propagate the external abort signal (SSE client disconnect) into the turn controller. + const onExternalAbort = () => turnController.abort() + if (signal?.aborted) { + turnController.abort() + } else { + signal?.addEventListener("abort", onExternalAbort, { once: true }) + } + // Take initial snapshot for artifact detection let snapshot = this.snapshots.get(sessionId) ?? createSnapshot(WORKSPACE_DIR) @@ -170,48 +185,53 @@ export class SandboxAgentManager { let yieldCount = 0 log.info("[DIAG-AGM] starting for-await on session.sendMessage()", { sessionId }) - for await (const event of session.sendMessage(content, signal)) { - yieldCount++ - if (event.type !== "message_update" && event.type !== "thinking_update" && event.type !== "tool_call_update") { - log.info("[DIAG-AGM] received event from PiAgentSession", { - sessionId, - yieldCount, - type: event.type, - ms: Date.now() - t0, - }) - } - yield event - - // After a tool call ends, scan for new output files - if (event.type === "tool_call_end") { - const scanStart = Date.now() - const result = this.scanForArtifacts(sessionId, snapshot) - const scanMs = Date.now() - scanStart - if (scanMs > 100) { - log.warn("[DIAG-AGM] slow artifact scan", { sessionId, scanMs }) + try { + for await (const event of session.sendMessage(content, turnController.signal)) { + yieldCount++ + if (event.type !== "message_update" && event.type !== "thinking_update" && event.type !== "tool_call_update") { + log.info("[DIAG-AGM] received event from PiAgentSession", { + sessionId, + yieldCount, + type: event.type, + ms: Date.now() - t0, + }) } - if (result) { - snapshot = result.newSnapshot - yield { type: "file_output", artifacts: result.artifacts } + yield event + + // After a tool call ends, scan for new output files + if (event.type === "tool_call_end") { + const scanStart = Date.now() + const result = this.scanForArtifacts(sessionId, snapshot) + const scanMs = Date.now() - scanStart + if (scanMs > 100) { + log.warn("[DIAG-AGM] slow artifact scan", { sessionId, scanMs }) + } + if (result) { + snapshot = result.newSnapshot + yield { type: "file_output", artifacts: result.artifacts } + } } } - } - log.info("[DIAG-AGM] for-await loop completed", { - sessionId, - yieldCount, - durationMs: Date.now() - t0, - }) + log.info("[DIAG-AGM] for-await loop completed", { + sessionId, + yieldCount, + durationMs: Date.now() - t0, + }) + + // Final scan after the turn completes to catch stragglers + const finalResult = this.scanForArtifacts(sessionId, snapshot) + if (finalResult) { + snapshot = finalResult.newSnapshot + yield { type: "file_output", artifacts: finalResult.artifacts } + } - // Final scan after the turn completes to catch stragglers - const finalResult = this.scanForArtifacts(sessionId, snapshot) - if (finalResult) { - snapshot = finalResult.newSnapshot - yield { type: "file_output", artifacts: finalResult.artifacts } + // Persist snapshot for next turn + this.snapshots.set(sessionId, snapshot) + } finally { + signal?.removeEventListener("abort", onExternalAbort) + this.activeTurnControllers.delete(sessionId) } - - // Persist snapshot for next turn - this.snapshots.set(sessionId, snapshot) } /** @@ -237,6 +257,24 @@ export class SandboxAgentManager { return { newSnapshot, artifacts } } + /** + * Cancel the active turn for a session. + * + * Aborts the in-flight LLM call / tool execution without destroying the + * session or its message history. The session remains usable for further + * messages after cancellation. + * + * Returns true if there was an active turn to cancel, false if the session + * has no in-progress turn. + */ + cancelSession(sessionId: string): boolean { + const controller = this.activeTurnControllers.get(sessionId) + if (!controller) return false + controller.abort() + this.activeTurnControllers.delete(sessionId) + return true + } + /** * Check if a session exists. */ @@ -248,6 +286,7 @@ export class SandboxAgentManager { * Delete a session. */ deleteSession(sessionId: string): boolean { + this.cancelSession(sessionId) this.snapshots.delete(sessionId) this.sessionLabels.delete(sessionId) return this.sessions.delete(sessionId) diff --git a/packages/sandbox-server/src/server.ts b/packages/sandbox-server/src/server.ts index 52fb5d7..e0d49b5 100644 --- a/packages/sandbox-server/src/server.ts +++ b/packages/sandbox-server/src/server.ts @@ -414,20 +414,19 @@ export function createSandboxApp(): Hono { }) /** - * POST /sessions/:id/cancel -- cancel the current turn. + * POST /sessions/:id/cancel -- cancel the active turn for a session. * - * This is a placeholder. The actual cancellation happens when the client - * disconnects from the SSE stream (abort signal fires). + * Aborts any in-flight LLM call or tool execution for the session without + * destroying the session or its message history. The session remains usable + * for further messages after cancellation. */ app.post("/sessions/:id/cancel", (c) => { const sessionId = c.req.param("id") if (!agent.hasSession(sessionId)) { return c.json({ error: "Session not found" }, 404) } - // Cancellation is handled by the SSE abort signal in sendMessage. - // A dedicated cancel mechanism would require tracking active generators, - // which will be added if needed. - return c.json({ ok: true }) + const cancelled = agent.cancelSession(sessionId) + return c.json({ ok: true, cancelled }) }) // -----------------------------------------------------------------------