Skip to content

Commit 1f893ef

Browse files
matthiaskrgrgitbot
authored and
gitbot
committed
Rollup merge of rust-lang#135583 - NobodyXu:move-pipe-to-io, r=joshtriplett
Move `std::pipe::*` into `std::io` Resolve concern from final comment period rust-lang#127154 (comment)
2 parents 7b06323 + 6bc12e0 commit 1f893ef

File tree

10 files changed

+273
-290
lines changed

10 files changed

+273
-290
lines changed

std/src/io/mod.rs

+249
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ pub use self::{
330330
};
331331
use crate::mem::take;
332332
use crate::ops::{Deref, DerefMut};
333+
use crate::sys::anonymous_pipe::{AnonPipe, pipe as pipe_inner};
333334
use crate::{cmp, fmt, slice, str, sys};
334335

335336
mod buffered;
@@ -3250,3 +3251,251 @@ impl<B: BufRead> Iterator for Lines<B> {
32503251
}
32513252
}
32523253
}
3254+
3255+
/// Create anonymous pipe that is close-on-exec and blocking.
3256+
///
3257+
/// # Behavior
3258+
///
3259+
/// A pipe is a synchronous, unidirectional data channel between two or more processes, like an
3260+
/// interprocess [`mpsc`](crate::sync::mpsc) provided by the OS. In particular:
3261+
///
3262+
/// * A read on a [`PipeReader`] blocks until the pipe is non-empty.
3263+
/// * A write on a [`PipeWriter`] blocks when the pipe is full.
3264+
/// * When all copies of a [`PipeWriter`] are closed, a read on the corresponding [`PipeReader`]
3265+
/// returns EOF.
3266+
/// * [`PipeReader`] can be shared, but only one process will consume the data in the pipe.
3267+
///
3268+
/// # Capacity
3269+
///
3270+
/// Pipe capacity is platform dependent. To quote the Linux [man page]:
3271+
///
3272+
/// > Different implementations have different limits for the pipe capacity. Applications should
3273+
/// > not rely on a particular capacity: an application should be designed so that a reading process
3274+
/// > consumes data as soon as it is available, so that a writing process does not remain blocked.
3275+
///
3276+
/// # Examples
3277+
///
3278+
/// ```no_run
3279+
/// #![feature(anonymous_pipe)]
3280+
/// # #[cfg(miri)] fn main() {}
3281+
/// # #[cfg(not(miri))]
3282+
/// # fn main() -> std::io::Result<()> {
3283+
/// # use std::process::Command;
3284+
/// # use std::io::{Read, Write};
3285+
/// let (ping_rx, mut ping_tx) = std::io::pipe()?;
3286+
/// let (mut pong_rx, pong_tx) = std::io::pipe()?;
3287+
///
3288+
/// // Spawn a process that echoes its input.
3289+
/// let mut echo_server = Command::new("cat").stdin(ping_rx).stdout(pong_tx).spawn()?;
3290+
///
3291+
/// ping_tx.write_all(b"hello")?;
3292+
/// // Close to unblock echo_server's reader.
3293+
/// drop(ping_tx);
3294+
///
3295+
/// let mut buf = String::new();
3296+
/// // Block until echo_server's writer is closed.
3297+
/// pong_rx.read_to_string(&mut buf)?;
3298+
/// assert_eq!(&buf, "hello");
3299+
///
3300+
/// echo_server.wait()?;
3301+
/// # Ok(())
3302+
/// # }
3303+
/// ```
3304+
/// [pipe]: https://man7.org/linux/man-pages/man2/pipe.2.html
3305+
/// [CreatePipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe
3306+
/// [man page]: https://man7.org/linux/man-pages/man7/pipe.7.html
3307+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3308+
#[inline]
3309+
pub fn pipe() -> Result<(PipeReader, PipeWriter)> {
3310+
pipe_inner().map(|(reader, writer)| (PipeReader(reader), PipeWriter(writer)))
3311+
}
3312+
3313+
/// Read end of the anonymous pipe.
3314+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3315+
#[derive(Debug)]
3316+
pub struct PipeReader(pub(crate) AnonPipe);
3317+
3318+
/// Write end of the anonymous pipe.
3319+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3320+
#[derive(Debug)]
3321+
pub struct PipeWriter(pub(crate) AnonPipe);
3322+
3323+
impl PipeReader {
3324+
/// Create a new [`PipeReader`] instance that shares the same underlying file description.
3325+
///
3326+
/// # Examples
3327+
///
3328+
/// ```no_run
3329+
/// #![feature(anonymous_pipe)]
3330+
/// # #[cfg(miri)] fn main() {}
3331+
/// # #[cfg(not(miri))]
3332+
/// # fn main() -> std::io::Result<()> {
3333+
/// # use std::fs;
3334+
/// # use std::io::Write;
3335+
/// # use std::process::Command;
3336+
/// const NUM_SLOT: u8 = 2;
3337+
/// const NUM_PROC: u8 = 5;
3338+
/// const OUTPUT: &str = "work.txt";
3339+
///
3340+
/// let mut jobs = vec![];
3341+
/// let (reader, mut writer) = std::io::pipe()?;
3342+
///
3343+
/// // Write NUM_SLOT characters the pipe.
3344+
/// writer.write_all(&[b'|'; NUM_SLOT as usize])?;
3345+
///
3346+
/// // Spawn several processes that read a character from the pipe, do some work, then
3347+
/// // write back to the pipe. When the pipe is empty, the processes block, so only
3348+
/// // NUM_SLOT processes can be working at any given time.
3349+
/// for _ in 0..NUM_PROC {
3350+
/// jobs.push(
3351+
/// Command::new("bash")
3352+
/// .args(["-c",
3353+
/// &format!(
3354+
/// "read -n 1\n\
3355+
/// echo -n 'x' >> '{OUTPUT}'\n\
3356+
/// echo -n '|'",
3357+
/// ),
3358+
/// ])
3359+
/// .stdin(reader.try_clone()?)
3360+
/// .stdout(writer.try_clone()?)
3361+
/// .spawn()?,
3362+
/// );
3363+
/// }
3364+
///
3365+
/// // Wait for all jobs to finish.
3366+
/// for mut job in jobs {
3367+
/// job.wait()?;
3368+
/// }
3369+
///
3370+
/// // Check our work and clean up.
3371+
/// let xs = fs::read_to_string(OUTPUT)?;
3372+
/// fs::remove_file(OUTPUT)?;
3373+
/// assert_eq!(xs, "x".repeat(NUM_PROC.into()));
3374+
/// # Ok(())
3375+
/// # }
3376+
/// ```
3377+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3378+
pub fn try_clone(&self) -> Result<Self> {
3379+
self.0.try_clone().map(Self)
3380+
}
3381+
}
3382+
3383+
impl PipeWriter {
3384+
/// Create a new [`PipeWriter`] instance that shares the same underlying file description.
3385+
///
3386+
/// # Examples
3387+
///
3388+
/// ```no_run
3389+
/// #![feature(anonymous_pipe)]
3390+
/// # #[cfg(miri)] fn main() {}
3391+
/// # #[cfg(not(miri))]
3392+
/// # fn main() -> std::io::Result<()> {
3393+
/// # use std::process::Command;
3394+
/// # use std::io::Read;
3395+
/// let (mut reader, writer) = std::io::pipe()?;
3396+
///
3397+
/// // Spawn a process that writes to stdout and stderr.
3398+
/// let mut peer = Command::new("bash")
3399+
/// .args([
3400+
/// "-c",
3401+
/// "echo -n foo\n\
3402+
/// echo -n bar >&2"
3403+
/// ])
3404+
/// .stdout(writer.try_clone()?)
3405+
/// .stderr(writer)
3406+
/// .spawn()?;
3407+
///
3408+
/// // Read and check the result.
3409+
/// let mut msg = String::new();
3410+
/// reader.read_to_string(&mut msg)?;
3411+
/// assert_eq!(&msg, "foobar");
3412+
///
3413+
/// peer.wait()?;
3414+
/// # Ok(())
3415+
/// # }
3416+
/// ```
3417+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3418+
pub fn try_clone(&self) -> Result<Self> {
3419+
self.0.try_clone().map(Self)
3420+
}
3421+
}
3422+
3423+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3424+
impl Read for &PipeReader {
3425+
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
3426+
self.0.read(buf)
3427+
}
3428+
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
3429+
self.0.read_vectored(bufs)
3430+
}
3431+
#[inline]
3432+
fn is_read_vectored(&self) -> bool {
3433+
self.0.is_read_vectored()
3434+
}
3435+
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
3436+
self.0.read_to_end(buf)
3437+
}
3438+
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> {
3439+
self.0.read_buf(buf)
3440+
}
3441+
}
3442+
3443+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3444+
impl Read for PipeReader {
3445+
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
3446+
self.0.read(buf)
3447+
}
3448+
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
3449+
self.0.read_vectored(bufs)
3450+
}
3451+
#[inline]
3452+
fn is_read_vectored(&self) -> bool {
3453+
self.0.is_read_vectored()
3454+
}
3455+
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
3456+
self.0.read_to_end(buf)
3457+
}
3458+
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> {
3459+
self.0.read_buf(buf)
3460+
}
3461+
}
3462+
3463+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3464+
impl Write for &PipeWriter {
3465+
fn write(&mut self, buf: &[u8]) -> Result<usize> {
3466+
self.0.write(buf)
3467+
}
3468+
#[inline]
3469+
fn flush(&mut self) -> Result<()> {
3470+
Ok(())
3471+
}
3472+
3473+
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
3474+
self.0.write_vectored(bufs)
3475+
}
3476+
3477+
#[inline]
3478+
fn is_write_vectored(&self) -> bool {
3479+
self.0.is_write_vectored()
3480+
}
3481+
}
3482+
3483+
#[unstable(feature = "anonymous_pipe", issue = "127154")]
3484+
impl Write for PipeWriter {
3485+
fn write(&mut self, buf: &[u8]) -> Result<usize> {
3486+
self.0.write(buf)
3487+
}
3488+
#[inline]
3489+
fn flush(&mut self) -> Result<()> {
3490+
Ok(())
3491+
}
3492+
3493+
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
3494+
self.0.write_vectored(bufs)
3495+
}
3496+
3497+
#[inline]
3498+
fn is_write_vectored(&self) -> bool {
3499+
self.0.is_write_vectored()
3500+
}
3501+
}

std/src/io/tests.rs

+17
Original file line numberDiff line numberDiff line change
@@ -823,3 +823,20 @@ fn try_oom_error() {
823823
let io_err = io::Error::from(reserve_err);
824824
assert_eq!(io::ErrorKind::OutOfMemory, io_err.kind());
825825
}
826+
827+
#[test]
828+
#[cfg(all(windows, unix, not(miri)))]
829+
fn pipe_creation_clone_and_rw() {
830+
let (rx, tx) = std::io::pipe().unwrap();
831+
832+
tx.try_clone().unwrap().write_all(b"12345").unwrap();
833+
drop(tx);
834+
835+
let mut rx2 = rx.try_clone().unwrap();
836+
drop(rx);
837+
838+
let mut s = String::new();
839+
rx2.read_to_string(&mut s).unwrap();
840+
drop(rx2);
841+
assert_eq!(s, "12345");
842+
}

std/src/lib.rs

-2
Original file line numberDiff line numberDiff line change
@@ -596,8 +596,6 @@ pub mod panic;
596596
#[unstable(feature = "pattern_type_macro", issue = "123646")]
597597
pub mod pat;
598598
pub mod path;
599-
#[unstable(feature = "anonymous_pipe", issue = "127154")]
600-
pub mod pipe;
601599
pub mod process;
602600
#[unstable(feature = "random", issue = "130703")]
603601
pub mod random;

0 commit comments

Comments
 (0)