From 24b93b0b27bbbec9623435a095350117ae7f94a7 Mon Sep 17 00:00:00 2001 From: Nicolas Viennot Date: Tue, 5 May 2020 17:29:59 +0000 Subject: [PATCH] Initial commit --- .gitignore | 2 + Cargo.lock | 570 +++++++++++++++++++++++++++ Cargo.toml | 30 ++ Makefile | 44 +++ README.md | 289 ++++++++++++++ build.rs | 21 + proto/image.proto | 31 ++ proto/remote-image.proto | 26 ++ src/capture.rs | 385 +++++++++++++++++++ src/extract.rs | 408 ++++++++++++++++++++ src/handshake.rs | 98 +++++ src/image_store.rs | 289 ++++++++++++++ src/lib.rs | 36 ++ src/main.rs | 216 +++++++++++ src/mmap_buf.rs | 82 ++++ src/ord_by.rs | 42 ++ src/poller.rs | 116 ++++++ src/unix_pipe.rs | 152 ++++++++ src/util.rs | 113 ++++++ tests/helpers/criu.rs | 114 ++++++ tests/helpers/mod.rs | 16 + tests/helpers/util.rs | 111 ++++++ tests/integration_test.rs | 783 ++++++++++++++++++++++++++++++++++++++ 23 files changed, 3974 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 Makefile create mode 100644 README.md create mode 100644 build.rs create mode 100644 proto/image.proto create mode 100644 proto/remote-image.proto create mode 100644 src/capture.rs create mode 100644 src/extract.rs create mode 100644 src/handshake.rs create mode 100644 src/image_store.rs create mode 100644 src/lib.rs create mode 100644 src/main.rs create mode 100644 src/mmap_buf.rs create mode 100644 src/ord_by.rs create mode 100644 src/poller.rs create mode 100644 src/unix_pipe.rs create mode 100644 src/util.rs create mode 100644 tests/helpers/criu.rs create mode 100644 tests/helpers/mod.rs create mode 100644 tests/helpers/util.rs create mode 100644 tests/integration_test.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..529d9c60 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +criu-image-streamer +target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 00000000..eb99d3da --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,570 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "anyhow" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "autocfg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bytes" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "cc" +version = "1.0.52" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "clap" +version = "2.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "criu-image-streamer" +version = "1.0.0" +dependencies = [ + "anyhow 1.0.28 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)", + "nix 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", + "procinfo 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-build 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.52 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "structopt 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "either" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "fixedbitset" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "getrandom" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)", + "wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "heck" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-segmentation 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "indexmap" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "itertools" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "itoa" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "libc" +version = "0.2.69" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "log" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "multimap" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "nix" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "cc 1.0.52 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)", + "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "nom" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "petgraph" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fixedbitset 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "proc-macro-error" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro-error-attr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)", + "version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)", + "syn-mid 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "proc-macro2" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "procinfo" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)", + "nom 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-derive 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-build" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "multimap 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "petgraph 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-types 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "which 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-derive" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "anyhow 1.0.28 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-types" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "quote" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "redox_syscall" +version = "0.1.56" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "remove_dir_all" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "ryu" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "serde" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "serde_derive" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "serde_json" +version = "1.0.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "ryu 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "structopt" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "structopt-derive 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "structopt-derive" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-error 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "syn" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "syn-mid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tempfile" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)", + "remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "unicode-segmentation" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "unicode-width" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "unicode-xid" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "version_check" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "which" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "winapi" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[metadata] +"checksum anyhow 1.0.28 (registry+https://github.com/rust-lang/crates.io-index)" = "d9a60d744a80c30fcb657dfe2c1b22bcb3e814c1a1e3674f32bf5820b570fbff" +"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +"checksum byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" +"checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" +"checksum cc 1.0.52 (registry+https://github.com/rust-lang/crates.io-index)" = "c3d87b23d6a92cd03af510a5ade527033f6aa6fa92161e2d5863a907d4c5e31d" +"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" +"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +"checksum either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" +"checksum fixedbitset 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" +"checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" +"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" +"checksum indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292" +"checksum itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f56a2d0bc861f9165be4eb3442afd3c236d8a98afd426f65d92324ae1091a484" +"checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" +"checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +"checksum libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)" = "99e85c08494b21a9054e7fe1374a732aeadaff3980b6990b94bfd3a70f690005" +"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +"checksum multimap 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d8883adfde9756c1d30b0f519c9b8c502a94b41ac62f696453c37c7fc0a958ce" +"checksum nix 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)" = "50e4785f2c3b7589a0d0c1dd60285e1188adac4006e8abd6dd578e1567027363" +"checksum nom 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf51a729ecf40266a2368ad335a5fdde43471f545a967109cd62146ecf8b66ff" +"checksum petgraph 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "29c127eea4a29ec6c85d153c59dc1213f33ec74cead30fe4730aecc88cc1fd92" +"checksum ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b" +"checksum proc-macro-error 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "98e9e4b82e0ef281812565ea4751049f1bdcdfccda7d3f459f2e138a40c08678" +"checksum proc-macro-error-attr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f5444ead4e9935abd7f27dc51f7e852a0569ac888096d5ec2499470794e2e53" +"checksum proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)" = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3" +"checksum procinfo 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6ab1427f3d2635891f842892dda177883dca0639e05fe66796a62c9d2f23b49c" +"checksum prost 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ce49aefe0a6144a45de32927c77bd2859a5f7677b55f220ae5b744e87389c212" +"checksum prost-build 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "02b10678c913ecbd69350e8535c3aef91a8676c0773fc1d7b95cdd196d7f2f26" +"checksum prost-derive 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "537aa19b95acde10a12fec4301466386f757403de4cd4e5b4fa78fb5ecb18f72" +"checksum prost-types 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1834f67c0697c001304b75be76f67add9c89742eda3a085ad8ee0bb38c3417aa" +"checksum quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4c1f4b0efa5fc5e8ceb705136bfee52cfdb6a4e3509f770b478cd6ed434232a7" +"checksum rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +"checksum rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +"checksum rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +"checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +"checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" +"checksum remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" +"checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +"checksum ryu 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ed3d612bc64430efeb3f7ee6ef26d590dce0c43249217bddc62112540c7941e1" +"checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +"checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" +"checksum serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399" +"checksum serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c" +"checksum serde_json 1.0.52 (registry+https://github.com/rust-lang/crates.io-index)" = "a7894c8ed05b7a3a279aeb79025fdec1d3158080b75b98a08faf2806bb799edd" +"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +"checksum structopt 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "863246aaf5ddd0d6928dfeb1a9ca65f505599e4e1b399935ef7e75107516b4ef" +"checksum structopt-derive 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "d239ca4b13aee7a2142e6795cbd69e457665ff8037aed33b3effdc430d2f927a" +"checksum syn 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)" = "410a7488c0a728c7ceb4ad59b9567eb4053d02e8cc7f5c0e0eeeb39518369213" +"checksum syn-mid 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a" +"checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" +"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +"checksum unicode-segmentation 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0" +"checksum unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479" +"checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +"checksum version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce" +"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +"checksum wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)" = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +"checksum which 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d011071ae14a2f6671d0b74080ae0cd8ebf3a6f8c9589a2cd45f23126fe29724" +"checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" +"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +"checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..865c3012 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "criu-image-streamer" +version = "1.0.0" +authors = ["Nicolas Viennot "] +description = "Captures and extracts CRIU images via UNIX pipes" +edition = "2018" +license = "Apache-2.0" + +[dependencies] +structopt = { version = "0.3", default-features = false } +anyhow = "1.0" +slab = "0.4" +libc = "0.2" +prost = "0.6" +bytes = "0.5" +nix = "0.17" +lazy_static = "1.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + +[build-dependencies] +prost-build = "0.6" # to generate protobuf wrappers + +[dev-dependencies] +procinfo = "0.4" # to measure memory usage +crossbeam-utils = "0.7" # for scoped threads + +[profile.release] +lto = true +codegen-units = 1 diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..4552ebeb --- /dev/null +++ b/Makefile @@ -0,0 +1,44 @@ +# Copyright 2020 Two Sigma Investments, LP. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +all: criu-image-streamer + +#BUILD=dev +BUILD=release + +BUILD_FLAGS= + +ifeq ($(BUILD),release) + BUILD_FLAGS+=--release +endif + +DEPS = $(wildcard src/*.rs) Cargo.toml + +CARGO=$(HOME)/.cargo/bin/cargo +ifeq (,$(wildcard $(CARGO))) + CARGO=cargo +endif + +target/$(BUILD)/criu-image-streamer: $(DEPS) + $(CARGO) build $(BUILD_FLAGS) + +criu-image-streamer: target/$(BUILD)/criu-image-streamer + cp -a $< $@ + +test: + $(CARGO) test $(BUILD_FLAGS) -- --test-threads=1 --nocapture + +clean: + rm -rf target criu-image-streamer diff --git a/README.md b/README.md new file mode 100644 index 00000000..15d51fbd --- /dev/null +++ b/README.md @@ -0,0 +1,289 @@ +CRIU Image Streamer +==================== + +_criu-image-streamer_ enables streaming of images to and from +[CRIU](https://www.criu.org) during checkpoint/restore with low overhead. + +It enables use of remote storage (e.g., S3, GCS) without buffering in local +storage, speeding up operations considerably. Fast checkpointing makes Google's +preemptible VM and Amazon Spot VM offerings more attractive: with streaming, +CRIU can checkpoint and evacuate even large processes within the tight eviction +deadlines (~30secs). + + +criu-image-streamer includes the following high-level features: + +* **Extensible**: UNIX pipes are used for image transfers allowing integration +in various workloads and environments. One can build fast data pipelines to +performing compression, encryption, and remote storage access. + +* **Image sharding**: When capturing a CRIU image, the image stream can be split +into multiple output shards. This helps maximizing the network throughput for +remote uploads and CPU utilization for compression/encryption. + +* **Shard load balancing**: When capturing a CRIU image, the throughput of each +output shard is independently optimized. If a shard exhibits poor performance +(e.g., by hitting a slow disk), traffic is directed to other shards. This is +useful for reducing checkpoint tail latency when using many shards. + +* **External file embedding**: Files that are not CRIU specific can be included +in the image. This can be used, for example, to incorporate a file system +tarball along with the CRIU image. + +* **Low checkpoint overhead**: To maximize speed, we modified CRIU to send pipes +over its UNIX socket connection to transfer data. This allows the use of the +`splice()` system call for moving data pipe-to-pipe giving us a zero-copy +implementation. We measured **0.1 CPUsec/GB** of CPU usage and **3 MB** of +resident memory when capturing a 10 GB application on standard server hardware +of 2020. + +* **Moderate restore overhead**: We measured **1.4 CPUsec/GB** of CPU usage +and **3 MB** of resident memory. In the future, we could switch to a zero-copy +implementation to greatly improve performance. + +* **Reliable**: criu-image-streamer is written in Rust, avoiding common classes +of bugs often encountered when using other low-level languages. + +Usage +----- + +The CLI interface of criu-image-streamer is the following: + +``` +criu-image-streamer [OPTIONS] --images-dir + +OPTIONS: + -D, --images-dir Images directory where the CRIU UNIX socket is created during + streaming operations. + -s, --shard-fds ... File descriptors of shards. Multiple fds may be passed as a comma + separated list. Defaults to 0 or 1 depending on the operation. + -e, --ext-file-fds ... External files to incorporate/extract in/from the image. Format is + filename:fd where filename corresponds to the name of the file, fd + corresponds to the pipe sending or receiving the file content. + Multiple external files may be passed as a comma separated list. + -p, --progress-fd File descriptor where to report progress. Defaults to 2. + + +SUBCOMMANDS: + capture Capture a CRIU image + extract Extract a captured CRIU image + +extract OPTIONS: + --serve Buffer the image in memory and serve to CRIU +``` + +`--images-dir` is used during operations to create a UNIX socket where CRIU can +connect to. That socket is used to exchange pipes for data transfers. The +directory is not used for storing data when streaming images to and from CRIU. + +There are two modes of operation: capture and extract. Capture is used for +checkpointing and extract for restoring images. We show how these operations +are used with examples. + +Example 1: On-the-fly compression to local storage +-------------------------------------------------- + +In this example, we show how to checkpoint/restore an application and +compress/decompress its image on-the-fly with the lz4 compressor. + +### Checkpoint + +```bash +sleep 10 & # The app to be checkpointed +APP_PID=$! + +criu-image-streamer --images-dir /tmp capture | lz4 -f - /tmp/img.lz4 & +criu dump --images-dir /tmp --remote --shell-job --tree $APP_PID +``` + +### Restore + +```bash +lz4 -d /tmp/img.lz4 - | criu-image-streamer --images-dir /tmp extract --serve & +criu restore --images-dir /tmp --remote --shell-job +``` + +Example 2: Extracting an image to local storage +----------------------------------------------- + +Extracting a previously captured image to disk can be useful for inspection. +Using the `extract` command without `--serve` extract the image to disk instead +of waiting for CRIU to consume it from memory. + +```bash +lz4 -d /tmp/img.lz4 - | criu-image-streamer --images-dir output_dir extract +``` + +Example 3: Multi-shard upload to the S3 remote storage +------------------------------------------------------ + +When compressing and uploading to S3, parallelism is beneficial both to +leverage multiple CPUs for compression, and multiple streams for maximizing +network throughput. Parallelism can be achieved by splitting the image stream +into multiple shards using the `--shard-fds` option. + +### Checkpoint + +```bash +sleep 10 & # The app to be checkpointed +APP_PID=$! + +# The 'exec N>' syntax opens a new file descriptor in bash (not sh, not zsh). +exec 10> >(lz4 - - | aws s3 cp - s3://bucket/img-1.lz4) +exec 11> >(lz4 - - | aws s3 cp - s3://bucket/img-2.lz4) +exec 12> >(lz4 - - | aws s3 cp - s3://bucket/img-3.lz4) + +criu-image-streamer --images-dir /tmp --shard-fds 10,11,12 capture & +criu dump --images-dir /tmp --remote --shell-job --tree $APP_PID +``` + +### Restore + +```bash +exec 10< <(aws s3 cp s3://bucket/img-1.lz4 - | lz4 -d - -) +exec 11< <(aws s3 cp s3://bucket/img-2.lz4 - | lz4 -d - -) +exec 12< <(aws s3 cp s3://bucket/img-3.lz4 - | lz4 -d - -) + +criu-image-streamer --shard-fds 10,11,12 --images-dir /tmp extract --serve & +criu restore --images-dir /tmp --remote --shell-job +``` + +Example 4: Incorporating a tarball into the image +------------------------------------------------- + +Often, we wish to capture the file system along side the CRIU process image. +criu-image-streamer can weave in external files via the `--ext-file-fds` option. +In this example, We use `tar` to archive `/scratch` and include the tarball into +our final image. + +### Checkpoint + +```bash +mkdir -p /scratch/app +echo "app data to preserve" > /scratch/app/data + +sleep 10 & # The app to be checkpointed +APP_PID=$! + +# The 'exec N>' syntax opens a new file descriptor in bash (not sh, not zsh). +exec 20< <(tar -C / -vcpSf - /scratch/app) + +criu-image-streamer --images-dir /tmp --ext-file-fds fs.tar:20 capture | lz4 -f - /tmp/img.lz4 & +criu dump --images-dir /tmp --remote --shell-job --tree $APP_PID +``` + +### Restore + +```bash +rm -f /scratch/app/data + +exec 20> >(tar -C / -vxf - --no-overwrite-dir) + +lz4 -d /tmp/img.lz4 - | criu-image-streamer --images-dir /tmp --ext-file-fds fs.tar:20 extract --serve & +criu restore --images-dir /tmp --remote --shell-job + +cat /scratch/app/data +``` + +**Important correctness consideration**: We are missing synchronization details +in this simplified example. For correctness, we should do the following: + +* On checkpoint, we should start tarring the file system AFTER the application +has stopped. Otherwise, we risk a data race leading to data loss. + +* On restore, we should only start CRIU after tar has finished restoring the +file system. Otherwise, we risk having CRIU try to access files that are not +yet present. + +Synchronization +--------------- + +criu-image-streamer emits the following into the progress pipe, helpful for +synchronizing operations: + +* During capture it emits the following messages: + * `socket-init\n` to report that the UNIX socket is ready for CRIU to connect. + At this point, CRIU is safe to be launched for dump. + * `checkpoint-start\n` to report that the checkpoint has started. + The application is now guaranteed to be in a stopped state. Starting tarring + the file system is appropriate. + * JSON formatted statistics defined below. + +* During restore: + * JSON formatted statistics defined below. + * `socket-init\n` to report that the UNIX socket is ready for CRIU to connect. + At this point, CRIU is safe to be launched for restore. + +Transfer speed statistics +------------------------- + +The progress pipe emits statistics related to shards with the following JSON +format. These statistics are helpful to compute transfer speeds. +The JSON blob is emitted as a single `\n` terminated line. + +```javascript +{ + "shards": [ + { + "size": u64, // Total size of shard in bytes + "transfer_duration_millis": u128, // Total time to transfer data + }, + ... + ] +} +``` + +Installation +------------ + +### Build + +The Rust toolchain must be installed as a prerequisite. +Run `make`, or use `cargo build --release` to build the project. + +### Deploy + +Copy the built binary to the destination host. It requires no library except +libc. Change kernel settings to the following for optimal performance. + +```bash +echo 0 > /proc/sys/fs/pipe-user-pages-soft +echo 0 > /proc/sys/fs/pipe-user-pages-hard +echo $((4*1024*1024)) > /proc/sys/fs/pipe-max-size +``` + +Note that during checkpointing, pages in the pipe buffers are not consuming +memory. Because CRIU uses `vmsplice()` and criu-image-streamer uses `splice()`, +the content in the pipes are pointing directly to the application memory. + +Tests +----- + +We provide an extensive integration test suite located in `tests/`. You may run +it with `cargo test -- --test-threads=1`, or `make test`. + +Limitations +----------- + +* Incremental checkpoints are not supported. +* Most CLI options must be passed _before_ the capture/extract subcommand. +* Shards must be UNIX pipes. For regular files support, `cat` or `pv` (faster) +may be used as a pipe adapter. +* Using an older Linux kernel can lead to memory corruption. +We tested version 4.14.67 from the stable tree, and have seen memory corruption. +We tested version 4.14.121 and seen no issues. +It appears that this [kernel bug fix](https://github.com/torvalds/linux/commit/1bdc347) +is the remedy. Run `cargo test splice` to test if criu-image-streamer is +affected by the bug on your platform. + +Acknowledgments +--------------- +* Author: Nicolas Viennot [@nviennot](https://github.com/nviennot) +* Reviewer: Vitaly Davidovich [@vitalyd](https://github.com/vitalyd) +* Reviewer: Peter Burka [@pburka](https://github.com/pburka) + +License +------- + +criu-image-streamer is licensed under the +[Apache 2.0 license](https://www.apache.org/licenses/LICENSE-2.0). diff --git a/build.rs b/build.rs new file mode 100644 index 00000000..bd61348f --- /dev/null +++ b/build.rs @@ -0,0 +1,21 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +fn main() { + prost_build::compile_protos(&["proto/remote-image.proto", + "proto/image.proto"], + &["proto/"]) + .expect("Failed to generate protobuf wrappers"); +} diff --git a/proto/image.proto b/proto/image.proto new file mode 100644 index 00000000..781d3ae4 --- /dev/null +++ b/proto/image.proto @@ -0,0 +1,31 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package image; + +message marker { + uint64 seq = 1; + oneof body { + // Denotes the filename of the next upcoming markers (denoted as current file) + string filename = 2; + // Incoming data for the current file + uint32 file_data = 3; + // EOF of current file is reached + bool file_eof = 4; + // EOF of image is reached + bool image_eof = 5; + } +} diff --git a/proto/remote-image.proto b/proto/remote-image.proto new file mode 100644 index 00000000..55e5b0cb --- /dev/null +++ b/proto/remote-image.proto @@ -0,0 +1,26 @@ +// Taken from CRIU codebase: criu/images/remote-image.proto + +syntax = "proto2"; + +package criu; + +message local_image_entry { + required string name = 1; + required string snapshot_id = 2; + required uint32 open_mode = 3; +} + +message remote_image_entry { + required string name = 1; + required string snapshot_id = 2; + required uint32 open_mode = 3; + required uint64 size = 4; +} + +message local_image_reply_entry { + required uint32 error = 1; +} + +message snapshot_id_entry { + required string snapshot_id = 1; +} diff --git a/src/capture.rs b/src/capture.rs new file mode 100644 index 00000000..d71553da --- /dev/null +++ b/src/capture.rs @@ -0,0 +1,385 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::{BinaryHeap}, + os::unix::net::UnixListener, + os::unix::io::AsRawFd, + io::Write, + time::Instant, + cmp::{min, max}, + path::Path, + sync::Once, + rc::Rc, + fs, +}; +use crate::{ + poller::{Poller, EpollFlags}, + handshake::{HandshakeContext, HandshakeResult}, + unix_pipe::{UnixPipe, UnixPipeImpl}, + util::*, + image, + image::marker, + impl_ord_by, +}; +use anyhow::{Result, Context}; + +// CRIU's image is comprised of many files. When CRIU wishes to send us a file, it connects to our +// UNIX socket, performs a handshake, and sends the content of the file via a pipe. We stream the +// content of these image files to an array of outputs, called shards. The shards are typically +// a compression and upload stage (e.g., `lz4 | aws s3 cp - s3://destination`). The number of +// shards is typically 4, and less than 32. Image file sizes can vary widely (1KB to +10GB) and are +// sometimes sent interleaved (e.g., pages-X.img and pagemap-X.img). +// +// We move data from CRIU into shards without copying data to user-land for performance. For this, +// we use the splice() system call. This makes the implementation tricky because we also want to +// load-balance data to the shards. +// +// If we were to use a round-robin scheduling algorithm, we would get terrible slowdown if one of +// the shards were to be slow. To avoid this problem, we load-balance with the available throughput +// of each shard. To do so, we must know the available buffer space in each shard. For this we use +// the ioctl fionread(). It allows to measure the amount of data in a pipe. Due to this +// unpredictable data scheduling, we mark each data chunk with a sequence number that is used to +// reorder the data stream during restore. +// +// The shard data streams are comprised of markers followed by an optional data payload. The format +// of the markers is described in ../proto/image.proto + + +/// CRIU has difficulties if the pipe size is bigger than 4MB. +/// Note that the following pipe buffers are not actually using memory. The content of the pipe is +/// just a list of pointers to the application memory page, which is already allocated as CRIU does +/// a vmsplice(..., SPLICE_F_GIFT) when providing data. +const CRIU_PIPE_DESIRED_CAPACITY: i32 = 4*MB as i32; + +/// Large buffers size improves performance as it allows us to increase the size of our chunks. +/// 1MB provides excellent performance. +const SHARD_PIPE_DESIRED_CAPACITY: i32 = 1*MB as i32; + +/// An `ImageFile` represents a file coming from CRIU. +/// The complete CRIU image is comprised of many of these files. +struct ImageFile { + /// Incoming pipe from CRIU + pipe: UnixPipe, + /// Associated filename (e.g., "pages-3.img") + filename: Rc, +} + +impl ImageFile { + pub fn new(filename: String, mut pipe: UnixPipe) -> Result { + pipe.set_capacity_no_eperm(CRIU_PIPE_DESIRED_CAPACITY)?; + let filename = Rc::from(filename); + Ok(Self { pipe, filename }) + } +} + +/// A `Shard` is a pipe whose endpoint goes to the upload process (e.g., aws s3) +/// We keep track of the in-kernel buffer capacity to optimize performance. +struct Shard { + /// Outgoing pipe to the uploader + pipe: UnixPipe, + /// `remaining_space` is a lower bound of the in-kernel pipe remaining space. As the upload + /// process consume the pipe, the true `remaining_space` increases, but we don't know about it + /// until we call fionread(). Note that we keep this signed, as it can potentially go negative + /// as we overestimate the size of our writes (see CHUNK_MARKER_MAX_SIZE). + remaining_space: i32, + /// Total bytes written to the pipe, useful for producing stats. + bytes_written: u64, +} + +impl Shard { + pub fn new(pipe: UnixPipe) -> Result { + Ok(Self { pipe, remaining_space: 0, bytes_written: 0 }) + } + + pub fn refresh_remaining_space(&mut self, pipe_capacity: i32) -> Result<()> { + let pipe_len = self.pipe.fionread()?; + self.remaining_space = pipe_capacity - pipe_len; + Ok(()) + } +} + +// This gives ordering to `Shard` over its `remaining_space` field, useful for the binary heap +// (max-heap) in `ImageSerializer`. The Shard with the largest `remaining_space` goes first. On +// ambiguities, we order by file descriptor providing a total order. +impl_ord_by!(Shard, |a: &Self, b: &Self| a.remaining_space.cmp(&b.remaining_space) + .then(a.pipe.as_raw_fd().cmp(&b.pipe.as_raw_fd()))); + +/// The image serializer reads data from CRIU's image files pipes, chunks the data, and writes into +/// shard pipes. Each chunk is written to the shard that has the most room available in its pipe. +/// We keep track of which shard has the most room with a binary heap. +/// Chunks are ordered by a sequence number. Semantically, the sequence number should be per image +/// file, but for simplicity, we use a global sequence number. It makes the implementation easier, +/// esp. on the deserializer side. +struct ImageSerializer<'a> { + shards: BinaryHeap<&'a mut Shard>, + shard_pipe_capacity: i32, // constant + seq: u64, + current_filename: Option>, +} + +struct Chunk<'a> { + marker: image::Marker, + data: Option<(&'a mut ImageFile, i32)>, +} + +/// Chunks are preceded by a header that we call marker. Chunk markers take an entire page in +/// kernel space as it is followed by spliced data. +static CHUNK_MARKER_KERNEL_SIZE: &PAGE_SIZE = &PAGE_SIZE; + +impl<'a> ImageSerializer<'a> { + pub fn new(shards: &'a mut [Shard], shard_pipe_capacity: i32) -> Self { + assert!(!shards.is_empty()); + Self { + shard_pipe_capacity, + shards: shards.iter_mut().collect(), + current_filename: None, + seq: 0, + } + } + + fn refresh_all_shard_remaining_space(&mut self) -> Result<()> { + // We wish to mutate all the elements of the BinaryHeap. + // We tear the existing one down and build a fresh one to reduce insertion cost. + let shard_pipe_capacity = self.shard_pipe_capacity; + self.shards = self.shards.drain() + .map(|shard| { + shard.refresh_remaining_space(shard_pipe_capacity)?; + Ok(shard) + }) + .collect::>()?; + Ok(()) + } + + fn gen_marker(&mut self, body: marker::Body) -> image::Marker { + let seq = self.seq; + self.seq += 1; + image::Marker { seq, body: Some(body) } + } + + /// When transferring bytes from the CRIU pipe to one of the shards, we do so with large chunks + /// to reduce serialization overhead, but not too large to minimize blocking when writing for + /// better load-balancing. + fn chunk_max_data_size(&self) -> i32 { + // If the shard pipe capacity is small, it's sad, but we need to send at least a page + max(self.shard_pipe_capacity/4 - **CHUNK_MARKER_KERNEL_SIZE as i32, *PAGE_SIZE as i32) + } + + fn write_chunk(&mut self, chunk: Chunk) -> Result<()> { + let data_size = match chunk.data { + None => 0, + Some((_, size)) => size, + }; + + // Estimate the space required in the shard pipe to write the marker and its data. + let space_required = **CHUNK_MARKER_KERNEL_SIZE as i32 + data_size; + + // Check if the shard with the most remaining space is likely to block. + // If so, refresh other pipes' remaining space to check for a better candidate. + // Note: it's safe to unwrap(), because we always have one shard to work with. + if self.shards.peek().unwrap().remaining_space < space_required { + // We refresh the `remaining_space` of all shards instead of just refreshing the + // current shard, otherwise we risk starvation of other shards without knowing it. + self.refresh_all_shard_remaining_space()?; + } + + // Pick the shard with the greatest remaining space for our write. We might block when we + // write, but that's inevitable, and that's how our output is throttled. + let mut shard = self.shards.peek_mut().unwrap(); + + // 1) Write the chunk marker + let marker_size = pb_write(&mut shard.pipe, &chunk.marker)?; + + // 2) and its associated data, if specified + if let Some((img_file, _)) = chunk.data { + img_file.pipe.splice_all(&mut shard.pipe, data_size as usize)?; + } + + shard.bytes_written += marker_size as u64 + data_size as u64; + shard.remaining_space -= space_required; + // As the shard reference drops, the binary heap gets reordered. nice. + + Ok(()) + } + + fn maybe_write_filename_marker(&mut self, img_file: &ImageFile) -> Result<()> { + // We avoid repeating the filename on sequential data chunks of the same file for + // performance. We write the filename only when needed. + let filename = &img_file.filename; + match &self.current_filename { + Some(current_filename) if current_filename == filename => {}, + _ => { + self.current_filename = Some(Rc::clone(&filename)); + let marker = self.gen_marker(marker::Body::Filename(filename.to_string())); + self.write_chunk(Chunk { marker, data: None })?; + } + } + + Ok(()) + } + + /// Returns false if EOF of img_file is reached, true otherwise. + pub fn drain_img_file(&mut self, img_file: &mut ImageFile) -> Result { + let mut readable_len = img_file.pipe.fionread()?; + + // This code is only invoked when the poller reports that the image file's pipe is readable + // (or errored), which is why we can detect EOF when fionread() returns 0. + let is_eof = readable_len == 0; + + self.maybe_write_filename_marker(img_file)?; + + while readable_len > 0 { + let data_size = min(readable_len, self.chunk_max_data_size()); + let marker = self.gen_marker(marker::Body::FileData(data_size as u32)); + self.write_chunk(Chunk { marker, data: Some((img_file, data_size)) })?; + readable_len -= data_size; + } + + if is_eof { + let marker = self.gen_marker(marker::Body::FileEof(true)); + self.write_chunk(Chunk { marker, data: None })?; + } + + Ok(!is_eof) + } + + pub fn write_image_eof(&mut self) -> Result<()> { + let marker = self.gen_marker(image::marker::Body::ImageEof(true)); + self.write_chunk(Chunk { marker, data: None }) + } +} + + +/// The description of arguments can be found in main.rs +pub fn capture( + images_dir: &Path, + mut progress_pipe: fs::File, + mut shard_pipes: Vec, + ext_file_pipes: Vec<(String, UnixPipe)>, +) -> Result<()> +{ + // First, we need to listen on the unix socket and notify the progress pipe that + // we are ready. We do this ASAP because our controller is blocking on us to start CRIU. + let server = { + fs::create_dir_all(images_dir) + .with_context(|| format!("Failed to create directory {}", images_dir.display()))?; + + let socket_path = &images_dir.join("img-proxy.sock"); + // 1) We unlink the socket path to avoid EADDRINUSE on bind() if it already exists. + // 2) We ignore the unlink error because we are most likely getting a -ENOENT. + // It is safe to do so as correctness is not impacted by unlink() failing. + let _ = fs::remove_file(socket_path); + UnixListener::bind(socket_path) + .with_context(|| format!("Failed to bind socket to {}", socket_path.display()))? + }; + writeln!(&mut progress_pipe, "socket-init")?; + + // The kernel may limit the number of allocated pages for pipes, we must do it before setting + // the pipe size of external file pipes as shard pipes are more performance sensitive. + let shard_pipe_capacity = UnixPipe::set_best_capacity(&mut shard_pipes, SHARD_PIPE_DESIRED_CAPACITY)?; + let mut shards: Vec = shard_pipes.into_iter().map(Shard::new).collect::>()?; + + // Setup the poller to monitor the server socket and image files' pipes + enum PollType { + Server(UnixListener), + ImageFile(ImageFile), + }; + let mut poller = Poller::new()?; + poller.add(server.as_raw_fd(), PollType::Server(server), EpollFlags::EPOLLIN)?; + + for (filename, pipe) in ext_file_pipes { + let img_file = ImageFile::new(filename, pipe)?; + poller.add(img_file.pipe.as_raw_fd(), PollType::ImageFile(img_file), EpollFlags::EPOLLIN)?; + } + + // Used to compute transfer speed. But the real start is when we call + // notify_checkpoint_start_once + let mut start_time = Instant::now(); + + // The handshake context provides logic around CRIU protocol. + // The image serializer is the one reading data from the image files, + // and writing them in chunks into the shards. + let mut handshake_ctx = HandshakeContext::new(); + let mut img_serializer = ImageSerializer::new(&mut shards, shard_pipe_capacity); + let notify_checkpoint_start_once = Once::new(); + + // Process all inputs until they are all closed. + // When we have activity on the server socket, we accept a new connection, do the handshake. + // If the handshake result in a image file that we should receive, we add it to the poller. + // We use an epoll_capacity of 8. Doesn't really matter as the number of concurrent connection + // is typically at most 2. + let epoll_capacity = 8; + while let Some((poll_key, poll_obj)) = poller.poll(epoll_capacity)? { + match poll_obj { + PollType::Server(server) => { + let (mut socket, _) = server.accept()?; + match handshake_ctx.handshake(&mut socket)? { + HandshakeResult::WriteFile(filename) => { + if filename != "cpuinfo.img" { + // Once the checkpoint has started, we must notify the controller. + // This is useful for our controller to kick tarring the file system as + // the application is guaranteed to be stopped. + // We skip cpuinfo.img because it doesn't tell us if the application + // has been stopped. + notify_checkpoint_start_once.call_once(|| { + // TODO remove this unwrap(). Currently, we can't return a Result + // from a call_once(). + writeln!(&mut progress_pipe, "checkpoint-start").unwrap(); + start_time = Instant::now(); + }); + } + + let pipe = UnixPipe::new(recv_fd(&mut socket)?)?; + let img_file = ImageFile::new(filename, pipe)?; + poller.add(img_file.pipe.as_raw_fd(), PollType::ImageFile(img_file), + EpollFlags::EPOLLIN)?; + } + HandshakeResult::ImageEOF => { + // CRIU is telling us that we are done, so we close the server. + // We are not unlinking the socket path as we could potentially + // be racing with another instance of the image streamer. + poller.remove(poll_key)?; + } + HandshakeResult::SnapshotIdExchanged => {} + HandshakeResult::ReadFile(_) => { + bail!("CRIU is attempting to read the image. \ + We don't support incremental checkpoints"); + } + } + } + PollType::ImageFile(img_file) => { + if !img_serializer.drain_img_file(img_file)? { + // EOF if reached. Note that the image file pipe file descriptor is closed + // automatically as it is owned by the poller. + poller.remove(poll_key)?; + } + } + } + } + + img_serializer.write_image_eof()?; + + let stats = { + let transfer_duration_millis = start_time.elapsed().as_millis(); + Stats { + shards: shards.iter().map(|s| ShardStat { + size: s.bytes_written, + transfer_duration_millis, + }).collect(), + } + }; + writeln!(&mut progress_pipe, "{}", serde_json::to_string(&stats)?)?; + + Ok(()) +} diff --git a/src/extract.rs b/src/extract.rs new file mode 100644 index 00000000..2fe133bf --- /dev/null +++ b/src/extract.rs @@ -0,0 +1,408 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::{BinaryHeap, HashMap}, + os::unix::net::UnixListener, + os::unix::io::AsRawFd, + io::Write, + time::Instant, + path::Path, + fs, +}; +use crate::{ + handshake::{HandshakeContext, HandshakeResult}, + unix_pipe::{UnixPipe, UnixPipeImpl}, + util::*, + image, + image::marker, + impl_ord_by, + image_store, + image_store::{ImageStore, ImageFile}, +}; +use nix::poll::{poll, PollFd, PollFlags}; +use anyhow::{Result, Context}; + +// The serialized image is received via multiple data streams (`Shard`). The data streams are +// comprised of markers followed by an optional data payload. The format of the markers is +// described in ../proto/image.proto. +// Each marker has a sequence number providing a reassembly order (produced in capture.rs). +// At any point in time, a given shard has the marker that should be processed next. +// We represent this with a PendingMarker. Reassembling the stream of markers provides the +// original image files. +// +// When extracting the image, we either store the image files in memory, or write them on disk. +// The former is useful when streaming to CRIU directly, the latter is useful to extract an image +// on disk. +// +// Streaming to CRIU is done by buffering the entire image in memory, and let CRIU consume it. +// XXX Performance isn't that great due to the memory copy in our address space. To improve +// performance, we could splice() shard pipe data to CRIU directly. This is difficult as CRIU +// doesn't read the image files in the same order as they are produced. For example, inventory.img +// is written last in the image, but is read first. One way to go around this issue is to reserve +// a shard during capture for all small image files (pretty much all except pages, ghost files, and +// fs.tar). In addition, we might have to rewrite some part of CRIU to restore these large files in +// the same order as they were produced. It might be difficult to preserve this guarantee forever, +// so it would be wise to keep our in-memory buffering implementation anyways. + +/// We are not doing zero-copy tranfers to CRIU (yet), we have to be mindful of CPU caches. +/// If we were doing shard to CRIU splices, we could bump the capacity to 4MB. +const CRIU_PIPE_DESIRED_CAPACITY: i32 = 1*MB as i32; + +/// Data comes in a stream of chunks, which can be as large as 256KB (from capture.rs). +/// We use 512KB to have two chunks in to avoid stalling the shards. +/// Making this buffer bigger would most likely trash CPU caches. +const SHARD_PIPE_DESIRED_CAPACITY: i32 = 512*KB as i32; + +struct Shard { + pipe: UnixPipe, + transfer_duration_millis: u128, + bytes_read: u64, +} + +impl Shard { + fn new(mut pipe: UnixPipe) -> Result { + pipe.set_capacity_no_eperm(SHARD_PIPE_DESIRED_CAPACITY)?; + Ok(Self { pipe, bytes_read: 0, transfer_duration_millis: 0 }) + } +} + +struct PendingMarker<'a> { + marker: image::Marker, + shard: &'a mut Shard, +} + +// This gives ordering of the PendingMarker for the BinaryHeap (max-heap). Lowest `seq` comes first, +// hence the `reverse()`. Note that sequence numbers are unique, giving us a total order. +impl_ord_by!(PendingMarker<'a>, |a: &Self, b: &Self| a.marker.seq.cmp(&b.marker.seq).reverse()); + +struct ImageDeserializer<'a, ImgStore: ImageStore> { + // Shards are located in three different collections: + // 1) `shards` stores shards that may not be readable yet. `poll()` is used to determine when a + // shard is readable, at which point it is moved to the `readable_shard` vec. + // 2) `readable_shards` stores shards that are definitely readable. When reading a marker from + // a shard, its sequence number is examined and the pair (marker, shard) denoted by the type + // `PendingMarker` is placed into the `pending_markers` binary heap. + // 3) `pending_markers` stores a sorted collection of these pending markers by sequence number. + // Once a marker matches the sequence number that we need (stored in the `seq` field), it is + // processed with its associated shard. Once processed, the shard goes back in the shards + // vec, and the cycle continues. + shards: Vec<&'a mut Shard>, + readable_shards: Vec<&'a mut Shard>, + pending_markers: BinaryHeap>, + seq: u64, + + // The following fields relate to the output. + // When receiving a `Filename(filename)` marker, the `img_files` map is examined to see if we + // have an image file corresponding to that filename. If not found, a new image file is created + // via the `img_store`. The image file is then placed into `current_img_file`, which becomes the + // default destination for incoming data via `FileData` markers. + // + // We use a `Box` instead of `String` for filenames to reduce memory usage by 8 bytes per + // filenames (`str` are not resizable, `Strings` are, so they need to carry additional information). + img_store: &'a mut ImgStore, + img_files: HashMap, ImgStore::File>, + current_img_file: Option<(Box, ImgStore::File)>, + + // `start_time` is used for stats, image_eof is used for safety checks. + start_time: Instant, + image_eof: bool, +} + +impl<'a, ImgStore: ImageStore> ImageDeserializer<'a, ImgStore> { + pub fn new(img_store: &'a mut ImgStore, shards: &'a mut [Shard]) -> Self { + let num_shards = shards.len(); + Self { + shards: shards.iter_mut().collect(), + readable_shards: Vec::with_capacity(num_shards), + pending_markers: BinaryHeap::with_capacity(num_shards), + seq: 0, + img_store, + img_files: HashMap::new(), + current_img_file: None, + start_time: Instant::now(), + image_eof: false, + } + } + + fn mark_image_eof(&mut self) -> Result<()> { + ensure!(self.img_files.is_empty() && self.pending_markers.is_empty(), + "Image EOF marker came unexpectedly"); + + self.image_eof = true; + Ok(()) + } + + fn select_img_file(&mut self, filename: Box) -> Result<()> { + // First, put the current image file back in the hashmap. + // This avoids creating the same image file twice. + if let Some((filename, output)) = self.current_img_file.take() { + self.img_files.insert(filename, output); + } + + // Then, look for an image file in the hashmap with the corresponding filename. + // If not found, create a new image file. + let (filename, img_file) = match self.img_files.remove_entry(&filename) { + Some((filename, img_file)) => (filename, img_file), + None => { + let img_file = self.img_store.create(&filename)?; + (filename, img_file) + } + }; + + self.current_img_file = Some((filename, img_file)); + Ok(()) + } + + fn process_marker(&mut self, marker: image::Marker, shard: &mut Shard) -> Result<()> { + use marker::Body::*; + + match marker.body { + Some(Filename(filename)) => { + self.select_img_file(filename.into_boxed_str())?; + } + Some(FileData(size)) => { + let (_filename, img_file) = self.current_img_file.as_mut() + .ok_or_else(|| anyhow!("Unexpected FileData marker"))?; + img_file.write_all_from_pipe(&mut shard.pipe, size as usize)?; + shard.bytes_read += size as u64; + } + Some(FileEof(true)) => { + let (filename, img_file) = self.current_img_file.take() + .ok_or_else(|| anyhow!("Unexpected FileEof marker"))?; + self.img_store.close(filename, img_file); + } + Some(ImageEof(true)) => { + self.mark_image_eof()?; + } + _ => bail!("Malformed image marker"), + } + + Ok(()) + } + + fn get_next_in_order_marker(&mut self) -> Option> { + if let Some(pmarker) = self.pending_markers.peek() { + if pmarker.marker.seq == self.seq { + return self.pending_markers.pop(); + } + } + None + } + + fn process_pending_markers(&mut self) -> Result<()> { + while let Some(PendingMarker { marker, mut shard }) = self.get_next_in_order_marker() { + self.process_marker(marker, &mut shard)?; + self.seq += 1; + self.shards.push(shard); + } + Ok(()) + } + + fn mark_shard_eof(&self, shard: &mut Shard) { + shard.transfer_duration_millis = self.start_time.elapsed().as_millis(); + } + + fn drain_shard(&mut self, shard: &'a mut Shard) -> Result<()> { + match pb_read_next(&mut shard.pipe)? { + None => { + // EOF of that shard is reached + self.mark_shard_eof(shard); + } + Some((marker, marker_size)) => { + ensure!(!self.image_eof, "Unexpected data after image EOF"); + shard.bytes_read += marker_size as u64; + self.pending_markers.push(PendingMarker { marker, shard }); + self.process_pending_markers()?; + } + } + Ok(()) + } + + fn get_next_readable_shard(&mut self) -> Result> { + // If we just return `self.shard.pop()`, we may deadlock if shard pipes are not independent + // from each other. + // This scenario only happens when the capture shards are directly connected to the extract + // shards, useful when doing live migrations. The deadlock may happen when the capture + // serializer attempts to push a large chunk down a shard, and blocks because the shard is + // full. Meanwhile, the extract reader could be blocking on reading from an empty pipe + // shard in `pb_read_next()`. + // To tolerate these workloads, we need to read from shards that are guaranteed to have + // available data. + // We use poll() instead of epoll() because we need to ignore the shards that are in the + // list of pending markers, and we are not doing async reads to do edge triggers. + if self.readable_shards.is_empty() { + if self.shards.len() <= 1 { + // If we have no shard to read from, we'll return None. + // If we have a single shard to read from, there no need to block in poll() + // We return immediately with that shard, even if it is not readable yet as it + // won't introduce a deadlock with the capture side. + return Ok(self.shards.pop()); + } + + let mut poll_fds: Vec = self.shards.iter() + .map(|shard| PollFd::new(shard.pipe.as_raw_fd(), PollFlags::POLLIN)) + .collect(); + + let timeout = -1; + let n = poll(&mut poll_fds, timeout)?; + assert!(n > 0); // There should be at least one fd ready. + + // We could use drain_filter() instead of the mem::replace dance, but we'll probably + // have to use zip(), which complicates the code. + let shards = { + let capacity = self.shards.capacity(); + std::mem::replace(&mut self.shards, Vec::with_capacity(capacity)) + }; + for (shard, poll_fd) in shards.into_iter().zip(poll_fds) { + // We can unwrap() safely. It is fair to assume that the kernel returned valid bits + // in `revents`. + if !poll_fd.revents().unwrap().is_empty() { + self.readable_shards.push(shard); + } else { + self.shards.push(shard); + } + } + } + + Ok(self.readable_shards.pop()) + } + + /// Returns succesfully when the image has been fully deserialized. This is our main loop. + pub fn drain_all(&mut self) -> Result<()> { + while let Some(shard) = self.get_next_readable_shard()? { + self.drain_shard(shard)?; + } + ensure!(self.image_eof, "No shards to read from"); + Ok(()) + } +} + +/// `serve_img()` serves the in-memory image store to CRIU. +fn serve_img( + images_dir: &Path, + progress_pipe: &mut fs::File, + mem_store: &mut image_store::mem::Store, +) -> Result<()> +{ + let server = { + let socket_path = &images_dir.join("img-cache.sock"); + let _ = fs::remove_file(socket_path); // see capture.rs for rationale + UnixListener::bind(socket_path) + .with_context(|| format!("Failed to bind socket to {}", socket_path.display()))? + }; + + writeln!(progress_pipe, "socket-init")?; + + let mut handshake_ctx = HandshakeContext::new(); + handshake_ctx.add_snapshot_id(images_dir.to_str().expect("image_dir should be UTF8 valid").to_string()); + + let mut serve_file = |filename: String, mut socket| -> Result<()> { + match mem_store.remove(&filename) { + Some(memory_file) => { + HandshakeContext::send_reply(&mut socket, 0)?; + + let mut pipe = UnixPipe::new(recv_fd(&mut socket)?)?; + pipe.set_capacity_no_eperm(CRIU_PIPE_DESIRED_CAPACITY)?; + + memory_file.drain(&mut pipe)?; + } + None => { + HandshakeContext::send_reply(&mut socket, libc::ENOENT as u32)?; + } + } + + Ok(()) + }; + + loop { + let (mut socket, _) = server.accept()?; + match handshake_ctx.handshake(&mut socket)? { + HandshakeResult::ReadFile(filename) => { + // XXX It seems that CRIU reads image files sequentially. If it was reading files + // in an interleaved fashion, we would have to use the Poller to avoid deadlocks. + serve_file(filename, socket)?; + } + HandshakeResult::ImageEOF => { + // CRIU informs us that it is done, allowing us to close the server. + // We don't unlink the socket file as we could potentially be racing with another + // instance of the image streamer. + break; + } + HandshakeResult::SnapshotIdExchanged => {} + _ => bail!("unexpected handshake"), + } + } + + Ok(()) +} + +fn drain_shards_into_img_store( + img_store: &mut Store, + progress_pipe: &mut fs::File, + shard_pipes: Vec, + ext_file_pipes: Vec<(String, UnixPipe)>, +) -> Result<()> +{ + let mut shards: Vec = shard_pipes.into_iter().map(Shard::new).collect::>()?; + + // The content of the `ext_file_pipes` are streamed out directly, and not bufferd in memory. + // This is important to avoid blowing up our memory budget. These external files typically + // contain a checkpointed filesystem, which is large. + let mut overlayed_img_store = image_store::fs_overlay::Store::new(img_store); + for (filename, mut pipe) in ext_file_pipes { + // Despite the misleading name, the pipe is not for CRIU, it's most likely for `tar`, but + // it gets to enjoy the same pipe capacity. + pipe.set_capacity_no_eperm(CRIU_PIPE_DESIRED_CAPACITY)?; + overlayed_img_store.add_overlay(filename, pipe); + } + + let mut img_deserializer = ImageDeserializer::new(&mut overlayed_img_store, &mut shards); + img_deserializer.drain_all()?; + + let stats = Stats { + shards: shards.iter().map(|s| ShardStat { + size: s.bytes_read, + transfer_duration_millis: s.transfer_duration_millis, + }).collect(), + }; + writeln!(progress_pipe, "{}", serde_json::to_string(&stats)?)?; + + Ok(()) +} + +/// Description of the arguments can be found in main.rs +pub fn extract(images_dir: &Path, + mut progress_pipe: fs::File, + shard_pipes: Vec, + ext_file_pipes: Vec<(String, UnixPipe)>, + serve: bool, +) -> Result<()> +{ + fs::create_dir_all(images_dir) + .with_context(|| format!("Failed to create directory {}", images_dir.display()))?; + + if serve { + // extract in memory, and serve to CRIU + let mut mem_store = image_store::mem::Store::new(); + drain_shards_into_img_store(&mut mem_store, &mut progress_pipe, shard_pipes, ext_file_pipes)?; + serve_img(images_dir, &mut progress_pipe, &mut mem_store)?; + } else { + // extract on disk + let mut file_store = image_store::fs::Store::new(images_dir); + drain_shards_into_img_store(&mut file_store, &mut progress_pipe, shard_pipes, ext_file_pipes)?; + } + + Ok(()) +} diff --git a/src/handshake.rs b/src/handshake.rs new file mode 100644 index 00000000..0aa076bd --- /dev/null +++ b/src/handshake.rs @@ -0,0 +1,98 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::os::unix::net::UnixStream; +use crate::criu; +use crate::util::{pb_write, pb_read}; +use anyhow::Result; + +// These constants can be seen in the CRIU source code at criu/include/img-remote.h +const NULL_SNAPSHOT_ID: &str = ""; +const FINISH: &str = ""; +const PARENT_IMG: &str = "parent"; + +pub struct HandshakeContext { + snapshot_ids: Vec, +} + +pub enum HandshakeResult { + ReadFile(String), + WriteFile(String), + ImageEOF, + SnapshotIdExchanged, +} + +/// The role of the `HandshakeContext` is to handle communication with CRIU over the image socket. +/// The handshake determines what CRIU wants, and provides a `HandshakeResult` to the caller. +/// Note that we are storing `snapshot_ids` in the context. It seems that these are related to +/// incremental checkpoints where we may have a collection of snapshots. Even though we don't +/// use incremental checkpoints, CRIU needs it. + +impl HandshakeContext { + pub fn new() -> Self { + Self { snapshot_ids: Vec::new() } + } + + pub fn add_snapshot_id(&mut self, snapshot_id: String) { + self.snapshot_ids.push(snapshot_id); + } + + fn handshake_snapshot_id(&mut self, socket: &mut UnixStream, header: criu::LocalImageEntry) + -> Result + { + Ok(match header.name.as_str() { + PARENT_IMG => match header.open_mode as i32 { + libc::O_APPEND => { + let sie: criu::SnapshotIdEntry = pb_read(socket)?; + self.add_snapshot_id(sie.snapshot_id); + HandshakeResult::SnapshotIdExchanged + } + libc::O_RDONLY => { + Self::send_reply(socket, 0)?; + for snapshot_id in &self.snapshot_ids { + let entry = &criu::SnapshotIdEntry { snapshot_id: snapshot_id.into() }; + pb_write(socket, entry)?; + } + HandshakeResult::SnapshotIdExchanged + } + _ => bail!("Unrecognized CRIU snapshot_id open_mode: {}", header.open_mode), + } + FINISH => HandshakeResult::ImageEOF, + _ => bail!("Unrecognized CRIU snapshot_id name: {}", header.name), + }) + } + + fn handshake_file(&self, header: criu::LocalImageEntry) -> Result { + let filename = header.name; + Ok(match header.open_mode as i32 { + libc::O_WRONLY => HandshakeResult::WriteFile(filename), + libc::O_RDONLY => HandshakeResult::ReadFile(filename), + _ => bail!("CRIU file open mode not recognized: {}", header.open_mode), + }) + } + + pub fn handshake(&mut self, socket: &mut UnixStream) -> Result { + let header: criu::LocalImageEntry = pb_read(socket)?; + + match header.snapshot_id.as_str() { + NULL_SNAPSHOT_ID => self.handshake_snapshot_id(socket, header), + _ => self.handshake_file(header) + } + } + + pub fn send_reply(socket: &mut UnixStream, error: u32) -> Result<()> { + pb_write(socket, &criu::LocalImageReplyEntry { error })?; + Ok(()) + } +} diff --git a/src/image_store.rs b/src/image_store.rs new file mode 100644 index 00000000..87330838 --- /dev/null +++ b/src/image_store.rs @@ -0,0 +1,289 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::{Context, Result}; +use crate::{ + unix_pipe::{UnixPipe, UnixPipeImpl}, + util::{MB, PAGE_SIZE}, +}; +use std::{ + collections::{VecDeque, HashMap}, + path::Path, + cmp::min, +}; + +// This code is only used during image extraction. +// +// `ImageDeserializer` in extract.rs outputs the image into an image store, defined here. +// We have three image stores: +// * `fs::Store`, used to store an image on disk. +// * `mem::Store`, used to store an image in memory. This is useful to stream the image to +// CRIU without touching disk. +// * `fs_overlay::Store`, used for bypassing certain files (like fs.tar) when extracting to memory. +// These special files are passed via the "--ext-files-fds" option on the CLI. + +// We use a `Box` instead of `String` for filenames to reduce memory usage by 8 bytes per +// filename. CRIU can generate a lot of files (e.g., one per checkpointed application thread). +// We still have a fairly high memory overhead per file of ~150 bytes. See the `restore_mem_usage` +// integration test. + +pub trait ImageStore { + type File: ImageFile; + fn create(&mut self, filename: &str) -> Result; + fn close(&mut self, _filename: Box, _file: Self::File) {} +} + +pub trait ImageFile { + fn write_all_from_pipe(&mut self, shard_pipe: &mut UnixPipe, size: usize) -> Result<()>; +} + +pub mod fs { + use std::fs; + use super::*; + + pub struct Store<'a> { + images_dir: &'a Path, + } + + impl<'a> Store<'a> { + pub fn new(images_dir: &'a Path) -> Self { + Self { images_dir } + } + } + + impl ImageStore for Store<'_> { + type File = fs::File; + + fn create(&mut self, filename: &str) -> Result { + let full_path = &self.images_dir.join(&filename); + + let file = fs::File::create(full_path) + .with_context(|| format!("Failed to create file {}", full_path.display()))?; + + Ok(file) + } + } + + impl ImageFile for fs::File { + fn write_all_from_pipe(&mut self, shard_pipe: &mut UnixPipe, size: usize) -> Result<()> { + shard_pipe.splice_all(self, size)?; + Ok(()) + } + } +} + +pub mod mem { + use std::fs; + use std::io::prelude::*; + use super::*; + use crate::mmap_buf::MmapBuf; + + /// CRIU can generate many small files (e.g., core-*.img files), and a few large ones (e.g., + /// pages-*.img files). For efficiency, we store small files in a regular `Vec`. When it + /// grows bigger than a page size, we switch to a large file implementation that uses mmap + /// chunks to store its content. + /// + /// This is beneficial for avoiding blowing up our memory budget: CRIU copies data from the + /// image pipe. As CRIU copies data from our memory space, we need to release the memory that + /// we are holding. The default memory allocator is unpredicable when it comes to using brk(), + /// or mmap(), risking an Out-Of-Memory situation. Which is why we use our `MmapBuf` + /// implementation. + /// + /// The large chunk size should not be too large (e.g., 100MB) as the chunk size directly + /// increases our memory overhead while transferring data to CRIU: while CRIU is duplicates the + /// chunk data into its memory space, the chunk remains allocated until we get to the next chunk. + + const MAX_LARGE_CHUNK_SIZE: usize = 10*MB; + static MAX_SMALL_CHUNK_SIZE: &PAGE_SIZE = &PAGE_SIZE; + + pub struct Store { + files: HashMap, File>, + } + + impl Store { + pub fn new() -> Self { + Self { files: HashMap::new() } + } + + pub fn remove(&mut self, filename: &str) -> Option { + self.files.remove(filename) + } + } + + impl ImageStore for Store { + type File = File; + + fn create(&mut self, _filename: &str) -> Result { + Ok(File::new_small()) + } + + fn close(&mut self, filename: Box, file: File) { + // We don't need to shrink our file. If it's a small one, then it's already small + // enough (we used reserve_exact()). For a large file, We could mremap() the last + // chunk, but that doesn't really help as we don't touch pages from the unused + // capacity, which thus remains unallocated. + self.files.insert(filename, file); + } + } + + pub enum File { + Small(Vec), + Large(VecDeque), + } + + use File::*; + + impl File { + pub fn new_small() -> Self { + Small(Vec::new()) + } + + fn large_from_slice(init_data: &[u8]) -> Self { + let mut chunk = MmapBuf::with_capacity(MAX_LARGE_CHUNK_SIZE); + chunk.resize(init_data.len()); + chunk.copy_from_slice(init_data); + + let mut chunks = VecDeque::new(); + chunks.push_back(chunk); + Large(chunks) + } + + fn reserve_chunk(&mut self, size_hint: usize) { + match self { + Small(chunk) => { + if chunk.len() + size_hint > **MAX_SMALL_CHUNK_SIZE { + *self = Self::large_from_slice(&chunk); + } else { + chunk.reserve_exact(size_hint); + } + } + Large(chunks) => { + match chunks.back_mut() { + // We don't use `size_hint` here. The caller will top-up the current chunk, + // and call `reserve_chunk()` again. + Some(chunk) if chunk.len() < chunk.capacity() => {} + _ => chunks.push_back(MmapBuf::with_capacity(MAX_LARGE_CHUNK_SIZE)), + } + } + } + } + + fn write_from_pipe(&mut self, shard_pipe: &mut UnixPipe, size: usize) -> Result { + // reserve_chunk() upgrades a small file to a large file if needed. + self.reserve_chunk(size); + + Ok(match self { + Small(chunk) => { + shard_pipe.take(size as u64).read_to_end(chunk) + .context("Failed to read from shard")?; + size + } + Large(chunks) => { + // Safe to unwrap() as we called reserve_chunk(). + let chunk = chunks.back_mut().unwrap(); + + let current_offset = chunk.len(); + let remaining_chunk_len = chunk.capacity() - current_offset; + let to_read = min(size, remaining_chunk_len); + + chunk.resize(current_offset + to_read); + shard_pipe.read_exact(&mut chunk[current_offset..]) + .context("Failed to read from shard")?; + to_read + } + }) + } + + pub fn drain(self, dst: &mut fs::File) -> Result<()> { + match self { + Small(chunk) => dst.write_all(&chunk)?, + Large(chunks) => { + for chunk in chunks { + // We can vmsplice() because the chunk is backed by our mmap buffer. + // It will be unmapped after the vmsplice guaranteeing that memory pages + // are not going to be recycled and modified, which is a problem for + // vmsplice(). + dst.vmsplice_all(&chunk)?; + } + } + }; + + Ok(()) + } + } + + impl ImageFile for File { + fn write_all_from_pipe(&mut self, shard_pipe: &mut UnixPipe, mut size: usize) -> Result<()> { + while size > 0 { + let written = self.write_from_pipe(shard_pipe, size)?; + size -= written; + } + + Ok(()) + } + } +} + +pub mod fs_overlay { + use std::fs; + use super::*; + + pub struct Store<'a, UnderlyingStore> { + underlying_store: &'a mut UnderlyingStore, + overlayed_files: HashMap, fs::File>, + } + + impl<'a, UnderlyingStore: ImageStore> Store<'a, UnderlyingStore> { + pub fn new (underlying_store: &'a mut UnderlyingStore) -> Self { + let overlayed_files = HashMap::new(); + Self { underlying_store, overlayed_files } + } + + pub fn add_overlay(&mut self, filename: String, file: fs::File) { + self.overlayed_files.insert(filename.into_boxed_str(), file); + } + } + + impl ImageStore for Store<'_, UnderlyingStore> { + type File = File; + + fn create(&mut self, filename: &str) -> Result { + Ok(match self.overlayed_files.remove(filename) { + Some(file) => File::Overlayed(file), + None => File::Underlying(self.underlying_store.create(filename)?), + }) + } + + fn close(&mut self, filename: Box, output: Self::File) { + match output { + File::Overlayed(_) => {}, + File::Underlying(file) => self.underlying_store.close(filename, file), + } + } + } + + pub enum File { + Overlayed(fs::File), + Underlying(UnderlyingFile), + } + + impl ImageFile for File { + fn write_all_from_pipe(&mut self, shard_pipe: &mut UnixPipe, size: usize) -> Result<()> { + match self { + File::Overlayed(file) => file.write_all_from_pipe(shard_pipe, size), + File::Underlying(file) => file.write_all_from_pipe(shard_pipe, size), + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 00000000..3100bb92 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! We use a lib.rs file for building integration tests. + +// Unless we are in release mode, allow dead code, unused imports and variables, +// it makes development more enjoyable. +#![cfg_attr(debug_assertions, allow(dead_code, unused_imports, unused_variables))] + +#[macro_use] +extern crate anyhow; + +pub mod util; +pub mod capture; +pub mod extract; +pub mod poller; +pub mod unix_pipe; +pub mod handshake; +pub mod ord_by; +pub mod image_store; +pub mod mmap_buf; + +// Protobufs definitions are defined in ../proto/ +pub mod criu { include!(concat!(env!("OUT_DIR"), "/criu.rs")); } +pub mod image { include!(concat!(env!("OUT_DIR"), "/image.rs")); } diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 00000000..4f3e4d49 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,216 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Executable entry point. Imports lib.rs via the criu_image_streamer crate. + +// Unless we are in release mode, allow dead code, unused imports and variables, +// it makes development more enjoyable. +#![cfg_attr(debug_assertions, allow(dead_code, unused_imports, unused_variables))] + +#[macro_use] +extern crate anyhow; + +use std::{ + os::unix::io::FromRawFd, + path::PathBuf, + fs, +}; +use structopt::{StructOpt, clap::AppSettings}; +use criu_image_streamer::{ + unix_pipe::{UnixPipe, UnixPipeImpl}, + capture::capture, + extract::extract, +}; +use nix::unistd::dup; +use anyhow::{Result, Context}; + +fn parse_ext_fd(s: &str) -> Result<(String, i32)> { + let mut parts = s.split(':'); + Ok(match (parts.next(), parts.next()) { + (Some(filename), Some(fd)) => { + let filename = filename.to_string(); + let fd = fd.parse().context("Provided ext fd is not an integer")?; + (filename, fd) + }, + _ => bail!("Format is filename:fd") + }) +} + +#[derive(StructOpt, PartialEq, Debug)] +#[structopt(about, + // When showing --help, we want to keep the order of arguments defined + // in the `Opts` struct, as opposed to the default alphabetical order. + global_setting(AppSettings::DeriveDisplayOrder), + // help subcommand is not useful, disable it. + global_setting(AppSettings::DisableHelpSubcommand), + // subcommand version is not useful, disable it. + global_setting(AppSettings::VersionlessSubcommands), +)] +struct Opts { + /// Images directory where the CRIU UNIX socket is created during streaming operations. + // The short option -D mimics CRIU's short option for its --images-dir argument. + #[structopt(short = "D", long)] + images_dir: PathBuf, + + /// File descriptors of shards. Multiple fds may be passed as a comma separated list. + /// Defaults to 0 or 1 depending on the operation. + // require_delimiter is set to avoid clap's non-standard way of accepting lists. + #[structopt(short, long, require_delimiter = true)] + shard_fds: Vec, + + /// External files to incorporate/extract in/from the image. Format is filename:fd + /// where filename corresponds to the name of the file, fd corresponds to the pipe + /// sending or receiving the file content. Multiple external files may be passed as + /// a comma separated list. + #[structopt(short, long, parse(try_from_str=parse_ext_fd), require_delimiter = true)] + ext_file_fds: Vec<(String, i32)>, + + /// File descriptor where to report progress. Defaults to 2. + // The default being 2 is a bit of a lie. We dup(STDOUT_FILENO) due to ownership issues. + #[structopt(short, long)] + progress_fd: Option, + + #[structopt(subcommand)] + operation: Operation, +} + +#[derive(StructOpt, PartialEq, Debug)] +enum Operation { + /// Capture a CRIU image + Capture, + + /// Extract a captured CRIU image + Extract { + /// Buffer the image in memory and serve to CRIU + #[structopt(long)] + serve: bool, + } +} + +fn main() -> Result<()> { + let opts: Opts = Opts::from_args(); + + let progress_pipe = { + let progress_fd = match opts.progress_fd { + Some(fd) => fd, + None => dup(libc::STDERR_FILENO)? + }; + unsafe { fs::File::from_raw_fd(progress_fd) } + }; + + let shard_pipes = + if !opts.shard_fds.is_empty() { + opts.shard_fds + } else { + match opts.operation { + Operation::Capture => vec![dup(libc::STDOUT_FILENO)?], + Operation::Extract{..} => vec![dup(libc::STDIN_FILENO)?], + } + }.into_iter() + .map(UnixPipe::new) + .collect::>() + .context("Image shards (input/output) must be pipes. \ + You may use `cat` or `pv` (faster) to create one.")?; + + let ext_file_pipes = opts.ext_file_fds.into_iter() + .map(|(filename, fd)| Ok((filename, UnixPipe::new(fd)?))) + .collect::>()?; + + match opts.operation { + Operation::Capture => + capture(&opts.images_dir, progress_pipe, shard_pipes, ext_file_pipes), + Operation::Extract { serve } => + extract(&opts.images_dir, progress_pipe, shard_pipes, ext_file_pipes, serve), + } +} + + +#[cfg(test)] +mod cli_tests { + use super::*; + + #[test] + fn test_capture_basic() { + assert_eq!(Opts::from_iter(&vec!["prog", "--images-dir", "imgdir", "capture"]), + Opts { + images_dir: PathBuf::from("imgdir"), + shard_fds: vec![], + ext_file_fds: vec![], + progress_fd: None, + operation: Operation::Capture, + }) + } + + #[test] + fn test_extract_basic() { + assert_eq!(Opts::from_iter(&vec!["prog", "-D", "imgdir", "extract"]), + Opts { + images_dir: PathBuf::from("imgdir"), + shard_fds: vec![], + ext_file_fds: vec![], + progress_fd: None, + operation: Operation::Extract { serve: false }, + }) + } + + #[test] + fn test_extract_serve() { + assert_eq!(Opts::from_iter(&vec!["prog", "-D", "imgdir", "extract", "--serve"]), + Opts { + images_dir: PathBuf::from("imgdir"), + shard_fds: vec![], + ext_file_fds: vec![], + progress_fd: None, + operation: Operation::Extract { serve: true }, + }) + } + + + #[test] + fn test_shards_fds() { + assert_eq!(Opts::from_iter(&vec!["prog", "--images-dir", "imgdir", "--shard-fds", "1,2,3", "capture"]), + Opts { + images_dir: PathBuf::from("imgdir"), + shard_fds: vec![1,2,3], + ext_file_fds: vec![], + progress_fd: None, + operation: Operation::Capture, + }) + } + + #[test] + fn test_ext_files() { + assert_eq!(Opts::from_iter(&vec!["prog", "--images-dir", "imgdir", "--ext-file-fds", "file1:1,file2:2", "capture"]), + Opts { + images_dir: PathBuf::from("imgdir"), + shard_fds: vec![], + ext_file_fds: vec![(String::from("file1"), 1), (String::from("file2"), 2)], + progress_fd: None, + operation: Operation::Capture, + }) + } + + #[test] + fn test_progess_fd() { + assert_eq!(Opts::from_iter(&vec!["prog", "--images-dir", "imgdir", "--progress-fd", "3", "capture"]), + Opts { + images_dir: PathBuf::from("imgdir"), + shard_fds: vec![], + ext_file_fds: vec![], + progress_fd: Some(3), + operation: Operation::Capture, + }) + } +} + diff --git a/src/mmap_buf.rs b/src/mmap_buf.rs new file mode 100644 index 00000000..66b074c0 --- /dev/null +++ b/src/mmap_buf.rs @@ -0,0 +1,82 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + ptr, + slice, + ops::{Drop, Deref, DerefMut}, +}; +use nix::sys::mman::{mmap, munmap, ProtFlags, MapFlags}; +use core::ffi::c_void; + +/// `MmapBuf` is semantically a `Vec` backed by an mmap region. +/// See the discussion in `image_store::mem` for why it is useful. +/// +/// We don't use the memmap create because it doesn't offer a len+capacity abstraction. We'd have +/// to do a wrapper on their `MmapMap` type. That doesn't buy us much code reuse. + +pub struct MmapBuf { + addr: ptr::NonNull, + len: usize, + capacity: usize, +} + +impl MmapBuf { + pub fn with_capacity(capacity: usize) -> Self { + unsafe { + let addr = mmap(ptr::null_mut(), capacity, + ProtFlags::PROT_READ | ProtFlags::PROT_WRITE, + MapFlags::MAP_PRIVATE | MapFlags::MAP_ANONYMOUS, + -1, 0, + ).expect("mmap() failed") as *mut u8; + let addr = ptr::NonNull::new_unchecked(addr); + Self { addr, len: 0, capacity } + } + } + + pub fn resize(&mut self, len: usize) { + assert!(len <= self.capacity); + self.len = len; + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn capacity(&self) -> usize { + self.capacity + } +} + +impl Deref for MmapBuf { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.addr.as_ptr(), self.len) } + } +} + +impl DerefMut for MmapBuf { + fn deref_mut(&mut self) -> &mut [u8] { + unsafe { slice::from_raw_parts_mut(self.addr.as_ptr(), self.len) } + } +} + +impl Drop for MmapBuf { + fn drop(&mut self) { + unsafe { + munmap(self.addr.as_ptr() as *mut c_void, self.capacity).expect("munmap() failed"); + } + } +} diff --git a/src/ord_by.rs b/src/ord_by.rs new file mode 100644 index 00000000..de4e34a3 --- /dev/null +++ b/src/ord_by.rs @@ -0,0 +1,42 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +/// `impl_ord_by!` provides ordering on a type given a closure. +/// We use it for providing ordering to types that are used in a BinaryHeap. + +#[macro_export] +macro_rules! impl_ord_by { + ($type:ident$(<$($gen:tt),+>)?, $cmp_fn:expr) => { + impl$(<$($gen),+>)? Ord for $type$(<$($gen),+>)? { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + $cmp_fn(self, other) + } + } + + impl$(<$($gen),+>)? PartialOrd for $type$(<$($gen),+>)? { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl$(<$($gen),+>)? PartialEq for $type$(<$($gen),+>)? { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == std::cmp::Ordering::Equal + } + } + + impl$(<$($gen),+>)? Eq for $type$(<$($gen),+>)? {} + }; +} diff --git a/src/poller.rs b/src/poller.rs new file mode 100644 index 00000000..e77725ef --- /dev/null +++ b/src/poller.rs @@ -0,0 +1,116 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + os::unix::io::RawFd, + convert::TryFrom, + ops::Drop, +}; +use slab::Slab; +use nix::{ + sys::epoll::{epoll_create, epoll_ctl, epoll_wait, EpollOp, EpollEvent}, + unistd::close, + errno::Errno, + Error, +}; +pub use nix::sys::epoll::EpollFlags; +use anyhow::{Context, Result}; + +/// `Poller` provides an easy-to-use interface to epoll(). It associates file descriptor with +/// objects. When a file descriptor is ready, the poller returns a reference to the corresponding +/// object via poll(). +/// There should be a crate with this functionality. Either we didn't look well enough, or we +/// should publish a crate, because it seems useful beyond this project, + +pub struct Poller { + epoll_fd: RawFd, + slab: Slab<(RawFd, T)>, + pending_events: Vec, +} + +pub type Key = usize; + +impl Poller { + pub fn new() -> Result { + let epoll_fd = epoll_create().context("Failed to create epoll")?; + let slab = Slab::new(); + let pending_events = Vec::new(); + + Ok(Self { epoll_fd, slab, pending_events }) + } + + pub fn add(&mut self, fd: RawFd, obj: T, flags: EpollFlags) -> Result { + let entry = self.slab.vacant_entry(); + let key = entry.key(); + let mut event = EpollEvent::new(flags, u64::try_from(key).unwrap()); + epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, &mut event) + .context("Failed to add fd to epoll")?; + entry.insert((fd, obj)); + Ok(key) + } + + pub fn remove(&mut self, key: Key) -> Result { + let (fd, obj) = self.slab.remove(key); + epoll_ctl(self.epoll_fd, EpollOp::EpollCtlDel, fd, None) + .context("Failed to remove fd from epoll")?; + Ok(obj) + } + + /// Returns None when the poller has no file descriptors to track. + /// Otherwise, blocks and returns a reference to the next ready object. + /// + /// `capacity` corresponds to the number of file descriptors that can be returned by a single + /// system call. + pub fn poll(&mut self, capacity: usize) -> Result> { + if self.slab.is_empty() { + return Ok(None); + } + + if self.pending_events.is_empty() { + self.pending_events.resize(capacity, EpollEvent::empty()); + + let timeout = -1; + let num_ready_fds = epoll_wait_no_intr(self.epoll_fd, &mut self.pending_events, timeout) + .context("Failed to wait on epoll")?; + + // We don't use a timeout (-1), and we have events registered (slab is not empty) + // so we should have a least one fd ready. + assert!(num_ready_fds > 0); + + self.pending_events.truncate(num_ready_fds); + } + + let event = self.pending_events.pop().unwrap(); + let key = event.data() as usize; + let (_fd, obj) = &mut self.slab[key]; + Ok(Some((key, obj))) + } +} + +impl Drop for Poller { + fn drop(&mut self) { + close(self.epoll_fd).expect("Failed to close epoll"); + } +} + +pub fn epoll_wait_no_intr(epoll_fd: RawFd, events: &mut [EpollEvent], timeout_ms: isize) + -> nix::Result +{ + loop { + match epoll_wait(epoll_fd, events, timeout_ms) { + Err(Error::Sys(Errno::EINTR)) => continue, + other => return other, + } + } +} diff --git a/src/unix_pipe.rs b/src/unix_pipe.rs new file mode 100644 index 00000000..42fefd4a --- /dev/null +++ b/src/unix_pipe.rs @@ -0,0 +1,152 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + os::unix::io::{RawFd, FromRawFd, AsRawFd}, + sync::Once, + fs, +}; +use nix::{ + sys::stat::{fstat, SFlag}, + fcntl::{fcntl, FcntlArg}, + fcntl::{vmsplice, splice, SpliceFFlags}, + sys::uio::IoVec, + errno::Errno, + Error, +}; +use crate::util::PAGE_SIZE; +use anyhow::{Context, Result}; + +/// Unix pipes are regular `fs::File`. To add pipe specific functionalities, we have three options: +/// 1) Use unscoped functions. Less pleasant to use. +/// 2) Define a new struct like `struct UnixPipe(fs::File)`. But this has the disadvantage that we +/// can no longer manipulate `UnixPipe` as a file when we wish to do so. For example, we'd have +/// to reimplement the `Reader` and `Writer` trait. +/// 3) Define a new trait `UnixPipeImpl`. This has the downside that we need to import the +/// `UnixPipeImpl` everywhere we want to use the `UnixPipe` features. Not a terrible downside, +/// so we go with this. + +pub type UnixPipe = fs::File; + +pub trait UnixPipeImpl: Sized { + fn new(fd: RawFd) -> Result; + fn fionread(&self) -> Result; + fn set_capacity(&mut self, capacity: i32) -> nix::Result<()>; + fn set_capacity_no_eperm(&mut self, capacity: i32) -> Result<()>; + fn set_best_capacity(pipes: &mut [Self], max_capacity: i32) -> Result; + fn splice_all(&mut self, dst: &mut fs::File, len: usize) -> Result<()>; + fn vmsplice_all(&mut self, data: &[u8]) -> Result<()>; +} + +impl UnixPipeImpl for UnixPipe { + fn new(fd: RawFd) -> Result { + fn ensure_pipe_type(fd: RawFd) -> Result<()> { + let stat = fstat(fd).with_context(|| format!("fstat() failed on fd {}", fd))?; + let is_pipe = (SFlag::S_IFMT.bits() & stat.st_mode) == SFlag::S_IFIFO.bits(); + ensure!(is_pipe, "fd {} is not a pipe", fd); + Ok(()) + } + + ensure_pipe_type(fd)?; + unsafe { Ok(fs::File::from_raw_fd(fd)) } + } + + fn fionread(&self) -> Result { + // fionread() is defined as an int in the kernel, hence the signed i32 + nix::ioctl_read_bad!(_fionread, libc::FIONREAD, i32); + + let mut result = 0; + unsafe { _fionread(self.as_raw_fd(), &mut result) } + .with_context(|| format!("Failed to get pipe content size via fionread() on fd {}", + self.as_raw_fd()))?; + Ok(result) + } + + fn set_capacity(&mut self, capacity: i32) -> nix::Result<()> { + fcntl(self.as_raw_fd(), FcntlArg::F_SETPIPE_SZ(capacity)).map(|_| ()) + } + + // Same as set_capacity(), except EPERM errors are ignored. + fn set_capacity_no_eperm(&mut self, capacity: i32) -> Result<()> { + match self.set_capacity(capacity) { + Err(Error::Sys(Errno::EPERM)) => { + warn_capacity_eperm(); + Ok(()) + } + other => other, + }?; + Ok(()) + } + + /// Sets the capacity of many pipes. /proc/sys/fs/pipe-user-pages-{hard,soft} may be non-zero, + /// preventing setting the desired capacity. If we can't set the provided `max_capacity`, then + /// we try with a lower capacity. Eventually we will succeed. + /// Returns the actual capacity of the pipes. + fn set_best_capacity(pipes: &mut [Self], max_capacity: i32) -> Result { + let mut capacity = max_capacity; + loop { + match pipes.iter_mut().try_for_each(|pipe| pipe.set_capacity(capacity)) { + Err(Error::Sys(Errno::EPERM)) => { + warn_capacity_eperm(); + assert!(capacity > *PAGE_SIZE as i32); + capacity /= 2; + continue; + } + Err(e) => return Err(anyhow!(e)), + Ok(()) => return Ok(capacity), + }; + } + } + + fn splice_all(&mut self, dst: &mut fs::File, len: usize) -> Result<()> { + let mut to_write = len; + + while to_write > 0 { + let written = splice(self.as_raw_fd(), None, dst.as_raw_fd(), None, + to_write, SpliceFFlags::SPLICE_F_MORE) + .with_context(|| format!("splice() failed fd {} -> fd {}", + self.as_raw_fd(), dst.as_raw_fd()))?; + ensure!(written > 0, "Reached EOF during splice() on fd {}", self.as_raw_fd()); + to_write -= written; + } + + Ok(()) + } + + fn vmsplice_all(&mut self, data: &[u8]) -> Result<()> { + let mut to_write = data.len(); + let mut offset = 0; + + while to_write > 0 { + let in_iov = IoVec::from_slice(&data[offset..]); + let written = vmsplice(self.as_raw_fd(), &[in_iov], SpliceFFlags::SPLICE_F_GIFT) + .with_context(|| format!("vmsplice() failed on fd {}", self.as_raw_fd()))?; + assert!(written > 0, "vmsplice() returned 0"); + + to_write -= written; + offset += written; + } + + Ok(()) + } +} + +fn warn_capacity_eperm() { + static ONCE: Once = Once::new(); + ONCE.call_once(|| { + eprintln!("Cannot set pipe size as desired (EPERM). \ + Continuing with smaller pipe sizes but performance may be reduced. \ + See the Deploy section in the README for a remedy."); + }); +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 00000000..fb1a8dea --- /dev/null +++ b/src/util.rs @@ -0,0 +1,113 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use prost::Message; +use std::{ + mem::size_of, + os::unix::net::UnixStream, + os::unix::io::{RawFd, AsRawFd}, + io::{Read, Write}, +}; +use nix::{ + sys::socket::{ControlMessageOwned, MsgFlags, recvmsg}, + sys::uio::IoVec, + unistd::{sysconf, SysconfVar}, +}; +use bytes::{BytesMut, Buf, BufMut}; +use serde::Serialize; +use anyhow::{Result, Context}; + +pub const KB: usize = 1024; +pub const MB: usize = 1024*1024; +pub const EOF_ERR_MSG: &str = "EOF unexpectedly reached"; + +lazy_static::lazy_static! { + pub static ref PAGE_SIZE: usize = sysconf(SysconfVar::PAGE_SIZE) + .expect("Failed to determine PAGE_SIZE") + .expect("Failed to determine PAGE_SIZE") as usize; +} + +/// read_bytes_next() attempts to read exactly the number of bytes requested. +/// If we are at EOF, it returns Ok(None). +/// If it can read the number of bytes requested, it returns Ok(bytes_requested). +/// Otherwise, it returns Err("EOF error"). +fn read_bytes_next(src: &mut S, len: usize) -> Result> { + let mut buf = Vec::with_capacity(len); + src.take(len as u64).read_to_end(&mut buf).context("Failed to read protobuf")?; + Ok(match buf.len() { + 0 => None, + l if l == len => Some(buf[..].into()), + _ => bail!(EOF_ERR_MSG), + }) +} + +/// pb_read_next() is useful to iterate through a stream of protobuf objects. +/// It returns Ok(obj) for each object to be read, and Ok(None) when EOF is reached. +/// It returns an error if an object is only partially read, or any deserialization error. +pub fn pb_read_next(src: &mut S) -> Result> { + Ok(match read_bytes_next(src, size_of::())? { + None => None, + Some(mut size_buf) => { + let size = size_buf.get_u32_le() as usize; + assert!(size < 10*KB, "Would read a protobuf of size >10KB. Something is wrong"); + let buf = read_bytes_next(src, size)?.ok_or_else(|| anyhow!(EOF_ERR_MSG))?; + let bytes_read = size_of::() + size_buf.len() + buf.len(); + Some((T::decode(buf)?, bytes_read)) + } + }) +} + +pub fn pb_read(src: &mut S) -> Result { + Ok(match pb_read_next(src)? { + None => bail!(EOF_ERR_MSG), + Some((obj, _size)) => obj, + }) +} + +pub fn pb_write(dst: &mut S, msg: &T) -> Result { + let msg_size = msg.encoded_len(); + let mut buf = BytesMut::with_capacity(size_of::() + msg_size); + assert!(msg_size < 10*KB, "Would serialize a protobuf of size >10KB. Something is wrong"); + buf.put_u32_le(msg_size as u32); + + msg.encode(&mut buf).context("Failed to encode protobuf")?; + dst.write_all(&buf).context("Failed to write protobuf")?; + + Ok(buf.len()) +} + +pub fn recv_fd(socket: &mut UnixStream) -> Result { + let mut cmsgspace = nix::cmsg_space!([RawFd; 1]); + + let msg = recvmsg(socket.as_raw_fd(), + &[IoVec::from_mut_slice(&mut [0])], + Some(&mut cmsgspace), + MsgFlags::empty()) + .context("Failed to read fd from socket")?; + + Ok(match msg.cmsgs().next() { + Some(ControlMessageOwned::ScmRights(fds)) if fds.len() == 1 => fds[0], + _ => bail!("No fd received"), + }) +} + +#[derive(Serialize)] +pub struct Stats { + pub shards: Vec, +} +#[derive(Serialize)] +pub struct ShardStat { + pub size: u64, + pub transfer_duration_millis: u128, +} diff --git a/tests/helpers/criu.rs b/tests/helpers/criu.rs new file mode 100644 index 00000000..26f33600 --- /dev/null +++ b/tests/helpers/criu.rs @@ -0,0 +1,114 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + os::unix::net::UnixStream, + os::unix::io::AsRawFd, + io::Read, + path::PathBuf, +}; +use anyhow::{Result, Context}; +use criu_image_streamer::{ + criu, + util::{pb_read, pb_read_next, pb_write}, + unix_pipe::UnixPipe, +}; +use crate::helpers::util::*; + +// These constants can be seen in the CRIU source code at criu/include/img-remote.h +const NULL_SNAPSHOT_ID: &str = ""; +const FINISH: &str = ""; +const PARENT_IMG: &str = "parent"; + +/// For test purposes, we implement a CRIU simulator +pub struct CRIU { + socket_path: PathBuf, +} + +impl CRIU { + pub fn new(socket_path: PathBuf) -> Self { + Self { socket_path } + } + + fn connect(&self, snapshot_id: &str, name: &str, open_mode: u32) -> Result { + let mut socket = UnixStream::connect(&self.socket_path) + .with_context(|| format!("Can't connect to {}", self.socket_path.display()))?; + let snapshot_id = snapshot_id.to_string(); + let name = name.to_string(); + pb_write(&mut socket, &criu::LocalImageEntry { snapshot_id, name, open_mode }) + .with_context(|| "Can't write to CRIU socket")?; + Ok(socket) + } + + fn read_reply(socket: &mut UnixStream) -> Result { + let reply: criu::LocalImageReplyEntry = pb_read(socket)?; + return Ok(reply.error) + } + + pub fn finish(&self) -> Result<()> { + self.connect(NULL_SNAPSHOT_ID, FINISH, 0)?; + Ok(()) + } + + pub fn append_snapshot_id(&self, snapshot_id: &str) -> Result<()> { + let mut socket = self.connect(NULL_SNAPSHOT_ID, PARENT_IMG, libc::O_APPEND as u32)?; + let snapshot_id = snapshot_id.to_string(); + pb_write(&mut socket, &criu::SnapshotIdEntry { snapshot_id })?; + Ok(()) + } + + pub fn get_snapshot_ids(&self) -> Result> { + let mut socket = self.connect(NULL_SNAPSHOT_ID, PARENT_IMG, libc::O_RDONLY as u32)?; + ensure!(Self::read_reply(&mut socket)? == 0, "Bad reply for get_snapshot_ids()"); + + let mut snapshot_ids = Vec::new(); + while let Some((entry, _)) = pb_read_next::<_, criu::SnapshotIdEntry>(&mut socket)? { + snapshot_ids.push(entry.snapshot_id) + } + + Ok(snapshot_ids) + } + + pub fn write_img_file(&self, snapshot_id: &str, filename: &str) -> Result { + let mut socket = self.connect(snapshot_id, filename, libc::O_WRONLY as u32)?; + let (pipe_r, pipe_w) = new_pipe(); + send_fd(&mut socket, pipe_r.as_raw_fd())?; + Ok(pipe_w) + } + + pub fn maybe_read_img_file(&self, snapshot_id: &str, filename: &str) -> Result> { + let mut socket = self.connect(snapshot_id, filename, libc::O_RDONLY as u32)?; + + match Self::read_reply(&mut socket)? as i32 { + 0 => {}, + libc::ENOENT => return Ok(None), + _ => return Err(anyhow!("Bad response during read_img_file()")), + } + + let (pipe_r, pipe_w) = new_pipe(); + send_fd(&mut socket, pipe_w.as_raw_fd())?; + Ok(Some(pipe_r)) + } + + pub fn read_img_file(&self, snapshot_id: &str, filename: &str) -> Result { + self.maybe_read_img_file(snapshot_id, filename)? + .ok_or_else(|| anyhow!("Requested file does not exists")) + } + + pub fn read_img_file_into_vec(&self, snapshot_id: &str, filename: &str) -> Result> { + let mut buf = Vec::new(); + self.read_img_file(snapshot_id, filename)?.read_to_end(&mut buf)?; + Ok(buf) + } +} diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs new file mode 100644 index 00000000..eb363261 --- /dev/null +++ b/tests/helpers/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod criu; +pub mod util; diff --git a/tests/helpers/util.rs b/tests/helpers/util.rs new file mode 100644 index 00000000..fc5433d2 --- /dev/null +++ b/tests/helpers/util.rs @@ -0,0 +1,111 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + os::unix::net::UnixStream, + io::{Read, BufReader, BufRead}, + os::unix::io::{RawFd, AsRawFd}, +}; +use nix::{ + sys::socket::{ControlMessage, MsgFlags, sendmsg}, + sys::uio::IoVec, + unistd, +}; +use criu_image_streamer::{ + unix_pipe::{UnixPipe, UnixPipeImpl}, + util::{KB, PAGE_SIZE}, +}; +use serde::Deserialize; +use anyhow::Result; + +#[derive(Deserialize, Debug)] +pub struct Stats { + pub shards: Vec, +} +#[derive(Deserialize, Debug)] +pub struct ShardStat { + pub size: u64, + pub transfer_duration_millis: u128, +} + +pub fn new_pipe() -> (UnixPipe, UnixPipe) { + let (fd_r, fd_w) = unistd::pipe().expect("Failed to create UNIX pipe"); + let pipe_r = UnixPipe::new(fd_r).unwrap(); + let pipe_w = UnixPipe::new(fd_w).unwrap(); + (pipe_r, pipe_w) +} + +pub fn read_line(progress: &mut BufReader) -> Result { + let mut buf = String::new(); + progress.read_line(&mut buf)?; + + ensure!(buf.len() > 0, "EOF reached"); + ensure!(buf.chars().last() == Some('\n'), "no trailing \\n found"); + buf.pop(); // Removes the trailing '\n' + + Ok(buf) +} + +pub fn read_stats(progress: &mut BufReader) -> Result { + Ok(serde_json::from_str(&read_line(progress)?)?) +} + +pub fn send_fd(socket: &mut UnixStream, fd: RawFd) -> Result<()> { + sendmsg(socket.as_raw_fd(), + &[IoVec::from_slice(&[0])], + &[ControlMessage::ScmRights(&[fd])], + MsgFlags::empty(), + None)?; + Ok(()) +} + +pub fn get_rand_vec(size: usize) -> Vec { + let urandom = std::fs::File::open("/dev/urandom").expect("Failed to open /dev/urandom"); + let mut result = Vec::with_capacity(size); + urandom.take(size as u64).read_to_end(&mut result).expect("Failed to read /dev/urandom"); + result +} + +pub fn get_filled_vec(size: usize, value: u8) -> Vec { + let mut vec = Vec::new(); + vec.resize(size, value); + vec +} + +pub fn get_resident_mem_size() -> usize { + *PAGE_SIZE * procinfo::pid::statm_self().unwrap().resident +} + +pub fn read_to_end_rate_limited(src: &mut UnixPipe, dst: &mut Vec, + max_rate_per_millis: usize) -> Result { + use std::time::{Instant, Duration}; + + let start_time = Instant::now(); + let mut total_size = 0; + + loop { + let size = src.take(4*KB as u64).read_to_end(dst)?; + if size == 0 { + break; // EOF + } + total_size += size; + + let duration_goal = Duration::from_millis((total_size / max_rate_per_millis) as u64); + if duration_goal > start_time.elapsed() { + std::thread::sleep(duration_goal - start_time.elapsed()) + } + } + + Ok(total_size) +} diff --git a/tests/integration_test.rs b/tests/integration_test.rs new file mode 100644 index 00000000..4613551b --- /dev/null +++ b/tests/integration_test.rs @@ -0,0 +1,783 @@ +// Copyright 2020 Two Sigma Investments, LP. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Unless we are in release mode, allow dead code, unused imports and variables, +// it makes development more enjoyable. +#![cfg_attr(debug_assertions, allow(dead_code, unused_imports, unused_variables))] + +#[macro_use] +extern crate anyhow; + +mod helpers; + +use std::{ + path::PathBuf, + io::{Read, Write, BufReader}, + cmp::{min, max}, + thread, +}; +use criu_image_streamer::{ + unix_pipe::{UnixPipe, UnixPipeImpl}, + capture::capture, + extract::extract, + util::{KB, MB, PAGE_SIZE}, +}; +use crate::helpers::{ + criu::CRIU, + util::*, +}; +use anyhow::Result; + +// Each test belongs in its separate module. They all follow the same workflow, so we made the +// workflow common in the TestImpl trait. Each test modifies some of the behavior of TestImpl to +// test specific part of the workflow. + +struct CheckpointContext { + snapshot_id: String, + + progress: BufReader, + capture_thread: thread::JoinHandle<()>, + + criu: CRIU, +} + +struct RestoreContext { + snapshot_id: String, + + progress: BufReader, + extract_thread: thread::JoinHandle<()>, + + criu: CRIU, +} + +trait TestImpl { + fn num_shards(&self) -> usize { 4 } + fn images_dir(&self) -> PathBuf { PathBuf::from("/tmp/test-criu-image-streamer") } + fn capture_ext_files(&mut self) -> Vec<(String, UnixPipe)> { Vec::new() } + fn extract_ext_files(&mut self) -> Vec<(String, UnixPipe)> { Vec::new() } + fn serve_image(&mut self) -> bool { true } + fn has_checkpoint_started(&mut self) -> bool { true } // should be true if send_img_files() has sent a file. + + fn shards(&mut self)-> Vec<(UnixPipe, UnixPipe)> { + (0..self.num_shards()) + .map(|_| new_pipe()) + .collect() + } + + fn bootstrap(&mut self) -> Result<(CheckpointContext, RestoreContext)> { + let images_dir = self.images_dir(); + + let (capture_progress_r, capture_progress_w) = new_pipe(); + let (extract_progress_r, extract_progress_w) = new_pipe(); + + let capture_progress = BufReader::new(capture_progress_r); + let extract_progress = BufReader::new(extract_progress_r); + + let (shard_pipes_r, shard_pipes_w): (Vec, Vec) = + self.shards().drain(..).unzip(); + + let capture_thread = { + let images_dir = images_dir.clone(); + let ext_files = self.capture_ext_files(); + + thread::spawn(move || { + capture(&images_dir, capture_progress_w, shard_pipes_w, ext_files) + .expect("capture() failed"); + }) + }; + + let extract_thread = { + let images_dir = images_dir.clone(); + let ext_files = self.extract_ext_files(); + let serve = self.serve_image(); + + thread::spawn(move || { + extract(&images_dir, extract_progress_w, shard_pipes_r, ext_files, serve) + .expect("extract() failed"); + }) + }; + + let criu_checkpoint = CRIU::new(images_dir.join("img-proxy.sock")); + let criu_restore = CRIU::new(images_dir.join("img-cache.sock")); + + let snapshot_id = images_dir.to_str().unwrap().to_string(); + + Ok(( + CheckpointContext { + snapshot_id: snapshot_id.clone(), + progress: capture_progress, + capture_thread, + criu: criu_checkpoint + }, + RestoreContext { + snapshot_id: snapshot_id.clone(), + progress: extract_progress, + extract_thread, + criu: criu_restore + } + )) + } + + fn init_checkpoint(&mut self, checkpoint: &mut CheckpointContext) -> Result<()> { + // Wait for CRIU socket for checkpointing to be ready + assert_eq!(read_line(&mut checkpoint.progress)?, "socket-init"); + // Initialize snapshot ids the way CRIU would do it + checkpoint.criu.append_snapshot_id(&checkpoint.snapshot_id)?; + let snapshot_ids = checkpoint.criu.get_snapshot_ids()?; + assert_eq!(snapshot_ids, vec![checkpoint.snapshot_id.as_str()]); + + Ok(()) + } + + fn send_img_files(&mut self, _checkpoint: &mut CheckpointContext) -> Result<()> { + Ok(()) + } + + fn read_progress_checkpoint_started(&mut self, checkpoint: &mut CheckpointContext) -> Result<()> { + if self.has_checkpoint_started() { + assert_eq!(read_line(&mut checkpoint.progress)?, "checkpoint-start"); + } + Ok(()) + } + + fn finish_checkpoint(&mut self, mut checkpoint: CheckpointContext) -> Result { + checkpoint.criu.finish()?; + let stats: Stats = read_stats(&mut checkpoint.progress)?; + checkpoint.capture_thread.join().unwrap(); + Ok(stats) + } + + fn after_finish_checkpoint(&mut self, _checkpoint_stats: &Stats) -> Result<()> { + Ok(()) + } + + fn finish_image_extraction(&mut self, restore: &mut RestoreContext) -> Result { + Ok(read_stats(&mut restore.progress)?) + } + + fn after_finish_image_extraction(&mut self, _restore_stats: &Stats) -> Result<()> { + Ok(()) + } + + fn init_restore(&mut self, restore: &mut RestoreContext) -> Result<()> { + // The image can be served now. Wait for the CRIU socket to be ready. + assert_eq!(read_line(&mut restore.progress)?, "socket-init"); + // Check that the snapshot ids is valid + assert_eq!(restore.criu.get_snapshot_ids()?, vec![restore.snapshot_id.as_str()]); + Ok(()) + } + + fn recv_img_files(&mut self, _restore: &mut RestoreContext) -> Result<()> { + Ok(()) + } + + fn finish_restore(&mut self, restore: RestoreContext) -> Result<()> { + restore.criu.finish()?; + restore.extract_thread.join().unwrap(); + Ok(()) + } + + fn run(&mut self) -> Result<()> { + let (mut checkpoint, mut restore) = self.bootstrap()?; + + self.init_checkpoint(&mut checkpoint)?; + self.send_img_files(&mut checkpoint)?; + self.read_progress_checkpoint_started(&mut checkpoint)?; + let stats = self.finish_checkpoint(checkpoint)?; + self.after_finish_checkpoint(&stats)?; + + let stats = self.finish_image_extraction(&mut restore)?; + self.after_finish_image_extraction(&stats)?; + + if self.serve_image() { + self.init_restore(&mut restore)?; + self.recv_img_files(&mut restore)?; + self.finish_restore(restore)?; + } + + Ok(()) + } +} + +mod basic { + use super::*; + + struct Test; + + impl Test { + fn new() -> Self { Self } + } + + impl TestImpl for Test { + fn send_img_files(&mut self, checkpoint: &mut CheckpointContext) -> Result<()> { + checkpoint.criu.write_img_file(&checkpoint.snapshot_id, "file.img")? + .write_all("hello world".as_bytes())?; + + Ok(()) + } + + fn recv_img_files(&mut self, restore: &mut RestoreContext) -> Result<()> { + let buf = restore.criu.read_img_file_into_vec(&restore.snapshot_id, "file.img")?; + assert_eq!(buf, "hello world".as_bytes(), "File data content mismatch"); + Ok(()) + } + } + + #[test] + fn test() -> Result<()> { + Test::new().run() + } +} + +mod missing_files { + use super::*; + + struct Test; + + impl Test { + fn new() -> Self { Self } + } + + impl TestImpl for Test { + fn has_checkpoint_started(&mut self) -> bool { false } + + fn recv_img_files(&mut self, restore: &mut RestoreContext) -> Result<()> { + let file = restore.criu.maybe_read_img_file(&restore.snapshot_id, "no-file.img")?; + assert!(file.is_none(), "File exists but shouldn't"); + Ok(()) + } + } + + #[test] + fn test() -> Result<()> { + Test::new().run() + } +} + +mod ext_files { + use super::*; + + const TEST_DATA1: &str = "ext file1 data"; + const TEST_DATA2: &str = "ext file2 data"; + + struct Test { + send_ext_pipe1: Option, + send_ext_pipe2: Option, + + recv_ext_pipe1: Option, + recv_ext_pipe2: Option, + } + + impl Test { + fn new() -> Self { + Self { + send_ext_pipe1: None, send_ext_pipe2: None, + recv_ext_pipe1: None, recv_ext_pipe2: None, + } + } + } + + impl TestImpl for Test { + fn has_checkpoint_started(&mut self) -> bool { false } + + fn capture_ext_files(&mut self) -> Vec<(String, UnixPipe)> { + let (pipe1_r, pipe1_w) = new_pipe(); + let (pipe2_r, pipe2_w) = new_pipe(); + + self.send_ext_pipe1 = Some(pipe1_w); + self.send_ext_pipe2 = Some(pipe2_w); + + vec![("file1.ext".to_string(), pipe1_r), + ("file2.ext".to_string(), pipe2_r)] + } + + fn extract_ext_files(&mut self) -> Vec<(String, UnixPipe)> { + let (pipe1_r, pipe1_w) = new_pipe(); + let (pipe2_r, pipe2_w) = new_pipe(); + + self.recv_ext_pipe1 = Some(pipe1_r); + self.recv_ext_pipe2 = Some(pipe2_r); + + vec![("file1.ext".to_string(), pipe1_w), + ("file2.ext".to_string(), pipe2_w)] + } + + fn send_img_files(&mut self, _: &mut CheckpointContext) -> Result<()> { + self.send_ext_pipe1.take().unwrap().write_all(TEST_DATA1.as_bytes())?; + self.send_ext_pipe2.take().unwrap().write_all(TEST_DATA2.as_bytes())?; + Ok(()) + } + + fn recv_img_files(&mut self, _: &mut RestoreContext) -> Result<()> { + let mut buf = Vec::new(); + self.recv_ext_pipe1.take().unwrap().read_to_end(&mut buf)?; + assert_eq!(buf, TEST_DATA1.as_bytes()); + + let mut buf = Vec::new(); + self.recv_ext_pipe2.take().unwrap().read_to_end(&mut buf)?; + assert_eq!(buf, TEST_DATA2.as_bytes()); + + Ok(()) + } + } + + #[test] + fn test() -> Result<()> { + Test::new().run() + } +} + +mod load_balancing { + use super::*; + + // This test simulates a capture with 4 shards, where the first one is being rate limited + // at 1MB/s. We attempt to capture a 40MB image. The choke shard should receieve little + // data compared to the other shards. + // + // NOTE Load balancing works only when using pipes of capacity greater than 2*PAGE_SIZE. + // We skip that test if we don't have enough pipe capacity. + // The Rust test runner lacks the ability to skip a stest at runtime, so we improvised a bit. + + const CHOKE_RATE_PER_MILLI: usize = 1*KB; // 1MB/sec + const CHOKE_SHARD_INDEX: usize = 0; + + struct Test { + file: Vec, + shard_threads: Option>>>, + skip_test: bool, + } + + impl Test { + fn new() -> Self { + Self { + // Large enough to fill the buffer of the choked pipe + file: get_rand_vec(40*MB), + shard_threads: None, + skip_test: false, + } + } + } + + impl TestImpl for Test { + fn shards(&mut self)-> Vec<(UnixPipe, UnixPipe)> { + let (shards, shard_threads) = (0..4).map(|i| { + let (mut capture_shard_r, capture_shard_w) = new_pipe(); + let (extract_shard_r, mut extract_shard_w) = new_pipe(); + let shard = (extract_shard_r, capture_shard_w); + + let shard_thread = thread::spawn(move || { + let mut buf = Vec::new(); + if i == CHOKE_SHARD_INDEX { + read_to_end_rate_limited(&mut capture_shard_r, &mut buf, CHOKE_RATE_PER_MILLI)?; + } else { + capture_shard_r.read_to_end(&mut buf)?; + } + extract_shard_w.write_all(&buf)?; + Ok(()) + }); + + (shard, shard_thread) + }).unzip(); + + self.shard_threads = Some(shard_threads); + + // Check if we have enough capacity for pipes + let mut shards: Vec<(UnixPipe, UnixPipe)> = shards; + for (shard_r, _) in &mut shards { + if shard_r.set_capacity(2*(*PAGE_SIZE) as i32).is_err() { + self.skip_test = true; + eprintln!("WARN Skipping load_balancing test due to insufficient pipe capacity. \ + This test needs at least 2 page capacity for shard pipes"); + break; + } + } + + shards + } + + fn has_checkpoint_started(&mut self) -> bool { !self.skip_test } + + fn send_img_files(&mut self, checkpoint: &mut CheckpointContext) -> Result<()> { + if self.skip_test { + return Ok(()); + } + + checkpoint.criu.write_img_file(&checkpoint.snapshot_id, "file.img")? + .write_all(&self.file)?; + + Ok(()) + } + + fn after_finish_checkpoint(&mut self, checkpoint_stats: &Stats) -> Result<()> { + if self.skip_test { + return Ok(()); + } + + self.shard_threads.take().unwrap() + .drain(..).map(|t| t.join().unwrap()) + .collect::>()?; + + eprintln!("Shard sizes: {:?} KB", checkpoint_stats.shards.iter() + .map(|s| s.size/KB as u64).collect::>()); + + for (i, shard_stats) in checkpoint_stats.shards.iter().enumerate() { + // Using 2MB threadshold as the choked shard has to be bigger than 1MB (SHARD_PIPE_CAPACITY), + // but not much more. + if i == CHOKE_SHARD_INDEX { + assert!(shard_stats.size < 2*MB as u64, + "Choked shard received too much data: {} KB", shard_stats.size/KB as u64); + } else { + assert!(shard_stats.size > 2*MB as u64, + "Normal shard received too little data: {} KB", shard_stats.size/KB as u64); + } + } + + Ok(()) + } + + fn recv_img_files(&mut self, restore: &mut RestoreContext) -> Result<()> { + if self.skip_test { + return Ok(()); + } + + let buf = restore.criu.read_img_file_into_vec(&restore.snapshot_id, "file.img")?; + assert!(buf == self.file); + + Ok(()) + } + } + + #[test] + fn test() -> Result<()> { + Test::new().run() + } +} + +mod restore_mem_usage { + use super::*; + + // We test one large file, and many small ones, simulating a fair CRIU workload. + // There are two things to test: + // 1) The in-memory store should have low overhead. 200 bytes per file is not exactly good, but + // it's good enough (measured at 129 bytes). I welcome suggestion to make this better. + // 2) The in-memory store should free its memory as its tranfering a file to CRIU + // It shouldn't be bigger than image_store::mem::MAX_LARGE_CHUNK_SIZE (10MB). We add a bit + // for slack. + + const BIG_FILE_SIZE: usize = 105*MB; + const SMALL_FILE_SIZE: usize = 10; + const NUM_SMALL_FILES: usize = 100_000; + const TOLERABLE_PER_FILE_OVERHEAD: isize = 200 as isize; + const TOLERABLE_CRIU_RECEIVE_OVERHEAD: isize = 12*MB as isize; + + struct Test { + start_mem_size: Option + } + + impl Test { + fn new() -> Self { + Self { start_mem_size: None } + } + } + + impl TestImpl for Test { + fn send_img_files(&mut self, checkpoint: &mut CheckpointContext) -> Result<()> { + self.start_mem_size = Some(get_resident_mem_size()); + + let small = get_filled_vec(SMALL_FILE_SIZE, 1); + + for i in 0..NUM_SMALL_FILES { + let filename = format!("small-{}.img", i); + checkpoint.criu.write_img_file(&checkpoint.snapshot_id, &filename)?.write_all(&small)?; + } + + // Writing the big file in small chunks, to prevent blowing up memory with a large + // vector that may not get freed. + let mut big_file_pipe = checkpoint.criu.write_img_file(&checkpoint.snapshot_id, "big.img")?; + let buf = get_filled_vec(1*KB, 1); + for _ in 0..(BIG_FILE_SIZE/buf.len()) { + big_file_pipe.write_all(&buf)?; + } + + Ok(()) + } + + fn after_finish_image_extraction(&mut self, _restore_stats: &Stats) -> Result<()> { + let extraction_use = get_resident_mem_size() as isize - self.start_mem_size.unwrap() as isize; + let overhead = extraction_use as isize - (BIG_FILE_SIZE + NUM_SMALL_FILES * SMALL_FILE_SIZE) as isize; + let overhead_per_file = overhead / (1 + NUM_SMALL_FILES) as isize; + + assert!(overhead_per_file < TOLERABLE_PER_FILE_OVERHEAD, + "In-memory image store shows too much memory overhead per file: {} bytes", overhead_per_file); + eprintln!("Per file overhead: {} bytes", overhead_per_file); + Ok(()) + } + + fn recv_img_files(&mut self, restore: &mut RestoreContext) -> Result<()> { + let start_recv_mem_usage = get_resident_mem_size(); + + let mut big_file = Vec::with_capacity(BIG_FILE_SIZE); + let big_file_pipe = &restore.criu.read_img_file(&restore.snapshot_id, "big.img")?; + let mut max_overhead = 0; + loop { + let count = big_file_pipe.take(10*KB as u64).read_to_end(&mut big_file)?; + + let delta_recv_mem_usage = get_resident_mem_size() as isize - start_recv_mem_usage as isize; + max_overhead = max(max_overhead, delta_recv_mem_usage); + + if count == 0 { + break; // EOF + } + } + + assert!(max_overhead < TOLERABLE_CRIU_RECEIVE_OVERHEAD, + "Memory budget exceeded while CRIU reads a large file: {} MB", max_overhead/MB as isize); + eprintln!("Large file overhead: {} MB", max_overhead/MB as isize); + + Ok(()) + } + } + + #[test] + fn test() -> Result<()> { + Test::new().run() + } +} + +mod stress { + use super::*; + use crossbeam_utils::thread; + + const NUM_THREADS: usize = 5; + const NUM_SMALL_FILES: usize = 10000; + const NUM_MEDIUM_FILES: usize = 1000; + const NUM_MEDIUM_CHUNKED_FILES: usize = 1000; + const NUM_LARGE_FILES: usize = 2; + + const SMALL_FILE_SIZE: usize = 10; + const MEDIUM_FILE_SIZE: usize = 10*KB; + const LARGE_FILE_SIZE: usize = 25*MB; // 25MB to have a few mmap buffers + const CHUNK_SIZE: usize = 10; + + struct Test { + small_file: Vec, + medium_file: Vec, + large_file: Vec, + } + + impl Test { + fn new() -> Self { + let small_file = get_rand_vec(SMALL_FILE_SIZE); + let medium_file = get_rand_vec(MEDIUM_FILE_SIZE); + let large_file = get_rand_vec(LARGE_FILE_SIZE); + + Self { small_file, medium_file, large_file } + } + } + + impl TestImpl for Test { + fn send_img_files(&mut self, checkpoint: &mut CheckpointContext) -> Result<()> { + let write_img_file = |name: &str, data: &Vec| -> Result<()> { + checkpoint.criu + .write_img_file(&checkpoint.snapshot_id, name)? + .write_all(data)?; + Ok(()) + }; + + let write_img_file_chunked = |name: &str, data: &Vec| -> Result<()> { + let mut pipe = checkpoint.criu.write_img_file(&checkpoint.snapshot_id, name)?; + + let mut offset = 0; + while offset < data.len() { + let to_write = min(data.len() - offset, CHUNK_SIZE); + pipe.write_all(&data[offset..offset+to_write])?; + offset += to_write; + } + + Ok(()) + }; + + let write_files = |i: usize| { + for j in 0..NUM_MEDIUM_FILES { + write_img_file(&format!("medium-{}-{}.img", i, j), &self.medium_file).unwrap(); + } + for j in 0..NUM_LARGE_FILES { + write_img_file(&format!("large-{}-{}.img", i, j), &self.large_file).unwrap(); + } + for j in 0..NUM_SMALL_FILES { + write_img_file(&format!("small-{}-{}.img", i, j), &self.small_file).unwrap(); + } + for j in 0..NUM_MEDIUM_CHUNKED_FILES { + write_img_file_chunked(&format!("mediumc-{}-{}.img", i, j), &self.medium_file).unwrap(); + } + }; + + // Write without threads first. Should be easier + write_files(0); + + thread::scope(|s| { + for i in 0..NUM_THREADS { + s.spawn(move |_| write_files(i+1)); + } + }).unwrap(); + + Ok(()) + } + + fn recv_img_files(&mut self, restore: &mut RestoreContext) -> Result<()> { + let read_img_file = |name: &str| -> Result> { + restore.criu.read_img_file_into_vec(&restore.snapshot_id, name) + }; + + for i in 0..NUM_THREADS+1 { + for j in 0..NUM_MEDIUM_FILES { + assert!(read_img_file(&format!("medium-{}-{}.img", i, j))? == self.medium_file); + } + for j in 0..NUM_LARGE_FILES { + assert!(read_img_file(&format!("large-{}-{}.img", i, j))? == self.large_file); + } + for j in 0..NUM_SMALL_FILES { + assert!(read_img_file(&format!("small-{}-{}.img", i, j))? == self.small_file); + } + for j in 0..NUM_MEDIUM_CHUNKED_FILES { + assert!(read_img_file(&format!("mediumc-{}-{}.img", i, j))? == self.medium_file); + } + } + + Ok(()) + } + } + + #[test] + fn test() -> Result<()> { + Test::new().run() + } +} + +mod splice_bug { + // The 4.14.67 kernel can corrupt data with splice(), but the 4.14.121 has that bug fixed. + // This test makes it obvious. It's a subset of the stress test above, but much faster. + // Maybe the fix is https://github.com/torvalds/linux/commit/1bdc347 + // We should be careful not to use such buggy kernel. + use super::*; + + const NUM_MEDIUM_CHUNKED_FILES: usize = 100; + const MEDIUM_FILE_SIZE: usize = 10*KB; + const CHUNK_SIZE: usize = 10; + + struct Test { + medium_file: Vec, + } + + impl Test { + fn new() -> Self { + let medium_file = get_rand_vec(MEDIUM_FILE_SIZE); + Self { medium_file } + } + } + + impl TestImpl for Test { + fn send_img_files(&mut self, checkpoint: &mut CheckpointContext) -> Result<()> { + let write_img_file_chunked = |name: &str, data: &Vec| -> Result<()> { + let mut pipe = checkpoint.criu.write_img_file(&checkpoint.snapshot_id, name)?; + + let mut offset = 0; + while offset < data.len() { + let to_write = min(data.len() - offset, CHUNK_SIZE); + pipe.write_all(&data[offset..offset+to_write])?; + offset += to_write; + } + + Ok(()) + }; + + let write_files = |i: usize| { + for j in 0..NUM_MEDIUM_CHUNKED_FILES { + write_img_file_chunked(&format!("mediumc-{}-{}.img", i, j), &self.medium_file).unwrap(); + } + }; + + write_files(0); + + Ok(()) + } + + fn recv_img_files(&mut self, restore: &mut RestoreContext) -> Result<()> { + let read_img_file = |name: &str| -> Result> { + restore.criu.read_img_file_into_vec(&restore.snapshot_id, name) + }; + + let i = 0; + for j in 0..NUM_MEDIUM_CHUNKED_FILES { + assert!(read_img_file(&format!("mediumc-{}-{}.img", i, j))? == self.medium_file); + } + + Ok(()) + } + } + + #[test] + fn test() -> Result<()> { + Test::new().run() + } +} + +mod extract_to_disk { + use super::*; + use std::fs::File; + + struct Test { + small_file: Vec, + medium_file: Vec, + } + + impl Test { + fn new() -> Self { + let small_file = get_rand_vec(1*KB); + let medium_file = get_rand_vec(100*KB); + Self { small_file, medium_file } + } + } + + impl TestImpl for Test { + fn serve_image(&mut self) -> bool { false } + + fn send_img_files(&mut self, checkpoint: &mut CheckpointContext) -> Result<()> { + checkpoint.criu.write_img_file(&checkpoint.snapshot_id, "small.img")? + .write_all(&self.small_file)?; + checkpoint.criu.write_img_file(&checkpoint.snapshot_id, "medium.img")? + .write_all(&self.medium_file)?; + Ok(()) + } + + fn after_finish_image_extraction(&mut self, _restore_stats: &Stats) -> Result<()> { + let read_img_file = |name: &str| -> Result> { + let mut buf = Vec::new(); + let mut small_file = File::open(self.images_dir().join(name))?; + small_file.read_to_end(&mut buf)?; + Ok(buf) + }; + + assert!(read_img_file("small.img")? == self.small_file); + assert!(read_img_file("medium.img")? == self.medium_file); + + Ok(()) + } + } + + #[test] + fn test() -> Result<()> { + Test::new().run() + } +}