Skip to content

Commit 790c553

Browse files
Merge branch 'main' into fix/centralize-model-ids
2 parents 05c1724 + 3be880a commit 790c553

5 files changed

Lines changed: 77 additions & 2 deletions

File tree

src/hooks/useSSEStream.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,12 @@ function handleSSEEvent(
193193
export async function consumeSSEStream(
194194
reader: ReadableStreamDefaultReader<Uint8Array>,
195195
callbacks: SSECallbacks,
196-
): Promise<{ accumulated: string; tokenUsage: TokenUsage | null }> {
196+
): Promise<{ accumulated: string; tokenUsage: TokenUsage | null; receivedDone: boolean }> {
197197
const decoder = new TextDecoder();
198198
let buffer = '';
199199
let accumulated = '';
200200
let tokenUsage: TokenUsage | null = null;
201+
let receivedDone = false;
201202

202203
const wrappedCallbacks: SSECallbacks = {
203204
...callbacks,
@@ -220,14 +221,30 @@ export async function consumeSSEStream(
220221

221222
try {
222223
const event: SSEEvent = JSON.parse(line.slice(6));
224+
if (event.type === 'done') {
225+
receivedDone = true;
226+
}
223227
accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks);
224228
} catch {
225229
// skip malformed SSE lines
226230
}
227231
}
228232
}
229233

230-
return { accumulated, tokenUsage };
234+
// Flush any residual buffer in case the final chunk didn't end with \n
235+
if (buffer.trim().startsWith('data: ')) {
236+
try {
237+
const event: SSEEvent = JSON.parse(buffer.trim().slice(6));
238+
if (event.type === 'done') {
239+
receivedDone = true;
240+
}
241+
accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks);
242+
} catch {
243+
// skip malformed residual data
244+
}
245+
}
246+
247+
return { accumulated, tokenUsage, receivedDone };
231248
}
232249

233250
/**

src/i18n/en.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ const en = {
5454
'streaming.allowForSession': 'Allow for Session',
5555
'streaming.allowed': 'Allowed',
5656
'streaming.denied': 'Denied',
57+
'streaming.connectionDrop': 'Connection lost — the server stream ended unexpectedly. Claude may still be running in the background. Please try sending your message again.',
5758

5859
// ── Chat view / session page ────────────────────────────────
5960
'chat.newConversation': 'New Conversation',

src/i18n/zh.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const zh: Record<TranslationKey, string> = {
5151
'streaming.allowForSession': '本次会话允许',
5252
'streaming.allowed': '已允许',
5353
'streaming.denied': '已拒绝',
54+
'streaming.connectionDrop': '连接中断 — 服务器流意外结束。Claude 可能仍在后台运行,请尝试重新发送消息。',
5455

5556
// ── Chat view / session page ────────────────────────────────
5657
'chat.newConversation': '新对话',

src/lib/claude-client.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,8 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
385385
autoTrigger,
386386
} = options;
387387

388+
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
389+
388390
return new ReadableStream<string>({
389391
async start(controller) {
390392
// Resolve provider via the unified resolver. The caller may pass an explicit
@@ -772,6 +774,21 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
772774
// Track pending TodoWrite tool_use_ids so we can sync after successful execution
773775
const pendingTodoWrites = new Map<string, Array<{ content: string; status: string; activeForm?: string }>>();
774776

777+
// Server-side heartbeat: send keep_alive every 30s to prevent
778+
// transport-level idle connection drops (Electron, OS TCP, proxies).
779+
// This is independent of the SDK's own keep_alive messages.
780+
heartbeatTimer = setInterval(() => {
781+
try {
782+
controller.enqueue(formatSSE({ type: 'keep_alive', data: '' }));
783+
} catch {
784+
// Controller may be closed — stop heartbeat
785+
if (heartbeatTimer) {
786+
clearInterval(heartbeatTimer);
787+
heartbeatTimer = null;
788+
}
789+
}
790+
}, 30_000);
791+
775792
for await (const message of conversation) {
776793
if (abortController?.signal.aborted) {
777794
break;
@@ -996,9 +1013,11 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
9961013
}
9971014
}
9981015

1016+
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
9991017
controller.enqueue(formatSSE({ type: 'done', data: '' }));
10001018
controller.close();
10011019
} catch (error) {
1020+
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
10021021
const rawMessage = error instanceof Error ? error.message : 'Unknown error';
10031022
// Log full error details for debugging (visible in terminal / dev tools)
10041023
console.error('[claude-client] Stream error:', {
@@ -1066,6 +1085,7 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
10661085
},
10671086

10681087
cancel() {
1088+
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
10691089
abortController?.abort();
10701090
},
10711091
});

src/lib/stream-session-manager.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,42 @@ async function runStream(stream: ActiveStream, params: StartStreamParams): Promi
355355
},
356356
});
357357

358+
// Detect premature stream end (connection drop without server 'done' event)
359+
if (!result.receivedDone) {
360+
cleanupTimers(stream);
361+
362+
const dropMsg = 'Connection lost — the server stream ended unexpectedly. Claude may still be running in the background. Please try sending your message again.';
363+
const errContent = stream.accumulatedText.trim()
364+
? stream.accumulatedText.trim() + `\n\n**Error:** ${dropMsg}`
365+
: `**Error:** ${dropMsg}`;
366+
367+
stream.snapshot = {
368+
...buildSnapshot(stream),
369+
phase: 'error',
370+
completedAt: Date.now(),
371+
error: dropMsg,
372+
finalMessageContent: errContent,
373+
statusText: undefined,
374+
pendingPermission: null,
375+
permissionResolved: null,
376+
};
377+
stream.accumulatedText = '';
378+
stream.toolUsesArray = [];
379+
stream.toolResultsArray = [];
380+
stream.toolOutputAccumulated = '';
381+
emit(stream, 'completed');
382+
383+
// Clear stale SDK session so next message starts fresh
384+
fetch(`/api/chat/sessions/${encodeURIComponent(stream.sessionId)}`, {
385+
method: 'PATCH',
386+
headers: { 'Content-Type': 'application/json' },
387+
body: JSON.stringify({ sdk_session_id: '' }),
388+
}).catch(() => {});
389+
390+
scheduleGC(stream);
391+
return;
392+
}
393+
358394
// Stream completed successfully — build final message content
359395
const accumulated = result.accumulated;
360396
const finalToolUses = stream.toolUsesArray;

0 commit comments

Comments
 (0)