Skip to content

Commit

Permalink
fix: try to avoid stdout blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Jun 22, 2024
1 parent 7ae0823 commit 7edeaff
Showing 1 changed file with 37 additions and 28 deletions.
65 changes: 37 additions & 28 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@ use crate::{get_memory_bytes, Args};
use chrono::Local;
use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse};
use flume::{unbounded, Receiver, Sender};
use futures_util::StreamExt;
use futures_util::{future::try_join3, StreamExt};
use log::{error, info, warn};
use reqwest::Url;
use std::{
os::fd::AsRawFd,
os::fd::FromRawFd,
path::Path,
process::{Output, Stdio},
time::{Duration, Instant},
};
use tokio::{
fs,
io::{AsyncBufReadExt, BufReader},
io::{AsyncBufReadExt, AsyncRead, BufReader},
process::Command,
time::sleep,
};
Expand All @@ -38,33 +36,38 @@ async fn get_output_logged(
logs.extend(msg.as_bytes());
info!("{}", msg.trim());

// join stdout and stderr together
let (writer, reader) = tokio::net::unix::pipe::pipe()?;
let writer_fd = writer.into_blocking_fd()?;
let output = Command::new(cmd)
let mut output = Command::new(cmd)
.args(args)
.current_dir(cwd)
.stdout(unsafe { Stdio::from_raw_fd(writer_fd.as_raw_fd()) })
.stderr(unsafe { Stdio::from_raw_fd(writer_fd.as_raw_fd()) })
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;

let mut stdout_reader = BufReader::new(reader).lines();

let mut stdout_out = vec![];
while let Ok(Some(v)) = stdout_reader.next_line().await {
for line in v.split("\r") {
tx.send_async(Message::Text(line.to_string())).await.ok();
// learn from tokio wait_with_output
async fn read_and_send<A: AsyncRead + Unpin>(
io: &mut Option<A>,
tx: Sender<Message>,
) -> tokio::io::Result<String> {
let mut res = String::new();
if let Some(io) = io.as_mut() {
let mut reader = BufReader::new(io).lines();
while let Ok(Some(v)) = reader.next_line().await {
for line in v.split("\r") {
tx.send_async(Message::Text(line.to_string())).await.ok();
res += &line;
res += "\n";
}
}
}
stdout_out.push(v);
Ok(res)
}

let mut output = output.wait_with_output().await?;
let mut stdout_pipe = output.stdout.take();
let stdout_future = read_and_send(&mut stdout_pipe, tx.clone());
let mut stderr_pipe = output.stderr.take();
let stderr_future = read_and_send(&mut stderr_pipe, tx.clone());

// save data back to output.stdout, since we captured it manually
for line in &stdout_out {
output.stdout.extend_from_slice(line.as_bytes());
output.stdout.push(b'\n');
}
let (status, stdout, stderr) = try_join3(output.wait(), stdout_future, stderr_future).await?;

let elapsed = begin.elapsed();

Expand All @@ -75,14 +78,20 @@ async fn get_output_logged(
cmd,
args.join(" "),
elapsed,
output.status.success()
status.success()
)
.as_bytes(),
);
logs.extend("STDOUT&STDERR:\n".as_bytes());
logs.extend(stdout_out.join("\n").as_bytes());

Ok(output)
logs.extend("STDOUT:\n".as_bytes());
logs.extend(stdout.as_bytes());
logs.extend("STDERR:\n".as_bytes());
logs.extend(stderr.as_bytes());

Ok(Output {
status,
stdout: stdout.into_bytes(),
stderr: stderr.into_bytes(),
})
}

/// Run command and retry until it succeeds
Expand Down

0 comments on commit 7edeaff

Please sign in to comment.