-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler.rs
67 lines (58 loc) · 2.39 KB
/
handler.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::collections::HashMap;
use std::marker::PhantomData;
use crossbeam_channel::Sender;
use rayon::prelude::*;
use crate::command::Command;
use crate::module_runner::ModuleRunner;
#[derive(Debug, Clone)]
pub struct Handler<OutgoingDataFormat> {
modules_sender: HashMap<u64, Sender<Command>>,
pd: PhantomData<OutgoingDataFormat>
}
impl<OutgoingDataFormat: 'static + Clone> Handler<OutgoingDataFormat> {
pub fn new() -> Self {
Self {
modules_sender: HashMap::new(),
pd: PhantomData::default()
}
}
/// Spawn a new ModuleRunner task and execute it immediately
pub fn spawn(&mut self, id: u64, mut module: ModuleRunner<OutgoingDataFormat>) {
let sender = module.command_sender.clone();
// TODO: do something meaningful with handle
let _handle = tokio::spawn(async move {
module.runner().await;
});
self.modules_sender.insert(id, sender);
}
/// Send message to specified module. Returns false if it does not exist
/// If stop message is sent, the reference to will be deleted and the thread will shut down gracefully after another execution
fn send_message(&self, id: u64, message: Command) -> bool {
match self.modules_sender.get(&id) {
None => {
false
}
Some(sender) => {
// TODO: program could crash here, introduce some kind of mechanism that prevents this!
sender.send(message).is_ok()
}
}
}
pub fn send_stop_message(&mut self, id: u64) -> bool {
if self.send_message(id, Command::Stop) {
self.modules_sender.remove(&id);
return true;
}
false
}
/// Has to be called periodically to reset the housekeeping timer of the modules.
/// If the thread does not receive a housekeeping message for the specified time it will end itself gracefully!
/// Note that as long as this function has never been called the mechanism is disabled!
pub fn housekeeping(&self) {
self.modules_sender.par_iter().for_each(|(id, _)| {
self.send_message(*id, Command::Housekeeping);
});
}
}
unsafe impl<OutgoingDataFormat: Clone> Send for Handler<OutgoingDataFormat> {}
unsafe impl<OutgoingDataFormat: Clone> Sync for Handler<OutgoingDataFormat> {}