Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 137 additions & 1 deletion desktop/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ const SERVER_STARTUP_LOG_LIMIT: usize = 80;
const SERVER_BIND_HOST: &str = "0.0.0.0";
const SERVER_CONTROL_HOST: &str = "127.0.0.1";
const MAIN_WINDOW_LABEL: &str = "main";

// Sidecar health check constants
// Interval: 60s between checks. Timeout: 5s per TCP probe.
// Max failures: 5 consecutive → triggers restart (5 × 60s = 5 min of unresponsiveness).
const HEALTH_CHECK_INTERVAL_SECS: u64 = 60;
const HEALTH_CHECK_TIMEOUT_SECS: u64 = 5;
const HEALTH_CHECK_MAX_FAILURES: u32 = 5;
const TRAY_SHOW_ID: &str = "tray_show";
const TRAY_QUIT_ID: &str = "tray_quit";
const WINDOW_STATE_FILE: &str = "window-state.json";
Expand Down Expand Up @@ -1497,6 +1504,132 @@ fn wait_for_server(url_host: &str, port: u16) -> Result<(), String> {
))
}

/// Check if the server is responsive via TCP connect (same mechanism as wait_for_server).
fn check_server_health(url_host: &str, port: u16) -> bool {
let addr: SocketAddr = match format!("{url_host}:{port}").parse() {
Ok(a) => a,
Err(_) => return false,
};
TcpStream::connect_timeout(&addr, Duration::from_secs(HEALTH_CHECK_TIMEOUT_SECS)).is_ok()
}

/// Kill a process and all its child processes (entire process tree).
///
/// On Windows: uses `taskkill /T /F /PID <pid>` — /T kills the tree, /F forces.
/// On Unix: sends SIGKILL to the process (children will get SIGHUP when parent dies).
fn kill_process_tree(child: &CommandChild) {
let _ = child.kill();

#[cfg(target_os = "windows")]
{
// child.pid() returns the PID. taskkill /T kills the entire process tree.
let pid = child.pid();
if pid > 0 {
let _ = StdCommand::new("taskkill")
.args(["/T", "/F", "/PID", &pid.to_string()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
}

#[cfg(unix)]
{
// On Unix, sending SIGTERM/SIGKILL to the direct child is usually enough
// since Bun.spawn creates a process group. The kill() from tauri-plugin-shell
// already sent SIGKILL. If children linger, the OS will reap them on parent exit.
}
}

/// Spawn a background thread that periodically health-checks the server sidecar.
/// After `HEALTH_CHECK_MAX_FAILURES` consecutive failures, kill and respawn the sidecar.
///
/// **Limitations**: Restarting the sidecar will break active WebSocket connections
/// and orphan any running CLI subprocesses. The frontend will need to reconnect.
/// This is a last-resort recovery mechanism — the high failure threshold (5 min of
/// continuous unresponsiveness) ensures it only triggers when the sidecar is truly stuck.
fn spawn_sidecar_health_check(app: &AppHandle) {
let app_handle = app.clone();
thread::spawn(move || {
let mut consecutive_failures: u32 = 0;

loop {
thread::sleep(Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS));

// Read the current server URL from state
let url = {
let Some(state) = app_handle.try_state::<ServerState>() else {
continue;
};
let Ok(guard) = state.0.lock() else {
continue;
};
match &guard.runtime {
Some(rt) => rt.url.clone(),
None => continue, // No runtime — server might be starting up
}
};

// Parse host:port from the URL (format: "http://127.0.0.1:PORT")
let parsed = url
.strip_prefix("http://")
.or_else(|| url.strip_prefix("https://"))
.unwrap_or(&url);
let (host, port_str) = match parsed.rsplit_once(':') {
Some(pair) => pair,
None => continue,
};
let port: u16 = match port_str.parse() {
Ok(p) => p,
Err(_) => continue,
};

if check_server_health(host, port) {
consecutive_failures = 0;
continue;
}

consecutive_failures += 1;
eprintln!(
"[desktop] sidecar health check failed ({consecutive_failures}/{HEALTH_CHECK_MAX_FAILURES})"
);

if consecutive_failures < HEALTH_CHECK_MAX_FAILURES {
continue;
}

// Threshold reached — restart the sidecar
eprintln!("[desktop] sidecar unresponsive after {HEALTH_CHECK_MAX_FAILURES} consecutive failures, restarting...");
consecutive_failures = 0;

let Some(state) = app_handle.try_state::<ServerState>() else {
continue;
};
let Ok(mut guard) = state.0.lock() else {
continue;
};

// Kill the old sidecar and all its child CLI processes
if let Some(old_runtime) = guard.runtime.take() {
kill_process_tree(&old_runtime.child);
}

// Respawn
match start_server_sidecar(&app_handle) {
Ok(new_runtime) => {
guard.runtime = Some(new_runtime);
guard.startup_error = None;
eprintln!("[desktop] sidecar restarted successfully");
}
Err(err) => {
eprintln!("[desktop] failed to restart sidecar: {err}");
guard.startup_error = Some(err);
}
}
}
});
}

fn push_server_startup_log(logs: &Arc<Mutex<VecDeque<String>>>, line: String) {
let line = line.trim_end().to_string();
if line.is_empty() {
Expand Down Expand Up @@ -1670,7 +1803,7 @@ fn stop_server_sidecar(app: &AppHandle) {
};

if let Some(runtime) = guard.runtime.take() {
let _ = runtime.child.kill();
kill_process_tree(&runtime.child);
}
}

Expand Down Expand Up @@ -2268,6 +2401,9 @@ pub fn run() {
}
drop(guard);

// Start background health monitoring for the sidecar
spawn_sidecar_health_check(&app.handle());

// server 起来之后再起 adapter sidecar —— start_adapters_sidecar
// 内部会从 ServerState 读 server URL 注入 ADAPTER_SERVER_URL env,
// 让 adapter 连上动态端口。
Expand Down
19 changes: 19 additions & 0 deletions src/server/services/conversationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ export class ConversationStartupError extends Error {
export class ConversationService {
private sessions = new Map<string, SessionProcess>()
private deletedSessions = new Set<string>()
private deletedSessionTimestamps = new Map<string, number>()
private static readonly DELETED_SESSION_TTL_MS = 24 * 60 * 60 * 1000 // 24 hours
private providerService = new ProviderService()

private buildSessionCliArgs(
Expand Down Expand Up @@ -735,7 +737,9 @@ export class ConversationService {

markSessionDeleted(sessionId: string): void {
this.deletedSessions.add(sessionId)
this.deletedSessionTimestamps.set(sessionId, Date.now())
this.stopSession(sessionId)
this.pruneOldDeletedSessions()
}

markSessionsDeleted(sessionIds: string[]): void {
Expand All @@ -746,6 +750,7 @@ export class ConversationService {

unmarkSessionDeleted(sessionId: string): void {
this.deletedSessions.delete(sessionId)
this.deletedSessionTimestamps.delete(sessionId)
}

unmarkSessionsDeleted(sessionIds: string[]): void {
Expand All @@ -754,10 +759,24 @@ export class ConversationService {
}
}

private pruneOldDeletedSessions(): void {
const now = Date.now()
for (const [id, ts] of this.deletedSessionTimestamps) {
if (now - ts > ConversationService.DELETED_SESSION_TTL_MS) {
this.deletedSessions.delete(id)
this.deletedSessionTimestamps.delete(id)
}
}
}

getActiveSessions(): string[] {
return Array.from(this.sessions.keys())
}

isSessionActive(sessionId: string): boolean {
return this.sessions.has(sessionId)
}

private async readProcessOutputStream(
sessionId: string,
stream: ReadableStream | null | undefined,
Expand Down
Loading
Loading