diff --git a/core/frontend/src/components/ChatPanel.tsx b/core/frontend/src/components/ChatPanel.tsx index 16a3189779..86ab1c750e 100644 --- a/core/frontend/src/components/ChatPanel.tsx +++ b/core/frontend/src/components/ChatPanel.tsx @@ -8,6 +8,7 @@ import { Loader2, Paperclip, X, + ChevronDown, } from "lucide-react"; export interface ImageContent { @@ -39,7 +40,6 @@ export interface ChatMessage { | "agent" | "user" | "tool_status" - | "worker_input_request" | "run_divider"; role?: "queen" | "worker"; /** Which worker thread this message belongs to (worker agent name) */ @@ -89,6 +89,10 @@ interface ChatPanelProps { queenPhase?: "planning" | "building" | "staging" | "running"; /** Context window usage for queen and workers */ contextUsage?: Record; + /** Epoch ms when the current worker run started — for elapsed timer */ + workerStartedAt?: number; + /** Duration in ms of the last completed worker run */ + workerDuration?: number; } const queenColor = "hsl(45,95%,58%)"; @@ -204,6 +208,148 @@ function ToolActivityRow({ content }: { content: string }) { ); } +function formatDuration(ms: number): string { + const totalSecs = Math.floor(ms / 1000); + const mins = Math.floor(totalSecs / 60); + const secs = totalSecs % 60; + return mins > 0 ? `${mins}m ${secs}s` : `${secs}s`; +} + +const WorkerGroupBubble = memo( + function WorkerGroupBubble({ + messages, + isActive, + workerStartedAt, + workerDuration, + }: { + workerName: string; + messages: ChatMessage[]; + isActive?: boolean; + workerStartedAt?: number; + workerDuration?: number; + }) { + const [collapsed, setCollapsed] = useState(true); + const [elapsed, setElapsed] = useState(0); + + // Auto-expand the latest group while worker is actively streaming + useEffect(() => { + if (isActive) setCollapsed(false); + }, [isActive]); + + useEffect(() => { + if (!isActive || workerStartedAt == null) { setElapsed(0); return; } + setElapsed(Math.floor((Date.now() - workerStartedAt) / 1000)); + const id = setInterval(() => { + setElapsed(Math.floor((Date.now() - workerStartedAt) / 1000)); + }, 1000); + return () => clearInterval(id); + }, [isActive, workerStartedAt]); + + const stepCount = messages.filter((m) => m.role === "worker").length; + const color = workerColor; + const lastAgent = [...messages].reverse().find((m) => m.type !== "tool_status")?.agent; + + const label = isActive + ? workerStartedAt != null + ? `Working for ${formatDuration(elapsed * 1000)}` + : "Working" + : workerDuration != null + ? `Worked for ${formatDuration(workerDuration)}` + : "Worked"; + + return ( +
+ + + + {!collapsed && ( +
+ {messages.map((m, idx) => { + if (m.type === "tool_status") return ; + const filtered = messages.slice(0, idx).filter((x) => x.type !== "tool_status"); + const prevContent = filtered[filtered.length - 1]; + const isContinuation = prevContent != null && prevContent.agent === m.agent; + return ( +
+
+ {isContinuation ? ( +
+ ) : ( +
+ +
+ )} +
+
+ {!isContinuation && ( +
+ {isActive && m.agent === lastAgent ? ( + {m.agent} + ) : ( + {m.agent} + )} + + Worker + +
+ )} +
+ +
+
+
+ ); + })} +
+ )} +
+ ); + }, + (prev, next) => + prev.workerName === next.workerName && + prev.isActive === next.isActive && + prev.workerStartedAt === next.workerStartedAt && + prev.workerDuration === next.workerDuration && + prev.messages.length === next.messages.length && + prev.messages[prev.messages.length - 1]?.content === + next.messages[next.messages.length - 1]?.content && + prev.messages.filter(m => m.type === "tool_status").map(m => m.content).join("\0") === + next.messages.filter(m => m.type === "tool_status").map(m => m.content).join("\0"), +); + + const MessageBubble = memo( function MessageBubble({ msg, @@ -345,6 +491,8 @@ export default function ChatPanel({ onQuestionDismiss, queenPhase, contextUsage, + workerStartedAt, + workerDuration, supportsImages = true, }: ChatPanelProps) { const [input, setInput] = useState(""); @@ -374,7 +522,8 @@ export default function ChatPanel({ // so interleaved queen/tool/system messages don't fragment the bubble. type RenderItem = | { kind: "message"; msg: ChatMessage } - | { kind: "parallel"; groupId: string; groups: SubagentGroup[] }; + | { kind: "parallel"; groupId: string; groups: SubagentGroup[] } + | { kind: "worker-group"; groupId: string; workerName: string; messages: ChatMessage[]; isLast: boolean; workerDuration?: number }; const renderItems = useMemo(() => { const items: RenderItem[] = []; @@ -382,6 +531,30 @@ export default function ChatPanel({ while (i < threadMessages.length) { const msg = threadMessages[i]; const isSubagent = msg.nodeId?.includes(":subagent:"); + + // Group consecutive worker messages (non-subagent) into a collapsible block + if (msg.role === "worker" && !isSubagent && !msg.type) { + const firstId = msg.id; + const workerName = msg.agent || msg.thread || "Worker"; + const groupMsgs: ChatMessage[] = []; + while (i < threadMessages.length) { + const m = threadMessages[i]; + if (m.nodeId?.includes(":subagent:")) break; + if (m.type === "user" || m.type === "run_divider") break; + if (m.role === "queen") break; + groupMsgs.push(m); + i++; + } + const firstCreatedAt = groupMsgs[0]?.createdAt; + const lastCreatedAt = groupMsgs[groupMsgs.length - 1]?.createdAt; + const inlineDuration = + firstCreatedAt != null && lastCreatedAt != null && lastCreatedAt > firstCreatedAt + ? lastCreatedAt - firstCreatedAt + : undefined; + items.push({ kind: "worker-group", groupId: `wg-${firstId}`, workerName, messages: groupMsgs, isLast: false, workerDuration: inlineDuration }); + continue; + } + if (!isSubagent) { items.push({ kind: "message", msg }); i++; @@ -444,6 +617,13 @@ export default function ChatPanel({ items.push({ kind: "parallel", groupId: `par-${firstId}`, groups }); } } + // Mark the last worker-group so it can auto-expand during live streaming + for (let j = items.length - 1; j >= 0; j--) { + if (items[j].kind === "worker-group") { + (items[j] as Extract).isLast = true; + break; + } + } return items; }, [threadMessages, contextUsage]); @@ -529,6 +709,16 @@ export default function ChatPanel({ groups={item.groups} />
+ ) : item.kind === "worker-group" ? ( +
+ +
) : (
@@ -696,13 +886,17 @@ export default function ChatPanel({ questions={pendingQuestions} onSubmit={onMultiQuestionSubmit} onDismiss={onQuestionDismiss} + source="queen" /> - ) : pendingQuestion && pendingOptions && onQuestionSubmit ? ( + ) : pendingQuestion && + pendingOptions && + onQuestionSubmit ? ( ) : (
diff --git a/core/frontend/src/components/MultiQuestionWidget.tsx b/core/frontend/src/components/MultiQuestionWidget.tsx index 077c1a3889..c14531fb4b 100644 --- a/core/frontend/src/components/MultiQuestionWidget.tsx +++ b/core/frontend/src/components/MultiQuestionWidget.tsx @@ -11,9 +11,11 @@ export interface MultiQuestionWidgetProps { questions: QuestionItem[]; onSubmit: (answers: Record) => void; onDismiss?: () => void; + /** Visual source: worker nodes render in indigo, queen renders in primary (yellow) */ + source?: "queen" | "worker"; } -export default function MultiQuestionWidget({ questions, onSubmit, onDismiss }: MultiQuestionWidgetProps) { +export default function MultiQuestionWidget({ questions, onSubmit, onDismiss, source = "queen" }: MultiQuestionWidgetProps) { // Per-question state: selected index (null = nothing, options.length = "Other") const [selections, setSelections] = useState<(number | null)[]>( () => questions.map(() => null), @@ -24,6 +26,29 @@ export default function MultiQuestionWidget({ questions, onSubmit, onDismiss }: const [submitted, setSubmitted] = useState(false); const containerRef = useRef(null); + const isWorker = source === "worker"; + const accent = isWorker + ? { + icon: "bg-[hsl(220,60%,55%)]/10 border-[hsl(220,60%,55%)]/20", + iconText: "text-[hsl(220,60%,55%)]", + selectedOption: "border-[hsl(220,60%,55%)] bg-[hsl(220,60%,55%)]/10", + hoverOption: "hover:border-[hsl(220,60%,55%)]/40", + selectedInput: "border-[hsl(220,60%,55%)] bg-[hsl(220,60%,55%)]/10", + hoverInput: "hover:border-[hsl(220,60%,55%)]/40", + focusInput: "focus:border-[hsl(220,60%,55%)]", + btn: "bg-[hsl(220,60%,55%)] hover:bg-[hsl(220,60%,55%)]/90 text-white", + } + : { + icon: "bg-primary/10 border-primary/20", + iconText: "text-primary", + selectedOption: "border-primary bg-primary/10", + hoverOption: "hover:border-primary/40", + selectedInput: "border-primary bg-primary/10", + hoverInput: "hover:border-primary/40", + focusInput: "focus:border-primary", + btn: "bg-primary hover:bg-primary/90 text-primary-foreground", + }; + // Scroll the first unanswered question into view when it changes useEffect(() => { containerRef.current?.scrollTo({ top: 0, behavior: "smooth" }); @@ -74,8 +99,8 @@ export default function MultiQuestionWidget({ questions, onSubmit, onDismiss }:
{/* Header */}
-
- +
+

@@ -129,8 +154,8 @@ export default function MultiQuestionWidget({ questions, onSubmit, onDismiss }: }} className={`w-full text-left px-4 py-2 rounded-lg border text-sm transition-colors ${ sel === oi - ? "border-primary bg-primary/10 text-foreground" - : "border-border/60 bg-muted/20 text-foreground hover:border-primary/40 hover:bg-muted/40" + ? `${accent.selectedOption} text-foreground` + : `border-border/60 bg-muted/20 text-foreground ${accent.hoverOption} hover:bg-muted/40` }`} > {opt} @@ -161,8 +186,8 @@ export default function MultiQuestionWidget({ questions, onSubmit, onDismiss }: placeholder="Type a custom response..." className={`w-full px-4 py-2 rounded-lg border border-dashed text-sm transition-colors bg-transparent placeholder:text-muted-foreground focus:outline-none ${ isOtherSelected - ? "border-primary bg-primary/10 text-foreground" - : "border-border text-muted-foreground hover:border-primary/40" + ? `${accent.selectedInput} text-foreground` + : `border-border text-muted-foreground ${accent.hoverInput}` }`} /> @@ -190,7 +215,7 @@ export default function MultiQuestionWidget({ questions, onSubmit, onDismiss }: }); }} placeholder="Type your answer..." - className="w-full px-4 py-2 rounded-lg border text-sm transition-colors bg-transparent placeholder:text-muted-foreground focus:outline-none border-border text-foreground hover:border-primary/40 focus:border-primary" + className={`w-full px-4 py-2 rounded-lg border text-sm transition-colors bg-transparent placeholder:text-muted-foreground focus:outline-none border-border text-foreground ${accent.hoverInput} ${accent.focusInput}`} /> )}

@@ -203,7 +228,7 @@ export default function MultiQuestionWidget({ questions, onSubmit, onDismiss }:
+
); } diff --git a/core/frontend/src/pages/workspace.tsx b/core/frontend/src/pages/workspace.tsx index 25d396228c..7f352d1446 100644 --- a/core/frontend/src/pages/workspace.tsx +++ b/core/frontend/src/pages/workspace.tsx @@ -256,6 +256,10 @@ type SessionRestoreResult = { flowchartMap: Record | null; /** Last original draft from events — used to restore flowchart overlay on cold resume. */ originalDraft: DraftGraphData | null; + /** Set if last worker run never completed (in-progress when session ended) */ + workerStartedAt?: number; + /** Set if last worker run completed */ + workerDuration?: number; }; /** @@ -274,7 +278,24 @@ async function restoreSessionMessages( let runningPhase: ChatMessage["phase"] = undefined; let flowchartMap: Record | null = null; let originalDraft: DraftGraphData | null = null; + let lastWorkerStartedAt: number | undefined; + let lastWorkerEndedAt: number | undefined; + + // Tool pill reconstruction accumulators + type PillGroup = { + toolsMap: Map; + createdAt: number; + agent: string; + role: ChatMessage["role"]; + nodeId?: string; + executionId?: string; + }; + const toolPillMap = new Map(); + const loopCounters = new Map(); // executionId → loop iteration count + for (const evt of events) { + const evtCreatedAt = evt.timestamp ? new Date(evt.timestamp).getTime() : Date.now(); + // Track phase transitions so each message gets the phase it was created in const p = evt.type === "queen_phase_changed" ? evt.data?.phase as string : evt.type === "node_loop_iteration" ? evt.data?.phase as string | undefined @@ -288,7 +309,77 @@ async function restoreSessionMessages( flowchartMap = mapData.map ?? null; originalDraft = mapData.original_draft ?? null; } - const msg = sseEventToChatMessage(evt, thread, agentDisplayName); + + // Track worker execution timing for timer reconstruction + if (evt.type === "execution_started" && evt.stream_id !== "queen") { + lastWorkerStartedAt = evtCreatedAt; + lastWorkerEndedAt = undefined; + } + if ( + (evt.type === "execution_completed" || evt.type === "execution_paused" || evt.type === "execution_failed") + && evt.stream_id !== "queen" + ) { + lastWorkerEndedAt = evtCreatedAt; + } + + // Track loop iterations to match live pill key format + if (evt.type === "node_loop_iteration" && evt.execution_id) { + loopCounters.set(evt.execution_id, (loopCounters.get(evt.execution_id) || 0) + 1); + } + + // Reconstruct tool pills inline (sseEventToChatMessage drops these events) + if ((evt.type === "tool_call_started" || evt.type === "tool_call_completed") && evt.node_id) { + const sid = evt.stream_id || ""; + const execId = evt.execution_id || "exec"; + const loopIter = loopCounters.get(execId) || 0; + const pillKey = `${sid}-${execId}-${loopIter}`; + const toolName = (evt.data?.tool_name as string) || "unknown"; + const toolUseId = (evt.data?.tool_use_id as string) || toolName; + const nodeLabel = formatAgentDisplayName(evt.node_id); + const msgRole: ChatMessage["role"] = evt.stream_id === "queen" ? "queen" : "worker"; + const msgId = `tool-pill-${pillKey}`; + + if (evt.type === "tool_call_started") { + const group: PillGroup = toolPillMap.get(pillKey) || { + toolsMap: new Map(), createdAt: evtCreatedAt, agent: nodeLabel, + role: msgRole, nodeId: evt.node_id, executionId: execId, + }; + group.toolsMap.set(toolUseId, { name: toolName, done: false }); + toolPillMap.set(pillKey, group); + + const tools = [...group.toolsMap.values()]; + const toolMsg: ChatMessage = { + id: msgId, agent: nodeLabel, agentColor: "", + content: JSON.stringify({ tools, allDone: false }), + timestamp: "", type: "tool_status", role: msgRole, thread, + createdAt: evtCreatedAt, nodeId: evt.node_id, executionId: execId, + }; + const existingIdx = messages.findIndex(m => m.id === msgId); + if (existingIdx >= 0) messages[existingIdx] = toolMsg; else messages.push(toolMsg); + } else { + const group = toolPillMap.get(pillKey); + if (group) { + const entry = group.toolsMap.get(toolUseId); + if (entry) entry.done = true; + const tools = [...group.toolsMap.values()]; + const allDone = tools.length > 0 && tools.every(t => t.done); + const toolMsg: ChatMessage = { + id: msgId, agent: group.agent, agentColor: "", + content: JSON.stringify({ tools, allDone }), + timestamp: "", type: "tool_status", role: group.role, thread, + createdAt: group.createdAt, nodeId: group.nodeId, executionId: group.executionId, + }; + const existingIdx = messages.findIndex(m => m.id === msgId); + if (existingIdx >= 0) messages[existingIdx] = toolMsg; else messages.push(toolMsg); + } + } + continue; // don't pass these to sseEventToChatMessage + } + + const nodeLabel = evt.stream_id !== "queen" && evt.node_id + ? formatAgentDisplayName(evt.node_id) + : (evt.stream_id === "queen" ? "Queen Bee" : agentDisplayName); + const msg = sseEventToChatMessage(evt, thread, nodeLabel); if (!msg) continue; if (evt.stream_id === "queen") { msg.role = "queen"; @@ -296,7 +387,20 @@ async function restoreSessionMessages( } messages.push(msg); } - return { messages, restoredPhase: runningPhase ?? null, flowchartMap, originalDraft }; + + // Compute timer result from tracking + let workerStartedAt: number | undefined; + let workerDuration: number | undefined; + if (lastWorkerStartedAt != null) { + if (lastWorkerEndedAt != null) { + workerDuration = lastWorkerEndedAt - lastWorkerStartedAt; + } else { + // Run was still in progress when the session ended + workerStartedAt = lastWorkerStartedAt; + } + } + + return { messages, restoredPhase: runningPhase ?? null, flowchartMap, originalDraft, workerStartedAt, workerDuration }; } } catch { // Event log not available — session will start fresh. @@ -356,6 +460,10 @@ interface AgentBackendState { contextUsage: Record; /** Whether the queen's LLM supports image content — false disables the attach button */ queenSupportsImages: boolean; + /** Epoch ms when the current worker execution started — drives elapsed timer */ + workerStartedAt?: number; + /** Duration in ms of the last completed worker run — shown as "Worked for X" */ + workerDuration?: number; } function defaultAgentState(): AgentBackendState { @@ -632,6 +740,9 @@ export default function Workspace() { // Using a ref avoids stale-closure bugs when multiple SSE events // arrive in the same React batch. const turnCounterRef = useRef>({}); + // Tracks active tool calls per agent synchronously — avoids stale agentStates closure + // in tool_call_completed (which fires before tool_call_started's setState commits). + const activeToolCallsRef = useRef>>({}); // Per-agent queen phase ref — used to stamp each message with the phase // it was created in (avoids stale-closure when phase change and message // events arrive in the same React batch). @@ -823,6 +934,8 @@ export default function Workspace() { let restoredPhase: "planning" | "building" | "staging" | "running" | null = null; let restoredFlowchartMap: Record | null = null; let restoredOriginalDraft: DraftGraphData | null = null; + let restoredWorkerStartedAt: number | undefined; + let restoredWorkerDuration: number | undefined; if (!liveSession) { // Fetch conversation history from disk BEFORE creating the new session. // SKIP if messages were already pre-populated by handleHistoryOpen. @@ -836,6 +949,8 @@ export default function Workspace() { restoredPhase = restored.restoredPhase; restoredFlowchartMap = restored.flowchartMap; restoredOriginalDraft = restored.originalDraft; + restoredWorkerStartedAt = restored.workerStartedAt; + restoredWorkerDuration = restored.workerDuration; } catch { // Not available — will start fresh } @@ -847,6 +962,8 @@ export default function Workspace() { restoredPhase = restored.restoredPhase; restoredFlowchartMap = restored.flowchartMap; restoredOriginalDraft = restored.originalDraft; + restoredWorkerStartedAt = restored.workerStartedAt; + restoredWorkerDuration = restored.workerDuration; } catch { // Not critical — UI will still show cached messages } @@ -930,6 +1047,8 @@ export default function Workspace() { // Restore flowchart overlay from persisted events ...(restoredFlowchartMap ? { flowchartMap: restoredFlowchartMap } : {}), ...(restoredOriginalDraft ? { originalDraft: restoredOriginalDraft, draftGraph: null } : {}), + ...(restoredWorkerStartedAt != null ? { workerStartedAt: restoredWorkerStartedAt } : {}), + ...(restoredWorkerDuration != null ? { workerDuration: restoredWorkerDuration } : {}), }); } catch (err: unknown) { const msg = err instanceof Error ? err.message : String(err); @@ -1006,6 +1125,8 @@ export default function Workspace() { let restoredPhase: "planning" | "building" | "staging" | "running" | null = null; let restoredFlowchartMap: Record | null = null; let restoredOriginalDraft: DraftGraphData | null = null; + let restoredWorkerStartedAt: number | undefined; + let restoredWorkerDuration: number | undefined; if (!liveSession) { // Reconnect failed — clear stale cached messages from localStorage restore. @@ -1035,6 +1156,8 @@ export default function Workspace() { restoredPhase = restored.restoredPhase; restoredFlowchartMap = restored.flowchartMap; restoredOriginalDraft = restored.originalDraft; + restoredWorkerStartedAt = restored.workerStartedAt; + restoredWorkerDuration = restored.workerDuration; } else if (coldRestoreId && alreadyHasMessages) { // Messages already cached — still fetch events for non-message state (phase, flowchart) try { @@ -1043,6 +1166,8 @@ export default function Workspace() { restoredPhase = restored.restoredPhase; restoredFlowchartMap = restored.flowchartMap; restoredOriginalDraft = restored.originalDraft; + restoredWorkerStartedAt = restored.workerStartedAt; + restoredWorkerDuration = restored.workerDuration; } catch { // Not critical — UI will still show cached messages } @@ -1172,6 +1297,8 @@ export default function Workspace() { restoredFlowchartMap = restored.flowchartMap; restoredOriginalDraft = restored.originalDraft; } + restoredWorkerStartedAt = restored.workerStartedAt; + restoredWorkerDuration = restored.workerDuration; // Check worker status (needed for isWorkerRunning flag) try { @@ -1217,6 +1344,8 @@ export default function Workspace() { // Restore flowchart overlay from persisted events ...(restoredFlowchartMap ? { flowchartMap: restoredFlowchartMap } : {}), ...(restoredOriginalDraft ? { originalDraft: restoredOriginalDraft, draftGraph: null } : {}), + ...(restoredWorkerStartedAt != null ? { workerStartedAt: restoredWorkerStartedAt } : {}), + ...(restoredWorkerDuration != null ? { workerDuration: restoredWorkerDuration } : {}), }); } catch (err: unknown) { const msg = err instanceof Error ? err.message : String(err); @@ -1524,7 +1653,8 @@ export default function Workspace() { } catch { // Best-effort — queen may have already finished } - updateAgentState(activeWorker, { isTyping: false, isStreaming: false, queenIsTyping: false, workerIsTyping: false }); + const cancelStartedAt = agentStates[activeWorker]?.workerStartedAt; + updateAgentState(activeWorker, { isTyping: false, isStreaming: false, queenIsTyping: false, workerIsTyping: false, workerStartedAt: undefined, workerDuration: cancelStartedAt != null ? Date.now() - cancelStartedAt : agentStates[activeWorker]?.workerDuration }); }, [agentStates, activeWorker, updateAgentState]); // --- Node log helper (writes into agentStates) --- @@ -1653,6 +1783,7 @@ export default function Workspace() { upsertChatMessage(agentType, dividerMsg); } turnCounterRef.current[turnKey] = currentTurn + 1; + activeToolCallsRef.current[agentType] = {}; updateAgentState(agentType, { isTyping: true, isStreaming: false, @@ -1669,6 +1800,8 @@ export default function Workspace() { pendingOptions: null, pendingQuestions: null, pendingQuestionSource: null, + workerStartedAt: eventCreatedAt, + workerDuration: undefined, }); markAllNodesAs(agentType, ["running", "looping", "complete", "error"], "pending"); } @@ -1686,6 +1819,7 @@ export default function Workspace() { appendNodeLog(agentType, nid, `${ts} INFO LLM: ${truncate(text.trim(), 300)}`); } } + const completedStartedAt = agentStates[agentType]?.workerStartedAt; updateAgentState(agentType, { isTyping: false, isStreaming: false, @@ -1699,6 +1833,8 @@ export default function Workspace() { pendingOptions: null, pendingQuestions: null, pendingQuestionSource: null, + workerStartedAt: undefined, + workerDuration: completedStartedAt != null ? eventCreatedAt - completedStartedAt : agentStates[agentType]?.workerDuration, }); markAllNodesAs(agentType, ["running", "looping"], "complete"); @@ -1715,7 +1851,10 @@ export default function Workspace() { case "client_input_received": case "client_input_requested": case "llm_text_delta": { - const chatMsg = sseEventToChatMessage(event, agentType, displayName, currentTurn); + const nodeDisplayName = !isQueen && event.node_id + ? formatAgentDisplayName(event.node_id) + : displayName; + const chatMsg = sseEventToChatMessage(event, agentType, nodeDisplayName, currentTurn); if (isQueen) console.log('[QUEEN] chatMsg:', chatMsg?.id, chatMsg?.content?.slice(0, 50), 'turn:', currentTurn); if (chatMsg && !suppressQueenMessages) { // Queen emits multiple client_output_delta / llm_text_delta snapshots @@ -1754,7 +1893,7 @@ export default function Workspace() { // Mark streaming when LLM text is actively arriving if (event.type === "llm_text_delta" || event.type === "client_output_delta") { - updateAgentState(agentType, { isStreaming: true, ...(isQueen ? {} : { workerIsTyping: false }) }); + updateAgentState(agentType, { isStreaming: true }); } if (event.type === "llm_text_delta" && !isQueen && event.node_id) { @@ -1784,16 +1923,15 @@ export default function Workspace() { : null; if (isQueen) { const prompt = (event.data?.prompt as string) || ""; - const isAutoBlock = !prompt && !options && !questions; - // Queen auto-block (empty prompt, no options) should not - // overwrite a pending worker question — the worker's - // QuestionWidget must stay visible. Use the updater form - // to read the latest state and avoid stale-closure races + // All queen input_required events must not overwrite an active worker + // question — the worker's QuestionWidget must stay visible. Use the + // updater form to read the latest state and avoid stale-closure races // when worker and queen events arrive in the same batch. setAgentStates(prev => { const cur = prev[agentType] || defaultAgentState(); const workerQuestionActive = cur.pendingQuestionSource === "worker"; - if (isAutoBlock && workerQuestionActive) { + if (workerQuestionActive) { + // Never overwrite an active worker question with any queen question return { ...prev, [agentType]: { ...cur, @@ -1826,23 +1964,7 @@ export default function Workspace() { // message bubble. For auto-block (empty prompt), the worker's text // was already streamed via client_output_delta — just activate the // reply box below the last worker message. - const eid = event.execution_id ?? ""; const prompt = (event.data?.prompt as string) || ""; - if (prompt) { - const workerInputMsg: ChatMessage = { - id: `worker-input-${eid}-${event.node_id || Date.now()}`, - agent: displayName || event.node_id || "Worker", - agentColor: "", - content: prompt, - timestamp: "", - type: "worker_input_request", - role: "worker", - thread: agentType, - createdAt: eventCreatedAt, - }; - console.log('[CLIENT_INPUT_REQ] creating worker_input_request msg:', workerInputMsg.id, 'content:', prompt.slice(0, 80)); - upsertChatMessage(agentType, workerInputMsg); - } updateAgentState(agentType, { awaitingInput: true, isTyping: false, @@ -1855,16 +1977,18 @@ export default function Workspace() { } } if (event.type === "execution_paused") { + const pausedStartedAt = agentStates[agentType]?.workerStartedAt; updateAgentState(agentType, { isTyping: false, isStreaming: false, queenIsTyping: false, workerIsTyping: false, awaitingInput: false, workerInputMessageId: null, pendingQuestion: null, pendingOptions: null, pendingQuestions: null, pendingQuestionSource: null }); if (!isQueen) { - updateAgentState(agentType, { workerRunState: "idle", currentExecutionId: null }); + updateAgentState(agentType, { workerRunState: "idle", currentExecutionId: null, workerStartedAt: undefined, workerDuration: pausedStartedAt != null ? eventCreatedAt - pausedStartedAt : agentStates[agentType]?.workerDuration }); markAllNodesAs(agentType, ["running", "looping"], "pending"); } } if (event.type === "execution_failed") { + const failedStartedAt = agentStates[agentType]?.workerStartedAt; updateAgentState(agentType, { isTyping: false, isStreaming: false, queenIsTyping: false, workerIsTyping: false, awaitingInput: false, workerInputMessageId: null, pendingQuestion: null, pendingOptions: null, pendingQuestions: null, pendingQuestionSource: null }); if (!isQueen) { - updateAgentState(agentType, { workerRunState: "idle", currentExecutionId: null }); + updateAgentState(agentType, { workerRunState: "idle", currentExecutionId: null, workerStartedAt: undefined, workerDuration: failedStartedAt != null ? eventCreatedAt - failedStartedAt : agentStates[agentType]?.workerDuration }); if (event.node_id) { updateGraphNodeStatus(agentType, event.node_id, "error"); const errMsg = (event.data?.error as string) || "unknown error"; @@ -1894,6 +2018,7 @@ export default function Workspace() { case "node_loop_iteration": turnCounterRef.current[turnKey] = currentTurn + 1; + activeToolCallsRef.current[agentType] = {}; if (isQueen) { updateAgentState(agentType, { isStreaming: false, activeToolCalls: {}, awaitingInput: false, pendingQuestion: null, pendingOptions: null, pendingQuestions: null, pendingQuestionSource: null }); } else { @@ -1998,32 +2123,30 @@ export default function Workspace() { } // Track active (in-flight) tools and upsert activity row into chat + // upsertChatMessage must be called outside any state updater (React purity requirement) + // Use activeToolCallsRef (not agentStates) to avoid stale-closure: tool_call_completed + // can fire before the setState from tool_call_started commits in React 18. const sid = event.stream_id; - setAgentStates(prev => { - const state = prev[agentType]; - if (!state) return prev; - const newActive = { ...state.activeToolCalls, [toolUseId]: { name: toolName, done: false, streamId: sid } }; - // Only include tools from this stream in the pill - const tools = Object.values(newActive).filter(t => t.streamId === sid).map(t => ({ name: t.name, done: t.done })); - const allDone = tools.length > 0 && tools.every(t => t.done); - upsertChatMessage(agentType, { - id: `tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`, - agent: agentDisplayName || event.node_id || "Agent", - agentColor: "", - content: JSON.stringify({ tools, allDone }), - timestamp: "", - type: "tool_status", - role, - thread: agentType, - createdAt: eventCreatedAt, - nodeId: event.node_id || undefined, - executionId: event.execution_id || undefined, - }); - return { - ...prev, - [agentType]: { ...state, isStreaming: false, activeToolCalls: newActive }, - }; + const currentActive = activeToolCallsRef.current[agentType] || {}; + const newActive = { ...currentActive, [toolUseId]: { name: toolName, done: false, streamId: sid } }; + activeToolCallsRef.current[agentType] = newActive; // sync update + // Only include tools from this stream in the pill + const tools = Object.values(newActive).filter(t => t.streamId === sid).map(t => ({ name: t.name, done: t.done })); + const allDone = tools.length > 0 && tools.every(t => t.done); + upsertChatMessage(agentType, { + id: `tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`, + agent: (event.node_id ? formatAgentDisplayName(event.node_id) : agentDisplayName) || "Agent", + agentColor: "", + content: JSON.stringify({ tools, allDone }), + timestamp: "", + type: "tool_status", + role, + thread: agentType, + createdAt: eventCreatedAt, + nodeId: event.node_id || undefined, + executionId: event.execution_id || undefined, }); + updateAgentState(agentType, { isStreaming: false, activeToolCalls: newActive }); } else { console.log('[TOOL_PILL] SKIPPED: no node_id', event.node_id); } @@ -2069,34 +2192,32 @@ export default function Workspace() { } // Mark tool as done and update activity row + // upsertChatMessage must be called outside any state updater (React purity requirement) + // Use activeToolCallsRef (not agentStates) to avoid stale-closure: this handler can + // fire before the setState from tool_call_started commits in React 18. const sid = event.stream_id; - setAgentStates(prev => { - const state = prev[agentType]; - if (!state) return prev; - const updated = { ...state.activeToolCalls }; - if (updated[toolUseId]) { - updated[toolUseId] = { ...updated[toolUseId], done: true }; - } - const tools = Object.values(updated).filter(t => t.streamId === sid).map(t => ({ name: t.name, done: t.done })); - const allDone = tools.length > 0 && tools.every(t => t.done); - upsertChatMessage(agentType, { - id: `tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`, - agent: agentDisplayName || event.node_id || "Agent", - agentColor: "", - content: JSON.stringify({ tools, allDone }), - timestamp: "", - type: "tool_status", - role, - thread: agentType, - createdAt: eventCreatedAt, - nodeId: event.node_id || undefined, - executionId: event.execution_id || undefined, - }); - return { - ...prev, - [agentType]: { ...state, activeToolCalls: updated }, - }; + const currentActive = activeToolCallsRef.current[agentType] || {}; + const updated = { ...currentActive }; + if (updated[toolUseId]) { + updated[toolUseId] = { ...updated[toolUseId], done: true }; + } + activeToolCallsRef.current[agentType] = updated; // sync update + const tools = Object.values(updated).filter(t => t.streamId === sid).map(t => ({ name: t.name, done: t.done })); + const allDone = tools.length > 0 && tools.every(t => t.done); + upsertChatMessage(agentType, { + id: `tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`, + agent: (event.node_id ? formatAgentDisplayName(event.node_id) : agentDisplayName) || "Agent", + agentColor: "", + content: JSON.stringify({ tools, allDone }), + timestamp: "", + type: "tool_status", + role, + thread: agentType, + createdAt: eventCreatedAt, + nodeId: event.node_id || undefined, + executionId: event.execution_id || undefined, }); + updateAgentState(agentType, { activeToolCalls: updated }); } break; } @@ -2644,8 +2765,8 @@ export default function Workspace() { // If worker is awaiting free-text input (no options / no QuestionWidget), // route the message directly to the worker instead of the queen. if (agentStates[activeWorker]?.awaitingInput && agentStates[activeWorker]?.pendingQuestionSource === "worker" && !agentStates[activeWorker]?.pendingOptions) { - const state = agentStates[activeWorker]; - if (state?.sessionId && state?.ready) { + const workerState = agentStates[activeWorker]; + if (workerState?.sessionId && workerState?.ready) { const userMsg: ChatMessage = { id: makeId(), agent: "You", agentColor: "", content: text, timestamp: "", type: "user", thread, createdAt: Date.now(), @@ -2657,7 +2778,7 @@ export default function Workspace() { ), })); updateAgentState(activeWorker, { awaitingInput: false, workerInputMessageId: null, isTyping: true, pendingQuestion: null, pendingOptions: null, pendingQuestions: null, pendingQuestionSource: null }); - executionApi.workerInput(state.sessionId, text).catch((err: unknown) => { + executionApi.workerInput(workerState.sessionId, text).catch((err: unknown) => { const errMsg = err instanceof Error ? err.message : String(err); const errorChatMsg: ChatMessage = { id: makeId(), agent: "System", agentColor: "", @@ -2732,8 +2853,6 @@ export default function Workspace() { if (!activeSession) return; const state = agentStates[activeWorker]; if (!state?.sessionId || !state?.ready) return; - - // Add user reply to chat thread const userMsg: ChatMessage = { id: makeId(), agent: "You", agentColor: "", content: text, timestamp: "", type: "user", thread: activeWorker, createdAt: Date.now(), @@ -2744,10 +2863,7 @@ export default function Workspace() { s.id === activeSession.id ? { ...s, messages: [...s.messages, userMsg] } : s ), })); - - // Clear awaiting state optimistically updateAgentState(activeWorker, { awaitingInput: false, workerInputMessageId: null, isTyping: true, pendingQuestion: null, pendingOptions: null, pendingQuestions: null, pendingQuestionSource: null }); - executionApi.workerInput(state.sessionId, text).catch((err: unknown) => { const errMsg = err instanceof Error ? err.message : String(err); const errorChatMsg: ChatMessage = { @@ -2763,7 +2879,7 @@ export default function Workspace() { })); updateAgentState(activeWorker, { isTyping: false, isStreaming: false }); }); - }, [activeWorker, activeSession, agentStates, updateAgentState]); + }, [activeWorker, activeSession, agentStates, updateAgentState, setSessionsByAgent]); // --- handleWorkerQuestionAnswer: route predefined answers direct to worker, "Other" through queen --- const handleWorkerQuestionAnswer = useCallback((answer: string, isOther: boolean) => { @@ -3222,7 +3338,7 @@ export default function Workspace() { onCancel={handleCancelQueen} activeThread={activeWorker} isWaiting={(activeAgentState?.queenIsTyping && !activeAgentState?.isStreaming) ?? false} - isWorkerWaiting={(activeAgentState?.workerIsTyping && !activeAgentState?.isStreaming) ?? false} + isWorkerWaiting={activeAgentState?.workerIsTyping ?? false} isBusy={activeAgentState?.queenIsTyping ?? false} disabled={ (activeAgentState?.loading ?? true) || @@ -3241,6 +3357,8 @@ export default function Workspace() { onQuestionDismiss={handleQuestionDismiss} contextUsage={activeAgentState?.contextUsage} supportsImages={activeAgentState?.queenSupportsImages ?? true} + workerStartedAt={activeAgentState?.workerStartedAt} + workerDuration={activeAgentState?.workerDuration} /> )}