Skip to content

Commit

Permalink
pipes: simplify pipe capacity setting
Browse files Browse the repository at this point in the history
When setting pipe capacities, we can get -EBUSY because the pipe may be
even larger than we thought, with data already in it. We can also
get -EPERM, because we don't have permission to increase the pipe
capacity. These two reasons are enough to simply stop caring about the
errors when setting pipe capacities.

Well, that's true except when doing load balancing on pipes, because we
need to know the capacity to be able to avoid blocking on writes.

Signed-off-by: Nicolas Viennot <[email protected]>
  • Loading branch information
nviennot committed Nov 1, 2021
1 parent 465ac11 commit a2e588b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 45 deletions.
13 changes: 7 additions & 6 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ struct ImageFile {
}

impl ImageFile {
pub fn new(filename: String, mut pipe: UnixPipe) -> Result<Self> {
pipe.set_capacity_no_eperm(CRIU_PIPE_DESIRED_CAPACITY)?;
pub fn new(filename: String, mut pipe: UnixPipe) -> Self {
// Try setting the pipe capacity. Failing is okay, it's just for better performance.
let _ = pipe.set_capacity(CRIU_PIPE_DESIRED_CAPACITY);
let filename = Rc::from(filename);
Ok(Self { pipe, filename })
Self { pipe, filename }
}
}

Expand Down Expand Up @@ -279,7 +280,7 @@ pub fn capture(

// The kernel may limit the number of allocated pages for pipes, we must do it before setting
// the pipe size of external file pipes as shard pipes are more performance sensitive.
let shard_pipe_capacity = UnixPipe::set_best_capacity(&mut shard_pipes, SHARD_PIPE_DESIRED_CAPACITY)?;
let shard_pipe_capacity = UnixPipe::increase_capacity(&mut shard_pipes, SHARD_PIPE_DESIRED_CAPACITY)?;
let mut shards: Vec<Shard> = shard_pipes.into_iter().map(Shard::new).collect::<Result<_>>()?;

// We are ready to get to work. Accept CRIU's connection.
Expand All @@ -294,7 +295,7 @@ pub fn capture(
poller.add(criu.as_raw_fd(), PollType::Criu(criu), EpollFlags::EPOLLIN)?;

for (filename, pipe) in ext_file_pipes {
let img_file = ImageFile::new(filename, pipe)?;
let img_file = ImageFile::new(filename, pipe);
poller.add(img_file.pipe.as_raw_fd(), PollType::ImageFile(img_file), EpollFlags::EPOLLIN)?;
}

Expand Down Expand Up @@ -329,7 +330,7 @@ pub fn capture(
}

let pipe = criu.recv_pipe()?;
let img_file = ImageFile::new(filename, pipe)?;
let img_file = ImageFile::new(filename, pipe);
poller.add(img_file.pipe.as_raw_fd(), PollType::ImageFile(img_file),
EpollFlags::EPOLLIN)?;
}
Expand Down
17 changes: 10 additions & 7 deletions src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ struct Shard {
}

impl Shard {
fn new(mut pipe: UnixPipe) -> Result<Self> {
pipe.set_capacity_no_eperm(SHARD_PIPE_DESIRED_CAPACITY)?;
Ok(Self { pipe, bytes_read: 0, transfer_duration_millis: 0 })
fn new(mut pipe: UnixPipe) -> Self {
// Try setting the pipe capacity. Failing is okay, it's just for better performance.
let _ = pipe.set_capacity(SHARD_PIPE_DESIRED_CAPACITY);
Self { pipe, bytes_read: 0, transfer_duration_millis: 0 }
}
}

Expand Down Expand Up @@ -310,7 +311,8 @@ fn serve_img(
filenames_of_sent_files.insert(filename);
criu.send_file_reply(true)?; // true means that the file exists.
let mut pipe = criu.recv_pipe()?;
pipe.set_capacity_no_eperm(CRIU_PIPE_DESIRED_CAPACITY)?;
// Try setting the pipe capacity. Failing is okay.
let _ = pipe.set_capacity(CRIU_PIPE_DESIRED_CAPACITY);
memory_file.drain(&mut pipe)?;
}
None => {
Expand All @@ -336,16 +338,17 @@ fn drain_shards_into_img_store<Store: ImageStore>(
ext_file_pipes: Vec<(String, UnixPipe)>,
) -> Result<()>
{
let mut shards: Vec<Shard> = shard_pipes.into_iter().map(Shard::new).collect::<Result<_>>()?;
let mut shards: Vec<Shard> = shard_pipes.into_iter().map(Shard::new).collect();

// The content of the `ext_file_pipes` are streamed out directly, and not buffered in memory.
// This is important to avoid blowing up our memory budget. These external files typically
// contain a checkpointed filesystem, which is large.
let mut overlayed_img_store = image_store::fs_overlay::Store::new(img_store);
for (filename, mut pipe) in ext_file_pipes {
// Despite the misleading name, the pipe is not for CRIU, it's most likely for `tar`, but
// it gets to enjoy the same pipe capacity.
pipe.set_capacity_no_eperm(CRIU_PIPE_DESIRED_CAPACITY)?;
// it gets to enjoy the same pipe capacity. If we fail to increase the pipe capacity,
// it's okay. This is just for better performance.
let _ = pipe.set_capacity(CRIU_PIPE_DESIRED_CAPACITY);
overlayed_img_store.add_overlay(filename, pipe);
}

Expand Down
35 changes: 3 additions & 32 deletions src/unix_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ pub trait UnixPipeImpl: Sized {
fn new(fd: RawFd) -> Result<Self>;
fn fionread(&self) -> Result<i32>;
fn set_capacity(&mut self, capacity: i32) -> nix::Result<()>;
fn set_capacity_no_eperm(&mut self, capacity: i32) -> Result<()>;
fn set_best_capacity(pipes: &mut [Self], max_capacity: i32) -> Result<i32>;
fn increase_capacity(pipes: &mut [Self], max_capacity: i32) -> Result<i32>;
fn splice_all(&mut self, dst: &mut fs::File, len: usize) -> Result<()>;
fn vmsplice_all(&mut self, data: &[u8]) -> Result<()>;
}
Expand Down Expand Up @@ -76,33 +75,20 @@ impl UnixPipeImpl for UnixPipe {
fcntl(self.as_raw_fd(), FcntlArg::F_SETPIPE_SZ(capacity)).map(|_| ())
}

// Same as set_capacity(), except EPERM errors are ignored.
fn set_capacity_no_eperm(&mut self, capacity: i32) -> Result<()> {
match self.set_capacity(capacity) {
Err(Error::Sys(Errno::EPERM)) => {
warn_once_capacity_eperm();
Ok(())
}
other => other,
}?;
Ok(())
}

/// Sets the capacity of many pipes. /proc/sys/fs/pipe-user-pages-{hard,soft} may be non-zero,
/// preventing setting the desired capacity. If we can't set the provided `max_capacity`, then
/// we try with a lower capacity. Eventually we will succeed.
/// Returns the actual capacity of the pipes.
fn set_best_capacity(pipes: &mut [Self], max_capacity: i32) -> Result<i32> {
fn increase_capacity(pipes: &mut [Self], max_capacity: i32) -> Result<i32> {
let mut capacity = max_capacity;
loop {
match pipes.iter_mut().try_for_each(|pipe| pipe.set_capacity(capacity)) {
Err(Error::Sys(Errno::EPERM)) => {
warn_once_capacity_eperm();
assert!(capacity > *PAGE_SIZE as i32);
capacity /= 2;
continue;
}
Err(e) => return Err(anyhow!(e)),
Err(e) => return Err(anyhow!(e).context("Failed to increase pipes capacities")),
Ok(()) => return Ok(capacity),
};
}
Expand Down Expand Up @@ -140,18 +126,3 @@ impl UnixPipeImpl for UnixPipe {
Ok(())
}
}

fn warn_once_capacity_eperm() {
// TODO warn only if there's a debug flag turned on. It's a bit annoying to have this message
// all the time. In most cases, the pipe size won't be a performance bottleneck.

/*
use std::sync::Once;
static ONCE: Once = Once::new();
ONCE.call_once(|| {
eprintln!("Cannot set pipe size as desired (EPERM). \
Continuing with smaller pipe sizes but performance may be reduced. \
See the Deploy section in the criu-image-streamer README for a remedy.");
});
*/
}

0 comments on commit a2e588b

Please sign in to comment.