Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 159 additions & 31 deletions frontend/src/components/ChatWindow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,42 @@ function groupedBlocksToAssistantContent(blocks: ContentBlock[]): string {
return content;
}

// ── Stream seed helpers (sessionStorage) ─────────────────────────────
// Persists streaming parts across page refreshes so the frontend can
// (a) reconnect with a visual seed, or (b) detect a missed stream completion.
const STREAM_SEED_KEY = 'chat_stream_seed_v1';

interface StreamSeed {
chatId: string;
parts: AGUIPart[];
ts: number;
}

function _saveStreamSeed(chatId: string, parts: AGUIPart[]): void {
try {
const seed: StreamSeed = { chatId, parts, ts: Date.now() };
sessionStorage.setItem(STREAM_SEED_KEY, JSON.stringify(seed));
} catch { /* storage may be full or unavailable */ }
}

function _loadStreamSeed(chatId: string): StreamSeed | null {
try {
const raw = sessionStorage.getItem(STREAM_SEED_KEY);
if (!raw) return null;
const seed: StreamSeed = JSON.parse(raw);
if (seed.chatId !== chatId) return null;
if (Date.now() - seed.ts > 10 * 60 * 1000) {
sessionStorage.removeItem(STREAM_SEED_KEY);
return null;
}
return seed;
} catch { return null; }
}

function _clearStreamSeed(): void {
try { sessionStorage.removeItem(STREAM_SEED_KEY); } catch { /* ignore */ }
}

// Drag overlay component
const DragOverlay: React.FC = () => {
const { t } = useI18n();
Expand Down Expand Up @@ -460,7 +496,6 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
setConfig,
shouldResetNext,
consumeResetFlag,
forceSaveNow,
updateMessage,
truncateMessagesFrom,
setIsStreaming,
Expand Down Expand Up @@ -572,12 +607,11 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
const {
parts: streamingParts,
sendMessage: sendAGUI,
resumeStream,
steerStream,
stop: stopAGUIStream,
stopSilently: stopAGUIStreamSilently,
getParts: getStreamingParts,
clearParts,
restorePartsFromSeed,
resolveApproval,
addApprovalDecision,
consumeApprovalDecisions,
Expand All @@ -599,6 +633,7 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
setIsStreaming(false, chatId);
streamingChatIdRef.current = null;
clearParts();
_clearStreamSeed();
setCurrentUsage(null);
setCurrentStreamDisplayRole('assistant');
// Reload chat from DB to reflect rolled-back state.
Expand Down Expand Up @@ -664,6 +699,7 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
// stale pending-approval tool blocks from the backend race condition.
setIsStreaming(false, chatId);
clearParts();
_clearStreamSeed();

// Background DB sync — delay slightly so the backend has time to commit tool
// results before we reload. An immediate reload risks getting stale
Expand Down Expand Up @@ -806,6 +842,52 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
return () => window.removeEventListener('agui:send-message', handler);
}, [sendAGUI, currentChatId, setHeartbeatRunning, setIsStreaming]);

// Save the current streaming parts to sessionStorage on page hide so they
// can be used as a seed (or to detect a missed completion) after a refresh.
useEffect(() => {
const saveOnHide = () => {
const chatId = streamingChatIdRef.current;
if (!chatId) return;
const parts = getStreamingParts();
if (parts.length > 0) {
_saveStreamSeed(chatId, parts);
}
};
document.addEventListener('visibilitychange', saveOnHide);
window.addEventListener('beforeunload', saveOnHide);
return () => {
document.removeEventListener('visibilitychange', saveOnHide);
window.removeEventListener('beforeunload', saveOnHide);
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [getStreamingParts]);

// Resume a tool-approval stream via the background queue so it is
// reconnectable after a page refresh (same pattern as /chat/send).
const resumeViaQueue = useCallback(async (body: Record<string, unknown>) => {
const chatId = (body.chat_id as string) || streamingChatIdRef.current || currentChatId;
if (!chatId) return;

// Save current parts as seed so tryConnect's reconnect can show prior steps.
const currentParts = getStreamingParts();
if (currentParts.length > 0) {
abandonedPartsRef.current.set(chatId, currentParts);
}

const resp = await fetch(`${getApiBase()}/chat/send`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
Comment on lines +877 to +881

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Allow queued approval resumes to attach to live stream

When this new queued resume path is used after tool approval, useToolApproval has already set streamingChatIdRef.current = targetChatId before calling resumeViaQueue, but the /chat/live connector skips tryConnect() whenever that ref matches the mounted chat. Because this path no longer opens a direct SSE reader, approving the final pending tool posts /chat/send successfully but never attaches to the background stream, leaving the UI stuck on the pending/running tool while the backend continues unseen; the same pre-set ref pattern also affects the new retry/steer queued paths.

Useful? React with 👍 / 👎.


if (!resp.ok) {
const msg = resp.status === 409 ? 'Chat is already responding' : `Resume failed (${resp.status})`;
setStatusBar(msg, 'error', 4000);
throw new Error(msg);
}
// 202: stream registered — event bus will fire stream_started → tryConnect
}, [currentChatId, getStreamingParts, setStatusBar]);

const { handleToolApproval } = useToolApproval({
currentChatId,
activeStreamingChatId,
Expand All @@ -818,7 +900,7 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
setConfig,
updateMessage,
setIsStreaming,
resumeStream,
resumeStream: resumeViaQueue,
resolveApproval,
addApprovalDecision,
consumeApprovalDecisions,
Expand Down Expand Up @@ -1030,6 +1112,34 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
const chatIdAtMount = currentChatId;
let cancelled = false;

// Restore stream seed saved before a page refresh so we can either:
// (a) seed an in-progress reconnect with prior visual state, or
// (b) detect that the stream completed while we were gone and reload from DB.
const savedSeed = _loadStreamSeed(chatIdAtMount);
if (savedSeed) {
_clearStreamSeed();
if (savedSeed.parts.length > 0) {
// Pre-populate the abandoned-parts map so tryConnect below can use it
// as a visual seed — even if the snapshot hasn't arrived yet.
abandonedPartsRef.current.set(chatIdAtMount, savedSeed.parts);
}
// Mark as abandoned so the existing reload-on-return logic triggers.
abandonedStreamChatsRef.current.add(chatIdAtMount);

// If there are pending tool approvals in the seed and no active stream,
// restore the parts directly so the approval dialog re-appears.
if (!isBusStreaming(chatIdAtMount)) {
const hasSeedApprovals = savedSeed.parts.some(
p => p.type === 'tool' && p.state === 'approval-requested'
);
if (hasSeedApprovals) {
restorePartsFromSeed(savedSeed.parts);
setIsStreaming(true, chatIdAtMount);
streamingChatIdRef.current = chatIdAtMount;
}
}
}

// On entry: reload from DB so any stream that completed while we were away
// is visible. Covers platform/heartbeat chats and regular chats whose live
// stream we abandoned on a previous navigation.
Expand Down Expand Up @@ -1097,6 +1207,7 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
// Stream finished cleanly — drop preserved reconnect state for this chat.
streamStartByChatRef.current.delete(chatIdAtMount);
abandonedPartsRef.current.delete(chatIdAtMount);
_clearStreamSeed();

if (isSocialStream) {
if (richMsg.content.trim()) addMessage(richMsg, chatIdAtMount);
Expand Down Expand Up @@ -1191,22 +1302,31 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
streamingChatIdRef.current = currentChatId;
activeChatIdRef.current = currentChatId;
stopInFlightRef.current = false;
try {
await steerStream({
chat_id: currentChatId,
message: prompt,
config: safeConfig,
});
} catch (error) {
console.error('Error during steer:', error);
setIsStreaming(false, currentChatId);
} finally {

// Abort the current live connection silently, then start steer via the
// background queue (/chat/steer-send → 202 → /chat/live) so it is
// reconnectable after a page refresh.
stopAGUIStreamSilently();
clearParts();

const steerChatId = currentChatId;
fetch(`${getApiBase()}/chat/steer-send`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chat_id: steerChatId, message: prompt, config: safeConfig }),
}).then(resp => {
if (!resp.ok) {
const msg = resp.status === 409 ? 'Steer failed — chat is busy' : `Steer failed (${resp.status})`;
setStatusBar(msg, 'error', 4000);
setIsStreaming(false, steerChatId);
}
}).catch(err => {
console.error('[send] /chat/steer-send failed:', err);
setIsStreaming(false, steerChatId);
}).finally(() => {
steeringRef.current = false;
stopInFlightRef.current = false;
setTimeout(async () => {
try { await forceSaveNow(currentChatId); } catch { }
}, 600);
}
});
return;
}

Expand Down Expand Up @@ -1305,8 +1425,10 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
});
};

// Retry handler — restores last checkpoint and re-runs the original message
const handleRetry = useCallback(async () => {
// Retry handler — restores last checkpoint and re-runs the original message.
// Uses the background queue (/chat/send) so the stream is reconnectable after
// a page refresh, matching the behaviour of the main send path.
const handleRetry = useCallback(() => {
if (!currentChatId || isStreaming) return;

// Strip all messages from the last user message onwards so the UI is clean
Expand All @@ -1324,25 +1446,31 @@ export const ChatWindow: React.FC<ChatWindowProps> = ({
}

const chatIdForRetry = currentChatId;
clearParts();
streamStartByChatRef.current.set(chatIdForRetry, Date.now());
setIsStreaming(true, chatIdForRetry);
streamingChatIdRef.current = chatIdForRetry;
activeChatIdRef.current = chatIdForRetry;
stopInFlightRef.current = false;

try {
await sendAGUI({ message: '/retry', chat_id: chatIdForRetry, config: safeConfig });
} catch (error) {
console.error('Error during retry:', error);
if (!steeringRef.current) {
// Fire and forget — event bus stream_started will trigger tryConnect.
fetch(`${getApiBase()}/chat/send`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: '/retry', chat_id: chatIdForRetry, config: safeConfig }),
}).then(resp => {
if (!resp.ok) {
const msg = resp.status === 409 ? 'Chat is already responding' : `Retry failed (${resp.status})`;
setStatusBar(msg, 'error', 4000);
setIsStreaming(false, chatIdForRetry);
}
} finally {
}).catch(err => {
console.error('[handleRetry] /chat/send failed:', err);
setIsStreaming(false, chatIdForRetry);
}).finally(() => {
stopInFlightRef.current = false;
setTimeout(async () => {
try { await forceSaveNow(chatIdForRetry); } catch { }
}, 600);
}
}, [currentChatId, isStreaming, messages, truncateMessagesFrom, setIsStreaming, sendAGUI, safeConfig, forceSaveNow]);
});
}, [currentChatId, isStreaming, messages, truncateMessagesFrom, setIsStreaming, clearParts, safeConfig, setStatusBar]);

// Stop streaming handler
const stopStreaming = async () => {
Expand Down
15 changes: 14 additions & 1 deletion frontend/src/hooks/useAGUI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ interface UseAGUIReturn {
/** Read the current parts synchronously (e.g. to snapshot before switching chats) */
getParts: () => AGUIPart[];
clearParts: () => void;
/**
* Restore saved parts directly (e.g. after a page refresh) without starting a
* new stream. Used to re-display pending tool-approval dialogs from sessionStorage.
*/
restorePartsFromSeed: (seed: AGUIPart[]) => void;
/** Remove an inline A2UI surface part by surface id (e.g. after ask_question is answered) */
removeInlineSurface: (surfaceId: string) => void;
/** Optimistically resolve a tool approval (instantly updates UI before backend responds) */
Expand Down Expand Up @@ -465,6 +470,14 @@ export function useAGUI(options: UseAGUIOptions): UseAGUIReturn {

const getParts = useCallback(() => partsRef.current, []);

const restorePartsFromSeed = useCallback((seed: AGUIPart[]) => {
setParts(seed);
partsRef.current = seed;
setStatus('idle');
const approvalCount = seed.filter(p => p.type === 'tool' && p.state === 'approval-requested').length;
setPendingApprovalCountSync(approvalCount);
}, [setPendingApprovalCountSync]);

// Optimistically update a tool part's state when user approves/denies
// so buttons disappear instantly (no waiting for backend round-trip)
const resolveApproval = useCallback((approvalId: string, approved: boolean) => {
Expand Down Expand Up @@ -883,5 +896,5 @@ export function useAGUI(options: UseAGUIOptions): UseAGUIReturn {
}
}, [resetApprovalTracking, setPendingApprovalCountSync]);

return { parts, status, error, sendMessage, resumeStream, steerStream, stop, stopSilently, getParts, clearParts, removeInlineSurface, resolveApproval, pendingApprovalCount, addApprovalDecision, consumeApprovalDecisions };
return { parts, status, error, sendMessage, resumeStream, steerStream, stop, stopSilently, getParts, clearParts, restorePartsFromSeed, removeInlineSurface, resolveApproval, pendingApprovalCount, addApprovalDecision, consumeApprovalDecisions };
}
Loading
Loading