diff --git a/Cargo.toml b/Cargo.toml index 0bff0b5..b5ef9f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,9 @@ members = [ "sim", "sim_derive", "simx", -] \ No newline at end of file +] + + +[profile.release] +# Tell `rustc` to optimize for small code size. +opt-level = "s" diff --git a/sim/Cargo.toml b/sim/Cargo.toml index b5e23fe..10d9f70 100644 --- a/sim/Cargo.toml +++ b/sim/Cargo.toml @@ -48,7 +48,3 @@ wee_alloc = { version = "0.4", optional = true } [dev-dependencies] wasm-bindgen-test = "0.3" - -[profile.release] -# Tell `rustc` to optimize for small code size. -opt-level = "s" diff --git a/sim/src/models/batcher.rs b/sim/src/models/batcher.rs index a62cada..ddf1781 100644 --- a/sim/src/models/batcher.rs +++ b/sim/src/models/batcher.rs @@ -9,6 +9,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The batching process begins when the batcher receives a job. It will /// then accept additional jobs, adding them to a batch with the first job, @@ -24,7 +25,7 @@ use simx::event_rules; pub struct Batcher { ports_in: PortsIn, ports_out: PortsOut, - max_batch_time: f64, + max_batch_time: SDuration, max_batch_size: usize, #[serde(default)] store_records: bool, @@ -48,7 +49,7 @@ struct PortsOut { #[serde(rename_all = "camelCase")] struct State { phase: Phase, - until_next_event: f64, + until_next_event: SDuration, jobs: Vec, records: Vec, } @@ -57,7 +58,7 @@ impl Default for State { fn default() -> Self { State { phase: Phase::Passive, - until_next_event: f64::INFINITY, + until_next_event: SDuration::INFINITY, jobs: Vec::new(), records: Vec::new(), } @@ -76,7 +77,7 @@ impl Batcher { pub fn new( job_in_port: String, job_out_port: String, - max_batch_time: f64, + max_batch_time: SDuration, max_batch_size: usize, store_records: bool, ) -> Self { @@ -90,7 +91,7 @@ impl Batcher { } } - fn add_to_batch(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn add_to_batch(&mut self, incoming_message: &ModelMessage, services: &Services) { self.state.phase = Phase::Batching; self.state.jobs.push(incoming_message.content.clone()); self.record( @@ -100,7 +101,7 @@ impl Batcher { ); } - fn start_batch(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn start_batch(&mut self, incoming_message: &ModelMessage, services: & Services) { self.state.phase = Phase::Batching; self.state.until_next_event = self.max_batch_time; self.state.jobs.push(incoming_message.content.clone()); @@ -111,9 +112,9 @@ impl Batcher { ); } - fn fill_batch(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn fill_batch(&mut self, incoming_message: &ModelMessage, services: & Services) { self.state.phase = Phase::Release; - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; self.state.jobs.push(incoming_message.content.clone()); self.record( services.global_time(), @@ -122,9 +123,9 @@ impl Batcher { ); } - fn release_full_queue(&mut self, services: &mut Services) -> Vec { + fn release_full_queue(&mut self, services: &Services) -> Vec { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; (0..self.state.jobs.len()) .map(|_| { self.record( @@ -140,7 +141,7 @@ impl Batcher { .collect() } - fn release_partial_queue(&mut self, services: &mut Services) -> Vec { + fn release_partial_queue(&mut self, services: &Services) -> Vec { self.state.phase = Phase::Batching; self.state.until_next_event = self.max_batch_time; (0..self.max_batch_size) @@ -158,9 +159,9 @@ impl Batcher { .collect() } - fn release_multiple(&mut self, services: &mut Services) -> Vec { + fn release_multiple(&mut self, services: &Services) -> Vec { self.state.phase = Phase::Release; - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; (0..self.max_batch_size) .map(|_| { self.record( @@ -176,7 +177,7 @@ impl Batcher { .collect() } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -187,12 +188,13 @@ impl Batcher { } } + #[cfg_attr(feature = "simx", event_rules)] impl DevsModel for Batcher { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { match ( &self.state.phase, @@ -207,7 +209,7 @@ impl DevsModel for Batcher { fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { match ( self.state.jobs.len() <= self.max_batch_size, @@ -220,11 +222,11 @@ impl DevsModel for Batcher { } } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } diff --git a/sim/src/models/coupled.rs b/sim/src/models/coupled.rs index ca50230..5a9e3c1 100644 --- a/sim/src/models/coupled.rs +++ b/sim/src/models/coupled.rs @@ -4,12 +4,14 @@ use super::model_trait::{DevsModel, Reportable, ReportableModel, SerializableMod use super::{Model, ModelMessage, ModelRecord}; use crate::simulator::Services; + use crate::utils::errors::SimulationError; use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::SDuration; #[derive(Clone, Deserialize, Serialize, SerializableModel)] #[serde(rename_all = "camelCase")] @@ -168,7 +170,7 @@ impl Coupled { fn distribute_events_ext( &mut self, parked_messages: &[ParkedMessage], - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { parked_messages.iter().try_for_each(|parked_message| { self.components @@ -187,7 +189,7 @@ impl Coupled { fn distribute_events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { // Find the (internal message) events_ext relevant models (parked message id == component id) let ext_transitioning_component_triggers: Vec<(usize, String, String)> = (0..self @@ -230,7 +232,7 @@ impl Coupled { // Run events_int for each model, and compile the internal and external messages // Store the internal messages in the Coupled model struct, and output the external messages let int_transitioning_component_indexes: Vec = (0..self.components.len()) - .filter(|component_index| self.components[*component_index].until_next_event() == 0.0) + .filter(|component_index| self.components[*component_index].until_next_event() == SDuration::NOW) .collect(); Ok(int_transitioning_component_indexes .iter() @@ -280,7 +282,7 @@ impl DevsModel for Coupled { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { match self.park_incoming_messages(incoming_message) { None => Ok(()), @@ -290,22 +292,22 @@ impl DevsModel for Coupled { fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { self.distribute_events_int(services) } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.components.iter_mut().for_each(|component| { component.time_advance(time_delta); }); } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.components .iter() - .fold(f64::INFINITY, |min, component| { - f64::min(min, component.until_next_event()) + .fold(SDuration::INFINITY, |min, component| { + SDuration::min(min, component.until_next_event()) }) } } diff --git a/sim/src/models/exclusive_gateway.rs b/sim/src/models/exclusive_gateway.rs index 436ed69..978b022 100644 --- a/sim/src/models/exclusive_gateway.rs +++ b/sim/src/models/exclusive_gateway.rs @@ -11,6 +11,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The exclusive gateway splits a process flow into a set of possible paths. /// The process will only follow one of the possible paths. Path selection is @@ -47,7 +48,7 @@ struct PortsOut { #[serde(rename_all = "camelCase")] struct State { phase: Phase, - until_next_event: f64, + until_next_event: SDuration, jobs: Vec, // port, message, time records: Vec, // port, message, time } @@ -56,7 +57,7 @@ impl Default for State { fn default() -> Self { State { phase: Phase::Passive, - until_next_event: f64::INFINITY, + until_next_event: SDuration::INFINITY, jobs: Vec::new(), records: Vec::new(), } @@ -92,9 +93,9 @@ impl ExclusiveGateway { } } - fn pass_job(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn pass_job(&mut self, incoming_message: &ModelMessage, services: &Services) { self.state.phase = Phase::Pass; - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; self.state.jobs.push(incoming_message.content.clone()); self.record( services.global_time(), @@ -107,9 +108,9 @@ impl ExclusiveGateway { ); } - fn send_jobs(&mut self, services: &mut Services) -> Result, SimulationError> { + fn send_jobs(&mut self, services: &Services) -> Result, SimulationError> { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; let departure_port_index = match &self.rng { Some(rng) => self.port_weights.random_variate(rng.clone())?, None => self.port_weights.random_variate(services.global_rng())?, @@ -135,11 +136,11 @@ impl ExclusiveGateway { fn passivate(&mut self) -> Vec { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; Vec::new() } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -155,14 +156,14 @@ impl DevsModel for ExclusiveGateway { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { Ok(self.pass_job(incoming_message, services)) } fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { match &self.state.phase { Phase::Passive => Ok(self.passivate()), @@ -170,11 +171,11 @@ impl DevsModel for ExclusiveGateway { } } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } diff --git a/sim/src/models/gate.rs b/sim/src/models/gate.rs index 13eb9ca..7fb662f 100644 --- a/sim/src/models/gate.rs +++ b/sim/src/models/gate.rs @@ -9,6 +9,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The gate model passes or blocks jobs, when it is in the open or closed /// state, respectively. The gate can be opened and closed throughout the @@ -51,7 +52,7 @@ struct PortsOut { #[serde(rename_all = "camelCase")] struct State { phase: Phase, - until_next_event: f64, + until_next_event: SDuration, jobs: Vec, records: Vec, } @@ -60,7 +61,7 @@ impl Default for State { fn default() -> Self { Self { phase: Phase::Open, - until_next_event: f64::INFINITY, + until_next_event: SDuration::INFINITY, jobs: Vec::new(), records: Vec::new(), } @@ -107,9 +108,9 @@ impl Gate { } } - fn activate(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn activate(&mut self, incoming_message: &ModelMessage, services: &Services) { self.state.phase = Phase::Open; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; self.record( services.global_time(), String::from("Activation"), @@ -117,9 +118,9 @@ impl Gate { ); } - fn deactivate(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn deactivate(&mut self, incoming_message: &ModelMessage, services: &Services) { self.state.phase = Phase::Closed; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; self.record( services.global_time(), String::from("Deactivation"), @@ -127,9 +128,9 @@ impl Gate { ); } - fn pass_job(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn pass_job(&mut self, incoming_message: &ModelMessage, services: &Services) { self.state.phase = Phase::Pass; - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; self.state.jobs.push(incoming_message.content.clone()); self.record( services.global_time(), @@ -138,7 +139,7 @@ impl Gate { ); } - fn drop_job(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn drop_job(&mut self, incoming_message: &ModelMessage, services: &Services) { self.record( services.global_time(), String::from("Arrival"), @@ -146,9 +147,9 @@ impl Gate { ); } - fn send_jobs(&mut self, services: &mut Services) -> Vec { + fn send_jobs(&mut self, services: &Services) -> Vec { self.state.phase = Phase::Open; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; (0..self.state.jobs.len()) .map(|_| { self.record( @@ -164,7 +165,7 @@ impl Gate { .collect() } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -180,7 +181,7 @@ impl DevsModel for Gate { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { match ( self.arrival_port(&incoming_message.port_name), @@ -196,16 +197,16 @@ impl DevsModel for Gate { fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { Ok(self.send_jobs(services)) } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } diff --git a/sim/src/models/generator.rs b/sim/src/models/generator.rs index 9c63f92..a43ff89 100644 --- a/sim/src/models/generator.rs +++ b/sim/src/models/generator.rs @@ -12,6 +12,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The generator produces jobs based on a configured interarrival /// distribution. A normalized thinning function is used to enable @@ -51,8 +52,7 @@ struct PortsOut { #[serde(rename_all = "camelCase")] struct State { phase: Phase, - until_next_event: f64, - until_job: f64, + until_next_event: SDuration, last_job: usize, records: Vec, } @@ -61,8 +61,7 @@ impl Default for State { fn default() -> Self { Self { phase: Phase::Initializing, - until_next_event: 0.0, - until_job: 0.0, + until_next_event: SDuration::NOW, last_job: 0, records: Vec::new(), } @@ -97,7 +96,7 @@ impl Generator { fn release_job( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { let interdeparture = match &self.rng { Some(rng) => self @@ -108,8 +107,7 @@ impl Generator { .random_variate(services.global_rng())?, }; self.state.phase = Phase::Generating; - self.state.until_next_event = interdeparture; - self.state.until_job = interdeparture; + self.state.until_next_event = SDuration::new(interdeparture); self.state.last_job += 1; self.record( services.global_time(), @@ -124,7 +122,7 @@ impl Generator { fn initialize_generation( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { let interdeparture = match &self.rng { Some(rng) => self @@ -135,8 +133,7 @@ impl Generator { .random_variate(services.global_rng())?, }; self.state.phase = Phase::Generating; - self.state.until_next_event = interdeparture; - self.state.until_job = interdeparture; + self.state.until_next_event = SDuration::new(interdeparture); self.record( services.global_time(), String::from("Initialization"), @@ -145,7 +142,7 @@ impl Generator { Ok(Vec::new()) } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -161,14 +158,14 @@ impl DevsModel for Generator { fn events_ext( &mut self, _incoming_message: &ModelMessage, - _services: &mut Services, + _services: &Services, ) -> Result<(), SimulationError> { Ok(()) } fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { match &self.state.phase { Phase::Generating => self.release_job(services), @@ -176,11 +173,11 @@ impl DevsModel for Generator { } } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } diff --git a/sim/src/models/load_balancer.rs b/sim/src/models/load_balancer.rs index 6fc6c86..03e3f38 100644 --- a/sim/src/models/load_balancer.rs +++ b/sim/src/models/load_balancer.rs @@ -9,6 +9,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The load balancer routes jobs to a set of possible process paths, using a /// round robin strategy. There is no stochastic behavior in this model. @@ -38,7 +39,7 @@ struct PortsOut { #[serde(rename_all = "camelCase")] struct State { phase: Phase, - until_next_event: f64, + until_next_event: SDuration, next_port_out: usize, jobs: Vec, records: Vec, @@ -48,7 +49,7 @@ impl Default for State { fn default() -> Self { Self { phase: Phase::Passive, - until_next_event: f64::INFINITY, + until_next_event: SDuration::INFINITY, next_port_out: 0, jobs: Vec::new(), records: Vec::new(), @@ -75,9 +76,9 @@ impl LoadBalancer { } } - fn pass_job(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn pass_job(&mut self, incoming_message: &ModelMessage, services: &Services) { self.state.phase = Phase::LoadBalancing; - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; self.state.jobs.push(incoming_message.content.clone()); self.record( services.global_time(), @@ -88,12 +89,12 @@ impl LoadBalancer { fn passivate(&mut self) -> Vec { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; Vec::new() } - fn send_job(&mut self, services: &mut Services) -> Vec { - self.state.until_next_event = 0.0; + fn send_job(&mut self, services: &Services) -> Vec { + self.state.until_next_event = SDuration::NOW; self.state.next_port_out = (self.state.next_port_out + 1) % self.ports_out.flow_paths.len(); self.record( services.global_time(), @@ -110,7 +111,7 @@ impl LoadBalancer { }] } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -126,14 +127,14 @@ impl DevsModel for LoadBalancer { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { Ok(self.pass_job(incoming_message, services)) } fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { match self.state.jobs.len() { 0 => Ok(self.passivate()), @@ -141,11 +142,11 @@ impl DevsModel for LoadBalancer { } } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } diff --git a/sim/src/models/mod.rs b/sim/src/models/mod.rs index 8fa245b..50214ae 100644 --- a/sim/src/models/mod.rs +++ b/sim/src/models/mod.rs @@ -4,6 +4,7 @@ //! `Model` trait. use serde::{Deserialize, Serialize}; +use crate::simulator::time::STime; pub mod batcher; pub mod coupled; @@ -44,9 +45,18 @@ pub struct ModelMessage { pub content: String, } +impl ModelMessage { + pub fn new(port: &str, content: &str) -> Self { + ModelMessage { + port_name: port.to_string(), + content: content.to_string(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ModelRecord { - pub time: f64, + pub time: STime, pub action: String, pub subject: String, } diff --git a/sim/src/models/model.rs b/sim/src/models/model.rs index 49849e5..112736e 100644 --- a/sim/src/models/model.rs +++ b/sim/src/models/model.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use super::model_trait::{DevsModel, Reportable, ReportableModel, SerializableModel}; use super::{ModelMessage, ModelRecord}; use crate::simulator::Services; +use crate::simulator::time::SDuration; use crate::utils::errors::SimulationError; /// `Model` wraps `model_type` and provides common ID functionality (a struct @@ -55,23 +56,23 @@ impl DevsModel for Model { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { self.inner.events_ext(incoming_message, services) } fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { self.inner.events_int(services) } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.inner.time_advance(time_delta); } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.inner.until_next_event() } diff --git a/sim/src/models/model_trait.rs b/sim/src/models/model_trait.rs index 42b80ee..2d98289 100644 --- a/sim/src/models/model_trait.rs +++ b/sim/src/models/model_trait.rs @@ -1,5 +1,6 @@ use super::{ModelMessage, ModelRecord}; use crate::simulator::Services; +use crate::simulator::time::SDuration; use crate::utils::errors::SimulationError; pub trait ModelClone { @@ -38,12 +39,12 @@ pub trait DevsModel: ModelClone + SerializableModel { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError>; - fn events_int(&mut self, services: &mut Services) + fn events_int(&mut self, services: &Services) -> Result, SimulationError>; - fn time_advance(&mut self, time_delta: f64); - fn until_next_event(&self) -> f64; + fn time_advance(&mut self, time_delta: SDuration); + fn until_next_event(&self) -> SDuration; #[cfg(feature = "simx")] fn event_rules_scheduling(&self) -> &str; #[cfg(feature = "simx")] diff --git a/sim/src/models/parallel_gateway.rs b/sim/src/models/parallel_gateway.rs index dca0436..61626a0 100644 --- a/sim/src/models/parallel_gateway.rs +++ b/sim/src/models/parallel_gateway.rs @@ -11,6 +11,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The parallel gateway splits a job across multiple processing paths. The /// job is duplicated across every one of the processing paths. In addition @@ -48,7 +49,7 @@ struct PortsOut { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct State { - until_next_event: f64, + until_next_event: SDuration, collections: HashMap, records: Vec, } @@ -56,7 +57,7 @@ struct State { impl Default for State { fn default() -> Self { Self { - until_next_event: f64::INFINITY, + until_next_event: SDuration::INFINITY, collections: HashMap::new(), records: Vec::new(), } @@ -97,7 +98,7 @@ impl ParallelGateway { .find(|(_, count)| **count == self.ports_in.flow_paths.len()) } - fn increment_collection(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn increment_collection(&mut self, incoming_message: &ModelMessage, services: &Services) { *self .state .collections @@ -112,11 +113,11 @@ impl ParallelGateway { incoming_message.port_name.clone() ], ); - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; } - fn send_job(&mut self, services: &mut Services) -> Result, SimulationError> { - self.state.until_next_event = 0.0; + fn send_job(&mut self, services: &Services) -> Result, SimulationError> { + self.state.until_next_event = SDuration::NOW; let completed_collection = self .full_collection() .ok_or(SimulationError::InvalidModelState)? @@ -143,11 +144,11 @@ impl ParallelGateway { } fn passivate(&mut self) -> Vec { - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; Vec::new() } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -163,7 +164,7 @@ impl DevsModel for ParallelGateway { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { match self.arrival_port(&incoming_message.port_name) { ArrivalPort::FlowPath => Ok(self.increment_collection(incoming_message, services)), @@ -173,7 +174,7 @@ impl DevsModel for ParallelGateway { fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { match self.full_collection() { Some(_) => self.send_job(services), @@ -181,11 +182,11 @@ impl DevsModel for ParallelGateway { } } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } diff --git a/sim/src/models/processor.rs b/sim/src/models/processor.rs index a766cc1..2cc5068 100644 --- a/sim/src/models/processor.rs +++ b/sim/src/models/processor.rs @@ -11,6 +11,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The processor accepts jobs, processes them for a period of time, and then /// outputs a processed job. The processor can have a configurable queue, of @@ -63,7 +64,7 @@ struct PortsOut { #[serde(rename_all = "camelCase")] struct State { phase: Phase, - until_next_event: f64, + until_next_event: SDuration, queue: Vec, records: Vec, } @@ -72,7 +73,7 @@ impl Default for State { fn default() -> Self { State { phase: Phase::Passive, - until_next_event: f64::INFINITY, + until_next_event: SDuration::INFINITY, queue: Vec::new(), records: Vec::new(), } @@ -116,7 +117,7 @@ impl Processor { } } - fn add_job(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn add_job(&mut self, incoming_message: &ModelMessage, services: &Services) { self.state.queue.push(incoming_message.content.clone()); self.record( services.global_time(), @@ -128,14 +129,14 @@ impl Processor { fn activate( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { self.state.queue.push(incoming_message.content.clone()); self.state.phase = Phase::Active; - self.state.until_next_event = match &self.rng { + self.state.until_next_event = SDuration::new(match &self.rng { Some(rng) => self.service_time.random_variate(rng.clone())?, None => self.service_time.random_variate(services.global_rng())?, - }; + }); self.record( services.global_time(), String::from("Arrival"), @@ -149,7 +150,7 @@ impl Processor { Ok(()) } - fn ignore_job(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn ignore_job(&mut self, incoming_message: &ModelMessage, services: &Services) { self.record( services.global_time(), String::from("Drop"), @@ -159,13 +160,13 @@ impl Processor { fn process_next( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { self.state.phase = Phase::Active; - self.state.until_next_event = match &self.rng { + self.state.until_next_event = SDuration::new(match &self.rng { Some(rng) => self.service_time.random_variate(rng.clone())?, None => self.service_time.random_variate(services.global_rng())?, - }; + }); self.record( services.global_time(), String::from("Processing Start"), @@ -174,10 +175,10 @@ impl Processor { Ok(Vec::new()) } - fn release_job(&mut self, services: &mut Services) -> Vec { + fn release_job(&mut self, services: &Services) -> Vec { let job = self.state.queue.remove(0); self.state.phase = Phase::Passive; - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; self.record( services.global_time(), String::from("Departure"), @@ -191,11 +192,11 @@ impl Processor { fn passivate(&mut self) -> Vec { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; Vec::new() } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -211,7 +212,7 @@ impl DevsModel for Processor { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { match ( self.arrival_port(&incoming_message.port_name), @@ -228,7 +229,7 @@ impl DevsModel for Processor { fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { match (&self.state.phase, self.state.queue.is_empty()) { (Phase::Passive, true) => Ok(self.passivate()), @@ -237,11 +238,11 @@ impl DevsModel for Processor { } } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } diff --git a/sim/src/models/stochastic_gate.rs b/sim/src/models/stochastic_gate.rs index 94e7414..28a051e 100644 --- a/sim/src/models/stochastic_gate.rs +++ b/sim/src/models/stochastic_gate.rs @@ -11,6 +11,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The stochastic gate blocks (drops) or passes jobs, based on a specified /// Bernoulli distribution. If the Bernoulli random variate is a 0, the job @@ -49,7 +50,7 @@ struct PortsOut { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct State { - until_next_event: f64, + until_next_event: SDuration, jobs: Vec, records: Vec, } @@ -57,7 +58,7 @@ struct State { impl Default for State { fn default() -> Self { State { - until_next_event: f64::INFINITY, + until_next_event: SDuration::INFINITY, jobs: Vec::new(), records: Vec::new(), } @@ -101,9 +102,9 @@ impl StochasticGate { fn receive_job( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; self.state.jobs.push(Job { content: incoming_message.content.clone(), pass: match &self.rng { @@ -122,12 +123,12 @@ impl StochasticGate { } fn passivate(&mut self) -> Vec { - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; Vec::new() } - fn pass_job(&mut self, services: &mut Services) -> Vec { - self.state.until_next_event = 0.0; + fn pass_job(&mut self, services: &Services) -> Vec { + self.state.until_next_event = SDuration::NOW; let job = self.state.jobs.remove(0); self.record( services.global_time(), @@ -140,14 +141,14 @@ impl StochasticGate { }] } - fn block_job(&mut self, services: &mut Services) -> Vec { - self.state.until_next_event = 0.0; + fn block_job(&mut self, services: &Services) -> Vec { + self.state.until_next_event = SDuration::NOW; let job = self.state.jobs.remove(0); self.record(services.global_time(), String::from("Block"), job.content); Vec::new() } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -163,7 +164,7 @@ impl DevsModel for StochasticGate { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { match self.arrival_port(&incoming_message.port_name) { ArrivalPort::Job => self.receive_job(incoming_message, services), @@ -173,7 +174,7 @@ impl DevsModel for StochasticGate { fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { match self.state.jobs.first() { None => Ok(self.passivate()), @@ -182,11 +183,11 @@ impl DevsModel for StochasticGate { } } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } diff --git a/sim/src/models/stopwatch.rs b/sim/src/models/stopwatch.rs index df49223..4425eb6 100644 --- a/sim/src/models/stopwatch.rs +++ b/sim/src/models/stopwatch.rs @@ -1,5 +1,4 @@ use std::iter::once; - use serde::{Deserialize, Serialize}; use super::model_trait::{DevsModel, Reportable, ReportableModel, SerializableModel}; @@ -11,6 +10,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The stopwatch calculates durations by matching messages on the start and /// stop ports. For example, a "job 1" message arrives at the start port at @@ -62,7 +62,7 @@ pub enum Metric { #[serde(rename_all = "camelCase")] struct State { phase: Phase, - until_next_event: f64, + until_next_event: SDuration, jobs: Vec, records: Vec, } @@ -71,7 +71,7 @@ impl Default for State { fn default() -> Self { State { phase: Phase::Passive, - until_next_event: f64::INFINITY, + until_next_event: SDuration::INFINITY, jobs: Vec::new(), records: Vec::new(), } @@ -88,11 +88,11 @@ enum Phase { #[serde(rename_all = "camelCase")] pub struct Job { name: String, - start: Option, - stop: Option, + start: Option, + stop: Option, } -fn some_duration(job: &Job) -> Option<(String, f64)> { +fn some_duration(job: &Job) -> Option<(String, SDuration)> { match (job.start, job.stop) { (Some(start), Some(stop)) => Some((job.name.to_string(), stop - start)), _ => None, @@ -160,7 +160,7 @@ impl Stopwatch { .iter() .filter_map(some_duration) .fold( - (None, f64::INFINITY), + (None, SDuration::INFINITY), |minimum, (job_name, job_duration)| { if job_duration < minimum.1 { (Some(job_name), job_duration) @@ -178,7 +178,7 @@ impl Stopwatch { .iter() .filter_map(some_duration) .fold( - (None, f64::NEG_INFINITY), + (None, SDuration::NOW), |maximum, (job_name, job_duration)| { if job_duration > maximum.1 { (Some(job_name), job_duration) @@ -190,7 +190,7 @@ impl Stopwatch { .0 } - fn start_job(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn start_job(&mut self, incoming_message: &ModelMessage, services: &Services) { self.record( services.global_time(), String::from("Start"), @@ -199,7 +199,7 @@ impl Stopwatch { self.matching_or_new_job(incoming_message).start = Some(services.global_time()); } - fn stop_job(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn stop_job(&mut self, incoming_message: &ModelMessage, services: &Services) { self.record( services.global_time(), String::from("Stop"), @@ -210,12 +210,12 @@ impl Stopwatch { fn get_job(&mut self) { self.state.phase = Phase::JobFetch; - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; } - fn release_minimum(&mut self, services: &mut Services) -> Vec { + fn release_minimum(&mut self, services: &Services) -> Vec { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; self.record( services.global_time(), String::from("Minimum Fetch"), @@ -231,9 +231,9 @@ impl Stopwatch { .collect() } - fn release_maximum(&mut self, services: &mut Services) -> Vec { + fn release_maximum(&mut self, services: &Services) -> Vec { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; self.record( services.global_time(), String::from("Maximum Fetch"), @@ -251,11 +251,11 @@ impl Stopwatch { fn passivate(&mut self) -> Vec { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; Vec::new() } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -271,7 +271,7 @@ impl DevsModel for Stopwatch { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { match self.arrival_port(&incoming_message.port_name) { ArrivalPort::Start => Ok(self.start_job(incoming_message, services)), @@ -283,7 +283,7 @@ impl DevsModel for Stopwatch { fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { match (&self.state.phase, &self.metric) { (Phase::JobFetch, Metric::Minimum) => Ok(self.release_minimum(services)), @@ -292,21 +292,22 @@ impl DevsModel for Stopwatch { } } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } + impl Reportable for Stopwatch { fn status(&self) -> String { if self.state.jobs.is_empty() { String::from("Measuring durations") } else { - let durations: Vec = self + let durations: Vec = self .state .jobs .iter() @@ -314,7 +315,7 @@ impl Reportable for Stopwatch { .collect(); format![ "Average {:.3}", - durations.iter().sum::() / durations.len() as f64 + (durations.iter().sum::()) / durations.len() as f64 ] } } diff --git a/sim/src/models/storage.rs b/sim/src/models/storage.rs index f05b56f..67dda33 100644 --- a/sim/src/models/storage.rs +++ b/sim/src/models/storage.rs @@ -9,6 +9,7 @@ use sim_derive::SerializableModel; #[cfg(feature = "simx")] use simx::event_rules; +use crate::simulator::time::{SDuration, STime}; /// The storage model stores a value, and responds with it upon request. /// Values are stored and value requests are handled instantantaneously. @@ -45,7 +46,7 @@ struct PortsOut { #[serde(rename_all = "camelCase")] struct State { phase: Phase, - until_next_event: f64, + until_next_event: SDuration, job: Option, records: Vec, } @@ -54,7 +55,7 @@ impl Default for State { fn default() -> Self { State { phase: Phase::Passive, - until_next_event: f64::INFINITY, + until_next_event: SDuration::INFINITY, job: None, records: Vec::new(), } @@ -100,10 +101,10 @@ impl Storage { fn get_job(&mut self) { self.state.phase = Phase::JobFetch; - self.state.until_next_event = 0.0; + self.state.until_next_event = SDuration::NOW; } - fn hold_job(&mut self, incoming_message: &ModelMessage, services: &mut Services) { + fn hold_job(&mut self, incoming_message: &ModelMessage, services: &Services) { self.state.job = Some(incoming_message.content.clone()); self.record( services.global_time(), @@ -112,9 +113,9 @@ impl Storage { ); } - fn release_job(&mut self, services: &mut Services) -> Vec { + fn release_job(&mut self, services: &Services) -> Vec { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; self.record( services.global_time(), String::from("Departure"), @@ -131,11 +132,11 @@ impl Storage { fn passivate(&mut self) -> Vec { self.state.phase = Phase::Passive; - self.state.until_next_event = f64::INFINITY; + self.state.until_next_event = SDuration::INFINITY; Vec::new() } - fn record(&mut self, time: f64, action: String, subject: String) { + fn record(&mut self, time: STime, action: String, subject: String) { if self.store_records { self.state.records.push(ModelRecord { time, @@ -151,7 +152,7 @@ impl DevsModel for Storage { fn events_ext( &mut self, incoming_message: &ModelMessage, - services: &mut Services, + services: &Services, ) -> Result<(), SimulationError> { match self.arrival_port(&incoming_message.port_name) { ArrivalPort::Put => Ok(self.hold_job(incoming_message, services)), @@ -162,7 +163,7 @@ impl DevsModel for Storage { fn events_int( &mut self, - services: &mut Services, + services: &Services, ) -> Result, SimulationError> { match &self.state.phase { Phase::Passive => Ok(self.passivate()), @@ -170,11 +171,11 @@ impl DevsModel for Storage { } } - fn time_advance(&mut self, time_delta: f64) { + fn time_advance(&mut self, time_delta: SDuration) { self.state.until_next_event -= time_delta; } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { self.state.until_next_event } } diff --git a/sim/src/simulator/coupling.rs b/sim/src/simulator/coupling.rs index 5fd2c01..08dd00c 100644 --- a/sim/src/simulator/coupling.rs +++ b/sim/src/simulator/coupling.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use wasm_bindgen::prelude::*; +use crate::simulator::time::STime; /// Connectors are configured to connect models through their ports. During /// simulation, models exchange messages (as per the Discrete Event System @@ -52,6 +53,14 @@ impl Connector { pub fn target_port(&self) -> &str { &self.target_port } + + pub fn source(&self) -> (&String, &String) { + (&self.source_id, &self.source_port) + } + pub fn target(&self) -> (&String, &String) { + (&self.target_id, &self.target_port) + } + } /// Messages are the mechanism of information exchange for models in a @@ -66,7 +75,7 @@ pub struct Message { source_port: String, target_id: String, target_port: String, - time: f64, + time: STime, content: String, } @@ -78,7 +87,7 @@ impl Message { source_port: String, target_id: String, target_port: String, - time: f64, + time: STime, content: String, ) -> Self { Self { @@ -112,7 +121,7 @@ impl Message { } /// This accessor method returns the transmission time of a message. - pub fn time(&self) -> &f64 { + pub fn time(&self) -> &STime { &self.time } diff --git a/sim/src/simulator/mod.rs b/sim/src/simulator/mod.rs index a0e31f2..9852399 100644 --- a/sim/src/simulator/mod.rs +++ b/sim/src/simulator/mod.rs @@ -14,42 +14,100 @@ //! return the messages generated during the execution of the simulation //! step(s), for use in message analysis. -use serde::{Deserialize, Serialize}; +use std::cell::RefCell; +use std::rc::Rc; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::input_modeling::dyn_rng; use crate::input_modeling::dynamic_rng::SimulationRng; use crate::models::{DevsModel, Model, ModelMessage, ModelRecord, Reportable}; use crate::utils::errors::SimulationError; use crate::utils::set_panic_hook; +use crate::simulator::time::{SDuration, STime}; pub mod coupling; pub mod services; pub mod web; +pub mod time; pub use self::coupling::{Connector, Message}; pub use self::services::Services; pub use self::web::Simulation as WebSimulation; +///Type representing a counted reference to a type. +pub type IMType = Rc>; + +///turn something into a counted reference. +pub fn imtype(x:T) -> IMType { + Rc::new(RefCell::new(x)) +} + +///Type representing a counted reference to vec of types +pub type IMTypeVec = IMType>; + +pub fn imtypevec(x:Vec) -> IMTypeVec { + Rc::new(RefCell::new(x)) +} + + /// The `Simulation` struct is the core of sim, and includes everything /// needed to run a simulation - models, connectors, and a random number /// generator. State information, specifically global time and active /// messages are additionally retained in the struct. -#[derive(Clone, Default, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] +#[derive(Clone, Default)] pub struct Simulation { - models: Vec, + models: IMTypeVec, connectors: Vec, messages: Vec, services: Services, } +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SimulationBase { + models: Vec, + connectors: Vec, + messages: Vec, + services: Services +} + +/// Take a simulation and render it in a json format. +impl Serialize for Simulation { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer + { + let sb = SimulationBase { models: self.models.borrow().clone(), + connectors: self.connectors.clone(), + messages: self.messages.clone(), + services: self.services.clone() }; + sb.serialize(serializer) + } +} + + +/// Take a json representation of the Simulation and construct a +impl<'de> Deserialize<'de> for Simulation { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de> + { + let sb = SimulationBase::deserialize(deserializer)?; + Ok(Simulation {models: imtypevec(sb.models.clone()), + connectors: sb.connectors, + messages: sb.messages, + services: sb.services}) + } +} + + impl Simulation { /// This constructor method creates a simulation from a supplied /// configuration (models and connectors). pub fn post(models: Vec, connectors: Vec) -> Self { set_panic_hook(); Self { - models, + models: imtypevec(models), connectors, ..Self::default() } @@ -64,11 +122,11 @@ impl Simulation { ) -> Self { set_panic_hook(); Self { - models, + models: imtypevec(models), connectors, services: Services { global_rng: dyn_rng(global_rng), - global_time: 0.0, + global_time: STime::NOW, }, ..Self::default() } @@ -80,7 +138,7 @@ impl Simulation { /// This method sets the models and connectors of an existing simulation. pub fn put(&mut self, models: Vec, connectors: Vec) { - self.models = models; + self.models = imtypevec(models); self.connectors = connectors; } @@ -96,7 +154,7 @@ impl Simulation { } /// An accessor method for the simulation global time. - pub fn get_global_time(&self) -> f64 { + pub fn get_global_time(&self) -> STime { self.services.global_time() } @@ -106,6 +164,7 @@ impl Simulation { pub fn get_status(&self, model_id: &str) -> Result { Ok(self .models + .borrow() .iter() .find(|model| model.id() == model_id) .ok_or(SimulationError::ModelNotFound)? @@ -115,13 +174,15 @@ impl Simulation { /// This method provides a mechanism for getting the records of any model /// in a simulation. The method takes the model ID as an argument, and /// returns the records for that model. - pub fn get_records(&self, model_id: &str) -> Result<&Vec, SimulationError> { + pub fn get_records(&self, model_id: &str) -> Result,SimulationError> + { Ok(self .models + .borrow() .iter() .find(|model| model.id() == model_id) .ok_or(SimulationError::ModelNotFound)? - .records()) + .records().to_vec()) } /// To enable simulation replications, the reset method resets the state @@ -140,40 +201,26 @@ impl Simulation { /// Reset the simulation global time to 0.0. pub fn reset_global_time(&mut self) { - self.services.set_global_time(0.0); + self.services.set_global_time(STime::NOW); } - /// This method provides a convenient foundation for operating on the - /// full set of models in the simulation. - pub fn models(&mut self) -> Vec<&mut Model> { - self.models.iter_mut().collect() + /// Input injection creates a message during simulation execution, + /// without needing to create that message through the standard + /// simulation constructs. This enables live simulation interaction, + /// disruption, and manipulation - all through the standard simulation + /// message system. + pub fn inject_input(&mut self, message: Message) { + self.messages.push(message); } - /// This method constructs a list of target IDs for a given source model - /// ID and port. This message target information is derived from the - /// connectors configuration. - fn get_message_target_ids(&self, source_id: &str, source_port: &str) -> Vec { - self.connectors - .iter() - .filter_map(|connector| { - if connector.source_id() == source_id && connector.source_port() == source_port { - Some(connector.target_id().to_string()) - } else { - None - } - }) - .collect() - } - /// This method constructs a list of target ports for a given source model - /// ID and port. This message target information is derived from the - /// connectors configuration. - fn get_message_target_ports(&self, source_id: &str, source_port: &str) -> Vec { - self.connectors - .iter() + // Search through all the connectors and find any that match the source model id and source port. + // There may be more than one. + pub fn find_target_connectors_matching_source(&self, id: &str, port: &str) -> Vec<(String, String)> { + self.connectors.iter() .filter_map(|connector| { - if connector.source_id() == source_id && connector.source_port() == source_port { - Some(connector.target_port().to_string()) + if connector.source() == (&id.to_string(), &port.to_string()) { + Some((connector.target_id().to_string(), connector.target_port().to_string())) } else { None } @@ -181,100 +228,90 @@ impl Simulation { .collect() } - /// Input injection creates a message during simulation execution, - /// without needing to create that message through the standard - /// simulation constructs. This enables live simulation interaction, - /// disruption, and manipulation - all through the standard simulation - /// message system. - pub fn inject_input(&mut self, message: Message) { - self.messages.push(message); + ///Given a model and an outgoing internal message, find the next model and next port for the internal message. + pub fn next_model_messages(&self, source_model_id: &str, message: &ModelMessage) -> Vec { + self.find_target_connectors_matching_source(source_model_id, message.port_name.as_str()) + .into_iter() + .map(|(target_model, target_port)| { + Message::new(source_model_id.to_string(), message.port_name.clone(), target_model, target_port, self.services.global_time(), message.content.clone()) + }).collect() } - /// The simulation step is foundational for a discrete event simulation. - /// This method executes a single discrete event simulation step, - /// including internal state transitions, external state transitions, - /// message orchestration, global time accounting, and step messages - /// output. - pub fn step(&mut self) -> Result, SimulationError> { - let messages = self.messages.clone(); - let mut next_messages: Vec = Vec::new(); - // Process external events - if !messages.is_empty() { - (0..self.models.len()).try_for_each(|model_index| -> Result<(), SimulationError> { - let model_messages: Vec = messages - .iter() - .filter_map(|message| { - if message.target_id() == self.models[model_index].id() { - Some(ModelMessage { - port_name: message.target_port().to_string(), - content: message.content().to_string(), - }) - } else { - None - } - }) - .collect(); - model_messages - .iter() - .try_for_each(|model_message| -> Result<(), SimulationError> { - self.models[model_index].events_ext(model_message, &mut self.services) - }) - })?; + pub fn process_external_messages(&mut self) -> Result<(), SimulationError> { + for m in self.models.borrow_mut().iter_mut() { + let internal_messages: Vec = self.messages.iter() + .filter_map(|message| { + if message.target_id() == m.id() { + Some(ModelMessage::new(message.target_port(), message.content())) + } else { + None + } + }).collect(); + for message in internal_messages { + m.events_ext(&message.clone(), &mut self.services)?; + } } - // Process internal events and gather associated messages - let until_next_event: f64 = if self.messages.is_empty() { - self.models().iter().fold(f64::INFINITY, |min, model| { - f64::min(min, model.until_next_event()) + Ok(()) + } + + pub fn advance_time(&mut self) -> SDuration { + let until_next_event: SDuration = if self.messages.is_empty() { + self.models.borrow().iter().fold(SDuration::INFINITY, |min, model| { + SDuration::min(min, model.until_next_event()) }) } else { - 0.0 + SDuration::NOW }; - self.models().iter_mut().for_each(|model| { + + self.models.borrow_mut().iter_mut().for_each(|model| { model.time_advance(until_next_event); }); - self.services - .set_global_time(self.services.global_time() + until_next_event); - let errors: Result, SimulationError> = (0..self.models.len()) - .map(|model_index| -> Result<(), SimulationError> { - if self.models[model_index].until_next_event() == 0.0 { - self.models[model_index] - .events_int(&mut self.services)? - .iter() - .for_each(|outgoing_message| { - let target_ids = self.get_message_target_ids( - self.models[model_index].id(), // Outgoing message source model ID - &outgoing_message.port_name, // Outgoing message source model port - ); - let target_ports = self.get_message_target_ports( - self.models[model_index].id(), // Outgoing message source model ID - &outgoing_message.port_name, // Outgoing message source model port - ); - target_ids.iter().zip(target_ports.iter()).for_each( - |(target_id, target_port)| { - next_messages.push(Message::new( - self.models[model_index].id().to_string(), - outgoing_message.port_name.clone(), - target_id.clone(), - target_port.clone(), - self.services.global_time(), - outgoing_message.content.clone(), - )); - }, - ); - }); - } - Ok(()) + self.services.advance_global_time(until_next_event); + until_next_event + } + + pub fn process_internal_events(&self) -> Result, SimulationError> { + let results: Result>, SimulationError> = self.models + .borrow_mut() + .iter_mut() + .filter(|model| model.until_next_event() == SDuration::NOW) //Only need to work on things that are now due + .map(|source_model| { + //Given the model, trigger any internal events. Internal events my produce outbound messages or may error. + let emitted_internal_messages = source_model.events_int(&self.services)?; + + let new_messages = emitted_internal_messages + .iter() + .map(|emitted_internal_message| + self.next_model_messages(source_model.id(), &emitted_internal_message)) + .flatten() + .collect::>(); + + Ok(new_messages) }) .collect(); - errors?; - self.messages = next_messages; - Ok(self.get_messages().clone()) + + let new_messages = results?.into_iter().flatten().collect(); + Ok(new_messages) + } + + + /// The simulation step is foundational for a discrete event simulation. + /// This method executes a single discrete event simulation step, + /// including internal state transitions, external state transitions, + /// message orchestration, global time accounting, and step messages + /// output. + pub fn step(&mut self) -> Result, SimulationError> { + self.process_external_messages()?; + self.advance_time(); + let new_messages = self.process_internal_events()?; + self.messages = new_messages.clone(); + Ok(new_messages) } /// This method executes simulation `step` calls, until a global time /// has been exceeded. At which point, the messages from all the /// simulation steps are returned. - pub fn step_until(&mut self, until: f64) -> Result, SimulationError> { + pub fn step_until(&mut self, until: STime) -> Result, SimulationError> { let mut message_records: Vec = Vec::new(); loop { self.step()?; diff --git a/sim/src/simulator/services.rs b/sim/src/simulator/services.rs index df1da82..9584d77 100644 --- a/sim/src/simulator/services.rs +++ b/sim/src/simulator/services.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; use crate::input_modeling::dynamic_rng::{default_rng, DynRng}; +use crate::simulator::time::{SDuration, STime}; /// The simulator provides a uniform random number generator and simulation /// clock to models during the execution of a simulation @@ -9,14 +10,19 @@ use crate::input_modeling::dynamic_rng::{default_rng, DynRng}; pub struct Services { #[serde(skip, default = "default_rng")] pub(crate) global_rng: DynRng, - pub(crate) global_time: f64, + pub(crate) global_time: STime, +} + +impl Services { + pub fn advance_global_time(&mut self, duration: SDuration) { + self.set_global_time(self.global_time() + duration); } } impl Default for Services { fn default() -> Self { Self { global_rng: default_rng(), - global_time: 0.0, + global_time: STime::NOW, } } } @@ -26,11 +32,11 @@ impl Services { self.global_rng.clone() } - pub fn global_time(&self) -> f64 { + pub fn global_time(&self) -> STime { self.global_time } - pub fn set_global_time(&mut self, time: f64) { + pub fn set_global_time(&mut self, time: STime) { self.global_time = time; } } diff --git a/sim/src/simulator/time.rs b/sim/src/simulator/time.rs new file mode 100644 index 0000000..dd48ddd --- /dev/null +++ b/sim/src/simulator/time.rs @@ -0,0 +1,157 @@ +use std::fmt::Display; +use std::iter::Sum; +use std::ops::{Add, Div, Sub, SubAssign}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, Default, PartialOrd, PartialEq, Serialize, Deserialize)] +pub struct STime(pub f64); + +impl STime { + pub const NOW: STime = Self(0.0); + pub const INFINITY: STime = Self(f64::INFINITY); + + pub fn new(value: f64) -> Self { Self(value)} +} + +impl Display for STime { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:.5}", self.0) + } +} + +impl Add for STime { + type Output = Self; + + fn add(self, rhs: SDuration) -> Self { + Self(self.0 + rhs.0) + } +} + + +impl Sub for STime { + type Output = SDuration; + + fn sub(self, rhs: STime) -> SDuration { + SDuration(self.0 - rhs.0) + } +} + +impl Sub<&STime> for &STime { + type Output = SDuration; + + fn sub(self, rhs: &STime) -> Self::Output { + *self - *rhs + } +} + + +impl Sub for STime { + type Output = Self; + + fn sub(self, rhs: SDuration) -> Self { + Self(self.0 - rhs.0) + } +} + +impl SubAssign for SDuration { + fn sub_assign(&mut self, rhs: Self) { + self.0 -= rhs.0; + } +} + + +impl Eq for STime {} + +impl Ord for STime { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.partial_cmp(other).unwrap() + } + fn max(self, other: Self) -> Self { + if self.0 > other.0 { + self + } else { + other + } + } + fn min(self, other: Self) -> Self { + if self.0 < other.0 { + self + } else { + other + } + } + fn clamp(self, min: Self, max: Self) -> Self { + if self.0 < min.0 { + min + } else if self.0 > max.0 { + max + } else { + self + } + } +} + +#[derive(Debug, Clone, Copy, Default, PartialOrd, PartialEq, Serialize, Deserialize)] +pub struct SDuration(pub f64); + +impl SDuration { + pub const NOW: SDuration = Self(0.0); + pub const INFINITY: SDuration = Self(f64::INFINITY); + pub fn new(value: f64) -> Self { Self(value)} +} + +impl Display for SDuration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "+{:.4}", self.0) + } +} + +impl Eq for SDuration {} + +impl<'a> Sum<&'a SDuration> for SDuration { + fn sum>(iter: I) -> Self { + SDuration::new(iter.map(|d| d.0).sum::()) + } +} + +impl Div for SDuration { + type Output = f64; + + fn div(self, rhs: f64) -> Self::Output { + self.0 / rhs + } +} + + +impl Ord for SDuration { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.partial_cmp(other).unwrap() + } + fn max(self, other: Self) -> Self { + if self.0 > other.0 { + self + } else { + other + } + } + fn min(self, other: Self) -> Self + where + Self: Sized, + { + if self.0 < other.0 { + self + } else { + other + } + } + fn clamp(self, min: Self, max: Self) -> Self { + if self.0 < min.0 { + min + } else if self.0 > max.0 { + max + } else { + self + } + + } +} \ No newline at end of file diff --git a/sim/src/simulator/web.rs b/sim/src/simulator/web.rs index 8e12515..7cf6b33 100644 --- a/sim/src/simulator/web.rs +++ b/sim/src/simulator/web.rs @@ -1,7 +1,7 @@ use js_sys::Array; use serde::{Deserialize, Serialize}; use wasm_bindgen::prelude::*; - +use crate::simulator::time::STime; use crate::utils::set_panic_hook; use super::Simulation as CoreSimulation; @@ -96,7 +96,7 @@ impl Simulation { /// An interface to `Simulation.get_global_time`. pub fn get_global_time(&self) -> f64 { - self.simulation.get_global_time() + self.simulation.get_global_time().0 } /// An interface to `Simulation.get_status`. @@ -107,13 +107,13 @@ impl Simulation { /// A JS/WASM interface for `Simulation.records`, which converts the /// records to a JSON string. pub fn get_records_json(&self, model_id: &str) -> String { - serde_json::to_string(self.simulation.get_records(model_id).unwrap()).unwrap() + serde_json::to_string(&self.simulation.get_records(model_id).unwrap()).unwrap() } /// A JS/WASM interface for `Simulation.records`, which converts the /// records to a YAML string. pub fn get_records_yaml(&self, model_id: &str) -> String { - serde_yaml::to_string(self.simulation.get_records(model_id).unwrap()).unwrap() + serde_yaml::to_string(&self.simulation.get_records(model_id).unwrap()).unwrap() } /// An interface to `Simulation.reset`. @@ -171,6 +171,8 @@ impl Simulation { /// A JS/WASM interface for `Simulation.step_until`, which converts the /// returned messages to a JavaScript Array. pub fn step_until_js(&mut self, until: f64) -> Array { + let until: STime = STime::new(until); + self.simulation .step_until(until) .unwrap() @@ -182,12 +184,14 @@ impl Simulation { /// A JS/WASM interface for `Simulation.step_until`, which converts the /// returned messages to a JSON string. pub fn step_until_json(&mut self, until: f64) -> String { + let until: STime = STime::new(until); serde_json::to_string(&self.simulation.step_until(until).unwrap()).unwrap() } /// A JS/WASM interface for `Simulation.step_until`, which converts the /// returned messages to a YAML string. pub fn step_until_yaml(&mut self, until: f64) -> String { + let until: STime = STime::new(until); serde_yaml::to_string(&self.simulation.step_until(until).unwrap()).unwrap() } diff --git a/sim/tests/coupled.rs b/sim/tests/coupled.rs index 197b983..b1d4885 100644 --- a/sim/tests/coupled.rs +++ b/sim/tests/coupled.rs @@ -4,6 +4,7 @@ use sim::models::{ }; use sim::output_analysis::{ConfidenceInterval, SteadyStateOutput}; use sim::simulator::{Connector, Message, Simulation}; +use sim::simulator::time::STime; use sim::utils::errors::SimulationError; fn get_message_number(message: &str) -> Option<&str> { @@ -146,8 +147,8 @@ fn closure_under_coupling() -> Result<(), SimulationError> { |(index, (models, connectors))| -> Result, SimulationError> { let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); let message_records: Vec = simulation.step_n(1000)?; - let arrivals: Vec<(&f64, &str)>; - let departures: Vec<(&f64, &str)>; + let arrivals: Vec<(&STime, &str)>; + let departures: Vec<(&STime, &str)>; match index { 0 => { arrivals = message_records @@ -179,14 +180,14 @@ fn closure_under_coupling() -> Result<(), SimulationError> { let response_times: Vec = departures .iter() .map(|departure| -> Result { - Ok(departure.0 + Ok(departure.0.0 - arrivals .iter() .find(|arrival| { get_message_number(&arrival.1) == get_message_number(&departure.1) }) .ok_or(SimulationError::DroppedMessageError)? - .0) + .0.0) }) .collect::, SimulationError>>()?; let mut response_times_sample = SteadyStateOutput::post(response_times); diff --git a/sim/tests/custom.rs b/sim/tests/custom.rs index bea2af8..7f34a30 100644 --- a/sim/tests/custom.rs +++ b/sim/tests/custom.rs @@ -1,4 +1,3 @@ -use std::f64::INFINITY; use serde::{Deserialize, Serialize}; use sim::input_modeling::ContinuousRandomVariable; @@ -8,7 +7,7 @@ use sim::simulator::{Connector, Message, Services, Simulation, WebSimulation}; use sim::utils::errors::SimulationError; use sim_derive::{register, SerializableModel}; use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; - +use sim::simulator::time::SDuration; #[cfg(feature = "simx")] use simx::event_rules; @@ -50,26 +49,26 @@ impl DevsModel for Passive { fn events_ext( &mut self, _incoming_message: &ModelMessage, - _services: &mut Services, + _services: &Services, ) -> Result<(), SimulationError> { Ok(()) } fn events_int( &mut self, - _services: &mut Services, + _services: &Services, ) -> Result, SimulationError> { Ok(Vec::new()) } - fn time_advance(&mut self, _time_delta: f64) { + fn time_advance(&mut self, _time_delta: SDuration) { // No future events list to advance } - fn until_next_event(&self) -> f64 { + fn until_next_event(&self) -> SDuration { // No future events list, as a source of finite until_next_event // values - INFINITY + SDuration::INFINITY } } diff --git a/sim/tests/simulations.rs b/sim/tests/simulations.rs index e6a0c65..27ada5a 100644 --- a/sim/tests/simulations.rs +++ b/sim/tests/simulations.rs @@ -6,6 +6,7 @@ use sim::models::{ }; use sim::output_analysis::{IndependentSample, SteadyStateOutput}; use sim::simulator::{Connector, Message, Simulation}; +use sim::simulator::time::{SDuration, STime}; use sim::utils::errors::SimulationError; fn epsilon() -> f64 { @@ -72,12 +73,12 @@ fn poisson_generator_processor_with_capacity() -> Result<(), SimulationError> { let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); // Sample size will be reduced during output analysis - initialization bias reduction through deletion let message_records: Vec = simulation.step_n(3000)?; - let departures: Vec<(&f64, &str)> = message_records + let departures: Vec<(&STime, &str)> = message_records .iter() .filter(|message_record| message_record.target_id() == "storage-01") .map(|message_record| (message_record.time(), message_record.content())) .collect(); - let arrivals: Vec<(&f64, &str)> = message_records + let arrivals: Vec<(&STime, &str)> = message_records .iter() .filter(|message_record| message_record.target_id() == "processor-01") .map(|message_record| (message_record.time(), message_record.content())) @@ -86,14 +87,14 @@ fn poisson_generator_processor_with_capacity() -> Result<(), SimulationError> { let response_times: Vec = departures .iter() .map(|departure| -> Result { - Ok(departure.0 + Ok(departure.0.0 - arrivals .iter() .find(|arrival| { get_message_number(&arrival.1) == get_message_number(&departure.1) }) .ok_or(SimulationError::DroppedMessageError)? - .0) + .0.0) }) .collect::, SimulationError>>()?; // Response times are not independent @@ -165,7 +166,7 @@ fn step_until_activities() -> Result<(), SimulationError> { // Refresh the models, but maintain the Uniform RNG for replication independence simulation.reset(); simulation.put(models.to_vec(), connectors.to_vec()); - let messages = simulation.step_until(100.0)?; + let messages = simulation.step_until(STime::new(100.0))?; generations_count.push(messages.len() as f64); } let generations_per_replication = IndependentSample::post(generations_count)?; @@ -235,7 +236,7 @@ fn non_stationary_generation() -> Result<(), SimulationError> { // Refresh the models, but maintain the Uniform RNG for replication independence simulation.reset(); simulation.put(models.to_vec(), connectors.to_vec()); - let messages = simulation.step_until(480.0)?; + let messages = simulation.step_until(STime::new(480.0))?; let arrivals: Vec<&Message> = messages .iter() .filter(|message| message.target_id() == "processor-01") @@ -910,7 +911,7 @@ fn batch_sizing() -> Result<(), SimulationError> { Box::new(Batcher::new( String::from("job"), String::from("job"), - 10.0, // 10 seconds max batching time + SDuration::new(10.0), // 10 seconds max batching time 10, // 10 jobs max batch size false, )), diff --git a/sim/tests/web.rs b/sim/tests/web.rs index e8a46ac..f384c71 100644 --- a/sim/tests/web.rs +++ b/sim/tests/web.rs @@ -1,10 +1,10 @@ use std::collections::HashMap; -use std::f64::INFINITY; use sim::models::{Model, ModelRecord}; use sim::output_analysis::IndependentSample; use sim::simulator::{Connector, Message, WebSimulation}; use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; +use sim::simulator::time::{SDuration, STime}; wasm_bindgen_test_configure!(run_in_browser); @@ -68,7 +68,7 @@ fn processor_from_queue_response_time_is_correct() { } ]"#; let mut web = WebSimulation::post_json(models, connectors); - let average_batch_completion_time = (0..200) // 100 jobs, and 2 steps per job + let average_batch_completion_time: f64 = (0..200) // 100 jobs, and 2 steps per job .map(|simulation_step| { // Get expected Option message at each step if (simulation_step + 1) % 2 == 0 { @@ -85,7 +85,7 @@ fn processor_from_queue_response_time_is_correct() { match expected_output { None => { assert![messages_set.is_empty()]; - INFINITY + STime::INFINITY } Some(output) => { let first_message = messages_set.first().unwrap(); @@ -101,8 +101,9 @@ fn processor_from_queue_response_time_is_correct() { }) .map(|(_, job_completion_time)| job_completion_time) .enumerate() + // Item: (INDEX, job_completion_time: STime) .fold( - (Vec::new(), 0.0), + (Vec::new(), STime::NOW), |mut batch_completion_times, (job_index, job_completion_time)| { // Compile batch completion times - 50 batches of 2 jobs each // batch_completion_times.1 is the global time of the last batch completion @@ -115,10 +116,11 @@ fn processor_from_queue_response_time_is_correct() { batch_completion_times }, ) + // Fold yields an accumulation to (Vec, STime) .0 .iter() // Take the average completion time across the 20 batches - .sum::() + .sum::() / 50.0; let expectation = 1.0 / 3.0; // Exponential with lambda=3.0 // Epsilon of 0.34 @@ -571,7 +573,7 @@ fn ci_half_width_for_average_waiting_time() { let records: Vec = serde_json::from_str(&web.get_records_json(processor)).unwrap(); // Times: Arrival, Processing Start, Departure - let mut job_timestamps: HashMap, Option, Option)> = + let mut job_timestamps: HashMap, Option, Option)> = HashMap::new(); records.iter().for_each(|record| { let job = job_timestamps @@ -593,7 +595,10 @@ fn ci_half_width_for_average_waiting_time() { } (_, _, _) => None, }) + //unwrap duration to a float for calculations + .map(|sd| sd.0) .collect(); + let waiting_times_sample = IndependentSample::post(waiting_times).unwrap(); if !waiting_times_sample.point_estimate_mean().is_nan() { average_waiting_times[processor_index] diff --git a/simx/Cargo.toml b/simx/Cargo.toml index 9bc8c07..7a0cfef 100644 --- a/simx/Cargo.toml +++ b/simx/Cargo.toml @@ -38,7 +38,3 @@ wee_alloc = { version = "0.4.5", optional = true } [dev-dependencies] wasm-bindgen-test = "0.3.13" - -[profile.release] -# Tell `rustc` to optimize for small code size. -opt-level = "s"