Skip to content

Commit 3be880a

Browse files
fix: detect silent SSE connection drops during long tool executions
When Claude runs a long script, the SSE connection can be silently dropped by transport layers (OS TCP stack, Electron internals, HTTP idle timeouts). Previously this caused the stream to appear "completed" with partial content and no error message. Two-part fix: Server-side heartbeat (claude-client.ts): - Send keep_alive SSE events every 30s independently of SDK activity - Prevents intermediate layers from considering the connection idle - Timer properly cleaned up in all exit paths including cancel() Client-side detection (useSSEStream.ts + stream-session-manager.ts): - Track whether the server's 'done' SSE event was received - If reader finishes without 'done', treat as connection drop - Show "Connection lost" error and clear stale SDK session - Flush residual SSE buffer after reader signals done i18n (en.ts + zh.ts): - Add streaming.connectionDrop translation key Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e694d75 commit 3be880a

5 files changed

Lines changed: 77 additions & 2 deletions

File tree

src/hooks/useSSEStream.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,12 @@ function handleSSEEvent(
193193
export async function consumeSSEStream(
194194
reader: ReadableStreamDefaultReader<Uint8Array>,
195195
callbacks: SSECallbacks,
196-
): Promise<{ accumulated: string; tokenUsage: TokenUsage | null }> {
196+
): Promise<{ accumulated: string; tokenUsage: TokenUsage | null; receivedDone: boolean }> {
197197
const decoder = new TextDecoder();
198198
let buffer = '';
199199
let accumulated = '';
200200
let tokenUsage: TokenUsage | null = null;
201+
let receivedDone = false;
201202

202203
const wrappedCallbacks: SSECallbacks = {
203204
...callbacks,
@@ -220,14 +221,30 @@ export async function consumeSSEStream(
220221

221222
try {
222223
const event: SSEEvent = JSON.parse(line.slice(6));
224+
if (event.type === 'done') {
225+
receivedDone = true;
226+
}
223227
accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks);
224228
} catch {
225229
// skip malformed SSE lines
226230
}
227231
}
228232
}
229233

230-
return { accumulated, tokenUsage };
234+
// Flush any residual buffer in case the final chunk didn't end with \n
235+
if (buffer.trim().startsWith('data: ')) {
236+
try {
237+
const event: SSEEvent = JSON.parse(buffer.trim().slice(6));
238+
if (event.type === 'done') {
239+
receivedDone = true;
240+
}
241+
accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks);
242+
} catch {
243+
// skip malformed residual data
244+
}
245+
}
246+
247+
return { accumulated, tokenUsage, receivedDone };
231248
}
232249

233250
/**

src/i18n/en.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ const en = {
5454
'streaming.allowForSession': 'Allow for Session',
5555
'streaming.allowed': 'Allowed',
5656
'streaming.denied': 'Denied',
57+
'streaming.connectionDrop': 'Connection lost — the server stream ended unexpectedly. Claude may still be running in the background. Please try sending your message again.',
5758

5859
// ── Chat view / session page ────────────────────────────────
5960
'chat.newConversation': 'New Conversation',

src/i18n/zh.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const zh: Record<TranslationKey, string> = {
5151
'streaming.allowForSession': '本次会话允许',
5252
'streaming.allowed': '已允许',
5353
'streaming.denied': '已拒绝',
54+
'streaming.connectionDrop': '连接中断 — 服务器流意外结束。Claude 可能仍在后台运行,请尝试重新发送消息。',
5455

5556
// ── Chat view / session page ────────────────────────────────
5657
'chat.newConversation': '新对话',

src/lib/claude-client.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,8 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
385385
autoTrigger,
386386
} = options;
387387

388+
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
389+
388390
return new ReadableStream<string>({
389391
async start(controller) {
390392
// Resolve provider via the unified resolver. The caller may pass an explicit
@@ -772,6 +774,21 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
772774
// Track pending TodoWrite tool_use_ids so we can sync after successful execution
773775
const pendingTodoWrites = new Map<string, Array<{ content: string; status: string; activeForm?: string }>>();
774776

777+
// Server-side heartbeat: send keep_alive every 30s to prevent
778+
// transport-level idle connection drops (Electron, OS TCP, proxies).
779+
// This is independent of the SDK's own keep_alive messages.
780+
heartbeatTimer = setInterval(() => {
781+
try {
782+
controller.enqueue(formatSSE({ type: 'keep_alive', data: '' }));
783+
} catch {
784+
// Controller may be closed — stop heartbeat
785+
if (heartbeatTimer) {
786+
clearInterval(heartbeatTimer);
787+
heartbeatTimer = null;
788+
}
789+
}
790+
}, 30_000);
791+
775792
for await (const message of conversation) {
776793
if (abortController?.signal.aborted) {
777794
break;
@@ -996,9 +1013,11 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
9961013
}
9971014
}
9981015

1016+
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
9991017
controller.enqueue(formatSSE({ type: 'done', data: '' }));
10001018
controller.close();
10011019
} catch (error) {
1020+
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
10021021
const rawMessage = error instanceof Error ? error.message : 'Unknown error';
10031022
// Log full error details for debugging (visible in terminal / dev tools)
10041023
console.error('[claude-client] Stream error:', {
@@ -1066,6 +1085,7 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
10661085
},
10671086

10681087
cancel() {
1088+
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
10691089
abortController?.abort();
10701090
},
10711091
});

src/lib/stream-session-manager.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,42 @@ async function runStream(stream: ActiveStream, params: StartStreamParams): Promi
355355
},
356356
});
357357

358+
// Detect premature stream end (connection drop without server 'done' event)
359+
if (!result.receivedDone) {
360+
cleanupTimers(stream);
361+
362+
const dropMsg = 'Connection lost — the server stream ended unexpectedly. Claude may still be running in the background. Please try sending your message again.';
363+
const errContent = stream.accumulatedText.trim()
364+
? stream.accumulatedText.trim() + `\n\n**Error:** ${dropMsg}`
365+
: `**Error:** ${dropMsg}`;
366+
367+
stream.snapshot = {
368+
...buildSnapshot(stream),
369+
phase: 'error',
370+
completedAt: Date.now(),
371+
error: dropMsg,
372+
finalMessageContent: errContent,
373+
statusText: undefined,
374+
pendingPermission: null,
375+
permissionResolved: null,
376+
};
377+
stream.accumulatedText = '';
378+
stream.toolUsesArray = [];
379+
stream.toolResultsArray = [];
380+
stream.toolOutputAccumulated = '';
381+
emit(stream, 'completed');
382+
383+
// Clear stale SDK session so next message starts fresh
384+
fetch(`/api/chat/sessions/${encodeURIComponent(stream.sessionId)}`, {
385+
method: 'PATCH',
386+
headers: { 'Content-Type': 'application/json' },
387+
body: JSON.stringify({ sdk_session_id: '' }),
388+
}).catch(() => {});
389+
390+
scheduleGC(stream);
391+
return;
392+
}
393+
358394
// Stream completed successfully — build final message content
359395
const accumulated = result.accumulated;
360396
const finalToolUses = stream.toolUsesArray;

0 commit comments

Comments
 (0)