From b127bd3cd670bdb3ff4b5609efc678321fde14b8 Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Tue, 10 Dec 2024 00:43:26 +0000 Subject: [PATCH] wip --- src/supervisor.rs | 23 +++++++++++++++-------- src/watch_files.rs | 12 +++++++----- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/supervisor.rs b/src/supervisor.rs index 14f7588..34d6a4e 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -4,7 +4,6 @@ use crate::state_file::{DaemonStatus, StateFile, StateFileDaemon}; use crate::watch_files::WatchFiles; use crate::{env, Result}; use duct::cmd; -use miette::IntoDiagnostic; use notify::RecursiveMode; use std::collections::HashMap; use std::fs; @@ -38,6 +37,7 @@ enum Event { Signal, Interval, DaemonStart(StateFileDaemon), + DaemonStop(StateFileDaemon), DaemonFailed { name: String, error: String }, } @@ -107,6 +107,12 @@ impl Supervisor { self.close(); exit(0) } + Event::DaemonStop(daemon) => { + self.active_pids.remove(&daemon.pid); + self.state_file.daemons.remove(&daemon.name); + self.state_file.write()?; + Ok(()) + } _ => Ok(()), } } @@ -118,9 +124,10 @@ impl Supervisor { } async fn handle_ipc(&mut self, msg: IpcMessage, send: Sender) -> Result<()> { + let tx = self.event_tx.clone(); let mut event_rx = self.event_tx.subscribe(); match msg { - IpcMessage::Run(name, mut cmd) => { + IpcMessage::Run(name, cmd) => { info!("received run message: {name:?} cmd: {cmd:?}"); task::spawn({ let name = name.clone(); @@ -157,7 +164,7 @@ impl Supervisor { let log_path = env::PITCHFORK_LOGS_DIR .join(&name) .join(format!("{name}.log")); - xx::file::mkdirp(&log_path.parent().unwrap())?; + xx::file::mkdirp(log_path.parent().unwrap())?; debug!("starting daemon: {name} with args: {args:?}"); match tokio::process::Command::new("sh") .args(&args) @@ -168,6 +175,7 @@ impl Supervisor { { Ok(mut child) => { let pid = child.id().unwrap(); + self.active_pids.insert(pid, name.clone()); info!("started daemon {name} with pid {pid}"); let daemon = StateFileDaemon { name: name.clone(), @@ -176,7 +184,7 @@ impl Supervisor { }; self.state_file.daemons.insert(name.clone(), daemon.clone()); self.state_file.write()?; - self.event_tx.send(Event::DaemonStart(daemon)).unwrap(); + tx.send(Event::DaemonStart(daemon.clone())).unwrap(); tokio::spawn(async move { let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); @@ -188,10 +196,6 @@ impl Supervisor { .open(&log_path) .await .unwrap(); - tokio::spawn(async move { - let status = child.wait().await.unwrap(); - info!("daemon {name} exited with status {status}"); - }); loop { select! { Ok(Some(line)) = stdout.next_line() => { @@ -205,6 +209,9 @@ impl Supervisor { else => break, } } + let status = child.wait().await.unwrap(); + info!("daemon {name} exited with status {status}"); + tx.send(Event::DaemonStop(daemon)).unwrap(); }); } Err(err) => { diff --git a/src/watch_files.rs b/src/watch_files.rs index ebd90ff..2c70b9f 100644 --- a/src/watch_files.rs +++ b/src/watch_files.rs @@ -24,11 +24,13 @@ impl WatchFiles { if let Ok(ev) = res { let paths = ev .into_iter() - .filter(|e| match e.kind { - EventKind::Modify(_) - | EventKind::Create(_) - | EventKind::Remove(_) => true, - _ => false, + .filter(|e| { + matches!( + e.kind, + EventKind::Modify(_) + | EventKind::Create(_) + | EventKind::Remove(_) + ) }) .flat_map(|e| e.paths.clone()) .unique()