diff --git a/sim/benches/fan_out_fan_in_bench.rs b/sim/benches/fan_out_fan_in_bench.rs new file mode 100644 index 0000000..0cb65d4 --- /dev/null +++ b/sim/benches/fan_out_fan_in_bench.rs @@ -0,0 +1,215 @@ +// Fan a single message out to N processors and then join them all back. +// benchmark execution time (not simulation time) + +#![feature(test)] + +extern crate test; + +#[cfg(test)] +mod test_parallel_gateway { + use sim::models::{DevsModel, Model, ModelMessage, ParallelGateway}; + use sim::simulator::{Connector, Message, Services, Simulation}; + use sim::utils::errors::SimulationError; + use test::Bencher; + + /// the message that will be sent. + fn initial_message(content: String) -> Message { + Message::new( + "manual".to_string(), + "manual".to_string(), + "PG_FAN_OUT".to_string(), + "IN".to_string(), + 0.0, + content, + ) + } + + fn get_out_port_names(port_count: usize) -> Vec { + (0..port_count).map(|s| format!("OUT_{}", s)).collect() + } + fn get_in_port_names(port_count: usize) -> Vec { + (0..port_count).map(|s| format!("IN_{}", s)).collect() + } + + fn get_parallel_gateway_fan_out(port_count: usize) -> Model { + Model::new( + "PG_FAN_OUT".to_string(), + Box::new(ParallelGateway::new( + vec!["IN".to_string()], + get_out_port_names(port_count), + false, + )), + ) + } + + fn get_parallel_gateway_fan_in(port_count: usize) -> Model { + Model::new( + "PG_FAN_IN".to_string(), + Box::new(ParallelGateway::new( + get_in_port_names(port_count), + vec!["OUT".to_string()], + false, + )), + ) + } + fn get_models(port_count: usize) -> Vec { + vec![ + get_parallel_gateway_fan_out(port_count), + get_parallel_gateway_fan_in(port_count), + ] + } + + fn get_connectors(port_count: usize) -> Vec { + //Connect OUT_0 to IN_0 + (0..port_count) + .map(|pi| { + Connector::new( + format!("connector_{}", pi), + "PG_FAN_OUT".to_string(), + "PG_FAN_IN".to_string(), + format!("OUT_{}", pi), + format!("IN_{}", pi), + ) + }) + .collect() + } + + fn get_simulator(port_count: usize) -> Simulation { + Simulation::post(get_models(port_count), get_connectors(port_count)) + } + + #[test] + fn fi_fo_test() { + let fan_size = 10usize; + let mut sim = get_simulator(fan_size); + sim.inject_input(initial_message("TESTING".to_string())); + let result = sim.step(); + assert!(result.is_ok()); + assert_eq!(result.unwrap().len(), fan_size); + } + + #[bench] + fn fi_fo_bench_100(b: &mut Bencher) { + let fan_size = 100usize; + let mut sim = get_simulator(fan_size); + sim.inject_input(initial_message("TESTING".to_string())); + b.iter(|| sim.step()); + } + #[bench] + fn fi_fo_bench_1000(b: &mut Bencher) { + let fan_size = 10000usize; + let mut sim = get_simulator(fan_size); + sim.inject_input(initial_message("TESTING".to_string())); + b.iter(|| sim.step()); + } + #[bench] + fn fi_fo_bench_50000(b: &mut Bencher) { + let fan_size = 50000usize; + let mut sim = get_simulator(fan_size); + sim.inject_input(initial_message("TESTING".to_string())); + b.iter(|| sim.step()); + } + + // #[bench] + // fn fi_fo_bench_80000(b: &mut Bencher) { + // let fan_size = 80000usize; + // let mut sim = get_simulator(fan_size); + // sim.inject_input(initial_message("TESTING".to_string())); + // b.iter(|| sim.step()); + // } + + // test test_parallel_gateway::fi_fo_bench_100 ... bench: 47.90 ns/iter (+/- 0.21) + // test test_parallel_gateway::fi_fo_bench_1000 ... bench: 55.22 ns/iter (+/- 0.35) + + // test test_parallel_gateway::fi_fo_bench_100 ... bench: 47.90 ns/iter (+/- 0.56) + // test test_parallel_gateway::fi_fo_bench_1000 ... bench: 55.53 ns/iter (+/- 4.41) + // test test_parallel_gateway::fi_fo_bench_50000 ... bench: 59.44 ns/iter (+/- 1.39) + + // test test_parallel_gateway::fi_fo_bench_100 ... bench: 48.24 ns/iter (+/- 2.12) + // test test_parallel_gateway::fi_fo_bench_1000 ... bench: 50.00 ns/iter (+/- 8.31) + // test test_parallel_gateway::fi_fo_bench_50000 ... bench: 59.83 ns/iter (+/- 5.48) + // test test_parallel_gateway::fi_fo_bench_80000 ... bench: 58.30 ns/iter (+/- 8.40) + + #[test] + fn fo_test() { + let fan_size = 10000usize; + let mut model = + ParallelGateway::new(vec!["IN".to_string()], get_out_port_names(fan_size), false); + let in_message = ModelMessage { + port_name: "IN".to_string(), + content: "testing".to_string(), + }; + + let mut services = Services::default(); + + let ext_result = &model.events_ext(&in_message, &mut services); + assert!(ext_result.is_ok()); + let int_results = model.events_int(&mut services); + assert!(int_results.is_ok()); + assert_eq!(int_results.unwrap().len(), fan_size); + + //instantaneous operation in simulation + assert_eq!(services.global_time(), 0.0f64); + } + #[test] + fn fi_test() { + let fan_size = 10000usize; + let mut model = + ParallelGateway::new(get_in_port_names(fan_size), vec!["OUT".to_string()], false); + let in_messages: Vec = (0..fan_size) + .map(|i| ModelMessage { + port_name: format!("IN_{}", i), + content: "testing".to_string(), + }) + .collect(); + + let mut services = Services::default(); + + let ext_results: Vec> = in_messages + .iter() + .map(|i| model.events_ext(i, &mut services)) + .collect(); + + assert!(ext_results.iter().all(|r| r.is_ok())); + //instantaneous simulation time. + assert_eq!(services.global_time(), 0.0f64); + + //There's only one resulting message. This is a fan in. + let int_results = model.events_int(&mut services); + assert!(int_results.is_ok()); + let mm = &int_results.unwrap().pop().unwrap(); + assert_eq!(mm.port_name, "OUT".to_string()); + + //nothing left after the previous events_int call. + let int_results = model.events_int(&mut services); + assert!(int_results.is_ok()); + assert!(int_results.unwrap().is_empty()); + } + + + // TODO do fo_bench + // TODO smaller fan size for benchmark tests. + #[bench] + fn fi_bench(b: &mut Bencher) { + let fan_size = 10000usize; + let mut model = + ParallelGateway::new(get_in_port_names(fan_size), vec!["OUT".to_string()], false); + let in_messages: Vec = (0..fan_size) + .map(|i| ModelMessage { + port_name: format!("IN_{}", i), + content: "testing".to_string(), + }) + .collect(); + + let mut services = Services::default(); + + //Todo m + b.iter(|| { + let _: Vec> = in_messages + .iter() + .map(|i| model.events_ext(i, &mut services)) + .collect(); + let _ = model.events_int(&mut services); + }); + } +} diff --git a/sim/benches/loadbalancer_bench.rs b/sim/benches/loadbalancer_bench.rs new file mode 100644 index 0000000..cb00b6d --- /dev/null +++ b/sim/benches/loadbalancer_bench.rs @@ -0,0 +1,81 @@ +#![feature(test)] + +extern crate test; + +#[cfg(test)] +mod test_loadbalancer { + use test::Bencher; + use sim::models::{DevsModel, LoadBalancer, Model, ModelMessage}; + use sim::simulator::Services; + use std::collections::HashSet; + use std::iter::FromIterator; + + fn job_message(content: String) -> ModelMessage { + ModelMessage { + port_name: "job".to_string(), + content: content, + } + } + + fn get_loadbalancer() -> (Model,HashSet) { + let fpports = vec!["A".to_string(), + "B".to_string(), + "C".to_string()]; + let fpports_set = HashSet::from_iter(fpports.clone().into_iter()); + (Model::new( + "bench_storage".to_string(), + Box::new(LoadBalancer::new( + "job".to_string(), + fpports, + false, + )) + ), fpports_set) + + } + + #[test] + ///verify that the storage model stores and retrieves a stored value. + fn loadbalancer_test() { + let (mut model, mut expected_port_set) = get_loadbalancer(); + + let expected_message = "value001".to_string(); + let job_message = job_message(expected_message.clone()); + let mut services = Services::default(); + + for _ in 0..expected_port_set.len(){ + let ext_result = &model.events_ext(&job_message, &mut services); + assert!(ext_result.is_ok()); + //expect an internal message routed to "A" port + let int_results = model.events_int(&mut services); + assert!(int_results.is_ok()); + match int_results { + Ok(vl) => { + assert_eq!(vl.len(), 1); + assert_eq!(vl[0].content, expected_message.clone()); + assert!(expected_port_set.contains(&vl[0].port_name), "The expected port `{}` was not found.", vl[0].port_name); + expected_port_set.remove(&vl[0].port_name); + }, + Err(_) => assert!(int_results.is_err()) + } + } + assert!(expected_port_set.is_empty(),"Not all the ports expected have been used. in round robin."); + } + + #[bench] + fn loadbalancer_bench(b: &mut Bencher) { + let (mut model, expected_port_set) = get_loadbalancer(); + + let expected_message = "value001".to_string(); + let job_message = job_message(expected_message.clone()); + let mut services = Services::default(); + + b.iter(|| { + for _ in 0..expected_port_set.len() { + let _ = &model.events_ext(&job_message, &mut services); + let _ = model.events_int(&mut services); + } + }); + } + // test test_loadbalancer::loadbalancer_bench ... bench: 549.13 ns/iter (+/- 6.46) + // test test_loadbalancer::loadbalancer_bench ... bench: 583.14 ns/iter (+/- 5.70) +} \ No newline at end of file diff --git a/sim/benches/ping_pong_bench.rs b/sim/benches/ping_pong_bench.rs new file mode 100644 index 0000000..5abae57 --- /dev/null +++ b/sim/benches/ping_pong_bench.rs @@ -0,0 +1,82 @@ +#![feature(test)] + +extern crate test; + +#[cfg(test)] +mod testy { + use sim::input_modeling::ContinuousRandomVariable; + use sim::models::{Model, Processor}; + use sim::simulator::{Connector, Message, Simulation}; + use test::Bencher; + + #[bench] + fn ping_pong_bench(b: &mut Bencher) { + let bench_iterations = 5; + + //run the ping pong a bunch of times and collect runtime metrics. + let (initial_messages, mut simulation) = ping_pong_sim(); + + initial_messages + .iter() + .for_each(|m| simulation.inject_input(m.clone())); + + b.iter(|| simulation.step_n(bench_iterations).unwrap()); + } + + fn ping_pong_sim() -> ([Message; 1], Simulation) { + let models = [ + Model::new( + String::from("player-01"), + Box::new(Processor::new( + ContinuousRandomVariable::Exp { lambda: 0.9 }, + None, + String::from("receive"), + String::from("send"), + false, + None, + )), + ), + Model::new( + String::from("player-02"), + Box::new(Processor::new( + ContinuousRandomVariable::Exp { lambda: 0.9 }, + None, + String::from("receive"), + String::from("send"), + false, + None, + )), + ), + ]; + + let connectors = [ + Connector::new( + String::from("p1 to p2"), + String::from("player-01"), + String::from("player-02"), + String::from("send"), + String::from("receive"), + ), + Connector::new( + String::from("p2 to p1"), + String::from("player-02"), + String::from("player-01"), + String::from("send"), + String::from("receive"), + ), + ]; + + let initial_messages = [Message::new( + "manual".to_string(), + "manual".to_string(), + "player-01".to_string(), + "receive".to_string(), + 0.0, + "Ball".to_string(), + )]; + + let simulation = Simulation::post(models.to_vec(), connectors.to_vec()); + (initial_messages, simulation) + } + // test testy::ping_pong_bench ... bench: 2,321.04 ns/iter (+/- 22.68) +} diff --git a/sim/benches/storage_bench.rs b/sim/benches/storage_bench.rs new file mode 100644 index 0000000..da53010 --- /dev/null +++ b/sim/benches/storage_bench.rs @@ -0,0 +1,85 @@ +#![feature(test)] + +extern crate test; + +#[cfg(test)] +mod test_models { + use sim::models::{DevsModel, Model, ModelMessage, Storage}; + use sim::simulator::Services; + use test::Bencher; + + fn put_message(content: String) -> ModelMessage{ + ModelMessage { + port_name: "put".to_string(), + content: content, + } + } + fn get_message() -> ModelMessage { + ModelMessage { + port_name: "get".to_string(), + content: "NA01".to_string(), + } + } + + fn get_storage() -> Model { + Model::new( + "bench_storage".to_string(), + Box::new(Storage::new( + "put".to_string(), + "get".to_string(), + "stored".to_string(), + false, + )), + ) + } + + #[test] + ///verify that the storage model stores and retrieves a stored value. + fn storage_test() { + let mut model = get_storage(); + let expected_message = "value001".to_string(); + let put_message = put_message(expected_message.clone()); + let get_message = get_message(); + let mut services = Services::default(); + let ext_result = model.events_ext(&put_message, &mut services); + assert!(ext_result.is_ok()); + + let int_results = model.events_int(& mut services); + assert!(int_results.is_ok()); + + let ext_result = model.events_ext(&get_message, &mut services); + assert!(ext_result.is_ok()); + + let int_results = model.events_int(& mut services); + match int_results { + Ok(vl) => { + assert_eq!(vl.len(), 1); + assert_eq!(vl[0].content, expected_message.clone()); + }, + Err(_) => assert!(int_results.is_err()) + } + assert_eq!(services.global_time(), 0f64) + + } + + + #[bench] + fn storage_bench(b: &mut Bencher) { + let mut model = get_storage(); + + let put_message = put_message("value001".to_string()); + let get_message = get_message(); + + b.iter(|| { + let mut services = Services::default(); + let _ = model.events_ext(&put_message, &mut services); + let _ = model.events_int(&mut services); + let _ = model.events_ext(&get_message, &mut services); + let _ = model.events_int(&mut services); + + }); + } +// test test_models::storage_bench ... bench: 165.55 ns/iter (+/- 2.20) +// test test_models::storage_bench ... bench: 165.97 ns/iter (+/- 2.58) +// test test_models::storage_bench ... bench: 166.18 ns/iter (+/- 3.14) +} diff --git a/sim/src/models/parallel_gateway.rs b/sim/src/models/parallel_gateway.rs index dca0436..cb62710 100644 --- a/sim/src/models/parallel_gateway.rs +++ b/sim/src/models/parallel_gateway.rs @@ -97,6 +97,7 @@ impl ParallelGateway { .find(|(_, count)| **count == self.ports_in.flow_paths.len()) } + /// ModelMessages are stored by content in a bag. fn increment_collection(&mut self, incoming_message: &ModelMessage, services: &mut Services) { *self .state