Skip to content

Commit

Permalink
Change to use tokio::sync::mpsc for events
Browse files Browse the repository at this point in the history
  • Loading branch information
happybeing committed Sep 13, 2020
1 parent 4ebee5f commit 8092945
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ default = ["termion", "crossterm"]
rand = "0.7.3"
structopt = "~0.3.15"
linemux = "0.1.1"
tokio = "0.2.22"
tokio = { version = "0.2.22", features = ["sync"] }
futures = "0.3.5"
termion = { version = "1.5", optional = true }
crossterm = { version = "0.17", optional = true }
Expand Down
105 changes: 48 additions & 57 deletions src/bin/logtail-crossterm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
#![recursion_limit = "512"] // Prevent select! macro blowing up

use tokio::stream::StreamExt;
use tokio::sync::mpsc;

///! forks of logterm customise the files in src/custom
#[path = "../custom/mod.rs"]
pub mod custom;
use self::custom::app::{App, DashViewMain};
use self::custom::opt::Opt;
use self::custom::ui::draw_dashboard;

///! logtail and its forks share code in src/
Expand All @@ -33,7 +33,6 @@ use crossterm::{
use std::{
error::Error,
io::{stdout, Write},
sync::mpsc,
thread,
time::{Duration, Instant},
};
Expand All @@ -58,8 +57,6 @@ enum Event<I> {
Tick,
}

use structopt::StructOpt;

// RUSTFLAGS="-A unused" cargo run --bin logtail-crossterm --features="crossterm" /var/log/auth.log /var/log/dmesg
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
Expand All @@ -74,91 +71,89 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let rx = initialise_events(app.opt.tick_rate);
let mut rx = initialise_events(app.opt.tick_rate);
terminal.clear()?;

// Use futures of async functions to handle events
// concurrently with logfile changes.
loop {
terminal.draw(|f| draw_dashboard(f, &mut app.dash_state, &mut app.monitors))?;
let logfiles_future = app.logfiles.next().fuse();
let events_future = next_event(&rx).fuse();
let events_future = rx.recv().fuse();
pin_mut!(logfiles_future, events_future);

select! {
(e) = events_future => {
match e {
Ok(Event::Input(event)) => match event.code {
KeyCode::Char('q')|
KeyCode::Char('Q') => {
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
break Ok(());
},
KeyCode::Char('h')|
KeyCode::Char('H') => app.dash_state.main_view = DashViewMain::DashHorizontal,
KeyCode::Char('v')|
KeyCode::Char('V') => app.dash_state.main_view = DashViewMain::DashVertical,
KeyCode::Down => app.handle_arrow_down(),
KeyCode::Up => app.handle_arrow_up(),
KeyCode::Right|
KeyCode::Tab => app.change_focus_next(),
KeyCode::Left => app.change_focus_previous(),
_ => {},
}
match e {
Some(Event::Input(event)) => match event.code {
KeyCode::Char('q')|
KeyCode::Char('Q') => {
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
break Ok(());
},
KeyCode::Char('h')|
KeyCode::Char('H') => app.dash_state.main_view = DashViewMain::DashHorizontal,
KeyCode::Char('v')|
KeyCode::Char('V') => app.dash_state.main_view = DashViewMain::DashVertical,
KeyCode::Down => app.handle_arrow_down(),
KeyCode::Up => app.handle_arrow_up(),
KeyCode::Right|
KeyCode::Tab => app.change_focus_next(),
KeyCode::Left => app.change_focus_previous(),
_ => {},
}

Some(Event::Tick) => {
// draw_dashboard(&mut f, &dash_state, &mut monitors).unwrap();
// draw_dashboard(f, &dash_state, &mut monitors)?;
}

Ok(Event::Tick) => {
// draw_dashboard(&mut f, &dash_state, &mut monitors).unwrap();
// draw_dashboard(f, &dash_state, &mut monitors)?;
}

Err(error) => {
println!("{}", error);
None => (),
}
}
},

(line) = logfiles_future => {
match line {
Some(Ok(line)) => {
let source_str = line.source().to_str().unwrap();
let source = String::from(source_str);

match app.monitors.get_mut(&source) {
match line {
Some(Ok(line)) => {
let source_str = line.source().to_str().unwrap();
let source = String::from(source_str);

match app.monitors.get_mut(&source) {
None => (),
Some(monitor) => monitor.append_to_content(line.line())
}
},
Some(Err(e)) => panic!("{}", e),
None => (),
Some(monitor) => monitor.append_to_content(line.line())
}
},
Some(Err(e)) => panic!("{}", e),
None => (),
}
},
}
}
}
// type Tx = std::sync::mpsc::Sender<Event<crossterm::event::KeyEvent>>;
type Rx = std::sync::mpsc::Receiver<Event<crossterm::event::KeyEvent>>;
type Rx = tokio::sync::mpsc::UnboundedReceiver<Event<crossterm::event::KeyEvent>>;

fn initialise_events(tick_rate: u64) -> Rx {
let tick_rate = Duration::from_millis(tick_rate);
let (tx, rx) = mpsc::channel(); // Setup input handling
let (tx, rx) = mpsc::unbounded_channel(); // Setup input handling

thread::spawn(move || {
let mut last_tick = Instant::now();
loop {
// poll for tick rate duration, if no events, sent tick event.
if event::poll(tick_rate - last_tick.elapsed()).unwrap() {
if let CEvent::Key(key) = event::read().unwrap() {
tx.send(Event::Input(key)).unwrap();
tx.send(Event::Input(key));
}
}
if last_tick.elapsed() >= tick_rate {
tx.send(Event::Tick).unwrap(); // <-- PANICS HERE
tx.send(Event::Tick);
last_tick = Instant::now();
}

Expand All @@ -172,7 +167,3 @@ fn initialise_events(tick_rate: u64) -> Rx {
});
rx
}

async fn next_event(rx: &Rx) -> Result<Event<crossterm::event::KeyEvent>, mpsc::RecvError> {
rx.recv()
}
21 changes: 8 additions & 13 deletions src/bin/logtail-termion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn terminal_main() -> std::io::Result<()> {
}
};

let events = Events::new();
let mut events = Events::new();

// Terminal initialization
// info!("Intialising terminal (termion backend)");
Expand All @@ -88,14 +88,14 @@ async fn terminal_main() -> std::io::Result<()> {
// concurrently with logfile changes.
// info!("Processing started");
loop {
let events_future = next_event(&events).fuse();
let events_future = events.rx.recv().fuse();
let logfiles_future = app.logfiles.next().fuse();
pin_mut!(events_future, logfiles_future);

select! {
(e) = events_future => {
match e {
Ok(Event::Input(input)) => {
Some(Event::Input(input)) => {
match input {
Key::Char('q')|
Key::Char('Q') => return Ok(()),
Expand All @@ -112,7 +112,7 @@ async fn terminal_main() -> std::io::Result<()> {
}
}

Ok(Event::Tick) => {
Some(Event::Tick) => {
match terminal.draw(|f| draw_dashboard(f, &mut app.dash_state, &mut app.monitors)) {
Ok(_) => {},
Err(e) => {
Expand All @@ -121,9 +121,10 @@ async fn terminal_main() -> std::io::Result<()> {
};
}

Err(e) => {
return Err(Error::new(ErrorKind::Other, "receive error"));
}
None => return Err(Error::new(ErrorKind::Other, "receive error")),
// Err(e) => {
// return Err(Error::new(ErrorKind::Other, "receive error"));
// }
}
},
(line) = logfiles_future => {
Expand All @@ -149,9 +150,3 @@ async fn terminal_main() -> std::io::Result<()> {
}
}
}

use std::sync::mpsc;

async fn next_event(events: &Events) -> Result<Event<Key>, mpsc::RecvError> {
events.next()
}
12 changes: 6 additions & 6 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::io;
use std::sync::mpsc;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::thread;
use std::time::Duration;
use tokio::sync::mpsc;

use termion::event::Key;
use termion::input::TermRead;
Expand All @@ -18,7 +18,7 @@ pub enum Event<I> {
/// A small event handler that wrap termion input and tick events. Each event
/// type is handled in its own thread and returned to a common `Receiver`
pub struct Events {
rx: mpsc::Receiver<Event<Key>>,
pub rx: mpsc::UnboundedReceiver<Event<Key>>,
input_handle: thread::JoinHandle<()>,
ignore_exit_key: Arc<AtomicBool>,
tick_handle: thread::JoinHandle<()>,
Expand All @@ -45,7 +45,7 @@ impl Events {
}

pub fn with_config(config: Config) -> Events {
let (tx, rx) = mpsc::channel();
let (tx, rx) = mpsc::unbounded_channel();
let ignore_exit_key = Arc::new(AtomicBool::new(false));
let input_handle = {
let tx = tx.clone();
Expand Down Expand Up @@ -81,9 +81,9 @@ impl Events {
}
}

pub fn next(&self) -> Result<Event<Key>, mpsc::RecvError> {
self.rx.recv()
}
// pub async fn next(&self) -> Option<Event<Key>> {
// self.rx.recv().await
// }

pub fn disable_exit_key(&mut self) {
self.ignore_exit_key.store(true, Ordering::Relaxed);
Expand Down

0 comments on commit 8092945

Please sign in to comment.