diff --git a/Cargo.lock b/Cargo.lock index aa36f4a..5ff049a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1647,6 +1647,7 @@ dependencies = [ "irpc", "irpc-iroh", "n0-future 0.2.0", + "n0-watcher", "postcard", "rand 0.9.2", "rcan", diff --git a/Cargo.toml b/Cargo.toml index 9e394ba..2a037de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ bytes = { version = "1.10.1", features = ["serde"] } postcard = { version = "1.1.3", features = ["use-std"] } futures-buffered = "0.2.12" tokio-util = "0.7.16" +n0-watcher = "0.3.0" [dev-dependencies] tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread"] } diff --git a/src/simulation.rs b/src/simulation.rs index 5498e78..bfd4447 100644 --- a/src/simulation.rs +++ b/src/simulation.rs @@ -9,20 +9,23 @@ use std::{ use anyhow::{Context, Result}; use bytes::Bytes; -use iroh::{Endpoint, NodeAddr, NodeId, SecretKey}; +use iroh::{Endpoint, NodeAddr, NodeId, SecretKey, Watcher}; use iroh_metrics::encoding::Encoder; use iroh_n0des::{ Registry, simulation::proto::{ActiveTrace, NodeInfo, TraceClient, TraceInfo}, }; use n0_future::IterExt; +use n0_watcher::Watchable; use proto::{GetTraceResponse, NodeInfoWithAddr, Scope}; use serde::{Serialize, de::DeserializeOwned}; -use tokio::sync::Semaphore; +use tokio::{sync::Semaphore, time::MissedTickBehavior}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error_span, info, warn}; use uuid::Uuid; +use crate::simulation::proto::CheckpointId; + pub mod events; pub mod proto; pub mod trace; @@ -36,6 +39,8 @@ pub const ENV_TRACE_SERVER: &str = "N0DES_TRACE_SERVER"; /// Environment variable name for the simulation session ID. pub const ENV_TRACE_SESSION_ID: &str = "N0DES_SESSION_ID"; +const METRICS_INTERVAL: Duration = Duration::from_secs(1); + type BoxedSetupFn = Box BoxFuture<'static, Result>>; type BoxedSpawnFn = Arc< @@ -50,6 +55,7 @@ type BoxedRoundFn = Arc< + Sync + for<'a> Fn(&'a mut BoxNode, &'a RoundContext<'a, D>) -> BoxFuture<'a, Result>, >; +type RoundLabelFn = Arc Fn(&'a D, u32) -> Option>; type BoxedCheckFn = Arc) -> Result<()>>; @@ -221,6 +227,10 @@ pub trait Spawn: Node + 'static { where Self: Sized; + fn round_label(_setup_data: &D, _round: u32) -> Option { + None + } + /// Spawns a new instance as a dynamically-typed node. /// /// This calls `spawn` and boxes the result. @@ -308,6 +318,10 @@ pub trait DynNode: Send + Any + 'static { None } + fn round_label(&self, _round: u32) -> Option { + None + } + /// Returns a reference to this node as `Any` for downcasting. fn as_any(&self) -> &dyn Any; @@ -353,6 +367,7 @@ pub struct NodeBuilder { phantom: PhantomData, spawn_fn: BoxedSpawnFn, round_fn: BoxedRoundFn, + round_label_fn: RoundLabelFn, check_fn: Option>, } @@ -360,6 +375,7 @@ pub struct NodeBuilder { struct ErasedNodeBuilder { spawn_fn: BoxedSpawnFn, round_fn: BoxedRoundFn, + round_label_fn: RoundLabelFn, check_fn: Option>, } @@ -381,6 +397,7 @@ impl, D: SetupData> NodeBuilder { round_fn: impl for<'a> AsyncCallback<'a, N, RoundContext<'a, D>, Result>, ) -> Self { let spawn_fn: BoxedSpawnFn = Arc::new(N::spawn_dyn); + let round_label_fn: RoundLabelFn = Arc::new(N::round_label); let round_fn: BoxedRoundFn = Arc::new(move |node, context| { let node = node .as_any_mut() @@ -393,6 +410,7 @@ impl, D: SetupData> NodeBuilder { spawn_fn, round_fn, check_fn: None, + round_label_fn, } } @@ -424,6 +442,7 @@ impl, D: SetupData> NodeBuilder { spawn_fn: self.spawn_fn, round_fn: self.round_fn, check_fn: self.check_fn, + round_label_fn: self.round_label_fn, } } } @@ -434,9 +453,11 @@ struct SimNode { idx: u32, round_fn: BoxedRoundFn, check_fn: Option>, + round_label_fn: RoundLabelFn, round: u32, info: NodeInfo, - metrics_encoder: Encoder, + metrics: Arc>, + checkpoint_watcher: n0_watcher::Watchable, all_nodes: Vec, } @@ -477,7 +498,9 @@ impl SimNode { round: 0, round_fn: builder.round_fn, check_fn: builder.check_fn, - metrics_encoder: Encoder::new(Arc::new(RwLock::new(registry))), + round_label_fn: builder.round_label_fn, + checkpoint_watcher: Watchable::new(0), + metrics: Arc::new(RwLock::new(registry)), all_nodes: Default::default(), }; @@ -496,6 +519,38 @@ impl SimNode { info!(idx = self.idx, "start"); + // Spawn a task to periodically put metrics. + if let Some(node_id) = self.node_id() { + let client = client.clone(); + let mut watcher = self.checkpoint_watcher.watch(); + let mut metrics_encoder = Encoder::new(self.metrics.clone()); + tokio::task::spawn( + async move { + let mut interval = tokio::time::interval(METRICS_INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + let checkpoint = tokio::select! { + _ = interval.tick() => None, + checkpoint = watcher.updated() => { + match checkpoint { + Err(_) => break, + Ok(checkpoint) => Some(checkpoint) + } + } + }; + if let Err(err) = client + .put_metrics(node_id, checkpoint, metrics_encoder.export()) + .await + { + warn!(?err, "failed to put metrics, stop metrics task"); + break; + } + } + } + .instrument(error_span!("metrics")), + ); + } + let info = NodeInfoWithAddr { addr: self.my_addr().await, info: self.info.clone(), @@ -535,7 +590,6 @@ impl SimNode { #[tracing::instrument(name="round", skip_all, fields(round=self.round))] async fn run_round(&mut self, client: &ActiveTrace, setup_data: &D) -> Result { - info!("start round"); let context = RoundContext { round: self.round, node_index: self.idx, @@ -543,24 +597,28 @@ impl SimNode { all_nodes: &self.all_nodes, }; + let label = (self.round_label_fn)(setup_data, self.round) + .unwrap_or_else(|| format!("Round {}", self.round)); + + info!(%label, "start round"); + let result = (self.round_fn)(&mut self.node, &context) .await .context("round function failed"); + info!(%label, "end round"); + let checkpoint = (context.round + 1) as u64; - let label = format!("Round {} end", context.round); + self.checkpoint_watcher.set(checkpoint).ok(); client .put_checkpoint(checkpoint, Some(label), to_str_err(&result)) - .await?; - - // TODO(Frando): Couple metrics to node idx, not node id. - if let Some(node_id) = self.node_id() { - client - .put_metrics(node_id, Some(checkpoint), self.metrics_encoder.export()) - .await?; - } + .await + .context("put checkpoint")?; - client.wait_checkpoint(checkpoint).await?; + client + .wait_checkpoint(checkpoint) + .await + .context("wait checkpoint")?; match result { Ok(out) => {