Skip to content

Commit 0423e12

Browse files
committed
support stream API with wait_with_pipe()
// Continuously process child process' outputs spawn!(journalctl)?.wait_with_pipe(&mut |pipe| { BufReader::new(pipe) .lines() .filter_map(|line| line.ok()) .filter(|line| line.find("usb").is_some()) .take(10) .for_each(|line| println!("{}", line)); Ok(()) })?;
1 parent 72c78cd commit 0423e12

File tree

4 files changed

+92
-57
lines changed

4 files changed

+92
-57
lines changed

examples/rust_cookbook.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
// Rewrite examples with rust_cmd_lib from
33
// https://rust-lang-nursery.github.io/rust-cookbook/os/external.html
44
//
5-
use cmd_lib::{init_builtin_logger, run_cmd, run_fun, CmdResult};
5+
use cmd_lib::*;
6+
use std::io::{BufRead, BufReader};
67
fn main() -> CmdResult {
78
init_builtin_logger();
89
cmd_lib::set_pipefail(false); // do not fail due to pipe errors
@@ -26,7 +27,15 @@ fn main() -> CmdResult {
2627
run_cmd!(rm -f out.txt)?;
2728

2829
// Continuously process child process' outputs
29-
run_cmd!(ping -c 5 www.google.com | awk r#"/time/ {print $(NF-3) " " $(NF-1) " " $NF}"#)?;
30+
spawn!(journalctl)?.wait_with_pipe(&mut |pipe| {
31+
BufReader::new(pipe)
32+
.lines()
33+
.filter_map(|line| line.ok())
34+
.filter(|line| line.find("usb").is_some())
35+
.take(10)
36+
.for_each(|line| println!("{}", line));
37+
Ok(())
38+
})?;
3039

3140
Ok(())
3241
}

src/child.rs

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ use std::process::{Child, ExitStatus};
66
use std::thread::JoinHandle;
77
use CmdChild::{ProcChild, SyncChild, ThreadChild};
88

9+
/// Representation of running or exited children processes, connected with pipes
10+
/// optionally.
11+
///
12+
/// Calling `spawn!` macro will return `Result<CmdChildren>`
913
pub struct CmdChildren(Vec<CmdChild>);
1014
impl CmdChildren {
11-
pub fn from(children: Vec<CmdChild>) -> Self {
15+
pub(crate) fn from(children: Vec<CmdChild>) -> Self {
1216
Self(children)
1317
}
1418

@@ -39,33 +43,6 @@ impl CmdChildren {
3943
Ok(())
4044
}
4145

42-
pub fn wait_raw_result(&mut self) -> Result<Vec<u8>> {
43-
let ret = self.wait_raw_result_nolog();
44-
if let Err(ref err) = ret {
45-
error!(
46-
"Running {} failed, Error: {}",
47-
CmdChild::get_full_cmd(&self.0),
48-
err
49-
);
50-
}
51-
ret
52-
}
53-
54-
fn wait_raw_result_nolog(&mut self) -> Result<Vec<u8>> {
55-
let handle = self.0.pop().unwrap();
56-
let wait_last = handle.wait_with_output();
57-
match wait_last {
58-
Err(e) => {
59-
let _ = Self::wait_children(&mut self.0);
60-
Err(e)
61-
}
62-
Ok(output) => {
63-
Self::wait_children(&mut self.0)?;
64-
Ok(output)
65-
}
66-
}
67-
}
68-
6946
pub fn wait_fun_result(&mut self) -> FunResult {
7047
let ret = self.wait_fun_result_nolog();
7148
if let Err(ref err) = ret {
@@ -97,23 +74,55 @@ impl CmdChildren {
9774
}
9875
}
9976
}
77+
78+
pub fn wait_with_pipe(&mut self, f: &mut dyn FnMut(PipeReader) -> CmdResult) -> CmdResult {
79+
let handle = self.0.pop().unwrap();
80+
let mut ret = Ok(());
81+
match handle {
82+
ProcChild {
83+
mut child,
84+
stderr,
85+
stdout,
86+
..
87+
} => {
88+
if let Some(stdout) = stdout {
89+
ret = f(stdout);
90+
let _ = child.kill();
91+
}
92+
CmdChild::log_stderr_output(stderr);
93+
}
94+
ThreadChild { .. } => {
95+
panic!("should not wait pipe on thread");
96+
}
97+
SyncChild { stderr, stdout, .. } => {
98+
CmdChild::log_stderr_output(stderr);
99+
if let Some(stdout) = stdout {
100+
ret = f(stdout);
101+
}
102+
}
103+
};
104+
let _ = Self::wait_children(&mut self.0);
105+
ret
106+
}
100107
}
101108

102109
#[derive(Debug)]
103110
pub enum CmdChild {
104111
ProcChild {
105112
child: Child,
106113
cmd: String,
114+
stdout: Option<PipeReader>,
107115
stderr: Option<PipeReader>,
108116
},
109117
ThreadChild {
110118
child: JoinHandle<CmdResult>,
111119
cmd: String,
120+
stdout: Option<PipeReader>,
112121
stderr: Option<PipeReader>,
113122
},
114123
SyncChild {
115-
output: Option<PipeReader>,
116124
cmd: String,
125+
stdout: Option<PipeReader>,
117126
stderr: Option<PipeReader>,
118127
},
119128
}
@@ -134,6 +143,7 @@ impl CmdChild {
134143
mut child,
135144
stderr,
136145
cmd,
146+
..
137147
} => {
138148
Self::log_stderr_output(stderr);
139149
if let Some(stdout) = child.stdout.take() {
@@ -150,7 +160,9 @@ impl CmdChild {
150160
));
151161
}
152162
}
153-
ThreadChild { child, cmd, stderr } => {
163+
ThreadChild {
164+
child, cmd, stderr, ..
165+
} => {
154166
let status = child.join();
155167
Self::log_stderr_output(stderr);
156168
match status {
@@ -167,12 +179,12 @@ impl CmdChild {
167179
}
168180
}
169181
}
170-
SyncChild { output, stderr, .. } => {
182+
SyncChild { stdout, stderr, .. } => {
171183
Self::log_stderr_output(stderr);
172-
if let Some(mut out) = output {
184+
if let Some(mut out) = stdout {
173185
let mut buf = vec![];
174186
check_result(out.read_to_end(&mut buf).map(|_| ()))?;
175-
println!("{}", String::from_utf8_lossy(&buf));
187+
print!("{}", String::from_utf8_lossy(&buf));
176188
}
177189
}
178190
}
@@ -181,24 +193,32 @@ impl CmdChild {
181193

182194
pub fn wait_with_output(self) -> Result<Vec<u8>> {
183195
match self {
184-
ProcChild { child, cmd, stderr } => {
196+
ProcChild {
197+
mut child,
198+
cmd,
199+
stdout,
200+
stderr,
201+
} => {
202+
let mut buf = vec![];
203+
if let Some(mut stdout) = stdout {
204+
stdout.read_to_end(&mut buf)?;
205+
}
185206
Self::log_stderr_output(stderr);
186-
let output = child.wait_with_output()?;
187-
if !output.status.success() {
207+
let status = child.wait()?;
208+
if !status.success() {
188209
return Err(Self::status_to_io_error(
189-
output.status,
210+
status,
190211
&format!("{} exited with error", cmd),
191212
));
192-
} else {
193-
Ok(output.stdout)
194213
}
214+
Ok(buf)
195215
}
196216
ThreadChild { cmd, .. } => {
197217
panic!("{} thread should not be waited for output", cmd);
198218
}
199-
SyncChild { output, stderr, .. } => {
219+
SyncChild { stdout, stderr, .. } => {
200220
Self::log_stderr_output(stderr);
201-
if let Some(mut out) = output {
221+
if let Some(mut out) = stdout {
202222
let mut buf = vec![];
203223
out.read_to_end(&mut buf)?;
204224
return Ok(buf);

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ pub use builtins::{
331331
builtin_cat, builtin_debug, builtin_die, builtin_echo, builtin_error, builtin_info,
332332
builtin_trace, builtin_true, builtin_warn,
333333
};
334+
pub use child::CmdChildren;
334335
#[doc(hidden)]
335336
pub use log;
336337
pub use logger::init_builtin_logger;

src/process.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::fmt;
1010
use std::fs::{File, OpenOptions};
1111
use std::io::{Error, ErrorKind, Read, Result, Write};
1212
use std::path::Path;
13-
use std::process::{Command, Stdio};
13+
use std::process::Command;
1414
use std::sync::Mutex;
1515

1616
/// Environment for builtin or custom commands
@@ -249,7 +249,8 @@ pub struct Cmd {
249249
stdin_redirect: Option<CmdIn>,
250250
stdout_redirect: Option<CmdOut>,
251251
stderr_redirect: Option<CmdOut>,
252-
stderr_logging: Option<PipeReader>, // for builtin/custom commands
252+
stdout_logging: Option<PipeReader>,
253+
stderr_logging: Option<PipeReader>,
253254
}
254255

255256
impl Default for Cmd {
@@ -263,6 +264,7 @@ impl Default for Cmd {
263264
stdin_redirect: None,
264265
stdout_redirect: None,
265266
stderr_redirect: None,
267+
stdout_logging: None,
266268
stderr_logging: None,
267269
}
268270
}
@@ -342,12 +344,11 @@ impl Cmd {
342344
self.run_cd_cmd(current_dir)?;
343345
Ok(CmdChild::SyncChild {
344346
cmd: full_cmd,
345-
output: None,
347+
stdout: None,
346348
stderr: None,
347349
})
348350
} else if self.in_cmd_map {
349-
let pipe_out = matches!(self.stdout_redirect, Some(CmdOut::CmdPipe(_)));
350-
let mut new_pipe_out = None;
351+
let pipe_out = self.stdout_logging.is_none();
351352
let mut env = CmdEnv {
352353
args: self.args,
353354
vars: self.vars,
@@ -360,9 +361,7 @@ impl Cmd {
360361
stdout: if let Some(redirect_out) = self.stdout_redirect.take() {
361362
redirect_out
362363
} else {
363-
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
364-
new_pipe_out = Some(pipe_reader);
365-
CmdOut::CmdPipe(pipe_writer)
364+
CmdOut::CmdPipe(os_pipe::dup_stdout()?)
366365
},
367366
stderr: if let Some(redirect_err) = self.stderr_redirect.take() {
368367
redirect_err
@@ -376,6 +375,7 @@ impl Cmd {
376375
let handle = std::thread::spawn(move || internal_cmd(&mut env));
377376
Ok(CmdChild::ThreadChild {
378377
child: handle,
378+
stdout: self.stdout_logging,
379379
stderr: self.stderr_logging,
380380
cmd: full_cmd,
381381
})
@@ -384,7 +384,7 @@ impl Cmd {
384384
drop(env);
385385
Ok(CmdChild::SyncChild {
386386
cmd: full_cmd,
387-
output: new_pipe_out,
387+
stdout: self.stdout_logging,
388388
stderr: self.stderr_logging,
389389
})
390390
}
@@ -404,8 +404,6 @@ impl Cmd {
404404
// update stdout
405405
if let Some(redirect_out) = self.stdout_redirect.take() {
406406
cmd.stdout(redirect_out);
407-
} else {
408-
cmd.stdout(Stdio::piped());
409407
}
410408

411409
// update stderr
@@ -417,6 +415,7 @@ impl Cmd {
417415
let child = cmd.spawn()?;
418416
Ok(CmdChild::ProcChild {
419417
cmd: full_cmd,
418+
stdout: self.stdout_logging,
420419
stderr: self.stderr_logging,
421420
child,
422421
})
@@ -470,11 +469,17 @@ impl Cmd {
470469
self.stderr_redirect = Some(CmdOut::CmdPipe(pipe_writer));
471470
self.stderr_logging = Some(pipe_reader);
472471

473-
if let Some(pipe) = pipe_in.take() {
474-
self.stdin_redirect = Some(CmdIn::CmdPipe(pipe));
475-
}
476472
if let Some(pipe) = pipe_out {
477473
self.stdout_redirect = Some(CmdOut::CmdPipe(pipe));
474+
} else {
475+
// set up stdout pipe
476+
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
477+
self.stdout_redirect = Some(CmdOut::CmdPipe(pipe_writer));
478+
self.stdout_logging = Some(pipe_reader);
479+
}
480+
481+
if let Some(pipe) = pipe_in.take() {
482+
self.stdin_redirect = Some(CmdIn::CmdPipe(pipe));
478483
}
479484

480485
for redirect in self.redirects.iter() {

0 commit comments

Comments
 (0)