diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 0c83880dc..b75554834 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -221,6 +221,13 @@ const SERVER_STARTUP_LOG_LIMIT: usize = 80; const SERVER_BIND_HOST: &str = "0.0.0.0"; const SERVER_CONTROL_HOST: &str = "127.0.0.1"; const MAIN_WINDOW_LABEL: &str = "main"; + +// Sidecar health check constants +// Interval: 60s between checks. Timeout: 5s per TCP probe. +// Max failures: 5 consecutive → triggers restart (5 × 60s = 5 min of unresponsiveness). +const HEALTH_CHECK_INTERVAL_SECS: u64 = 60; +const HEALTH_CHECK_TIMEOUT_SECS: u64 = 5; +const HEALTH_CHECK_MAX_FAILURES: u32 = 5; const TRAY_SHOW_ID: &str = "tray_show"; const TRAY_QUIT_ID: &str = "tray_quit"; const WINDOW_STATE_FILE: &str = "window-state.json"; @@ -1497,6 +1504,132 @@ fn wait_for_server(url_host: &str, port: u16) -> Result<(), String> { )) } +/// Check if the server is responsive via TCP connect (same mechanism as wait_for_server). +fn check_server_health(url_host: &str, port: u16) -> bool { + let addr: SocketAddr = match format!("{url_host}:{port}").parse() { + Ok(a) => a, + Err(_) => return false, + }; + TcpStream::connect_timeout(&addr, Duration::from_secs(HEALTH_CHECK_TIMEOUT_SECS)).is_ok() +} + +/// Kill a process and all its child processes (entire process tree). +/// +/// On Windows: uses `taskkill /T /F /PID ` — /T kills the tree, /F forces. +/// On Unix: sends SIGKILL to the process (children will get SIGHUP when parent dies). +fn kill_process_tree(child: &CommandChild) { + let _ = child.kill(); + + #[cfg(target_os = "windows")] + { + // child.pid() returns the PID. taskkill /T kills the entire process tree. + let pid = child.pid(); + if pid > 0 { + let _ = StdCommand::new("taskkill") + .args(["/T", "/F", "/PID", &pid.to_string()]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); + } + } + + #[cfg(unix)] + { + // On Unix, sending SIGTERM/SIGKILL to the direct child is usually enough + // since Bun.spawn creates a process group. The kill() from tauri-plugin-shell + // already sent SIGKILL. If children linger, the OS will reap them on parent exit. + } +} + +/// Spawn a background thread that periodically health-checks the server sidecar. +/// After `HEALTH_CHECK_MAX_FAILURES` consecutive failures, kill and respawn the sidecar. +/// +/// **Limitations**: Restarting the sidecar will break active WebSocket connections +/// and orphan any running CLI subprocesses. The frontend will need to reconnect. +/// This is a last-resort recovery mechanism — the high failure threshold (5 min of +/// continuous unresponsiveness) ensures it only triggers when the sidecar is truly stuck. +fn spawn_sidecar_health_check(app: &AppHandle) { + let app_handle = app.clone(); + thread::spawn(move || { + let mut consecutive_failures: u32 = 0; + + loop { + thread::sleep(Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS)); + + // Read the current server URL from state + let url = { + let Some(state) = app_handle.try_state::() else { + continue; + }; + let Ok(guard) = state.0.lock() else { + continue; + }; + match &guard.runtime { + Some(rt) => rt.url.clone(), + None => continue, // No runtime — server might be starting up + } + }; + + // Parse host:port from the URL (format: "http://127.0.0.1:PORT") + let parsed = url + .strip_prefix("http://") + .or_else(|| url.strip_prefix("https://")) + .unwrap_or(&url); + let (host, port_str) = match parsed.rsplit_once(':') { + Some(pair) => pair, + None => continue, + }; + let port: u16 = match port_str.parse() { + Ok(p) => p, + Err(_) => continue, + }; + + if check_server_health(host, port) { + consecutive_failures = 0; + continue; + } + + consecutive_failures += 1; + eprintln!( + "[desktop] sidecar health check failed ({consecutive_failures}/{HEALTH_CHECK_MAX_FAILURES})" + ); + + if consecutive_failures < HEALTH_CHECK_MAX_FAILURES { + continue; + } + + // Threshold reached — restart the sidecar + eprintln!("[desktop] sidecar unresponsive after {HEALTH_CHECK_MAX_FAILURES} consecutive failures, restarting..."); + consecutive_failures = 0; + + let Some(state) = app_handle.try_state::() else { + continue; + }; + let Ok(mut guard) = state.0.lock() else { + continue; + }; + + // Kill the old sidecar and all its child CLI processes + if let Some(old_runtime) = guard.runtime.take() { + kill_process_tree(&old_runtime.child); + } + + // Respawn + match start_server_sidecar(&app_handle) { + Ok(new_runtime) => { + guard.runtime = Some(new_runtime); + guard.startup_error = None; + eprintln!("[desktop] sidecar restarted successfully"); + } + Err(err) => { + eprintln!("[desktop] failed to restart sidecar: {err}"); + guard.startup_error = Some(err); + } + } + } + }); +} + fn push_server_startup_log(logs: &Arc>>, line: String) { let line = line.trim_end().to_string(); if line.is_empty() { @@ -1670,7 +1803,7 @@ fn stop_server_sidecar(app: &AppHandle) { }; if let Some(runtime) = guard.runtime.take() { - let _ = runtime.child.kill(); + kill_process_tree(&runtime.child); } } @@ -2268,6 +2401,9 @@ pub fn run() { } drop(guard); + // Start background health monitoring for the sidecar + spawn_sidecar_health_check(&app.handle()); + // server 起来之后再起 adapter sidecar —— start_adapters_sidecar // 内部会从 ServerState 读 server URL 注入 ADAPTER_SERVER_URL env, // 让 adapter 连上动态端口。 diff --git a/src/server/services/conversationService.ts b/src/server/services/conversationService.ts index 7ce4b5012..dfe03ed9d 100644 --- a/src/server/services/conversationService.ts +++ b/src/server/services/conversationService.ts @@ -111,6 +111,8 @@ export class ConversationStartupError extends Error { export class ConversationService { private sessions = new Map() private deletedSessions = new Set() + private deletedSessionTimestamps = new Map() + private static readonly DELETED_SESSION_TTL_MS = 24 * 60 * 60 * 1000 // 24 hours private providerService = new ProviderService() private buildSessionCliArgs( @@ -735,7 +737,9 @@ export class ConversationService { markSessionDeleted(sessionId: string): void { this.deletedSessions.add(sessionId) + this.deletedSessionTimestamps.set(sessionId, Date.now()) this.stopSession(sessionId) + this.pruneOldDeletedSessions() } markSessionsDeleted(sessionIds: string[]): void { @@ -746,6 +750,7 @@ export class ConversationService { unmarkSessionDeleted(sessionId: string): void { this.deletedSessions.delete(sessionId) + this.deletedSessionTimestamps.delete(sessionId) } unmarkSessionsDeleted(sessionIds: string[]): void { @@ -754,10 +759,24 @@ export class ConversationService { } } + private pruneOldDeletedSessions(): void { + const now = Date.now() + for (const [id, ts] of this.deletedSessionTimestamps) { + if (now - ts > ConversationService.DELETED_SESSION_TTL_MS) { + this.deletedSessions.delete(id) + this.deletedSessionTimestamps.delete(id) + } + } + } + getActiveSessions(): string[] { return Array.from(this.sessions.keys()) } + isSessionActive(sessionId: string): boolean { + return this.sessions.has(sessionId) + } + private async readProcessOutputStream( sessionId: string, stream: ReadableStream | null | undefined, diff --git a/src/server/services/sessionService.ts b/src/server/services/sessionService.ts index 50dfdd97e..e956356c4 100644 --- a/src/server/services/sessionService.ts +++ b/src/server/services/sessionService.ts @@ -219,6 +219,36 @@ type PersistedWorktreeSession = { type ContentBlock = Record +// In-memory metadata cache to avoid re-reading JSONL files on every listSessions call. +// Each entry is keyed by the JSONL file path and stores the lightweight metadata +// needed for the session list, plus the file mtime used for staleness checks. +type CachedSessionMeta = { + id: string + title: string + createdAt: string + modifiedAt: string + messageCount: number + projectPath: string + projectRoot: string | null + workDir: string | null + workDirExists: boolean +} +const sessionMetaCache = new Map() +const SESSION_META_CACHE_TTL_MS = 30_000 +let sessionMetaCacheTimestamp = 0 + +function isSessionMetaCacheValid(): boolean { + return ( + sessionMetaCache.size > 0 && + Date.now() - sessionMetaCacheTimestamp < SESSION_META_CACHE_TTL_MS + ) +} + +function invalidateSessionMetaCache(): void { + sessionMetaCache.clear() + sessionMetaCacheTimestamp = 0 +} + const USER_INTERRUPTION_TEXTS = new Set([ '[Request interrupted by user]', '[Request interrupted by user for tool use]', @@ -1278,6 +1308,8 @@ export class SessionService { /** * List all sessions, optionally filtered by project path. + * Uses an in-memory metadata cache (30s TTL) to avoid re-reading JSONL files + * on every call. Cache is invalidated when sessions are created/deleted/renamed. */ async listSessions(options?: { project?: string @@ -1285,12 +1317,16 @@ export class SessionService { offset?: number }): Promise<{ sessions: SessionListItem[]; total: number }> { const sessionFiles = await this.discoverSessionFiles(options?.project) + + // Check if we can reuse the cache. The cache stores per-file metadata keyed + // by absolute file path. We rebuild it when the cache is empty, expired, or + // when a file's mtime has changed since the last cache fill. + const useCache = isSessionMetaCacheValid() + const filesWithStats = (await Promise.all(sessionFiles.map(async (sessionFile) => { try { - return { - ...sessionFile, - stat: await fs.stat(sessionFile.filePath), - } + const stat = await fs.stat(sessionFile.filePath) + return { ...sessionFile, stat } } catch { return null } @@ -1303,22 +1339,32 @@ export class SessionService { const limit = options?.limit ?? 50 const paginatedFiles = filesWithStats.slice(offset, offset + limit) - // Build session list items with metadata from file stats & first entries - const items = (await Promise.all(paginatedFiles.map(async ({ filePath, projectDir, sessionId, stat }) => { + // Build session list items, using cache where possible + const items: SessionListItem[] = [] + + for (const { filePath, projectDir, sessionId, stat } of paginatedFiles) { try { + const cached = sessionMetaCache.get(filePath) + const mtimeMs = stat.mtimeMs + + if (useCache && cached && cached.mtimeMs === mtimeMs) { + // Cache hit and file hasn't changed — use cached metadata + items.push(cached.meta) + continue + } + + // Cache miss — parse JSONL to extract metadata const entries = await this.readJsonlFile(filePath) const workDir = this.resolveWorkDirFromEntries(entries, projectDir) const projectRoot = await this.resolveProjectRootFromEntries(entries, workDir, projectDir) const workDirExists = await this.pathExists(workDir) - // Count transcript messages only (user + assistant) const messageCount = entries.filter( (e) => (e.type === 'user' || e.type === 'assistant') && e.message?.role ).length const title = this.extractTitle(entries) - // Find the earliest timestamp from entries, fallback to file birthtime let createdAt = stat.birthtime.toISOString() for (const e of entries) { if (e.timestamp) { @@ -1327,7 +1373,7 @@ export class SessionService { } } - return { + const meta: CachedSessionMeta = { id: sessionId, title, createdAt, @@ -1338,11 +1384,16 @@ export class SessionService { workDir, workDirExists, } + + // Update cache entry + sessionMetaCache.set(filePath, { meta, mtimeMs }) + sessionMetaCacheTimestamp = Date.now() + + items.push(meta) } catch { // Skip unreadable files - return null } - }))).filter((item): item is SessionListItem => item !== null) + } return { sessions: items, total } } @@ -1468,6 +1519,7 @@ export class SessionService { } await fs.writeFile(filePath, JSON.stringify(initialEntry) + '\n' + JSON.stringify(metaEntry) + '\n', 'utf-8') + invalidateSessionMetaCache() return { sessionId, workDir: absWorkDir } } @@ -1482,6 +1534,7 @@ export class SessionService { } await fs.unlink(found.filePath) + invalidateSessionMetaCache() } async deleteSessions(sessionIds: string[]): Promise { @@ -1514,6 +1567,7 @@ export class SessionService { } } + invalidateSessionMetaCache() return { successes, failures } } @@ -1537,6 +1591,7 @@ export class SessionService { } await this.appendJsonlEntry(found.filePath, entry) + invalidateSessionMetaCache() } /** @@ -1551,6 +1606,7 @@ export class SessionService { aiTitle: title, timestamp: new Date().toISOString(), }) + invalidateSessionMetaCache() } async getCustomTitle(sessionId: string): Promise { @@ -1627,6 +1683,7 @@ export class SessionService { const found = await this.findSessionFile(sessionId) if (!found) return await fs.unlink(found.filePath) + invalidateSessionMetaCache() } async clearSessionTranscript(sessionId: string, fallbackWorkDir?: string): Promise { @@ -1674,6 +1731,7 @@ export class SessionService { `${JSON.stringify(initialEntry)}\n${JSON.stringify(metaEntry)}\n`, 'utf-8', ) + invalidateSessionMetaCache() } async appendSessionMetadata( @@ -1718,6 +1776,7 @@ export class SessionService { timestamp: new Date().toISOString(), }) } + invalidateSessionMetaCache() } async deletePlaceholderSessionFiles( @@ -1749,6 +1808,7 @@ export class SessionService { await fs.rm(filePath, { force: true }) removed += 1 } + if (removed > 0) invalidateSessionMetaCache() return removed } @@ -1802,6 +1862,7 @@ export class SessionService { ? filteredEntries.map((entry) => JSON.stringify(entry)).join('\n') + '\n' : '' await fs.writeFile(found.filePath, content, 'utf-8') + invalidateSessionMetaCache() return { removedCount: removedMessageIds.length, diff --git a/src/server/ws/handler.ts b/src/server/ws/handler.ts index f77d36df8..f89dd63c6 100644 --- a/src/server/ws/handler.ts +++ b/src/server/ws/handler.ts @@ -80,6 +80,37 @@ const prewarmedSessions = new Set() const prewarmIdleTimers = new Map>() const DEFAULT_PREWARM_IDLE_TIMEOUT_MS = 5 * 60_000 +// --- Memory leak prevention: cap module-level Maps and periodically evict stale entries --- +const MAX_SESSION_RUNTIME_STATES = 500 +const STALE_SESSION_EVICT_INTERVAL_MS = 60_000 + +function evictStaleSessionRuntimeState() { + for (const sessionId of sessionStreamStates.keys()) { + // Only evict if ALL of these are true: + // 1. No active WebSocket connections + // 2. No active CLI subprocess + // 3. No pending cleanup timer (still in grace period after disconnect) + if ( + !activeSessions.has(sessionId) && + !conversationService.isSessionActive(sessionId) && + !sessionCleanupTimers.has(sessionId) + ) { + cleanupSessionRuntimeState(sessionId) + } + } + + // Hard cap: if still over limit, evict oldest entries (Map insertion order) + while (sessionStreamStates.size > MAX_SESSION_RUNTIME_STATES) { + const oldest = sessionStreamStates.keys().next().value + if (oldest === undefined) break + cleanupSessionRuntimeState(oldest) + } +} + +const staleEvictTimer = setInterval(evictStaleSessionRuntimeState, STALE_SESSION_EVICT_INTERVAL_MS) +// Allow the process to exit without waiting for the timer +if (staleEvictTimer.unref) staleEvictTimer.unref() + async function sendRepositoryStartupStatus( ws: ServerWebSocket, sessionId: string,