From a7ed7450a2795169851b614150063fe19d198264 Mon Sep 17 00:00:00 2001 From: Ericwyc-max <1340748652@qq.com> Date: Fri, 29 May 2026 00:36:17 +0800 Subject: [PATCH] fix: resolve sidecar memory leak and session list API timeout Root cause: listSessions() performed full JSONL parsing for every session file on each API call, blocking the Bun event loop and causing WebSocket heartbeat timeouts, HTTP request queuing, and connection pileup. Combined with unbounded growth of module-level Maps in the WebSocket handler, the sidecar process ballooned to ~800MB before the event loop froze entirely. Changes: - Add 30s TTL in-memory cache for session metadata in listSessions(), with invalidation on all write paths (create/delete/rename/append/clear) - Add periodic cleanup (60s interval) for stale WebSocket handler Maps with a hard cap of 500 entries - Add 24h TTL for deletedSessions Set to prevent unbounded growth - Add sidecar health check thread in Rust: TCP probe every 60s, auto-restart after 5 consecutive failures (5 min of unresponsiveness) - Replace child.kill() with process tree kill (taskkill /T on Windows) to prevent orphaned CLI subprocesses when sidecar is terminated Fixes: session list loading timeout, sidecar memory leak, zombie processes Co-Authored-By: Claude Opus 4.7 --- desktop/src-tauri/src/lib.rs | 138 ++++++++++++++++++++- src/server/services/conversationService.ts | 19 +++ src/server/services/sessionService.ts | 83 +++++++++++-- src/server/ws/handler.ts | 31 +++++ 4 files changed, 259 insertions(+), 12 deletions(-) diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 0c83880dc..b75554834 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -221,6 +221,13 @@ const SERVER_STARTUP_LOG_LIMIT: usize = 80; const SERVER_BIND_HOST: &str = "0.0.0.0"; const SERVER_CONTROL_HOST: &str = "127.0.0.1"; const MAIN_WINDOW_LABEL: &str = "main"; + +// Sidecar health check constants +// Interval: 60s between checks. Timeout: 5s per TCP probe. +// Max failures: 5 consecutive → triggers restart (5 × 60s = 5 min of unresponsiveness). +const HEALTH_CHECK_INTERVAL_SECS: u64 = 60; +const HEALTH_CHECK_TIMEOUT_SECS: u64 = 5; +const HEALTH_CHECK_MAX_FAILURES: u32 = 5; const TRAY_SHOW_ID: &str = "tray_show"; const TRAY_QUIT_ID: &str = "tray_quit"; const WINDOW_STATE_FILE: &str = "window-state.json"; @@ -1497,6 +1504,132 @@ fn wait_for_server(url_host: &str, port: u16) -> Result<(), String> { )) } +/// Check if the server is responsive via TCP connect (same mechanism as wait_for_server). +fn check_server_health(url_host: &str, port: u16) -> bool { + let addr: SocketAddr = match format!("{url_host}:{port}").parse() { + Ok(a) => a, + Err(_) => return false, + }; + TcpStream::connect_timeout(&addr, Duration::from_secs(HEALTH_CHECK_TIMEOUT_SECS)).is_ok() +} + +/// Kill a process and all its child processes (entire process tree). +/// +/// On Windows: uses `taskkill /T /F /PID ` — /T kills the tree, /F forces. +/// On Unix: sends SIGKILL to the process (children will get SIGHUP when parent dies). +fn kill_process_tree(child: &CommandChild) { + let _ = child.kill(); + + #[cfg(target_os = "windows")] + { + // child.pid() returns the PID. taskkill /T kills the entire process tree. + let pid = child.pid(); + if pid > 0 { + let _ = StdCommand::new("taskkill") + .args(["/T", "/F", "/PID", &pid.to_string()]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); + } + } + + #[cfg(unix)] + { + // On Unix, sending SIGTERM/SIGKILL to the direct child is usually enough + // since Bun.spawn creates a process group. The kill() from tauri-plugin-shell + // already sent SIGKILL. If children linger, the OS will reap them on parent exit. + } +} + +/// Spawn a background thread that periodically health-checks the server sidecar. +/// After `HEALTH_CHECK_MAX_FAILURES` consecutive failures, kill and respawn the sidecar. +/// +/// **Limitations**: Restarting the sidecar will break active WebSocket connections +/// and orphan any running CLI subprocesses. The frontend will need to reconnect. +/// This is a last-resort recovery mechanism — the high failure threshold (5 min of +/// continuous unresponsiveness) ensures it only triggers when the sidecar is truly stuck. +fn spawn_sidecar_health_check(app: &AppHandle) { + let app_handle = app.clone(); + thread::spawn(move || { + let mut consecutive_failures: u32 = 0; + + loop { + thread::sleep(Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS)); + + // Read the current server URL from state + let url = { + let Some(state) = app_handle.try_state::() else { + continue; + }; + let Ok(guard) = state.0.lock() else { + continue; + }; + match &guard.runtime { + Some(rt) => rt.url.clone(), + None => continue, // No runtime — server might be starting up + } + }; + + // Parse host:port from the URL (format: "http://127.0.0.1:PORT") + let parsed = url + .strip_prefix("http://") + .or_else(|| url.strip_prefix("https://")) + .unwrap_or(&url); + let (host, port_str) = match parsed.rsplit_once(':') { + Some(pair) => pair, + None => continue, + }; + let port: u16 = match port_str.parse() { + Ok(p) => p, + Err(_) => continue, + }; + + if check_server_health(host, port) { + consecutive_failures = 0; + continue; + } + + consecutive_failures += 1; + eprintln!( + "[desktop] sidecar health check failed ({consecutive_failures}/{HEALTH_CHECK_MAX_FAILURES})" + ); + + if consecutive_failures < HEALTH_CHECK_MAX_FAILURES { + continue; + } + + // Threshold reached — restart the sidecar + eprintln!("[desktop] sidecar unresponsive after {HEALTH_CHECK_MAX_FAILURES} consecutive failures, restarting..."); + consecutive_failures = 0; + + let Some(state) = app_handle.try_state::() else { + continue; + }; + let Ok(mut guard) = state.0.lock() else { + continue; + }; + + // Kill the old sidecar and all its child CLI processes + if let Some(old_runtime) = guard.runtime.take() { + kill_process_tree(&old_runtime.child); + } + + // Respawn + match start_server_sidecar(&app_handle) { + Ok(new_runtime) => { + guard.runtime = Some(new_runtime); + guard.startup_error = None; + eprintln!("[desktop] sidecar restarted successfully"); + } + Err(err) => { + eprintln!("[desktop] failed to restart sidecar: {err}"); + guard.startup_error = Some(err); + } + } + } + }); +} + fn push_server_startup_log(logs: &Arc>>, line: String) { let line = line.trim_end().to_string(); if line.is_empty() { @@ -1670,7 +1803,7 @@ fn stop_server_sidecar(app: &AppHandle) { }; if let Some(runtime) = guard.runtime.take() { - let _ = runtime.child.kill(); + kill_process_tree(&runtime.child); } } @@ -2268,6 +2401,9 @@ pub fn run() { } drop(guard); + // Start background health monitoring for the sidecar + spawn_sidecar_health_check(&app.handle()); + // server 起来之后再起 adapter sidecar —— start_adapters_sidecar // 内部会从 ServerState 读 server URL 注入 ADAPTER_SERVER_URL env, // 让 adapter 连上动态端口。 diff --git a/src/server/services/conversationService.ts b/src/server/services/conversationService.ts index 7ce4b5012..dfe03ed9d 100644 --- a/src/server/services/conversationService.ts +++ b/src/server/services/conversationService.ts @@ -111,6 +111,8 @@ export class ConversationStartupError extends Error { export class ConversationService { private sessions = new Map() private deletedSessions = new Set() + private deletedSessionTimestamps = new Map() + private static readonly DELETED_SESSION_TTL_MS = 24 * 60 * 60 * 1000 // 24 hours private providerService = new ProviderService() private buildSessionCliArgs( @@ -735,7 +737,9 @@ export class ConversationService { markSessionDeleted(sessionId: string): void { this.deletedSessions.add(sessionId) + this.deletedSessionTimestamps.set(sessionId, Date.now()) this.stopSession(sessionId) + this.pruneOldDeletedSessions() } markSessionsDeleted(sessionIds: string[]): void { @@ -746,6 +750,7 @@ export class ConversationService { unmarkSessionDeleted(sessionId: string): void { this.deletedSessions.delete(sessionId) + this.deletedSessionTimestamps.delete(sessionId) } unmarkSessionsDeleted(sessionIds: string[]): void { @@ -754,10 +759,24 @@ export class ConversationService { } } + private pruneOldDeletedSessions(): void { + const now = Date.now() + for (const [id, ts] of this.deletedSessionTimestamps) { + if (now - ts > ConversationService.DELETED_SESSION_TTL_MS) { + this.deletedSessions.delete(id) + this.deletedSessionTimestamps.delete(id) + } + } + } + getActiveSessions(): string[] { return Array.from(this.sessions.keys()) } + isSessionActive(sessionId: string): boolean { + return this.sessions.has(sessionId) + } + private async readProcessOutputStream( sessionId: string, stream: ReadableStream | null | undefined, diff --git a/src/server/services/sessionService.ts b/src/server/services/sessionService.ts index 50dfdd97e..e956356c4 100644 --- a/src/server/services/sessionService.ts +++ b/src/server/services/sessionService.ts @@ -219,6 +219,36 @@ type PersistedWorktreeSession = { type ContentBlock = Record +// In-memory metadata cache to avoid re-reading JSONL files on every listSessions call. +// Each entry is keyed by the JSONL file path and stores the lightweight metadata +// needed for the session list, plus the file mtime used for staleness checks. +type CachedSessionMeta = { + id: string + title: string + createdAt: string + modifiedAt: string + messageCount: number + projectPath: string + projectRoot: string | null + workDir: string | null + workDirExists: boolean +} +const sessionMetaCache = new Map() +const SESSION_META_CACHE_TTL_MS = 30_000 +let sessionMetaCacheTimestamp = 0 + +function isSessionMetaCacheValid(): boolean { + return ( + sessionMetaCache.size > 0 && + Date.now() - sessionMetaCacheTimestamp < SESSION_META_CACHE_TTL_MS + ) +} + +function invalidateSessionMetaCache(): void { + sessionMetaCache.clear() + sessionMetaCacheTimestamp = 0 +} + const USER_INTERRUPTION_TEXTS = new Set([ '[Request interrupted by user]', '[Request interrupted by user for tool use]', @@ -1278,6 +1308,8 @@ export class SessionService { /** * List all sessions, optionally filtered by project path. + * Uses an in-memory metadata cache (30s TTL) to avoid re-reading JSONL files + * on every call. Cache is invalidated when sessions are created/deleted/renamed. */ async listSessions(options?: { project?: string @@ -1285,12 +1317,16 @@ export class SessionService { offset?: number }): Promise<{ sessions: SessionListItem[]; total: number }> { const sessionFiles = await this.discoverSessionFiles(options?.project) + + // Check if we can reuse the cache. The cache stores per-file metadata keyed + // by absolute file path. We rebuild it when the cache is empty, expired, or + // when a file's mtime has changed since the last cache fill. + const useCache = isSessionMetaCacheValid() + const filesWithStats = (await Promise.all(sessionFiles.map(async (sessionFile) => { try { - return { - ...sessionFile, - stat: await fs.stat(sessionFile.filePath), - } + const stat = await fs.stat(sessionFile.filePath) + return { ...sessionFile, stat } } catch { return null } @@ -1303,22 +1339,32 @@ export class SessionService { const limit = options?.limit ?? 50 const paginatedFiles = filesWithStats.slice(offset, offset + limit) - // Build session list items with metadata from file stats & first entries - const items = (await Promise.all(paginatedFiles.map(async ({ filePath, projectDir, sessionId, stat }) => { + // Build session list items, using cache where possible + const items: SessionListItem[] = [] + + for (const { filePath, projectDir, sessionId, stat } of paginatedFiles) { try { + const cached = sessionMetaCache.get(filePath) + const mtimeMs = stat.mtimeMs + + if (useCache && cached && cached.mtimeMs === mtimeMs) { + // Cache hit and file hasn't changed — use cached metadata + items.push(cached.meta) + continue + } + + // Cache miss — parse JSONL to extract metadata const entries = await this.readJsonlFile(filePath) const workDir = this.resolveWorkDirFromEntries(entries, projectDir) const projectRoot = await this.resolveProjectRootFromEntries(entries, workDir, projectDir) const workDirExists = await this.pathExists(workDir) - // Count transcript messages only (user + assistant) const messageCount = entries.filter( (e) => (e.type === 'user' || e.type === 'assistant') && e.message?.role ).length const title = this.extractTitle(entries) - // Find the earliest timestamp from entries, fallback to file birthtime let createdAt = stat.birthtime.toISOString() for (const e of entries) { if (e.timestamp) { @@ -1327,7 +1373,7 @@ export class SessionService { } } - return { + const meta: CachedSessionMeta = { id: sessionId, title, createdAt, @@ -1338,11 +1384,16 @@ export class SessionService { workDir, workDirExists, } + + // Update cache entry + sessionMetaCache.set(filePath, { meta, mtimeMs }) + sessionMetaCacheTimestamp = Date.now() + + items.push(meta) } catch { // Skip unreadable files - return null } - }))).filter((item): item is SessionListItem => item !== null) + } return { sessions: items, total } } @@ -1468,6 +1519,7 @@ export class SessionService { } await fs.writeFile(filePath, JSON.stringify(initialEntry) + '\n' + JSON.stringify(metaEntry) + '\n', 'utf-8') + invalidateSessionMetaCache() return { sessionId, workDir: absWorkDir } } @@ -1482,6 +1534,7 @@ export class SessionService { } await fs.unlink(found.filePath) + invalidateSessionMetaCache() } async deleteSessions(sessionIds: string[]): Promise { @@ -1514,6 +1567,7 @@ export class SessionService { } } + invalidateSessionMetaCache() return { successes, failures } } @@ -1537,6 +1591,7 @@ export class SessionService { } await this.appendJsonlEntry(found.filePath, entry) + invalidateSessionMetaCache() } /** @@ -1551,6 +1606,7 @@ export class SessionService { aiTitle: title, timestamp: new Date().toISOString(), }) + invalidateSessionMetaCache() } async getCustomTitle(sessionId: string): Promise { @@ -1627,6 +1683,7 @@ export class SessionService { const found = await this.findSessionFile(sessionId) if (!found) return await fs.unlink(found.filePath) + invalidateSessionMetaCache() } async clearSessionTranscript(sessionId: string, fallbackWorkDir?: string): Promise { @@ -1674,6 +1731,7 @@ export class SessionService { `${JSON.stringify(initialEntry)}\n${JSON.stringify(metaEntry)}\n`, 'utf-8', ) + invalidateSessionMetaCache() } async appendSessionMetadata( @@ -1718,6 +1776,7 @@ export class SessionService { timestamp: new Date().toISOString(), }) } + invalidateSessionMetaCache() } async deletePlaceholderSessionFiles( @@ -1749,6 +1808,7 @@ export class SessionService { await fs.rm(filePath, { force: true }) removed += 1 } + if (removed > 0) invalidateSessionMetaCache() return removed } @@ -1802,6 +1862,7 @@ export class SessionService { ? filteredEntries.map((entry) => JSON.stringify(entry)).join('\n') + '\n' : '' await fs.writeFile(found.filePath, content, 'utf-8') + invalidateSessionMetaCache() return { removedCount: removedMessageIds.length, diff --git a/src/server/ws/handler.ts b/src/server/ws/handler.ts index f77d36df8..f89dd63c6 100644 --- a/src/server/ws/handler.ts +++ b/src/server/ws/handler.ts @@ -80,6 +80,37 @@ const prewarmedSessions = new Set() const prewarmIdleTimers = new Map>() const DEFAULT_PREWARM_IDLE_TIMEOUT_MS = 5 * 60_000 +// --- Memory leak prevention: cap module-level Maps and periodically evict stale entries --- +const MAX_SESSION_RUNTIME_STATES = 500 +const STALE_SESSION_EVICT_INTERVAL_MS = 60_000 + +function evictStaleSessionRuntimeState() { + for (const sessionId of sessionStreamStates.keys()) { + // Only evict if ALL of these are true: + // 1. No active WebSocket connections + // 2. No active CLI subprocess + // 3. No pending cleanup timer (still in grace period after disconnect) + if ( + !activeSessions.has(sessionId) && + !conversationService.isSessionActive(sessionId) && + !sessionCleanupTimers.has(sessionId) + ) { + cleanupSessionRuntimeState(sessionId) + } + } + + // Hard cap: if still over limit, evict oldest entries (Map insertion order) + while (sessionStreamStates.size > MAX_SESSION_RUNTIME_STATES) { + const oldest = sessionStreamStates.keys().next().value + if (oldest === undefined) break + cleanupSessionRuntimeState(oldest) + } +} + +const staleEvictTimer = setInterval(evictStaleSessionRuntimeState, STALE_SESSION_EVICT_INTERVAL_MS) +// Allow the process to exit without waiting for the timer +if (staleEvictTimer.unref) staleEvictTimer.unref() + async function sendRepositoryStartupStatus( ws: ServerWebSocket, sessionId: string,