Skip to content

Commit 626e1a5

Browse files
authoredOct 17, 2023
runner logging using simple logger (#47755)
* runner using runner logger
1 parent 1431d1d commit 626e1a5

File tree

3 files changed

+218
-35
lines changed

3 files changed

+218
-35
lines changed
 

‎src/bin/pushpin.rs

+17-3
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,32 @@
1515
*/
1616

1717
use clap::Parser;
18-
use log::info;
19-
use pushpin::runner::{ArgsData, CliArgs, Settings};
18+
use log::{info, LevelFilter};
19+
use pushpin::runner::{get_runner_logger, ArgsData, CliArgs, Settings};
2020
use pushpin::service::start_services;
2121
use std::error::Error;
2222
use std::process;
2323

2424
fn process_args_and_run(args: CliArgs) -> Result<(), Box<dyn Error>> {
2525
let args_data = ArgsData::new(args)?;
2626
let settings = Settings::new(args_data)?;
27+
28+
log::set_logger(get_runner_logger()).unwrap();
29+
let ll = settings
30+
.log_levels
31+
.get("")
32+
.unwrap_or(settings.log_levels.get("default").unwrap());
33+
let level = match ll {
34+
0 => LevelFilter::Error,
35+
1 => LevelFilter::Warn,
36+
2 => LevelFilter::Info,
37+
3 => LevelFilter::Debug,
38+
4..=u8::MAX => LevelFilter::Trace,
39+
};
40+
log::set_max_level(level);
41+
2742
info!("using config: {:?}", settings.config_file.display());
2843
start_services(settings);
29-
//To be implemented in the next PR
3044

3145
Ok(())
3246
}

‎src/runner.rs

+85
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,21 @@
1616

1717
use clap::{ArgAction, Parser};
1818
use log::{error, warn};
19+
use log::{Level, Log, Metadata, Record};
1920
use serde::Deserialize;
2021
use std::collections::HashMap;
2122
use std::env;
2223
use std::error::Error;
2324
use std::fs;
25+
use std::io;
26+
use std::mem;
2427
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2528
use std::path::{Path, PathBuf};
29+
use std::str;
2630
use std::string::String;
31+
use std::sync::Once;
32+
use time::macros::format_description;
33+
use time::{OffsetDateTime, UtcOffset};
2734
use url::Url;
2835

2936
use crate::config::{get_config_file, CustomConfig};
@@ -887,3 +894,81 @@ mod tests {
887894
}
888895
}
889896
}
897+
898+
struct RunnerLogger {
899+
local_offset: UtcOffset,
900+
}
901+
902+
impl Log for RunnerLogger {
903+
fn enabled(&self, metadata: &Metadata) -> bool {
904+
metadata.level() <= Level::Trace
905+
}
906+
907+
fn log(&self, record: &Record) {
908+
if !self.enabled(record.metadata()) {
909+
return;
910+
}
911+
912+
let binding = record.args().to_string();
913+
let msg_parts: Vec<&str> = binding.split_whitespace().collect();
914+
915+
if msg_parts.len() >= 4 {
916+
println!(
917+
"{} {} {} [{}] {}",
918+
msg_parts[0],
919+
msg_parts[1],
920+
msg_parts[2],
921+
record.target(),
922+
msg_parts[3..].join(" ")
923+
);
924+
} else {
925+
let now = OffsetDateTime::now_utc().to_offset(self.local_offset);
926+
927+
let format = format_description!(
928+
"[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:3]"
929+
);
930+
931+
let mut ts = [0u8; 64];
932+
933+
let size = {
934+
let mut ts = io::Cursor::new(&mut ts[..]);
935+
936+
now.format_into(&mut ts, &format)
937+
.expect("failed to write timestamp");
938+
939+
ts.position() as usize
940+
};
941+
942+
let ts = str::from_utf8(&ts[..size]).expect("timestamp is not utf-8");
943+
944+
let lname = match record.level() {
945+
log::Level::Error => "ERR",
946+
log::Level::Warn => "WARN",
947+
log::Level::Info => "INFO",
948+
log::Level::Debug => "DEBUG",
949+
log::Level::Trace => "TRACE",
950+
};
951+
952+
println!("[{}] {} [{}] {}", lname, ts, record.target(), record.args());
953+
}
954+
}
955+
956+
fn flush(&self) {}
957+
}
958+
959+
static mut LOGGER: mem::MaybeUninit<RunnerLogger> = mem::MaybeUninit::uninit();
960+
961+
pub fn get_runner_logger() -> &'static impl Log {
962+
static INIT: Once = Once::new();
963+
964+
unsafe {
965+
INIT.call_once(|| {
966+
let local_offset =
967+
UtcOffset::current_local_offset().expect("failed to get local time offset");
968+
969+
LOGGER.write(RunnerLogger { local_offset });
970+
});
971+
972+
LOGGER.as_ptr().as_ref().unwrap()
973+
}
974+
}

‎src/service.rs

+116-32
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
*/
1616

1717
use crate::runner::Settings;
18-
use log::error;
18+
use log::{error, LevelFilter};
1919
use mpsc::{channel, Sender};
2020
use signal_hook::consts::{SIGINT, SIGTERM, TERM_SIGNALS};
2121
use signal_hook::iterator::Signals;
22+
use std::io::{BufRead, BufReader};
23+
use std::process::{ChildStderr, ChildStdout, Stdio};
2224
use std::sync::mpsc;
2325
use std::thread::JoinHandle;
2426
use std::{process::Command, thread};
@@ -31,6 +33,7 @@ pub enum ServiceError {
3133

3234
pub struct Service {
3335
pub name: String,
36+
pub log_level: u8,
3437
}
3538

3639
pub fn start_services(settings: Settings) {
@@ -54,7 +57,7 @@ pub fn start_services(settings: Settings) {
5457
let (sender, receiver) = channel();
5558
let mut threads: Vec<Option<JoinHandle<()>>> = vec![];
5659
for mut service in services {
57-
threads.push(service.start(sender.clone()));
60+
threads.extend(service.start(sender.clone()));
5861
}
5962

6063
// Spawn a signal handling thread
@@ -101,22 +104,49 @@ pub fn start_services(settings: Settings) {
101104
}
102105

103106
impl Service {
104-
fn new(name: String) -> Self {
105-
Self { name }
107+
fn new(name: String, log_level: u8) -> Self {
108+
Self { name, log_level }
106109
}
107-
108110
pub fn start(
109111
&mut self,
110112
args: Vec<String>,
111113
sender: Sender<Result<(), ServiceError>>,
112-
) -> Option<thread::JoinHandle<()>> {
114+
) -> Vec<Option<JoinHandle<()>>> {
113115
let name = self.name.clone();
116+
let name_str = self.name.clone();
117+
118+
let level = match self.log_level {
119+
0 => LevelFilter::Error,
120+
1 => LevelFilter::Warn,
121+
2 => LevelFilter::Info,
122+
3 => LevelFilter::Debug,
123+
4..=u8::MAX => LevelFilter::Trace,
124+
};
125+
log::set_max_level(level);
126+
127+
// Create a channel for sending thread handles back to main thread
128+
let (handle_sender, handle_receiver) = channel();
129+
130+
let mut result: Vec<Option<JoinHandle<()>>> = Vec::new();
114131

115-
Some(thread::spawn(move || {
116-
let mut command = Command::new(args[0].clone());
132+
result.push(Some(thread::spawn(move || {
133+
let mut command = Command::new(&args[0]);
117134
command.args(&args[1..]);
118135

119-
let status = command.status().expect("Failed to execute command");
136+
// Capture stdout and stderr
137+
command.stdout(Stdio::piped());
138+
command.stderr(Stdio::piped());
139+
140+
let mut child = command.spawn().expect("Failed to execute command");
141+
142+
let stdout = child.stdout.take().unwrap();
143+
let stderr = child.stderr.take().unwrap();
144+
let handles = start_log_handler(stdout, stderr, name_str);
145+
146+
// Send the handles back to main thread
147+
handle_sender.send(handles).unwrap();
148+
149+
let status = child.wait().expect("Failed to wait for command");
120150

121151
if status.success() {
122152
sender.send(Ok(())).unwrap();
@@ -126,12 +156,17 @@ impl Service {
126156
.send(Err(ServiceError::ThreadError(error_message)))
127157
.unwrap();
128158
}
129-
}))
159+
})));
160+
// Receive the handles from the channel and add them to the result vector
161+
let received_handles: Vec<Option<JoinHandle<()>>> = handle_receiver.recv().unwrap();
162+
result.extend(received_handles);
163+
164+
result
130165
}
131166
}
132167

133168
pub trait RunnerService {
134-
fn start(&mut self, sender: Sender<Result<(), ServiceError>>) -> Option<JoinHandle<()>>;
169+
fn start(&mut self, sender: Sender<Result<(), ServiceError>>) -> Vec<Option<JoinHandle<()>>>;
135170
}
136171

137172
pub struct CondureService {
@@ -147,12 +182,10 @@ impl CondureService {
147182
args.push(settings.condure_bin.display().to_string());
148183

149184
let log_level = match settings.log_levels.get(service_name) {
150-
Some(&x) => x as i8,
151-
None => settings.log_levels.get("default").unwrap().to_owned() as i8,
185+
Some(&x) => x,
186+
None => settings.log_levels.get("default").unwrap().to_owned(),
152187
};
153-
if log_level >= 0 {
154-
args.push(format!("--log-level={}", log_level));
155-
}
188+
args.push(format!("--log-level={}", log_level));
156189

157190
args.push(format!("--buffer-size={}", settings.client_buffer_size));
158191
args.push(format!(
@@ -215,14 +248,14 @@ impl CondureService {
215248
}
216249

217250
Self {
218-
service: Service::new(String::from(service_name)),
251+
service: Service::new(String::from(service_name), log_level),
219252
args,
220253
}
221254
}
222255
}
223256

224257
impl RunnerService for CondureService {
225-
fn start(&mut self, sender: Sender<Result<(), ServiceError>>) -> Option<JoinHandle<()>> {
258+
fn start(&mut self, sender: Sender<Result<(), ServiceError>>) -> Vec<Option<JoinHandle<()>>> {
226259
self.service.start(self.args.clone(), sender)
227260
}
228261
}
@@ -244,26 +277,24 @@ impl PushpinProxyService {
244277
args.push(format!("--ipc-prefix={}", settings.ipc_prefix));
245278
}
246279
let log_level = match settings.log_levels.get("pushpin-proxy") {
247-
Some(&x) => x as i8,
248-
None => settings.log_levels.get("default").unwrap().to_owned() as i8,
280+
Some(&x) => x,
281+
None => settings.log_levels.get("default").unwrap().to_owned(),
249282
};
250-
if log_level >= 0 {
251-
args.push(format!("--loglevel={}", log_level));
252-
}
283+
args.push(format!("--loglevel={}", log_level));
253284

254285
for route in settings.route_lines.clone() {
255286
args.push(format!("--route={}", route));
256287
}
257288

258289
Self {
259-
service: Service::new(String::from(service_name)),
290+
service: Service::new(String::from(service_name), log_level),
260291
args,
261292
}
262293
}
263294
}
264295

265296
impl RunnerService for PushpinProxyService {
266-
fn start(&mut self, sender: Sender<Result<(), ServiceError>>) -> Option<JoinHandle<()>> {
297+
fn start(&mut self, sender: Sender<Result<(), ServiceError>>) -> Vec<Option<JoinHandle<()>>> {
267298
self.service.start(self.args.clone(), sender)
268299
}
269300
}
@@ -288,22 +319,75 @@ impl PushpinHandlerService {
288319
args.push(format!("--ipc-prefix={}", settings.ipc_prefix));
289320
}
290321
let log_level = match settings.log_levels.get("pushpin-handler") {
291-
Some(&x) => x as i8,
292-
None => settings.log_levels.get("default").unwrap().to_owned() as i8,
322+
Some(&x) => x,
323+
None => settings.log_levels.get("default").unwrap().to_owned(),
293324
};
294-
if log_level >= 0 {
295-
args.push(format!("--loglevel={}", log_level));
296-
}
325+
args.push(format!("--loglevel={}", log_level));
297326

298327
Self {
299-
service: Service::new(String::from(service_name)),
328+
service: Service::new(String::from(service_name), log_level),
300329
args,
301330
}
302331
}
303332
}
304333

305334
impl RunnerService for PushpinHandlerService {
306-
fn start(&mut self, sender: Sender<Result<(), ServiceError>>) -> Option<JoinHandle<()>> {
335+
fn start(&mut self, sender: Sender<Result<(), ServiceError>>) -> Vec<Option<JoinHandle<()>>> {
307336
self.service.start(self.args.clone(), sender)
308337
}
309338
}
339+
340+
fn start_log_handler(
341+
stdout: ChildStdout,
342+
stderr: ChildStderr,
343+
name: String,
344+
) -> Vec<Option<JoinHandle<()>>> {
345+
let mut result: Vec<Option<JoinHandle<()>>> = Vec::new();
346+
347+
let name_str = name.clone();
348+
result.push(Some(thread::spawn(move || {
349+
let reader = BufReader::new(stdout);
350+
for line in reader.lines() {
351+
match line {
352+
Ok(msg) => log_message(&name_str, log::Level::Info, &msg),
353+
Err(_) => {
354+
log_message(
355+
&name_str,
356+
log::Level::Error,
357+
"failed to read from standard out.",
358+
);
359+
break;
360+
}
361+
}
362+
}
363+
})));
364+
365+
result.push(Some(thread::spawn(move || {
366+
let reader_err = BufReader::new(stderr);
367+
for line in reader_err.lines() {
368+
match line {
369+
Ok(msg) => log_message(&name, log::Level::Error, &msg),
370+
Err(_) => {
371+
log_message(
372+
&name,
373+
log::Level::Error,
374+
"failed to read from standard error.",
375+
);
376+
break;
377+
}
378+
}
379+
}
380+
})));
381+
382+
result
383+
}
384+
385+
fn log_message(name: &str, level: log::Level, msg: &str) {
386+
log::logger().log(
387+
&log::Record::builder()
388+
.level(level)
389+
.target(name)
390+
.args(format_args!("{}", msg))
391+
.build(),
392+
);
393+
}

0 commit comments

Comments
 (0)
Please sign in to comment.