diff --git a/Cargo.lock b/Cargo.lock index b61962a65e8..c322259d790 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,7 +65,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -182,7 +182,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -451,9 +451,9 @@ dependencies = [ [[package]] name = "arrayvec" -version = "0.7.4" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "asn1-rs" @@ -506,7 +506,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", "synstructure 0.13.1", ] @@ -529,7 +529,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -583,7 +583,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -594,7 +594,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -861,7 +861,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.74", + "syn 2.0.75", "which 4.4.2", ] @@ -1222,9 +1222,9 @@ dependencies = [ [[package]] name = "clap_complete" -version = "4.5.17" +version = "4.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a798c18087ce8dc78a0e49e8babd720dc71818b1aab77cf443e9a06129fadcb" +checksum = "1ee158892bd7ce77aa15c208abbdb73e155d191c287a659b57abd5adb92feb03" dependencies = [ "clap", ] @@ -1238,7 +1238,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1510,7 +1510,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f" dependencies = [ "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1542,7 +1542,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1566,7 +1566,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1577,7 +1577,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1663,7 +1663,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1673,7 +1673,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" dependencies = [ "derive_builder_core", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1686,7 +1686,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1785,7 +1785,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1893,7 +1893,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1905,7 +1905,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -2279,7 +2279,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -2755,7 +2755,7 @@ dependencies = [ "hyper-rustls 0.27.2", "hyper-util", "pin-project-lite", - "rustls-native-certs 0.7.1", + "rustls-native-certs 0.7.2", "tokio", "tokio-rustls 0.26.0", "tower-service", @@ -2802,7 +2802,7 @@ dependencies = [ "hyper-util", "log", "rustls 0.23.12", - "rustls-native-certs 0.7.1", + "rustls-native-certs 0.7.2", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -3347,7 +3347,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3401,9 +3401,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.156" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" [[package]] name = "libloading" @@ -3551,7 +3551,7 @@ dependencies = [ "clap", "glob", "rand", - "syn 2.0.74", + "syn 2.0.75", "thiserror", "tracing", "tracing-subscriber", @@ -3609,7 +3609,7 @@ checksum = "dcf09caffaac8068c346b6df2a7fc27a177fd20b39421a39ce0a211bde679a6c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3702,7 +3702,7 @@ dependencies = [ "prettytable-rs", "rcgen", "reqwest 0.12.5", - "rstest", + "rstest 0.21.0", "rustls 0.23.12", "rustls-pemfile 2.1.3", "semver 1.0.23", @@ -3717,7 +3717,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", - "which 6.0.2", + "which 6.0.3", ] [[package]] @@ -3755,7 +3755,7 @@ dependencies = [ "rawsocket", "rcgen", "regex", - "rstest", + "rstest 0.21.0", "rustls 0.23.12", "semver 1.0.23", "serde", @@ -3821,7 +3821,7 @@ dependencies = [ "mirrord-analytics", "mirrord-config-derive", "nom", - "rstest", + "rstest 0.21.0", "schemars", "serde", "serde_json", @@ -3839,7 +3839,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3911,7 +3911,7 @@ dependencies = [ "mirrord-protocol", "rand", "regex", - "rstest", + "rstest 0.22.0", "serde", "serde_json", "shellexpand", @@ -3955,7 +3955,7 @@ dependencies = [ "num-traits", "rand", "regex", - "rstest", + "rstest 0.22.0", "serde_json", "socket2", "syscalls", @@ -3975,7 +3975,7 @@ version = "3.114.1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3985,7 +3985,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "semver 1.0.23", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4010,7 +4010,7 @@ dependencies = [ "mirrord-protocol", "rand", "reqwest 0.12.5", - "rstest", + "rstest 0.21.0", "schemars", "semver 1.0.23", "serde", @@ -4034,7 +4034,7 @@ dependencies = [ [[package]] name = "mirrord-protocol" -version = "1.8.3" +version = "1.9.0" dependencies = [ "actix-codec", "bincode", @@ -4066,7 +4066,7 @@ dependencies = [ "tempfile", "thiserror", "tracing", - "which 6.0.2", + "which 6.0.3", ] [[package]] @@ -4093,7 +4093,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4264,7 +4264,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4508,7 +4508,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4567,7 +4567,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4646,7 +4646,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4752,7 +4752,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4852,7 +4852,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4928,7 +4928,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", "version_check", "yansi", ] @@ -4960,7 +4960,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.74", + "syn 2.0.75", "tempfile", ] @@ -4974,7 +4974,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -5348,7 +5348,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls 0.23.12", - "rustls-native-certs 0.7.1", + "rustls-native-certs 0.7.2", "rustls-pemfile 2.1.3", "rustls-pki-types", "serde", @@ -5420,7 +5420,19 @@ checksum = "9afd55a67069d6e434a95161415f5beeada95a01c7b815508a82dcb0e1593682" dependencies = [ "futures", "futures-timer", - "rstest_macros", + "rstest_macros 0.21.0", + "rustc_version", +] + +[[package]] +name = "rstest" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b423f0e62bdd61734b67cd21ff50871dfaeb9cc74f869dcd6af974fbcb19936" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros 0.22.0", "rustc_version", ] @@ -5438,7 +5450,25 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.74", + "syn 2.0.75", + "unicode-ident", +] + +[[package]] +name = "rstest_macros" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e1711e7d14f74b12a58411c542185ef7fb7f2e7f8ee6e2940a883628522b42" +dependencies = [ + "cfg-if", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.75", "unicode-ident", ] @@ -5576,9 +5606,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +checksum = "04182dffc9091a404e0fc069ea5cd60e5b866c3adf881eff99a32d048242dffa" dependencies = [ "openssl-probe", "rustls-pemfile 2.1.3", @@ -5714,7 +5744,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -5740,7 +5770,7 @@ checksum = "7f81c2fde025af7e69b1d1420531c8a8811ca898919db177141a85313b1cb932" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -5864,7 +5894,7 @@ checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -5875,7 +5905,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -5908,7 +5938,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -6174,7 +6204,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -6217,9 +6247,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.74" +version = "2.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fceb41e3d546d0bd83421d3409b1460cc7444cd389341a4c880fe7a042cb3d7" +checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" dependencies = [ "proc-macro2", "quote", @@ -6258,7 +6288,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -6417,7 +6447,7 @@ dependencies = [ "rand", "regex", "reqwest 0.12.5", - "rstest", + "rstest 0.21.0", "rustls 0.23.12", "serde", "serde_json", @@ -6455,7 +6485,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -6525,9 +6555,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.2" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", @@ -6559,7 +6589,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -6747,7 +6777,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -6821,7 +6851,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -6896,7 +6926,7 @@ dependencies = [ "log", "rand", "rustls 0.22.4", - "rustls-native-certs 0.7.1", + "rustls-native-certs 0.7.2", "rustls-pki-types", "sha1", "thiserror", @@ -7194,7 +7224,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", "wasm-bindgen-shared", ] @@ -7228,7 +7258,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -7278,9 +7308,9 @@ dependencies = [ [[package]] name = "which" -version = "6.0.2" +version = "6.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d9c5ed668ee1f17edb3b627225343d210006a90bb1e3745ce1f30b1fb115075" +checksum = "b4ee928febd44d98f2f459a4a79bd4d928591333a494a10a868418ac1b39cf1f" dependencies = [ "either", "home", @@ -7672,7 +7702,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -7692,7 +7722,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] diff --git a/changelog.d/2611.changed.md b/changelog.d/2611.changed.md new file mode 100644 index 00000000000..0124169e364 --- /dev/null +++ b/changelog.d/2611.changed.md @@ -0,0 +1 @@ +Adds a batched readdir, which should hopefully improve the performance when reading many dirs. Introduces a new ReadDirBatched message to the protocol. \ No newline at end of file diff --git a/mirrord-schema.json b/mirrord-schema.json index e456db15f34..c8549ef7e77 100644 --- a/mirrord-schema.json +++ b/mirrord-schema.json @@ -1243,7 +1243,7 @@ }, "log_level": { "title": "internal_proxy.log_level {#internal_proxy-log_level}", - "description": "Set the log level for the internal proxy. RUST_LOG convention (i.e `mirrord=trace`) will only be used if log_destination is set", + "description": "Set the log level for the internal proxy. RUST_LOG convention (i.e `mirrord=trace`) will only be used if `log_destination` is set.", "type": [ "string", "null" diff --git a/mirrord/agent/src/file.rs b/mirrord/agent/src/file.rs index aff16f3fd5d..807fb7c51b9 100644 --- a/mirrord/agent/src/file.rs +++ b/mirrord/agent/src/file.rs @@ -12,19 +12,8 @@ use std::{ use faccess::{AccessMode, PathExt}; use libc::DT_DIR; -use mirrord_protocol::{ - file::{ - AccessFileRequest, AccessFileResponse, CloseDirRequest, CloseFileRequest, DirEntryInternal, - FdOpenDirRequest, GetDEnts64Request, GetDEnts64Response, OpenDirResponse, OpenFileRequest, - OpenFileResponse, OpenOptionsInternal, OpenRelativeFileRequest, ReadDirRequest, - ReadDirResponse, ReadFileRequest, ReadFileResponse, ReadLimitedFileRequest, - ReadLinkFileRequest, ReadLinkFileResponse, SeekFileRequest, SeekFileResponse, - WriteFileRequest, WriteFileResponse, WriteLimitedFileRequest, XstatFsRequest, - XstatFsResponse, XstatRequest, XstatResponse, - }, - FileRequest, FileResponse, RemoteResult, ResponseError, -}; -use tracing::{error, trace}; +use mirrord_protocol::{file::*, FileRequest, FileResponse, RemoteResult, ResponseError}; +use tracing::{error, trace, Level}; use crate::error::Result; @@ -216,15 +205,20 @@ impl FileManager { let xstat_result = self.xstatfs(fd); Some(FileResponse::XstatFs(xstat_result)) } + + // dir operations FileRequest::FdOpenDir(FdOpenDirRequest { remote_fd }) => { let open_dir_result = self.fdopen_dir(remote_fd); Some(FileResponse::OpenDir(open_dir_result)) } - FileRequest::ReadDir(ReadDirRequest { remote_fd }) => { let read_dir_result = self.read_dir(remote_fd); Some(FileResponse::ReadDir(read_dir_result)) } + FileRequest::ReadDirBatch(ReadDirBatchRequest { remote_fd, amount }) => { + let read_dir_result = self.read_dir_batch(remote_fd, amount); + Some(FileResponse::ReadDirBatch(read_dir_result)) + } FileRequest::CloseDir(CloseDirRequest { remote_fd }) => { self.close_dir(remote_fd); None @@ -723,7 +717,7 @@ impl FileManager { } } - #[tracing::instrument(level = "trace", skip(self))] + #[tracing::instrument(level = Level::TRACE, skip(self), ret)] pub(crate) fn read_dir(&mut self, fd: u64) -> RemoteResult { let dir_stream = self.get_dir_stream(fd)?; let result = if let Some(offset_entry_pair) = dir_stream.next() { @@ -737,6 +731,25 @@ impl FileManager { Ok(result) } + /// Instead of returning just 1 [`DirEntryInternal`] from a `readdir` call (which in + /// Rust means advancing the [`read_dir`](std::fs::read_dir) iterator), we return + /// an iterator with (at most) `amount` items. + #[tracing::instrument(level = Level::TRACE, skip(self), ret)] + pub(crate) fn read_dir_batch( + &mut self, + fd: u64, + amount: usize, + ) -> RemoteResult { + let result = self + .get_dir_stream(fd)? + .take(amount) + .map(DirEntryInternal::try_from) + .try_collect::>() + .map(|dir_entries| ReadDirBatchResponse { fd, dir_entries })?; + + Ok(result) + } + /// The getdents64 syscall writes dir entries to a buffer, as long as they fit. /// If a call did not process all the entries in a dir, the result of the next call continues /// where the last one stopped. diff --git a/mirrord/agent/src/main.rs b/mirrord/agent/src/main.rs index bdfea92bac3..82c3000bdb3 100644 --- a/mirrord/agent/src/main.rs +++ b/mirrord/agent/src/main.rs @@ -2,6 +2,7 @@ #![feature(let_chains)] #![feature(type_alias_impl_trait)] #![feature(entry_insert)] +#![feature(iterator_try_collect)] #![cfg_attr(target_os = "linux", feature(tcp_quickack))] #![feature(lazy_cell)] #![warn(clippy::indexing_slicing)] diff --git a/mirrord/config/src/internal_proxy.rs b/mirrord/config/src/internal_proxy.rs index b8f9a4f18b0..46591f2c25a 100644 --- a/mirrord/config/src/internal_proxy.rs +++ b/mirrord/config/src/internal_proxy.rs @@ -85,12 +85,14 @@ pub struct InternalProxyConfig { pub idle_timeout: u64, /// ### internal_proxy.log_level {#internal_proxy-log_level} + /// /// Set the log level for the internal proxy. - /// RUST_LOG convention (i.e `mirrord=trace`) - /// will only be used if log_destination is set + /// RUST_LOG convention (i.e `mirrord=trace`) will only be used if `log_destination` + /// is set. pub log_level: Option, /// ### internal_proxy.log_destination {#internal_proxy-log_destination} + /// /// Set the log file destination for the internal proxy. pub log_destination: Option, diff --git a/mirrord/intproxy/protocol/src/lib.rs b/mirrord/intproxy/protocol/src/lib.rs index b0efe550adb..8f256efc6ee 100644 --- a/mirrord/intproxy/protocol/src/lib.rs +++ b/mirrord/intproxy/protocol/src/lib.rs @@ -11,14 +11,7 @@ use std::{ use bincode::{Decode, Encode}; use mirrord_protocol::{ dns::{GetAddrInfoRequest, GetAddrInfoResponse}, - file::{ - AccessFileRequest, AccessFileResponse, CloseDirRequest, CloseFileRequest, FdOpenDirRequest, - GetDEnts64Request, GetDEnts64Response, OpenDirResponse, OpenFileRequest, OpenFileResponse, - OpenRelativeFileRequest, ReadDirRequest, ReadDirResponse, ReadFileRequest, - ReadFileResponse, ReadLimitedFileRequest, ReadLinkFileRequest, ReadLinkFileResponse, - SeekFileRequest, SeekFileResponse, WriteFileRequest, WriteFileResponse, - WriteLimitedFileRequest, XstatFsRequest, XstatFsResponse, XstatRequest, XstatResponse, - }, + file::*, outgoing::SocketAddress, tcp::StealType, FileRequest, FileResponse, GetEnvVarsRequest, Port, RemoteResult, @@ -372,6 +365,15 @@ impl_request!( res_path = ProxyToLayerMessage::File => FileResponse::ReadDir, ); +// The `req = ReadDirBatchRequest` is a lie, the layer doesn't use this message, it'll use +// the regular `ReadDirRequest`, but we're doing it like this here to keep things simple. +impl_request!( + req = ReadDirBatchRequest, + res = RemoteResult, + req_path = LayerToProxyMessage::File => FileRequest::ReadDirBatch, + res_path = ProxyToLayerMessage::File => FileResponse::ReadDirBatch, +); + impl_request!( req = GetDEnts64Request, res = RemoteResult, diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index d1a2fd991c5..22abd5b1d92 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(map_try_insert)] #![warn(clippy::indexing_slicing)] use std::{collections::HashMap, time::Duration}; @@ -300,6 +301,13 @@ impl IntProxy { self.task_txs.agent.send(ClientMessage::ReadyForLogs).await; } + self.task_txs + .simple + .send(SimpleProxyMessage::ProtocolVersion( + protocol_version.clone(), + )) + .await; + self.task_txs .incoming .send(IncomingProxyMessage::AgentProtocolVersion(protocol_version)) diff --git a/mirrord/intproxy/src/proxies/simple.rs b/mirrord/intproxy/src/proxies/simple.rs index 2f5d31a0d5e..4405c94d778 100644 --- a/mirrord/intproxy/src/proxies/simple.rs +++ b/mirrord/intproxy/src/proxies/simple.rs @@ -1,14 +1,20 @@ //! The most basic proxying logic. Handles cases when the only job to do in the internal proxy is to //! pass requests and responses between the layer and the agent. -use std::collections::HashMap; +use std::{collections::HashMap, vec::IntoIter}; use mirrord_intproxy_protocol::{LayerId, MessageId, ProxyToLayerMessage}; use mirrord_protocol::{ dns::{GetAddrInfoRequest, GetAddrInfoResponse}, - file::{CloseDirRequest, CloseFileRequest, OpenDirResponse, OpenFileResponse}, - ClientMessage, FileRequest, FileResponse, GetEnvVarsRequest, RemoteResult, + file::{ + CloseDirRequest, CloseFileRequest, DirEntryInternal, OpenDirResponse, OpenFileResponse, + ReadDirBatchRequest, ReadDirBatchResponse, ReadDirRequest, ReadDirResponse, + READDIR_BATCH_VERSION, + }, + ClientMessage, FileRequest, FileResponse, GetEnvVarsRequest, RemoteResult, ResponseError, }; +use semver::Version; +use thiserror::Error; use crate::{ background_tasks::{BackgroundTask, MessageBus}, @@ -18,6 +24,7 @@ use crate::{ ProxyMessage, }; +#[derive(Debug)] pub enum SimpleProxyMessage { FileReq(MessageId, LayerId, FileRequest), FileRes(FileResponse), @@ -27,20 +34,56 @@ pub enum SimpleProxyMessage { LayerClosed(LayerClosed), GetEnvReq(MessageId, LayerId, GetEnvVarsRequest), GetEnvRes(RemoteResult>), + ProtocolVersion(Version), } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] -enum RemoteFd { +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub(crate) enum RemoteFd { File(u64), Dir(u64), } +#[derive(Clone)] +pub(crate) enum FileResource { + File, + Dir { + dirs_iter: IntoIter, + }, +} + +#[derive(Error, Debug)] +enum FileError { + #[error("Resource `{0}` not found!")] + MissingResource(u64), + + #[error("Dir operation called on file `{0}`!")] + DirOnFile(u64), +} + +impl From for ResponseError { + fn from(file_fail: FileError) -> Self { + match file_fail { + FileError::MissingResource(remote_fd) => ResponseError::NotFound(remote_fd), + FileError::DirOnFile(remote_fd) => ResponseError::NotDirectory(remote_fd), + } + } +} + +impl FileResource { + fn next_dir(&mut self, remote_fd: u64) -> Result, FileError> { + match self { + FileResource::Dir { dirs_iter } => dirs_iter.next().map(Ok).transpose(), + FileResource::File => Err(FileError::DirOnFile(remote_fd)), + } + } +} + /// For passing messages between the layer and the agent without custom internal logic. /// Run as a [`BackgroundTask`]. #[derive(Default)] pub struct SimpleProxy { /// Remote descriptors for open files and directories. Allows tracking across layer forks. - remote_fds: RemoteResources, + remote_fds: RemoteResources, /// For [`FileRequest`]s. file_reqs: RequestQueue, /// For [`GetAddrInfoRequest`]s. @@ -49,14 +92,76 @@ pub struct SimpleProxy { get_env_reqs: RequestQueue, } +impl SimpleProxy { + /// `readdir` works by keeping an iterator of all the `dir`s, and a call to it is + /// equivalent to doing `iterator.next()`. + /// + /// For efficiency, whenever we receive a `readdir` request from the layer, we try to + /// read more than just `next()` from the agent, while returning just the next `direntry` + /// back to layer. + async fn handle_readdir( + &mut self, + layer_id: LayerId, + remote_fd: u64, + message_id: u64, + protocol_version: Option<&Version>, + message_bus: &mut MessageBus, + ) -> Result<(), FileError> { + let resource = self + .remote_fds + .get_mut(&layer_id, &RemoteFd::Dir(remote_fd)) + .ok_or(FileError::MissingResource(remote_fd))?; + + if let Some(dir) = resource.next_dir(remote_fd)? { + message_bus + .send(ToLayer { + message_id, + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( + ReadDirResponse { + direntry: Some(dir), + }, + ))), + layer_id, + }) + .await; + } else { + self.file_reqs.insert(message_id, layer_id); + + let request = + if protocol_version.is_some_and(|version| READDIR_BATCH_VERSION.matches(version)) { + FileRequest::ReadDirBatch(ReadDirBatchRequest { + remote_fd, + amount: 128, + }) + } else { + FileRequest::ReadDir(ReadDirRequest { remote_fd }) + }; + + // Convert it into a `ReadDirBatch` for the agent. + message_bus + .send(ProxyMessage::ToAgent(ClientMessage::FileRequest(request))) + .await; + } + + Ok(()) + } +} + impl BackgroundTask for SimpleProxy { type Error = RequestQueueEmpty; type MessageIn = SimpleProxyMessage; type MessageOut = ProxyMessage; async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), RequestQueueEmpty> { + let mut protocol_version = None; + while let Some(msg) = message_bus.recv().await { + tracing::trace!(?msg, "new message in message_bus"); + match msg { + SimpleProxyMessage::ProtocolVersion(new_protocol_version) => { + protocol_version = Some(new_protocol_version); + } SimpleProxyMessage::FileReq( _, layer_id, @@ -85,6 +190,33 @@ impl BackgroundTask for SimpleProxy { .await; } } + SimpleProxyMessage::FileReq( + message_id, + layer_id, + FileRequest::ReadDir(ReadDirRequest { remote_fd }), + ) => { + if let Err(fail) = self + .handle_readdir( + layer_id, + remote_fd, + message_id, + protocol_version.as_ref(), + message_bus, + ) + .await + { + // Send local failure to layer. + message_bus + .send(ToLayer { + message_id, + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Err( + fail.into(), + ))), + layer_id, + }) + .await; + } + } SimpleProxyMessage::FileReq(message_id, session_id, req) => { self.file_reqs.insert(message_id, session_id); message_bus @@ -94,7 +226,8 @@ impl BackgroundTask for SimpleProxy { SimpleProxyMessage::FileRes(FileResponse::Open(Ok(OpenFileResponse { fd }))) => { let (message_id, layer_id) = self.file_reqs.get()?; - self.remote_fds.add(layer_id, RemoteFd::File(fd)); + self.remote_fds + .add(layer_id, RemoteFd::File(fd), FileResource::File); message_bus .send(ToLayer { @@ -109,7 +242,13 @@ impl BackgroundTask for SimpleProxy { SimpleProxyMessage::FileRes(FileResponse::OpenDir(Ok(OpenDirResponse { fd }))) => { let (message_id, layer_id) = self.file_reqs.get()?; - self.remote_fds.add(layer_id, RemoteFd::Dir(fd)); + self.remote_fds.add( + layer_id, + RemoteFd::Dir(fd), + FileResource::Dir { + dirs_iter: IntoIter::default(), + }, + ); message_bus .send(ToLayer { @@ -121,6 +260,30 @@ impl BackgroundTask for SimpleProxy { }) .await; } + SimpleProxyMessage::FileRes(FileResponse::ReadDirBatch(Ok( + ReadDirBatchResponse { fd, dir_entries }, + ))) => { + let (message_id, layer_id) = self.file_reqs.get()?; + + let mut entries_iter = dir_entries.into_iter(); + let direntry = entries_iter.next(); + + message_bus + .send(ToLayer { + message_id, + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( + ReadDirResponse { direntry }, + ))), + layer_id, + }) + .await; + + if let Some(FileResource::Dir { dirs_iter }) = + self.remote_fds.get_mut(&layer_id, &RemoteFd::Dir(fd)) + { + *dirs_iter = entries_iter; + } + } SimpleProxyMessage::FileRes(res) => { let (message_id, layer_id) = self.file_reqs.get()?; message_bus @@ -187,3 +350,196 @@ impl BackgroundTask for SimpleProxy { Ok(()) } } + +#[cfg(test)] +mod tests { + + use mirrord_intproxy_protocol::{LayerId, ProxyToLayerMessage}; + use mirrord_protocol::{ + file::{ + FdOpenDirRequest, OpenDirResponse, ReadDirBatchRequest, ReadDirBatchResponse, + ReadDirRequest, ReadDirResponse, + }, + ClientMessage, FileRequest, FileResponse, + }; + use semver::Version; + + use super::SimpleProxy; + use crate::{ + background_tasks::{BackgroundTasks, TaskSender, TaskUpdate}, + error::IntProxyError, + main_tasks::{MainTaskId, ProxyMessage, ToLayer}, + proxies::simple::SimpleProxyMessage, + }; + + /// Sets up a [`TaskSender`] and [`BackgroundTasks`] for a functioning [`SimpleProxy`]. + /// + /// - `protocol_version`: allows specifying the version of the protocol to use for + /// testing out potential mismatches in messages. + async fn setup_proxy( + protocol_version: Version, + ) -> ( + TaskSender, + BackgroundTasks, + ) { + let mut tasks: BackgroundTasks = + Default::default(); + + let proxy = tasks.register(SimpleProxy::default(), MainTaskId::SimpleProxy, 32); + + proxy + .send(SimpleProxyMessage::ProtocolVersion(protocol_version)) + .await; + + (proxy, tasks) + } + + /// Convenience for opening a dir. + async fn prepare_dir( + proxy: &TaskSender, + tasks: &mut BackgroundTasks, + ) { + let request = FileRequest::FdOpenDir(FdOpenDirRequest { remote_fd: 0xdad }); + proxy + .send(SimpleProxyMessage::FileReq(0xbad, LayerId(0xa55), request)) + .await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToAgent( + ClientMessage::FileRequest(FileRequest::FdOpenDir(FdOpenDirRequest { .. }),) + ))) + ), + "Mismatched message for `FdOpenDirRequest` {update:?}!" + ); + + let response = FileResponse::OpenDir(Ok(OpenDirResponse { fd: 0xdad })); + proxy.send(SimpleProxyMessage::FileRes(response)).await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer { + message_id: 0xbad, + layer_id: LayerId(0xa55), + message: ProxyToLayerMessage::File(FileResponse::OpenDir(Ok( + OpenDirResponse { .. } + ))) + }))) + ), + "Mismatched message for `OpenDirResponse` {update:?}!" + ); + } + + #[tokio::test] + async fn old_protocol_uses_read_dir_request() { + let (proxy, mut tasks) = setup_proxy(Version::new(0, 1, 0)).await; + + prepare_dir(&proxy, &mut tasks).await; + + let readdir_request = FileRequest::ReadDir(ReadDirRequest { remote_fd: 0xdad }); + proxy + .send(SimpleProxyMessage::FileReq( + 0xbad, + LayerId(0xa55), + readdir_request, + )) + .await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToAgent( + ClientMessage::FileRequest(FileRequest::ReadDir(ReadDirRequest { .. })) + ))) + ), + "Mismatched message for `ReadDirRequest` {update:?}!" + ); + + let readdir_response = FileResponse::ReadDir(Ok(ReadDirResponse { direntry: None })); + proxy + .send(SimpleProxyMessage::FileRes(readdir_response)) + .await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer { + message_id: 0xbad, + layer_id: LayerId(0xa55), + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( + ReadDirResponse { .. } + ))) + }))) + ), + "Mismatched message for `ReadDirResponse` {update:?}!" + ); + + drop(proxy); + let results = tasks.results().await; + for (_, result) in results { + assert!(result.is_ok(), "{result:?}"); + } + } + + #[tokio::test] + async fn new_protocol_uses_read_dir_batch_request() { + let (proxy, mut tasks) = setup_proxy(Version::new(1, 8, 3)).await; + + prepare_dir(&proxy, &mut tasks).await; + + let request = FileRequest::ReadDirBatch(ReadDirBatchRequest { + remote_fd: 0xdad, + amount: 0xca7, + }); + proxy + .send(SimpleProxyMessage::FileReq(0xbad, LayerId(0xa55), request)) + .await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToAgent( + ClientMessage::FileRequest(FileRequest::ReadDirBatch(ReadDirBatchRequest { + remote_fd: 0xdad, + amount: 0xca7 + })) + ))) + ), + "Mismatched message for `ReadDirBatchRequest` {update:?}!" + ); + + let response = FileResponse::ReadDirBatch(Ok(ReadDirBatchResponse { + fd: 0xdad, + dir_entries: Vec::new(), + })); + proxy.send(SimpleProxyMessage::FileRes(response)).await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer { + message_id: 0xbad, + layer_id: LayerId(0xa55), + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( + ReadDirResponse { .. } + ))) + }))) + ), + "Mismatched message for `ReadDirBatchResponse` {update:?}!" + ); + + drop(proxy); + let results = tasks.results().await; + for (_, result) in results { + assert!(result.is_ok(), "{result:?}"); + } + } +} diff --git a/mirrord/intproxy/src/remote_resources.rs b/mirrord/intproxy/src/remote_resources.rs index e5bf6d8ccd4..a195a75c1b8 100644 --- a/mirrord/intproxy/src/remote_resources.rs +++ b/mirrord/intproxy/src/remote_resources.rs @@ -1,18 +1,21 @@ use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap}, hash::Hash, }; use mirrord_intproxy_protocol::LayerId; +use tracing::Level; + +use crate::proxies::simple::FileResource; /// For tracking remote resources allocated in the agent: open files and directories, port /// subscriptions. Remote resources can be shared by multiple layer instances because of forks. -pub struct RemoteResources { - by_layer: HashMap>, +pub struct RemoteResources { + by_layer: HashMap>, counts: HashMap, } -impl Default for RemoteResources { +impl Default for RemoteResources { fn default() -> Self { Self { by_layer: Default::default(), @@ -21,32 +24,18 @@ impl Default for RemoteResources { } } -impl RemoteResources +impl RemoteResources where - T: Clone + PartialEq + Eq + Hash, + T: Clone + PartialEq + Eq + Hash + core::fmt::Debug, + Resource: Clone, { - /// Adds the given resource to the layer instance with the given [`LayerId`]. - /// - /// Used when the layer opens a resource, e.g. with - /// [`OpenFileRequest`](mirrord_protocol::file::OpenFileRequest). - pub fn add(&mut self, layer_id: LayerId, resource: T) { - let added = self - .by_layer - .entry(layer_id) - .or_default() - .insert(resource.clone()); - - if added { - *self.counts.entry(resource).or_default() += 1; - } - } - /// Removes the given resource from the layer instance with the given [`LayerId`]. /// Returns whether the resource should be closed on the agent side. /// /// Can be used when the layer closes the resource, e.g. with /// [`CloseFileRequest`](mirrord_protocol::file::CloseFileRequest). - pub fn remove(&mut self, layer_id: LayerId, resource: T) -> bool { + #[tracing::instrument(level = Level::TRACE, skip(self, resource), ret)] + pub(crate) fn remove(&mut self, layer_id: LayerId, resource: T) -> bool { let removed = match self.by_layer.entry(layer_id) { Entry::Occupied(mut e) => { let removed = e.get_mut().remove(&resource); @@ -58,11 +47,11 @@ where Entry::Vacant(..) => return false, }; - if !removed { + if removed.is_none() { return false; } - match self.counts.entry(resource) { + match self.counts.entry(resource.clone()) { Entry::Occupied(e) if *e.get() == 1 => { e.remove(); true @@ -71,7 +60,11 @@ where *e.get_mut() -= 1; false } - Entry::Vacant(..) => panic!("RemoteResources out of sync"), + Entry::Vacant(..) => { + panic!( + "`remove` -> RemoteResources out of sync! Removing {resource:?} failed for {layer_id:?}!" + ) + } } } @@ -79,12 +72,13 @@ where /// id `dst`. /// /// Can be used when the layer forks. - pub fn clone_all(&mut self, src: LayerId, dst: LayerId) { + #[tracing::instrument(level = Level::TRACE, skip(self))] + pub(crate) fn clone_all(&mut self, src: LayerId, dst: LayerId) { let Some(resources) = self.by_layer.get(&src).cloned() else { return; }; - for resource in resources.iter().cloned() { + for resource in resources.keys().cloned() { *self.counts.entry(resource).or_default() += 1; } @@ -95,11 +89,12 @@ where /// Returns an [`Iterator`] over resources that should be closed on the agent size. /// /// Can be used when the layer closes the connection. - pub fn remove_all(&mut self, layer_id: LayerId) -> impl '_ + Iterator { + #[tracing::instrument(level = Level::TRACE, skip(self))] + pub(crate) fn remove_all(&mut self, layer_id: LayerId) -> impl '_ + Iterator { let resources = self.by_layer.remove(&layer_id).unwrap_or_default(); resources - .into_iter() + .into_keys() .filter(|resource| match self.counts.entry(resource.clone()) { Entry::Occupied(e) if *e.get() == 1 => { e.remove(); @@ -109,7 +104,66 @@ where *e.get_mut() -= 1; false } - Entry::Vacant(..) => panic!("RemoteResources out of sync"), + Entry::Vacant(..) => { + panic!("`remove_all` -> RemoteResources out of sync!") + } }) } } + +impl RemoteResources +where + T: Clone + PartialEq + Eq + Hash, +{ + /// Adds the given resource to the layer instance with the given [`LayerId`]. + /// + /// Used when the layer opens a resource, e.g. with + /// [`OpenFileRequest`](mirrord_protocol::file::OpenFileRequest). + #[tracing::instrument(level = Level::TRACE, skip(self, resource))] + pub(crate) fn add(&mut self, layer_id: LayerId, resource: T) { + let added = self + .by_layer + .entry(layer_id) + .or_default() + .try_insert(resource.clone(), ()) + .is_ok(); + + if added { + *self.counts.entry(resource).or_default() += 1; + } + } +} + +impl RemoteResources +where + T: Clone + PartialEq + Eq + Hash, +{ + /// Adds the given resource to the layer instance with the given [`LayerId`]. + /// + /// Used when the layer opens a resource, e.g. with + /// [`OpenFileRequest`](mirrord_protocol::file::OpenFileRequest). + #[tracing::instrument(level = Level::TRACE, skip(self, resource, file))] + pub(crate) fn add(&mut self, layer_id: LayerId, resource: T, file: FileResource) { + let added = self + .by_layer + .entry(layer_id) + .or_default() + .try_insert(resource.clone(), file) + .is_ok(); + + if added { + *self.counts.entry(resource).or_default() += 1; + } + } + + #[tracing::instrument(level = Level::TRACE, skip(self, resource_key))] + pub(crate) fn get_mut( + &mut self, + layer_id: &LayerId, + resource_key: &T, + ) -> Option<&mut FileResource> { + self.by_layer + .get_mut(layer_id) + .and_then(|files| files.get_mut(resource_key)) + } +} diff --git a/mirrord/layer/src/file/open_dirs.rs b/mirrord/layer/src/file/open_dirs.rs index 2e799c9825c..bb4762a4c02 100644 --- a/mirrord/layer/src/file/open_dirs.rs +++ b/mirrord/layer/src/file/open_dirs.rs @@ -8,6 +8,7 @@ use std::{ }; use mirrord_protocol::file::{CloseDirRequest, DirEntryInternal, ReadDirRequest, ReadDirResponse}; +use tracing::Level; use super::{DirStreamFd, LocalFd, RemoteFd, OPEN_FILES}; use crate::{ @@ -210,6 +211,7 @@ impl OpenDir { } } + #[tracing::instrument(level = Level::DEBUG, skip(self), ret)] fn read_r(&self) -> Detour> { if self.closed { // This thread got this struct from `OpenDirs` before `close` removed it. diff --git a/mirrord/layer/src/file/ops.rs b/mirrord/layer/src/file/ops.rs index dc69fc253e9..58652f3918c 100644 --- a/mirrord/layer/src/file/ops.rs +++ b/mirrord/layer/src/file/ops.rs @@ -202,7 +202,6 @@ pub(crate) fn open(path: Detour, open_options: OpenOptionsInternal) -> pub(crate) fn fdopendir(fd: RawFd) -> Detour { // usize == ptr size // we don't return a pointer to an address that contains DIR - let remote_file_fd = OPEN_FILES .lock()? .get(&fd) diff --git a/mirrord/layer/src/lib.rs b/mirrord/layer/src/lib.rs index b66a05879d8..7385697e6b9 100644 --- a/mirrord/layer/src/lib.rs +++ b/mirrord/layer/src/lib.rs @@ -94,6 +94,7 @@ use mirrord_protocol::{EnvVars, GetEnvVarsRequest}; use proxy_connection::ProxyConnection; use setup::LayerSetup; use socket::SOCKETS; +use tracing::Level; use tracing_subscriber::{fmt::format::FmtSpan, prelude::*}; use crate::{ @@ -483,7 +484,7 @@ fn sip_only_layer_start(mut config: LayerConfig, patch_binaries: Vec) { /// - `enabled_remote_dns`: replaces [`libc::getaddrinfo`] and [`libc::freeaddrinfo`] when this is /// `true`, see [`NetworkConfig`](mirrord_config::feature::network::NetworkConfig), and /// [`hooks::enable_socket_hooks`](socket::hooks::enable_socket_hooks). -#[mirrord_layer_macro::instrument(level = "trace")] +#[mirrord_layer_macro::instrument(level = Level::TRACE)] fn enable_hooks(state: &LayerSetup) { let enabled_file_ops = state.fs_config().is_active(); let enabled_remote_dns = state.remote_dns_enabled(); diff --git a/mirrord/layer/tests/apps/env_bash_cat.sh b/mirrord/layer/tests/apps/env_bash_cat.sh index cdae4726b6a..46dfdde1206 100755 --- a/mirrord/layer/tests/apps/env_bash_cat.sh +++ b/mirrord/layer/tests/apps/env_bash_cat.sh @@ -14,4 +14,4 @@ # cat is a SIPed binary on mac. cat /very_interesting_file # sleep so we get close request (else bash might exit before we have time to close it) -sleep 1 \ No newline at end of file +sleep 1 diff --git a/mirrord/layer/tests/fileops.rs b/mirrord/layer/tests/fileops.rs index 8c5f34895fa..c4001aad799 100644 --- a/mirrord/layer/tests/fileops.rs +++ b/mirrord/layer/tests/fileops.rs @@ -334,6 +334,7 @@ async fn go_stat( XstatResponse { metadata }, )))) .await; + test_process.wait_assert_success().await; test_process.assert_no_error_in_stderr().await; } @@ -364,7 +365,7 @@ async fn go_dir( message, ClientMessage::FileRequest(FileRequest::Xstat(XstatRequest { path: None, - fd: Some(1), + fd: Some(fd), follow_symlink: true })) ); @@ -392,64 +393,64 @@ async fn go_dir( ClientMessage::FileRequest(FileRequest::FdOpenDir(FdOpenDirRequest { remote_fd: 1 })) ); + let dir_fd = 2; intproxy .send(DaemonMessage::File(FileResponse::OpenDir(Ok( - OpenDirResponse { fd: 2 }, + OpenDirResponse { fd: dir_fd }, )))) .await; assert_eq!( intproxy.recv().await, - ClientMessage::FileRequest(FileRequest::ReadDir(ReadDirRequest { remote_fd: 2 })) + ClientMessage::FileRequest(FileRequest::ReadDirBatch(ReadDirBatchRequest { + remote_fd: dir_fd, + amount: 128 + })) ); intproxy - .send(DaemonMessage::File(FileResponse::ReadDir(Ok( - ReadDirResponse { - direntry: Some(DirEntryInternal { - name: "a".to_string(), - inode: 1, - position: 1, - file_type: libc::DT_REG, - }), + .send(DaemonMessage::File(FileResponse::ReadDirBatch(Ok( + ReadDirBatchResponse { + fd: dir_fd, + dir_entries: vec![ + DirEntryInternal { + name: "a".to_string(), + inode: 1, + position: 1, + file_type: libc::DT_REG, + }, + DirEntryInternal { + name: "b".to_string(), + inode: 2, + position: 2, + file_type: libc::DT_REG, + }, + ], }, )))) .await; assert_eq!( intproxy.recv().await, - ClientMessage::FileRequest(FileRequest::ReadDir(ReadDirRequest { remote_fd: 2 })) + ClientMessage::FileRequest(FileRequest::ReadDirBatch(ReadDirBatchRequest { + remote_fd: dir_fd, + amount: 128 + })) ); intproxy - .send(DaemonMessage::File(FileResponse::ReadDir(Ok( - ReadDirResponse { - direntry: Some(DirEntryInternal { - name: "b".to_string(), - inode: 2, - position: 2, - file_type: libc::DT_REG, - }), + .send(DaemonMessage::File(FileResponse::ReadDirBatch(Ok( + ReadDirBatchResponse { + fd: dir_fd, + dir_entries: Vec::new(), }, )))) .await; assert_eq!( intproxy.recv().await, - ClientMessage::FileRequest(FileRequest::ReadDir(ReadDirRequest { remote_fd: 2 })) + ClientMessage::FileRequest(FileRequest::CloseDir(CloseDirRequest { remote_fd: dir_fd })) ); - - intproxy - .send(DaemonMessage::File(FileResponse::ReadDir(Ok( - ReadDirResponse { direntry: None }, - )))) - .await; - - assert_eq!( - intproxy.recv().await, - ClientMessage::FileRequest(FileRequest::CloseDir(CloseDirRequest { remote_fd: 2 })) - ); - intproxy.expect_file_close(fd).await; test_process.wait_assert_success().await; diff --git a/mirrord/layer/tests/issue2001.rs b/mirrord/layer/tests/issue2001.rs index f0915b716dc..048c6c85929 100644 --- a/mirrord/layer/tests/issue2001.rs +++ b/mirrord/layer/tests/issue2001.rs @@ -56,36 +56,30 @@ async fn test_issue2001( assert_eq!( intproxy.recv().await, - ClientMessage::FileRequest(FileRequest::ReadDir(ReadDirRequest { remote_fd: 11 })), + ClientMessage::FileRequest(FileRequest::ReadDirBatch(ReadDirBatchRequest { + remote_fd: 11, + amount: 128 + })), ); intproxy - .send(DaemonMessage::File(FileResponse::ReadDir(Ok( - ReadDirResponse { - direntry: Some(DirEntryInternal { - inode: 1, - position: 0, - name: "file1".into(), - file_type: libc::DT_REG, - }), - }, - )))) - .await; - - assert_eq!( - intproxy.recv().await, - ClientMessage::FileRequest(FileRequest::ReadDir(ReadDirRequest { remote_fd: 11 })), - ); - - intproxy - .send(DaemonMessage::File(FileResponse::ReadDir(Ok( - ReadDirResponse { - direntry: Some(DirEntryInternal { - inode: 2, - position: 1, - name: "file2".into(), - file_type: libc::DT_REG, - }), + .send(DaemonMessage::File(FileResponse::ReadDirBatch(Ok( + ReadDirBatchResponse { + fd: 11, + dir_entries: vec![ + DirEntryInternal { + inode: 1, + position: 0, + name: "file1".into(), + file_type: libc::DT_REG, + }, + DirEntryInternal { + inode: 2, + position: 1, + name: "file2".into(), + file_type: libc::DT_REG, + }, + ], }, )))) .await; diff --git a/mirrord/protocol/Cargo.toml b/mirrord/protocol/Cargo.toml index 3aa1ca05e4c..8c2ad780df2 100644 --- a/mirrord/protocol/Cargo.toml +++ b/mirrord/protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mirrord-protocol" -version = "1.8.3" +version = "1.9.0" authors.workspace = true description.workspace = true documentation.workspace = true diff --git a/mirrord/protocol/src/codec.rs b/mirrord/protocol/src/codec.rs index 31527930d35..6a36de8ab0b 100644 --- a/mirrord/protocol/src/codec.rs +++ b/mirrord/protocol/src/codec.rs @@ -13,14 +13,7 @@ use semver::VersionReq; use crate::{ dns::{GetAddrInfoRequest, GetAddrInfoResponse}, - file::{ - AccessFileRequest, AccessFileResponse, CloseDirRequest, CloseFileRequest, FdOpenDirRequest, - GetDEnts64Request, GetDEnts64Response, OpenDirResponse, OpenFileRequest, OpenFileResponse, - OpenRelativeFileRequest, ReadDirRequest, ReadDirResponse, ReadFileRequest, - ReadFileResponse, ReadLimitedFileRequest, ReadLinkFileRequest, ReadLinkFileResponse, - SeekFileRequest, SeekFileResponse, WriteFileRequest, WriteFileResponse, - WriteLimitedFileRequest, XstatFsRequest, XstatFsResponse, XstatRequest, XstatResponse, - }, + file::*, outgoing::{ tcp::{DaemonTcpOutgoing, LayerTcpOutgoing}, udp::{DaemonUdpOutgoing, LayerUdpOutgoing}, @@ -82,6 +75,13 @@ pub enum FileRequest { CloseDir(CloseDirRequest), GetDEnts64(GetDEnts64Request), ReadLink(ReadLinkFileRequest), + + /// `readdir` request. + /// + /// Unlike other requests that come from the layer -> intproxy, this one is intproxy + /// only. [`ReadDirRequest`]s that come from the layer are transformed into this + /// batched form when the protocol version supports it. See [`READDIR_BATCH_VERSION`]. + ReadDirBatch(ReadDirBatchRequest), } /// Minimal mirrord-protocol version that allows `ClientMessage::ReadyForLogs` message. @@ -124,6 +124,7 @@ pub enum FileResponse { OpenDir(RemoteResult), GetDEnts64(RemoteResult), ReadLink(RemoteResult), + ReadDirBatch(RemoteResult), } /// `-agent` --> `-layer` messages. diff --git a/mirrord/protocol/src/file.rs b/mirrord/protocol/src/file.rs index a467a5695a2..7a7b50f98d9 100644 --- a/mirrord/protocol/src/file.rs +++ b/mirrord/protocol/src/file.rs @@ -5,11 +5,18 @@ use std::fs::DirEntry; use std::io; #[cfg(target_os = "linux")] use std::os::unix::fs::DirEntryExt; -use std::{fs::Metadata, io::SeekFrom, os::unix::prelude::MetadataExt, path::PathBuf}; +use std::{ + fs::Metadata, io::SeekFrom, os::unix::prelude::MetadataExt, path::PathBuf, sync::LazyLock, +}; use bincode::{Decode, Encode}; #[cfg(target_os = "linux")] use nix::sys::statfs::Statfs; +use semver::VersionReq; + +/// Minimal mirrord-protocol version that allows [`ReadDirBatchRequest`]. +pub static READDIR_BATCH_VERSION: LazyLock = + LazyLock::new(|| ">=1.9.0".parse().expect("Bad Identifier")); /// Internal version of Metadata across operating system (macOS, Linux) /// Only mutual attributes @@ -385,11 +392,31 @@ pub struct ReadDirRequest { pub remote_fd: u64, } +/// `readdir` message that requests an iterable with `amount` items from the agent. +#[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)] +pub struct ReadDirBatchRequest { + /// The fd of the dir in the agent. + pub remote_fd: u64, + /// Max amount to take from the agent's iterator of dirs. + pub amount: usize, +} + #[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)] pub struct ReadDirResponse { pub direntry: Option, } +/// `readdir` response with the list of items (length depends on the [`ReadDirBatchRequest`]'s +/// `amount`), and the `remote_fd` of the dir (for convenience). +#[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)] +pub struct ReadDirBatchResponse { + /// Remote fd of the dir. + pub fd: u64, + /// The list of [`DirEntryInternal`] where `length` is, at max, the `amount` we took + /// from the agent's read dir iterator. + pub dir_entries: Vec, +} + #[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)] pub struct CloseDirRequest { pub remote_fd: u64,