Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ bytes = { version = "1.10.1", features = ["serde"] }
postcard = { version = "1.1.3", features = ["use-std"] }
futures-buffered = "0.2.12"
tokio-util = "0.7.16"
n0-watcher = "0.3.0"

[dev-dependencies]
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread"] }
Expand Down
88 changes: 73 additions & 15 deletions src/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,23 @@ use std::{

use anyhow::{Context, Result};
use bytes::Bytes;
use iroh::{Endpoint, NodeAddr, NodeId, SecretKey};
use iroh::{Endpoint, NodeAddr, NodeId, SecretKey, Watcher};
use iroh_metrics::encoding::Encoder;
use iroh_n0des::{
Registry,
simulation::proto::{ActiveTrace, NodeInfo, TraceClient, TraceInfo},
};
use n0_future::IterExt;
use n0_watcher::Watchable;
use proto::{GetTraceResponse, NodeInfoWithAddr, Scope};
use serde::{Serialize, de::DeserializeOwned};
use tokio::sync::Semaphore;
use tokio::{sync::Semaphore, time::MissedTickBehavior};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error_span, info, warn};
use uuid::Uuid;

use crate::simulation::proto::CheckpointId;

pub mod events;
pub mod proto;
pub mod trace;
Expand All @@ -36,6 +39,8 @@ pub const ENV_TRACE_SERVER: &str = "N0DES_TRACE_SERVER";
/// Environment variable name for the simulation session ID.
pub const ENV_TRACE_SESSION_ID: &str = "N0DES_SESSION_ID";

const METRICS_INTERVAL: Duration = Duration::from_secs(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is way to aggressive, pls set it to 10 or 15s which is the default scrape interval used elsewhere.


type BoxedSetupFn<D> = Box<dyn 'static + Send + Sync + FnOnce() -> BoxFuture<'static, Result<D>>>;

type BoxedSpawnFn<D> = Arc<
Expand All @@ -50,6 +55,7 @@ type BoxedRoundFn<D> = Arc<
+ Sync
+ for<'a> Fn(&'a mut BoxNode, &'a RoundContext<'a, D>) -> BoxFuture<'a, Result<bool>>,
>;
type RoundLabelFn<D> = Arc<dyn 'static + Send + Sync + for<'a> Fn(&'a D, u32) -> Option<String>>;

type BoxedCheckFn<D> = Arc<dyn Fn(&BoxNode, &RoundContext<'_, D>) -> Result<()>>;

Expand Down Expand Up @@ -221,6 +227,10 @@ pub trait Spawn<D: SetupData = ()>: Node + 'static {
where
Self: Sized;

fn round_label(_setup_data: &D, _round: u32) -> Option<String> {
None
}

/// Spawns a new instance as a dynamically-typed node.
///
/// This calls `spawn` and boxes the result.
Expand Down Expand Up @@ -308,6 +318,10 @@ pub trait DynNode: Send + Any + 'static {
None
}

fn round_label(&self, _round: u32) -> Option<String> {
None
}

/// Returns a reference to this node as `Any` for downcasting.
fn as_any(&self) -> &dyn Any;

Expand Down Expand Up @@ -353,13 +367,15 @@ pub struct NodeBuilder<N, D> {
phantom: PhantomData<N>,
spawn_fn: BoxedSpawnFn<D>,
round_fn: BoxedRoundFn<D>,
round_label_fn: RoundLabelFn<D>,
check_fn: Option<BoxedCheckFn<D>>,
}

#[derive(Clone)]
struct ErasedNodeBuilder<D> {
spawn_fn: BoxedSpawnFn<D>,
round_fn: BoxedRoundFn<D>,
round_label_fn: RoundLabelFn<D>,
check_fn: Option<BoxedCheckFn<D>>,
}

Expand All @@ -381,6 +397,7 @@ impl<N: Spawn<D>, D: SetupData> NodeBuilder<N, D> {
round_fn: impl for<'a> AsyncCallback<'a, N, RoundContext<'a, D>, Result<bool>>,
) -> Self {
let spawn_fn: BoxedSpawnFn<D> = Arc::new(N::spawn_dyn);
let round_label_fn: RoundLabelFn<D> = Arc::new(N::round_label);
let round_fn: BoxedRoundFn<D> = Arc::new(move |node, context| {
let node = node
.as_any_mut()
Expand All @@ -393,6 +410,7 @@ impl<N: Spawn<D>, D: SetupData> NodeBuilder<N, D> {
spawn_fn,
round_fn,
check_fn: None,
round_label_fn,
}
}

Expand Down Expand Up @@ -424,6 +442,7 @@ impl<N: Spawn<D>, D: SetupData> NodeBuilder<N, D> {
spawn_fn: self.spawn_fn,
round_fn: self.round_fn,
check_fn: self.check_fn,
round_label_fn: self.round_label_fn,
}
}
}
Expand All @@ -434,9 +453,11 @@ struct SimNode<D> {
idx: u32,
round_fn: BoxedRoundFn<D>,
check_fn: Option<BoxedCheckFn<D>>,
round_label_fn: RoundLabelFn<D>,
round: u32,
info: NodeInfo,
metrics_encoder: Encoder,
metrics: Arc<RwLock<Registry>>,
checkpoint_watcher: n0_watcher::Watchable<CheckpointId>,
all_nodes: Vec<NodeInfoWithAddr>,
}

Expand Down Expand Up @@ -477,7 +498,9 @@ impl<D: SetupData> SimNode<D> {
round: 0,
round_fn: builder.round_fn,
check_fn: builder.check_fn,
metrics_encoder: Encoder::new(Arc::new(RwLock::new(registry))),
round_label_fn: builder.round_label_fn,
checkpoint_watcher: Watchable::new(0),
metrics: Arc::new(RwLock::new(registry)),
all_nodes: Default::default(),
};

Expand All @@ -496,6 +519,38 @@ impl<D: SetupData> SimNode<D> {

info!(idx = self.idx, "start");

// Spawn a task to periodically put metrics.
if let Some(node_id) = self.node_id() {
let client = client.clone();
let mut watcher = self.checkpoint_watcher.watch();
let mut metrics_encoder = Encoder::new(self.metrics.clone());
tokio::task::spawn(
async move {
let mut interval = tokio::time::interval(METRICS_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
let checkpoint = tokio::select! {
_ = interval.tick() => None,
checkpoint = watcher.updated() => {
match checkpoint {
Err(_) => break,
Ok(checkpoint) => Some(checkpoint)
}
}
};
if let Err(err) = client
.put_metrics(node_id, checkpoint, metrics_encoder.export())
.await
{
warn!(?err, "failed to put metrics, stop metrics task");
break;
}
}
}
.instrument(error_span!("metrics")),
);
}

let info = NodeInfoWithAddr {
addr: self.my_addr().await,
info: self.info.clone(),
Expand Down Expand Up @@ -535,32 +590,35 @@ impl<D: SetupData> SimNode<D> {

#[tracing::instrument(name="round", skip_all, fields(round=self.round))]
async fn run_round(&mut self, client: &ActiveTrace, setup_data: &D) -> Result<bool> {
info!("start round");
let context = RoundContext {
round: self.round,
node_index: self.idx,
setup_data,
all_nodes: &self.all_nodes,
};

let label = (self.round_label_fn)(setup_data, self.round)
.unwrap_or_else(|| format!("Round {}", self.round));

info!(%label, "start round");

let result = (self.round_fn)(&mut self.node, &context)
.await
.context("round function failed");

info!(%label, "end round");

let checkpoint = (context.round + 1) as u64;
let label = format!("Round {} end", context.round);
self.checkpoint_watcher.set(checkpoint).ok();
client
.put_checkpoint(checkpoint, Some(label), to_str_err(&result))
.await?;

// TODO(Frando): Couple metrics to node idx, not node id.
if let Some(node_id) = self.node_id() {
client
.put_metrics(node_id, Some(checkpoint), self.metrics_encoder.export())
.await?;
}
.await
.context("put checkpoint")?;

client.wait_checkpoint(checkpoint).await?;
client
.wait_checkpoint(checkpoint)
.await
.context("wait checkpoint")?;

match result {
Ok(out) => {
Expand Down
Loading