Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ interface GatewayStreamEvent extends Omit<AgentStreamEvent, "type"> {
}
import { useQueryClient } from "@tanstack/react-query"
import type { FileUIPart } from "ai"
import { useCallback, useRef, useState } from "react"
import { useCallback, useEffect, useRef, useState } from "react"
import { toast } from "sonner"
import type {
ChatAttachment,
Expand All @@ -39,6 +39,8 @@ type UseChatStreamReturn = {
streamingsegments: MessageSegment[]
streamingreasoning: string
streamingartifacts: FileArtifact[]
/** Messages queued as follow-ups during an active stream. Cleared when the turn ends. */
queuedmessages: string[]
handlesubmit: (message: SubmitMessage) => Promise<void>
handlecancel: () => void
}
Expand All @@ -54,18 +56,51 @@ const useChatStream = (

/** Ref to the active WebSocket so handlecancel can reach it across renders. */
const wsRef = useRef<WebSocket | null>(null)
/**
* Segments accumulated during the active turn (text, tool, files, steer).
* Kept as a ref so the steer path (a separate handlesubmit invocation) can
* append to the same array that the streaming closure uses, preserving timeline order.
*/
const segmentsRef = useRef<MessageSegment[]>([])
/** Messages queued during an active stream, drained one-at-a-time as fresh turns after the stream ends. */
const queuedRef = useRef<string[]>([])

const [streaming, setStreaming] = useState(false)
const [streamingcontent, setStreamingcontent] = useState("")
const [streamingtoolcalls, setStreamingtoolcalls] = useState<StreamToolCall[]>([])
const [streamingsegments, setStreamingsegments] = useState<MessageSegment[]>([])
const [streamingreasoning, setStreamingreasoning] = useState("")
const [streamingartifacts, setStreamingartifacts] = useState<FileArtifact[]>([])
const [queuedmessages, setQueuedmessages] = useState<string[]>([])

const handlesubmit = useCallback(
async (message: SubmitMessage) => {
if (!message.text.trim() && message.files.length === 0) return

// During an active stream, route to steer or followUp on the existing WS.
// /btw <text> → steer (interrupt mid-turn)
// Any other text → followUp (queue for after the current turn)
if (streaming) {
const ws = wsRef.current
if (!ws || ws.readyState !== WebSocket.OPEN) return
const text = message.text.trim()
if (!text) return
if (text.startsWith("/btw ")) {
const content = text.slice(5).trim()
if (!content) return
const sentat = new Date().toISOString()
ws.send(JSON.stringify({ type: "steer", sessionId: conversationid, content, userId: session?.user?.id }))
// Push into the shared segments ref so it's saved with the agent message
segmentsRef.current.push({ type: "steer", content, sentat })
setStreamingsegments([...segmentsRef.current])
} else {
// Queue locally; auto-submitted as a fresh turn after the current stream ends
queuedRef.current = [...queuedRef.current, text]
setQueuedmessages([...queuedRef.current])
}
return
}

const userid = conversation?.createdby || "unknown"

// Optimistically insert the user message into the query cache
Expand Down Expand Up @@ -138,6 +173,7 @@ const useChatStream = (
setStreamingsegments([])
setStreamingreasoning("")
setStreamingartifacts([])
segmentsRef.current = []

try {
const wsurl = `${GATEWAY_URL.replace(/^http/, "ws")}/ws`
Expand All @@ -146,15 +182,14 @@ const useChatStream = (
let fullcontent = ""
let fullreasoning = ""
const toolcalls = new Map<string, StreamToolCall>()
const segments: MessageSegment[] = []
const allartifacts: FileArtifact[] = []

const updatetoolcalls = () => {
setStreamingtoolcalls(Array.from(toolcalls.values()))
}

const updatesegments = () => {
setStreamingsegments([...segments])
setStreamingsegments([...segmentsRef.current])
}

await new Promise<void>((resolve, reject) => {
Expand Down Expand Up @@ -195,11 +230,11 @@ const useChatStream = (
if (evt.text) {
fullcontent += evt.text
setStreamingcontent(fullcontent)
const last = segments[segments.length - 1]
const last = segmentsRef.current[segmentsRef.current.length - 1]
if (last?.type === "text") {
last.content += evt.text
} else {
segments.push({ type: "text", content: evt.text })
segmentsRef.current.push({ type: "text", content: evt.text })
}
updatesegments()
}
Expand All @@ -222,7 +257,7 @@ const useChatStream = (
args: parsedargs,
state: "calling",
})
segments.push({ type: "tool", toolcallid: toolCallId })
segmentsRef.current.push({ type: "tool", toolcallid: toolCallId })
updatetoolcalls()
updatesegments()
}
Expand Down Expand Up @@ -256,7 +291,7 @@ const useChatStream = (
result: toolResult,
iserror: isToolError,
})
segments.push({ type: "tool", toolcallid: toolCallId })
segmentsRef.current.push({ type: "tool", toolcallid: toolCallId })
updatesegments()
}
updatetoolcalls()
Expand All @@ -268,7 +303,7 @@ const useChatStream = (
allartifacts.push(...evt.artifacts)
setStreamingartifacts([...allartifacts])
// Add a files segment to the chat stream
segments.push({ type: "files", artifacts: [...evt.artifacts] })
segmentsRef.current.push({ type: "files", artifacts: [...evt.artifacts] })
updatesegments()
}
break
Expand Down Expand Up @@ -313,18 +348,17 @@ const useChatStream = (
}))

try {
const savedSegments = segmentsRef.current
await saveMessage({
conversationid,
payload: {
sendertype: "agent",
senderid: agentparticipant.participantid,
content: fullcontent,
metadata:
toolcalls.size > 0
? { toolcalls: Array.from(toolcalls.values()), segments }
: segments.length > 0
? { segments }
: {},
metadata: {
...(toolcalls.size > 0 ? { toolcalls: Array.from(toolcalls.values()) } : {}),
...(savedSegments.length > 0 ? { segments: savedSegments } : {}),
},
attachments: artifactattachments.length > 0 ? artifactattachments : undefined,
},
})
Expand All @@ -348,23 +382,35 @@ const useChatStream = (
setStreamingreasoning("")
setStreamingartifacts([])
},
[conversationid, conversation, participants, queryClient, saveMessage, session],
[conversationid, conversation, participants, queryClient, saveMessage, session, streaming],
)

const handlecancel = useCallback(() => {
const ws = wsRef.current
if (!ws || ws.readyState !== WebSocket.OPEN) return
ws.send(JSON.stringify({ type: "cancel", sessionId: conversationid }))
ws.close()
queuedRef.current = []
setQueuedmessages([])
}, [conversationid])

/** Drain one queued message as a fresh turn each time streaming ends. */
useEffect(() => {
if (streaming || queuedRef.current.length === 0) return
const [next, ...rest] = queuedRef.current
queuedRef.current = rest
setQueuedmessages(rest)
handlesubmit({ text: next, files: [] })
}, [streaming, handlesubmit])

return {
streaming,
streamingcontent,
streamingtoolcalls,
streamingsegments,
streamingreasoning,
streamingartifacts,
queuedmessages,
handlesubmit,
handlecancel,
}
Expand Down
14 changes: 14 additions & 0 deletions apps/web/src/components/organisms/chat-view/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const ChatView = () => {
streamingsegments,
streamingreasoning,
streamingartifacts,
queuedmessages,
handlesubmit,
handlecancel,
} = useChatStream(conversationid, conversation, participants)
Expand Down Expand Up @@ -143,6 +144,19 @@ const ChatView = () => {
onPreviewArtifact={handlePreviewArtifact}
/>
)}

{queuedmessages.length > 0 && (
<div className="flex flex-col gap-1 pl-10">
{queuedmessages.map((msg, i) => (
<div
key={i}
className="self-end max-w-[80%] rounded-2xl rounded-br-sm bg-muted px-4 py-2 text-sm text-muted-foreground opacity-60"
>
{msg}
</div>
))}
</div>
)}
</ConversationContent>
<ConversationScrollButton />
</Conversation>
Expand Down
16 changes: 13 additions & 3 deletions apps/web/src/components/organisms/chat-view/prompt-input.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ const PromptInput = ({
<PromptInputComponent onSubmit={wrappedHandleSubmit} className="rounded-2xl border shadow-lg">
<PromptInputAttachments>{(file) => <PromptInputAttachment data={file} />}</PromptInputAttachments>
<PromptInputTextarea
placeholder={hasmessages ? "Type a message..." : "Ask anything..."}
placeholder={
streaming
? "Type /btw to redirect, or queue a follow-up..."
: hasmessages
? "Type a message..."
: "Ask anything..."
}
ref={textarearef}
value={inputValue}
onChange={(e) => setInputValue(e.target.value)}
Expand Down Expand Up @@ -143,8 +149,12 @@ const PromptInput = ({
onClick={
streaming && handlecancel
? (e) => {
e.preventDefault()
handlecancel()
// Empty input while streaming → cancel the stream
// Non-empty input while streaming → submit as steer/followUp
if (!inputValue.trim()) {
e.preventDefault()
handlecancel()
}
}
: undefined
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ const RenderAgentContent = ({ message, onPreviewArtifact }: RenderAgentContentPr
if (seg.type === "files") {
return <ArtifactCardList key={`files-${i}`} artifacts={seg.artifacts} onPreview={onPreviewArtifact} />
}
if (seg.type === "steer") {
return (
<div key={i} className="flex items-center gap-1.5 text-xs text-muted-foreground italic py-0.5">
<span className="font-medium not-italic">↩</span>
{seg.content}
</div>
)
}
const tool = toolmap.get(seg.toolcallid)
if (!tool) return null
return <ToolActivityPill key={seg.toolcallid} tool={tool} />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ const StreamingResponse = ({
if (seg.type === "files") {
return <ArtifactCardList key={`files-${i}`} artifacts={seg.artifacts} onPreview={onPreviewArtifact} />
}
if (seg.type === "steer") {
return (
<div key={i} className="flex items-center gap-1.5 text-xs text-muted-foreground italic py-0.5">
<span className="font-medium not-italic">↩</span>
{seg.content}
</div>
)
}
const tool = toolmap.get(seg.toolcallid)
if (!tool) return null
return <ToolActivityPill key={seg.toolcallid} tool={tool} />
Expand Down
1 change: 1 addition & 0 deletions apps/web/src/components/organisms/chat-view/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ export type MessageSegment =
| { type: "text"; content: string }
| { type: "tool"; toolcallid: string }
| { type: "files"; artifacts: FileArtifact[] }
| { type: "steer"; content: string; sentat: string }
10 changes: 10 additions & 0 deletions packages/agents/src/pi.agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ class PiAgentSession implements AgentSession {
}
}

async steer(content: string): Promise<void> {
const session = await this.sessionPromise
await session.steer(content)
}

async followUp(content: string): Promise<void> {
const session = await this.sessionPromise
await session.followUp(content)
}

getMessages(): AgentMessage[] {
return this.messages
}
Expand Down
11 changes: 11 additions & 0 deletions packages/agents/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ export interface AgentSession {
sendMessage(content: string, signal?: AbortSignal): AsyncGenerator<AgentStreamEvent>
/** Get all messages in this session. */
getMessages(): AgentMessage[]
/**
* Queue a steering message while the agent is running.
* Delivered after the current tool calls finish, before the next LLM call.
* Use to redirect the agent mid-turn without waiting for it to finish.
*/
steer(content: string): Promise<void>
/**
* Queue a follow-up message to be processed after the agent fully finishes.
* Use to chain the next task naturally after the current turn concludes.
*/
followUp(content: string): Promise<void>
}

/** Provider that creates agent sessions. */
Expand Down
2 changes: 2 additions & 0 deletions packages/gateway/src/session-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class StubSession implements AgentSession {
getMessages() {
return []
}
async steer(_content: string): Promise<void> {}
async followUp(_content: string): Promise<void> {}
}

/** Stub provider that captures createSession calls for inspection. */
Expand Down
33 changes: 33 additions & 0 deletions packages/gateway/src/session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,39 @@ export class SessionManager {
return this.sessions.get(id)?.workspaceDir
}

/**
* Deliver a steering message to an active session turn.
*
* Interrupts the agent mid-turn (after current tool calls, before next LLM call).
* No-op if the session does not exist or no turn is active.
*/
async steer(sessionId: string, content: string, userId?: string): Promise<void> {
if (this.orchestrator) {
if (!userId) return
await this.orchestrator.steer(sessionId, userId, content)
return
}
const state = this.sessions.get(sessionId)
if (!state) return
await state.agentSession.steer(content)
}

/**
* Queue a follow-up message to be processed after the current agent turn ends.
*
* No-op if the session does not exist.
*/
async followUp(sessionId: string, content: string, userId?: string): Promise<void> {
if (this.orchestrator) {
if (!userId) return
await this.orchestrator.followUp(sessionId, userId, content)
return
}
const state = this.sessions.get(sessionId)
if (!state) return
await state.agentSession.followUp(content)
}

/**
* Cancel the active turn for a session.
*
Expand Down
2 changes: 2 additions & 0 deletions packages/gateway/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ export type WsClientMessage =
attachments?: WsAttachment[]
}
| { type: "cancel"; sessionId: string }
| { type: "steer"; sessionId: string; content: string; userId?: string }
| { type: "followUp"; sessionId: string; content: string; userId?: string }
| { type: "ping" }

/** A file attachment included in a WebSocket message. */
Expand Down
Loading
Loading