From 80ec76d6042b2684b871faa88e9982f7770417eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Smolarek?= <34063647+Razz4780@users.noreply.github.com> Date: Mon, 16 Dec 2024 15:49:06 +0100 Subject: [PATCH] Buffering readonly files (#2979) * Config entry * Moved file ops to a separate proxy and implemented buffering * Docs * Fixed readlink version req * Fixed old tests * Tests for buffered reading * Fixed seek logic * Changelog * Renamed RequestQueue methods * Reduce log level to trace for some methods * More docs and renames * Configurable buffer size * Updated json schema and configuration.md * Test code fix --- Cargo.lock | 3 +- changelog.d/2069.added.md | 2 + mirrord-schema.json | 10 + mirrord/analytics/src/lib.rs | 6 + mirrord/cli/src/internal_proxy.rs | 18 +- mirrord/config/configuration.md | 37 + mirrord/config/src/experimental.rs | 13 + mirrord/intproxy/Cargo.toml | 1 + mirrord/intproxy/protocol/src/lib.rs | 24 +- mirrord/intproxy/src/background_tasks.rs | 11 + mirrord/intproxy/src/error.rs | 18 +- mirrord/intproxy/src/lib.rs | 41 +- mirrord/intproxy/src/main_tasks.rs | 25 + mirrord/intproxy/src/proxies.rs | 1 + mirrord/intproxy/src/proxies/files.rs | 1349 ++++++++++++++++++++++ mirrord/intproxy/src/proxies/outgoing.rs | 24 +- mirrord/intproxy/src/proxies/simple.rs | 517 +-------- mirrord/intproxy/src/remote_resources.rs | 63 +- mirrord/intproxy/src/request_queue.rs | 53 +- mirrord/layer/tests/common/mod.rs | 2 +- mirrord/protocol/Cargo.toml | 2 +- mirrord/protocol/src/file.rs | 4 + 22 files changed, 1610 insertions(+), 614 deletions(-) create mode 100644 changelog.d/2069.added.md create mode 100644 mirrord/intproxy/src/proxies/files.rs diff --git a/Cargo.lock b/Cargo.lock index 008a42858c5..e5a4df96b54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4336,6 +4336,7 @@ dependencies = [ "mirrord-protocol", "rand", "reqwest 0.12.9", + "rstest", "rustls 0.23.19", "rustls-pemfile 2.2.0", "semver 1.0.23", @@ -4502,7 +4503,7 @@ dependencies = [ [[package]] name = "mirrord-protocol" -version = "1.12.0" +version = "1.12.1" dependencies = [ "actix-codec", "bincode", diff --git a/changelog.d/2069.added.md b/changelog.d/2069.added.md new file mode 100644 index 00000000000..f8efa7d3147 --- /dev/null +++ b/changelog.d/2069.added.md @@ -0,0 +1,2 @@ +Added to mirrord config a new experimental flag `.experimental.buffer_file_reads`. When this flag is enabled, mirrord will fetch remote readonly files in at least 4kb chunks. +This is to improve performance with applications that make many small reads from remote files. diff --git a/mirrord-schema.json b/mirrord-schema.json index 63886909f4e..a1b6d4c13d7 100644 --- a/mirrord-schema.json +++ b/mirrord-schema.json @@ -815,6 +815,16 @@ "null" ] }, + "readonly_file_buffer": { + "title": "_experimental_ readonly_file_buffer {#experimental-readonly_file_buffer}", + "description": "Sets buffer size for readonly remote files (in bytes, for example 4096). If set, such files will be read in chunks and buffered locally. This improves performace when the user application reads data in small portions.\n\nSetting to 0 disables file buffering.\n\n", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, "tcp_ping4_mock": { "title": "_experimental_ tcp_ping4_mock {#experimental-tcp_ping4_mock}", "description": "", diff --git a/mirrord/analytics/src/lib.rs b/mirrord/analytics/src/lib.rs index 4ca01f2b6c6..bb05773544a 100644 --- a/mirrord/analytics/src/lib.rs +++ b/mirrord/analytics/src/lib.rs @@ -145,6 +145,12 @@ impl From for AnalyticValue { } } +impl From for AnalyticValue { + fn from(n: u64) -> Self { + AnalyticValue::Number(u32::try_from(n).unwrap_or(u32::MAX)) + } +} + impl From for AnalyticValue { fn from(n: usize) -> Self { AnalyticValue::Number(u32::try_from(n).unwrap_or(u32::MAX)) diff --git a/mirrord/cli/src/internal_proxy.rs b/mirrord/cli/src/internal_proxy.rs index 2a1a6f3e829..1c8763a0d92 100644 --- a/mirrord/cli/src/internal_proxy.rs +++ b/mirrord/cli/src/internal_proxy.rs @@ -141,13 +141,17 @@ pub(crate) async fn proxy( let first_connection_timeout = Duration::from_secs(config.internal_proxy.start_idle_timeout); let consecutive_connection_timeout = Duration::from_secs(config.internal_proxy.idle_timeout); - IntProxy::new_with_connection(agent_conn, listener) - .run(first_connection_timeout, consecutive_connection_timeout) - .await - .map_err(InternalProxyError::from) - .inspect_err(|error| { - tracing::error!(%error, "Internal proxy encountered an error, exiting"); - }) + IntProxy::new_with_connection( + agent_conn, + listener, + config.experimental.readonly_file_buffer, + ) + .run(first_connection_timeout, consecutive_connection_timeout) + .await + .map_err(InternalProxyError::from) + .inspect_err(|error| { + tracing::error!(%error, "Internal proxy encountered an error, exiting"); + }) } /// Creates a connection with the agent and handles one round of ping pong. diff --git a/mirrord/config/configuration.md b/mirrord/config/configuration.md index fb7416c1c3b..49659d966b6 100644 --- a/mirrord/config/configuration.md +++ b/mirrord/config/configuration.md @@ -77,6 +77,9 @@ configuration file containing all fields. "override": { "DATABASE_CONNECTION": "db://localhost:7777/my-db", "LOCAL_BEAR": "panda" + }, + "mapping": { + ".+_TIMEOUT": "1000" } }, "fs": { @@ -460,6 +463,16 @@ Enables `getifaddrs` hook that removes IPv6 interfaces from the list returned by DEPRECATED, WILL BE REMOVED +### _experimental_ readonly_file_buffer {#experimental-readonly_file_buffer} + +Sets buffer size for readonly remote files (in bytes, for example 4096). +If set, such files will be read in chunks and buffered locally. +This improves performace when the user application reads data in small portions. + +Setting to 0 disables file buffering. + + + ### _experimental_ tcp_ping4_mock {#experimental-tcp_ping4_mock} @@ -649,6 +662,9 @@ See the environment variables [reference](https://mirrord.dev/docs/reference/env "override": { "DATABASE_CONNECTION": "db://localhost:7777/my-db", "LOCAL_BEAR": "panda" + }, + "mapping": { + ".+_TIMEOUT": "1000" } } } @@ -692,6 +708,27 @@ If set, the variables are fetched after the user application is started. This setting is meant to resolve issues when using mirrord via the IntelliJ plugin on WSL and the remote environment contains a lot of variables. +### feature.env.mapping {#feature-env-mapping} + +Specify map of patterns that if matched will replace the value according to specification. + +*Capture groups are allowed.* + +Example: +```json +{ + ".+_TIMEOUT": "10000" + "LOG_.+_VERBOSITY": "debug" + "(\w+)_(\d+)": "magic-value" +} +``` + +Will do the next replacements for environment variables that match: + +`CONNECTION_TIMEOUT: 500` => `CONNECTION_TIMEOUT: 10000` +`LOG_FILE_VERBOSITY: info` => `LOG_FILE_VERBOSITY: debug` +`DATA_1234: common-value` => `DATA_1234: magic-value` + ### feature.env.override {#feature-env-override} Allows setting or overriding environment variables (locally) with a custom value. diff --git a/mirrord/config/src/experimental.rs b/mirrord/config/src/experimental.rs index 04fdf423527..1fd9213a762 100644 --- a/mirrord/config/src/experimental.rs +++ b/mirrord/config/src/experimental.rs @@ -58,6 +58,18 @@ pub struct ExperimentalConfig { /// Uses /dev/null for creating local fake files (should be better than using /tmp) #[config(default = true)] pub use_dev_null: bool, + + /// ### _experimental_ readonly_file_buffer {#experimental-readonly_file_buffer} + /// + /// Sets buffer size for readonly remote files (in bytes, for example 4096). + /// If set, such files will be read in chunks and buffered locally. + /// This improves performace when the user application reads data in small portions. + /// + /// Setting to 0 disables file buffering. + /// + /// + #[config(default = 0)] + pub readonly_file_buffer: u64, } impl CollectAnalytics for &ExperimentalConfig { @@ -68,5 +80,6 @@ impl CollectAnalytics for &ExperimentalConfig { analytics.add("enable_exec_hooks_linux", self.enable_exec_hooks_linux); analytics.add("hide_ipv6_interfaces", self.hide_ipv6_interfaces); analytics.add("disable_reuseaddr", self.disable_reuseaddr); + analytics.add("readonly_file_buffer", self.readonly_file_buffer); } } diff --git a/mirrord/intproxy/Cargo.toml b/mirrord/intproxy/Cargo.toml index 85334cb83bd..1c9cf9ecc75 100644 --- a/mirrord/intproxy/Cargo.toml +++ b/mirrord/intproxy/Cargo.toml @@ -47,3 +47,4 @@ exponential-backoff = "2" [dev-dependencies] reqwest.workspace = true +rstest.workspace = true diff --git a/mirrord/intproxy/protocol/src/lib.rs b/mirrord/intproxy/protocol/src/lib.rs index 5c5a003260c..648caebcb30 100644 --- a/mirrord/intproxy/protocol/src/lib.rs +++ b/mirrord/intproxy/protocol/src/lib.rs @@ -35,7 +35,7 @@ pub struct LocalMessage { } /// Messages sent by the layer and handled by the internal proxy. -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, PartialEq, Eq)] pub enum LayerToProxyMessage { /// A request to start new `layer <-> proxy` session. /// This should be the first message sent by the layer after opening a new connection to the @@ -54,7 +54,7 @@ pub enum LayerToProxyMessage { } /// Layer process information -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, PartialEq, Eq)] pub struct ProcessInfo { /// Process ID. pub pid: u32, @@ -82,7 +82,7 @@ pub struct LayerId(pub u64); /// Sharing state between [`exec`](https://man7.org/linux/man-pages/man3/exec.3.html) calls is currently not supported. /// Therefore, when the layer initializes, it uses [`NewSessionRequest::New`] and does not inherit /// any state. -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, PartialEq, Eq)] pub enum NewSessionRequest { /// Layer initialized from its constructor, has a fresh state. New(ProcessInfo), @@ -119,7 +119,7 @@ impl fmt::Display for NetProtocol { } /// A request to initiate a new outgoing connection. -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, PartialEq, Eq)] pub struct OutgoingConnectRequest { /// The address the user application tries to connect to. pub remote_address: SocketAddress, @@ -128,7 +128,7 @@ pub struct OutgoingConnectRequest { } /// Requests related to incoming connections. -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, PartialEq, Eq)] pub enum IncomingRequest { /// A request made by layer when it starts listening for mirrored connections. PortSubscribe(PortSubscribe), @@ -152,7 +152,7 @@ pub struct ConnMetadataRequest { /// A response to layer's [`ConnMetadataRequest`]. /// Contains metadata useful for hooking `getsockname` and `getpeername`. -#[derive(Encode, Decode, Debug, Clone)] +#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)] pub struct ConnMetadataResponse { /// Original source of data, provided by the agent. Meant to be exposed to the user instead of /// the real source, which will always be localhost. @@ -176,7 +176,7 @@ pub struct ConnMetadataResponse { /// /// For each connection incoming to the remote port, /// the internal proxy will initiate a new connection to the local port specified in `listening_on`. -#[derive(Encode, Decode, Debug, Clone)] +#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)] pub struct PortSubscribe { /// Local address on which the layer is listening. pub listening_on: SocketAddr, @@ -185,7 +185,7 @@ pub struct PortSubscribe { } /// Instructions for the internal proxy and the agent on how to execute port mirroring. -#[derive(Encode, Decode, Debug, Clone)] +#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)] pub enum PortSubscription { /// Wrapped [`StealType`] specifies how to execute port mirroring. Steal(StealType), @@ -194,7 +194,7 @@ pub enum PortSubscription { } /// A request to stop proxying incoming connections. -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, PartialEq, Eq)] pub struct PortUnsubscribe { /// Port on the remote pod that layer mirrored. pub port: Port, @@ -203,7 +203,7 @@ pub struct PortUnsubscribe { } /// Messages sent by the internal proxy and handled by the layer. -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, PartialEq, Eq)] pub enum ProxyToLayerMessage { /// A response to [`NewSessionRequest`]. Contains the identifier of the new `layer <-> proxy` /// session. @@ -221,7 +221,7 @@ pub enum ProxyToLayerMessage { } /// A response to layer's [`IncomingRequest`]. -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, PartialEq, Eq)] pub enum IncomingResponse { /// A response to layer's [`PortSubscribe`]. /// As a temporary workaround to [agent protocol](mirrord_protocol) limitations, the only error @@ -234,7 +234,7 @@ pub enum IncomingResponse { } /// A response to layer's [`OutgoingConnectRequest`]. -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, PartialEq, Eq)] pub struct OutgoingConnectResponse { /// The address the layer should connect to instead of the address requested by the user. pub layer_address: SocketAddress, diff --git a/mirrord/intproxy/src/background_tasks.rs b/mirrord/intproxy/src/background_tasks.rs index 869f860ffda..e43c8f306b8 100644 --- a/mirrord/intproxy/src/background_tasks.rs +++ b/mirrord/intproxy/src/background_tasks.rs @@ -166,6 +166,7 @@ where /// An error that can occur when executing a [`BackgroundTask`]. #[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub enum TaskError { /// An internal task error. Error(Err), @@ -182,6 +183,16 @@ pub enum TaskUpdate { Finished(Result<(), TaskError>), } +#[cfg(test)] +impl TaskUpdate { + pub fn unwrap_message(self) -> MOut { + match self { + Self::Message(mout) => mout, + Self::Finished(res) => panic!("expected a message, got task result: {res:?}"), + } + } +} + /// A struct that can be used to send messages to a [`BackgroundTask`] registered /// /// A struct that can be used to send messages to a [`BackgroundTask`] registered in the diff --git a/mirrord/intproxy/src/error.rs b/mirrord/intproxy/src/error.rs index 74419e6a035..457b94c17df 100644 --- a/mirrord/intproxy/src/error.rs +++ b/mirrord/intproxy/src/error.rs @@ -8,11 +8,17 @@ use crate::{ agent_conn::{AgentChannelError, AgentConnectionError}, layer_initializer::LayerInitializerError, ping_pong::PingPongError, - proxies::{incoming::IncomingProxyError, outgoing::OutgoingProxyError}, - request_queue::RequestQueueEmpty, + proxies::{ + files::FilesProxyError, incoming::IncomingProxyError, outgoing::OutgoingProxyError, + simple::SimpleProxyError, + }, MainTaskId, }; +#[derive(Error, Debug)] +#[error("agent sent an unexpected message: {0:?}")] +pub struct UnexpectedAgentMessage(pub DaemonMessage); + #[derive(Error, Debug)] pub enum IntProxyError { #[error("waiting for the first layer connection timed out")] @@ -26,8 +32,8 @@ pub enum IntProxyError { AgentConnection(#[from] AgentConnectionError), #[error("agent closed connection with error: {0}")] AgentFailed(String), - #[error("agent sent unexpected message: {0:?}")] - UnexpectedAgentMessage(DaemonMessage), + #[error(transparent)] + UnexpectedAgentMessage(#[from] UnexpectedAgentMessage), #[error("background task {0} exited unexpectedly")] TaskExit(MainTaskId), @@ -43,11 +49,13 @@ pub enum IntProxyError { #[error("layer connection failed: {0}")] LayerConnection(#[from] CodecError), #[error("simple proxy failed: {0}")] - SimpleProxy(#[from] RequestQueueEmpty), + SimpleProxy(#[from] SimpleProxyError), #[error("outgoing proxy failed: {0}")] OutgoingProxy(#[from] OutgoingProxyError), #[error("incoming proxy failed: {0}")] IncomingProxy(#[from] IncomingProxyError), + #[error("files proxy failed: {0}")] + FilesProxy(#[from] FilesProxyError), } pub type Result = core::result::Result; diff --git a/mirrord/intproxy/src/lib.rs b/mirrord/intproxy/src/lib.rs index 794016d78a9..d9f85f64f55 100644 --- a/mirrord/intproxy/src/lib.rs +++ b/mirrord/intproxy/src/lib.rs @@ -4,6 +4,7 @@ use std::{collections::HashMap, time::Duration}; use background_tasks::{BackgroundTasks, TaskSender, TaskUpdate}; +use error::UnexpectedAgentMessage; use layer_conn::LayerConnection; use layer_initializer::LayerInitializer; use main_tasks::{FromLayer, LayerForked, MainTaskId, ProxyMessage, ToLayer}; @@ -11,6 +12,7 @@ use mirrord_intproxy_protocol::{LayerId, LayerToProxyMessage, LocalMessage}; use mirrord_protocol::{ClientMessage, DaemonMessage, LogLevel, CLIENT_READY_FOR_LOGS}; use ping_pong::{AgentSentPong, PingPong}; use proxies::{ + files::{FilesProxy, FilesProxyMessage}, incoming::{IncomingProxy, IncomingProxyMessage}, outgoing::{OutgoingProxy, OutgoingProxyMessage}, simple::{SimpleProxy, SimpleProxyMessage}, @@ -43,6 +45,7 @@ struct TaskTxs { outgoing: TaskSender, incoming: TaskSender, ping_pong: TaskSender, + files: TaskSender, } /// This struct contains logic for proxying between multiple layer instances and one agent. @@ -65,7 +68,11 @@ impl IntProxy { /// Creates a new [`IntProxy`] using existing [`AgentConnection`]. /// The returned instance will accept connections from the layers using the given /// [`TcpListener`]. - pub fn new_with_connection(agent_conn: AgentConnection, listener: TcpListener) -> Self { + pub fn new_with_connection( + agent_conn: AgentConnection, + listener: TcpListener, + file_buffer_size: u64, + ) -> Self { let mut background_tasks: BackgroundTasks = Default::default(); @@ -96,6 +103,11 @@ impl IntProxy { MainTaskId::IncomingProxy, Self::CHANNEL_SIZE, ); + let files = background_tasks.register( + FilesProxy::new(file_buffer_size), + MainTaskId::FilesProxy, + Self::CHANNEL_SIZE, + ); Self { any_connection_accepted: false, @@ -108,6 +120,7 @@ impl IntProxy { outgoing, incoming, ping_pong, + files, }, } } @@ -179,8 +192,8 @@ impl IntProxy { }; self.task_txs - .simple - .send(SimpleProxyMessage::LayerForked(msg)) + .files + .send(FilesProxyMessage::LayerForked(msg)) .await; self.task_txs .incoming @@ -224,8 +237,8 @@ impl IntProxy { let msg = LayerClosed { id: LayerId(id) }; self.task_txs - .simple - .send(SimpleProxyMessage::LayerClosed(msg)) + .files + .send(FilesProxyMessage::LayerClosed(msg)) .await; self.task_txs .incoming @@ -275,8 +288,8 @@ impl IntProxy { } DaemonMessage::File(msg) => { self.task_txs - .simple - .send(SimpleProxyMessage::FileRes(msg)) + .files + .send(FilesProxyMessage::FileRes(msg)) .await } DaemonMessage::GetAddrInfoResponse(msg) => { @@ -303,10 +316,8 @@ impl IntProxy { } self.task_txs - .simple - .send(SimpleProxyMessage::ProtocolVersion( - protocol_version.clone(), - )) + .files + .send(FilesProxyMessage::ProtocolVersion(protocol_version.clone())) .await; self.task_txs @@ -325,7 +336,9 @@ impl IntProxy { .await } other => { - return Err(IntProxyError::UnexpectedAgentMessage(other)); + return Err(IntProxyError::UnexpectedAgentMessage( + UnexpectedAgentMessage(other), + )); } } @@ -344,8 +357,8 @@ impl IntProxy { match message { LayerToProxyMessage::File(req) => { self.task_txs - .simple - .send(SimpleProxyMessage::FileReq(message_id, layer_id, req)) + .files + .send(FilesProxyMessage::FileReq(message_id, layer_id, req)) .await; } LayerToProxyMessage::GetAddrInfo(req) => { diff --git a/mirrord/intproxy/src/main_tasks.rs b/mirrord/intproxy/src/main_tasks.rs index a6ad4a54b77..fd0e5d7e8e5 100644 --- a/mirrord/intproxy/src/main_tasks.rs +++ b/mirrord/intproxy/src/main_tasks.rs @@ -7,6 +7,7 @@ use tokio::net::TcpStream; /// Messages sent back to the [`IntProxy`](crate::IntProxy) from the main background tasks. See /// [`MainTaskId`]. #[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub enum ProxyMessage { /// Message to be sent to the agent. ToAgent(ClientMessage), @@ -20,7 +21,18 @@ pub enum ProxyMessage { NewLayer(NewLayer), } +#[cfg(test)] +impl ProxyMessage { + pub fn unwrap_proxy_to_layer_message(self) -> ProxyToLayerMessage { + match self { + Self::ToLayer(to_layer) => to_layer.message, + other => panic!("expected proxy to layer message, found {other:?}"), + } + } +} + #[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub struct ToLayer { pub message_id: MessageId, pub layer_id: LayerId, @@ -28,6 +40,7 @@ pub struct ToLayer { } #[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub struct FromLayer { pub message_id: MessageId, pub layer_id: LayerId, @@ -42,6 +55,16 @@ pub struct NewLayer { pub parent_id: Option, } +#[cfg(test)] +impl PartialEq for NewLayer { + fn eq(&self, other: &Self) -> bool { + self.id == other.id && self.parent_id == other.parent_id + } +} + +#[cfg(test)] +impl Eq for NewLayer {} + impl From for ProxyMessage { fn from(value: ClientMessage) -> Self { Self::ToAgent(value) @@ -82,6 +105,7 @@ pub enum MainTaskId { IncomingProxy, PingPong, AgentConnection, + FilesProxy, LayerConnection(LayerId), } @@ -95,6 +119,7 @@ impl fmt::Display for MainTaskId { Self::AgentConnection => f.write_str("AGENT_CONNECTION"), Self::LayerConnection(id) => write!(f, "LAYER_CONNECTION {}", id.0), Self::IncomingProxy => f.write_str("INCOMING_PROXY"), + Self::FilesProxy => f.write_str("FILES_PROXY"), } } } diff --git a/mirrord/intproxy/src/proxies.rs b/mirrord/intproxy/src/proxies.rs index 94fedb589c6..47f8f274e9e 100644 --- a/mirrord/intproxy/src/proxies.rs +++ b/mirrord/intproxy/src/proxies.rs @@ -1,6 +1,7 @@ //! Sub-proxies of the internal proxy. Each of these encapsulates logic for handling a group of //! related requests and exchanges messages only with the [`IntProxy`](crate::IntProxy). +pub mod files; pub mod incoming; pub mod outgoing; pub mod simple; diff --git a/mirrord/intproxy/src/proxies/files.rs b/mirrord/intproxy/src/proxies/files.rs new file mode 100644 index 00000000000..5997271446b --- /dev/null +++ b/mirrord/intproxy/src/proxies/files.rs @@ -0,0 +1,1349 @@ +use core::fmt; +use std::{collections::HashMap, vec}; + +use mirrord_intproxy_protocol::{LayerId, MessageId, ProxyToLayerMessage}; +use mirrord_protocol::{ + file::{ + CloseDirRequest, CloseFileRequest, DirEntryInternal, ReadDirBatchRequest, ReadDirResponse, + ReadFileResponse, ReadLimitedFileRequest, SeekFromInternal, READDIR_BATCH_VERSION, + READLINK_VERSION, + }, + ClientMessage, DaemonMessage, ErrorKindInternal, FileRequest, FileResponse, RemoteIOError, + ResponseError, +}; +use semver::Version; +use thiserror::Error; +use tracing::Level; + +use crate::{ + background_tasks::{BackgroundTask, MessageBus}, + error::UnexpectedAgentMessage, + main_tasks::{LayerClosed, LayerForked, ProxyMessage, ToLayer}, + remote_resources::RemoteResources, + request_queue::RequestQueue, +}; + +/// Messages handled by [`FilesProxy`]. +#[derive(Debug)] +pub enum FilesProxyMessage { + /// Layer sent file request. + FileReq(MessageId, LayerId, FileRequest), + /// Agent sent file response. + FileRes(FileResponse), + /// Protocol version was negotiated with the agent. + ProtocolVersion(Version), + /// Layer instance forked. + LayerForked(LayerForked), + /// Layer instance closed. + LayerClosed(LayerClosed), +} + +/// Error that can occur in [`FilesProxy`]. +#[derive(Error, Debug)] +#[error(transparent)] +pub struct FilesProxyError(#[from] UnexpectedAgentMessage); + +/// Locally cached data of a remote file that is buffered. +#[derive(Default)] +struct BufferedFileData { + /// Buffered file contents. + buffer: Vec, + /// Position of [`Self::buffer`] in the file. + buffer_position: u64, + /// Position of the file descriptor in the file. + /// This position is normally managed in the agent, + /// but for buffered files we manage it here. + /// It's simpler this way. + fd_position: u64, +} + +impl BufferedFileData { + /// Attempts to read `amount` bytes from [`Self::buffer`], starting from `position` in the file. + /// + /// Returns [`None`] when the read does not fit in the buffer in whole. + fn read_from_buffer(&self, amount: u64, position: u64) -> Option<&[u8]> { + let start_from = position.checked_sub(self.buffer_position)? as usize; + let end_before = start_from + amount as usize; + self.buffer.get(start_from..end_before) + } +} + +impl fmt::Debug for BufferedFileData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufferedFileData") + .field("buffer_position", &self.buffer_position) + .field("buffer_len", &self.buffer.len()) + .field("fd_position", &self.fd_position) + .finish() + } +} + +/// Locally cached data of a remote directory that is buffered. +#[derive(Default)] +struct BufferedDirData { + /// Buffered entries of this directory. + buffered_entries: vec::IntoIter, +} + +impl fmt::Debug for BufferedDirData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufferedDirData") + .field("remaining_buffered_entries", &self.buffered_entries.len()) + .finish() + } +} + +/// Additional request data that is saved by [`FilesProxy`] in its [`RequestQueue`]. +/// Allows for handling buffered reads by marking requests that should be handled in a special way. +#[derive(Debug, Default)] +enum AdditionalRequestData { + /// Open file that will be buffered. + OpenBuffered, + + /// Read file that is buffered. + ReadBuffered { + /// File descriptor. + fd: u64, + /// Read buffer size of the user application. + /// The user requested reading this many bytes. + requested_amount: u64, + /// Whether we should update fd position in file + /// (we store it locally). + update_fd_position: bool, + }, + + /// Seek file that is buffered. + SeekBuffered { + /// File descriptor. + fd: u64, + }, + + /// All other file ops. + #[default] + Other, +} + +/// For handling all file operations. +/// Run as a [`BackgroundTask`]. +/// +/// # Directory buffering +/// +/// To optimize cases where user application traverses large directories, +/// we use [`FileRequest::ReadDirBatch`] to fetch many entries at once +/// ([`Self::READDIR_BATCH_SIZE`]). +/// +/// Excessive entries are cached locally in this proxy and used until depleted. +/// +/// # File buffering +/// +/// To optimize cases where user application makes a lot of small reads on remote files, +/// we change the way of reading readonly files. +/// +/// 1. When created with [`FilesProxy::new`], this proxy is given a desired file buffer size. Buffer +/// size 0 disables file buffering. +/// 2. When the user requests a read, we fetch at least `buffer_size` bytes. We return the amount +/// requested by the user and store the whole response as a local buffer. +/// 3. When the user requests a read again, we try to fulfill the request using only the local +/// buffer. If it's not possible, we proceed as in point 1 +/// 4. To solve problems with descriptor offset, we only use [`FileRequest::ReadLimited`] to read +/// buffered files. Descriptor offset value is maintained in this proxy. +pub struct FilesProxy { + /// [`mirrord_protocol`] version negotiated with the agent. + /// Determines whether we can use some messages, like [`FileRequest::ReadDirBatch`] or + /// [`FileRequest::ReadLink`]. + protocol_version: Option, + + /// Size for readonly files buffer. + /// If equal to 0, this proxy does not buffer files. + file_buffer_size: u64, + + /// Stores metadata of outstanding requests. + request_queue: RequestQueue, + + /// For tracking remote file descriptors across layer instances (forks). + remote_files: RemoteResources, + /// Locally stored data of buffered files. + buffered_files: HashMap, + + /// For tracking remote directory descriptors across layer instances (forks). + remote_dirs: RemoteResources, + /// Locally stored data of buffered directories. + buffered_dirs: HashMap, +} + +impl fmt::Debug for FilesProxy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FilesProxy") + .field("file_buffer_size", &self.file_buffer_size) + .field("buffer_readdir", &self.buffer_dirs()) + .field("buffered_files", &self.buffered_files) + .field("buffered_dirs", &self.buffered_dirs) + .field("protocol_version", &self.protocol_version) + .field("request_queue", &self.request_queue) + .finish() + } +} + +impl FilesProxy { + /// How many directory entries we request at a time. + /// Relevant only if [`mirrord_protocol`] version allows for [`FileRequest::ReadDirBatch`]. + pub const READDIR_BATCH_SIZE: usize = 128; + + /// Creates a new files proxy instance. + /// Proxy can be used as a [`BackgroundTask`]. + /// + /// `file_buffer_size` sets size of the readonly files buffer. + /// Size 0 disables buffering. + pub fn new(file_buffer_size: u64) -> Self { + Self { + protocol_version: Default::default(), + file_buffer_size, + + request_queue: Default::default(), + + remote_files: Default::default(), + buffered_files: Default::default(), + + remote_dirs: Default::default(), + buffered_dirs: Default::default(), + } + } + + /// Returns whether [`mirrord_protocol`] version allows for buffering directories. + fn buffer_dirs(&self) -> bool { + self.protocol_version + .as_ref() + .is_some_and(|version| READDIR_BATCH_VERSION.matches(version)) + } + + /// Returns whether this proxy is configured to buffer readonly files. + fn buffer_reads(&self) -> bool { + self.file_buffer_size > 0 + } + + #[tracing::instrument(level = Level::TRACE)] + fn layer_forked(&mut self, forked: LayerForked) { + self.remote_files.clone_all(forked.parent, forked.child); + self.remote_dirs.clone_all(forked.parent, forked.child); + } + + #[tracing::instrument(level = Level::TRACE, skip(message_bus))] + async fn layer_closed(&mut self, closed: LayerClosed, message_bus: &mut MessageBus) { + for fd in self.remote_files.remove_all(closed.id) { + self.buffered_files.remove(&fd); + message_bus + .send(ProxyMessage::ToAgent(ClientMessage::FileRequest( + FileRequest::Close(CloseFileRequest { fd }), + ))) + .await; + } + + for remote_fd in self.remote_dirs.remove_all(closed.id) { + self.buffered_dirs.remove(&remote_fd); + message_bus + .send(ProxyMessage::ToAgent(ClientMessage::FileRequest( + FileRequest::CloseDir(CloseDirRequest { remote_fd }), + ))) + .await; + } + } + + #[tracing::instrument(level = Level::TRACE)] + fn protocol_version(&mut self, version: Version) { + self.protocol_version.replace(version); + } + + // #[tracing::instrument(level = Level::TRACE, skip(message_bus))] + async fn file_request( + &mut self, + request: FileRequest, + layer_id: LayerId, + message_id: MessageId, + message_bus: &mut MessageBus, + ) { + match request { + // Should trigger remote close only when the fd is closed in all layer instances. + FileRequest::Close(close) => { + if self.remote_files.remove(layer_id, close.fd) { + self.buffered_files.remove(&close.fd); + message_bus + .send(ClientMessage::FileRequest(FileRequest::Close(close))) + .await; + } + } + + // Should trigger remote close only when the fd is closed in all layer instances. + FileRequest::CloseDir(close) => { + if self.remote_dirs.remove(layer_id, close.remote_fd) { + self.buffered_dirs.remove(&close.remote_fd); + message_bus + .send(ClientMessage::FileRequest(FileRequest::CloseDir(close))) + .await; + } + } + + // May require storing additional data in the request queue. + FileRequest::Open(open) => { + let additional_data = (self.buffer_reads() && open.open_options.is_read_only()) + .then_some(AdditionalRequestData::OpenBuffered) + .unwrap_or_default(); + self.request_queue + .push_back_with_data(message_id, layer_id, additional_data); + message_bus + .send(ClientMessage::FileRequest(FileRequest::Open(open))) + .await; + } + + // May require storing additional data in the request queue. + FileRequest::OpenRelative(open) => { + let additional_data = (self.buffer_reads() && open.open_options.is_read_only()) + .then_some(AdditionalRequestData::OpenBuffered) + .unwrap_or_default(); + self.request_queue + .push_back_with_data(message_id, layer_id, additional_data); + message_bus + .send(ClientMessage::FileRequest(FileRequest::OpenRelative(open))) + .await; + } + + // Try to use local buffer if possible. + FileRequest::Read(read) => match self.buffered_files.get_mut(&read.remote_fd) { + // File is buffered. + Some(data) => { + let from_buffer = data.read_from_buffer(read.buffer_size, data.fd_position); + if let Some(from_buffer) = from_buffer { + let bytes = from_buffer.to_vec(); + data.fd_position += read.buffer_size; + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::Read(Ok( + ReadFileResponse { + bytes, + read_amount: read.buffer_size, + }, + ))), + }) + .await; + } else { + let additional_data = AdditionalRequestData::ReadBuffered { + fd: read.remote_fd, + requested_amount: read.buffer_size, + update_fd_position: true, + }; + self.request_queue.push_back_with_data( + message_id, + layer_id, + additional_data, + ); + message_bus + .send(ClientMessage::FileRequest(FileRequest::ReadLimited( + ReadLimitedFileRequest { + remote_fd: read.remote_fd, + buffer_size: std::cmp::max( + read.buffer_size, + self.file_buffer_size, + ), + start_from: data.fd_position, + }, + ))) + .await; + } + } + + // File is not buffered. + None => { + self.request_queue.push_back(message_id, layer_id); + message_bus + .send(ClientMessage::FileRequest(FileRequest::Read(read))) + .await; + } + }, + + // Try to use local buffer if possible. + FileRequest::ReadLimited(read) => match self.buffered_files.get_mut(&read.remote_fd) { + // File is buffered. + Some(data) => { + let from_buffer = data.read_from_buffer(read.buffer_size, read.start_from); + if let Some(from_buffer) = from_buffer { + let bytes = from_buffer.to_vec(); + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::ReadLimited(Ok( + ReadFileResponse { + bytes, + read_amount: read.buffer_size, + }, + ))), + }) + .await; + } else { + let additional_data = AdditionalRequestData::ReadBuffered { + fd: read.remote_fd, + requested_amount: read.buffer_size, + update_fd_position: false, + }; + self.request_queue.push_back_with_data( + message_id, + layer_id, + additional_data, + ); + message_bus + .send(ClientMessage::FileRequest(FileRequest::ReadLimited( + ReadLimitedFileRequest { + remote_fd: read.remote_fd, + buffer_size: std::cmp::max( + read.buffer_size, + self.file_buffer_size, + ), + start_from: read.start_from, + }, + ))) + .await; + } + } + + // File is not buffered. + None => { + self.request_queue.push_back(message_id, layer_id); + message_bus + .send(ClientMessage::FileRequest(FileRequest::ReadLimited(read))) + .await; + } + }, + + // Try to use local buffer if possible. + FileRequest::ReadDir(read_dir) => match self.buffered_dirs.get_mut(&read_dir.remote_fd) + { + // Directory is buffered. + Some(data) => { + if let Some(direntry) = data.buffered_entries.next() { + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( + ReadDirResponse { + direntry: Some(direntry), + }, + ))), + }) + .await; + } else { + self.request_queue.push_back(message_id, layer_id); + message_bus + .send(ClientMessage::FileRequest(FileRequest::ReadDirBatch( + ReadDirBatchRequest { + remote_fd: read_dir.remote_fd, + amount: Self::READDIR_BATCH_SIZE, + }, + ))) + .await; + } + } + + // Directory is not buffered. + None => { + self.request_queue.push_back(message_id, layer_id); + message_bus + .send(ClientMessage::FileRequest(FileRequest::ReadDir(read_dir))) + .await; + } + }, + + // Not supported in old `mirrord-protocol` versions. + req @ FileRequest::ReadLink(..) => { + let supported = self + .protocol_version + .as_ref() + .is_some_and(|version| READLINK_VERSION.matches(version)); + + if supported { + self.request_queue.push_back(message_id, layer_id); + message_bus + .send(ProxyMessage::ToAgent(ClientMessage::FileRequest(req))) + .await; + } else { + message_bus + .send(ToLayer { + message_id, + message: ProxyToLayerMessage::File(FileResponse::ReadLink(Err( + ResponseError::NotImplemented, + ))), + layer_id, + }) + .await; + } + } + + // Should only be sent from intproxy, not from the layer. + FileRequest::ReadDirBatch(..) => { + unreachable!("ReadDirBatch request is never sent from the layer"); + } + + // May require storing additional data in the request queue. + FileRequest::Seek(mut seek) => { + let additional_data = + match (self.buffered_files.get_mut(&seek.fd), &mut seek.seek_from) { + (Some(data), SeekFromInternal::Current(diff)) => { + let result = u64::try_from(data.fd_position as i128 + *diff as i128); + match result { + Ok(offset) => seek.seek_from = SeekFromInternal::Start(offset), + Err(..) => { + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::Seek( + Err(ResponseError::RemoteIO(RemoteIOError { + raw_os_error: Some(22), // EINVAL + kind: ErrorKindInternal::InvalidInput, + })), + )), + }) + .await; + return; + } + } + + AdditionalRequestData::SeekBuffered { fd: seek.fd } + } + (Some(..), _) => AdditionalRequestData::SeekBuffered { fd: seek.fd }, + _ => AdditionalRequestData::Other, + }; + + self.request_queue + .push_back_with_data(message_id, layer_id, additional_data); + message_bus + .send(ClientMessage::FileRequest(FileRequest::Seek(seek))) + .await; + } + + // Doesn't require any special logic. + other => { + self.request_queue.push_back(message_id, layer_id); + message_bus.send(ClientMessage::FileRequest(other)).await; + } + } + } + + #[tracing::instrument(level = Level::TRACE, skip(message_bus))] + async fn file_response( + &mut self, + response: FileResponse, + message_bus: &mut MessageBus, + ) -> Result<(), FilesProxyError> { + match response { + // Update file maps. + FileResponse::Open(Ok(open)) => { + let (message_id, layer_id, additional_data) = + self.request_queue.pop_front_with_data().ok_or_else(|| { + UnexpectedAgentMessage(DaemonMessage::File(FileResponse::Open(Ok( + open.clone() + )))) + })?; + + self.remote_files.add(layer_id, open.fd); + + if matches!(additional_data, AdditionalRequestData::OpenBuffered) { + self.buffered_files.insert(open.fd, Default::default()); + } + + message_bus + .send(ToLayer { + layer_id, + message_id, + message: ProxyToLayerMessage::File(FileResponse::Open(Ok(open))), + }) + .await; + } + + // Update dir maps. + FileResponse::OpenDir(Ok(open)) => { + let (message_id, layer_id) = self.request_queue.pop_front().ok_or_else(|| { + UnexpectedAgentMessage(DaemonMessage::File(FileResponse::OpenDir(Ok( + open.clone() + )))) + })?; + + self.remote_dirs.add(layer_id, open.fd); + + if self.buffer_dirs() { + self.buffered_dirs.insert(open.fd, Default::default()); + } + + message_bus + .send(ToLayer { + layer_id, + message_id, + message: ProxyToLayerMessage::File(FileResponse::OpenDir(Ok(open))), + }) + .await; + } + + // If the file is buffered, update `files_data`. + FileResponse::ReadLimited(Ok(read)) => { + let (message_id, layer_id, additional_data) = + self.request_queue.pop_front_with_data().ok_or_else(|| { + UnexpectedAgentMessage(DaemonMessage::File(FileResponse::ReadLimited(Ok( + read.clone(), + )))) + })?; + + let AdditionalRequestData::ReadBuffered { + fd, + requested_amount, + update_fd_position, + } = additional_data + else { + // This file is not buffered. + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::ReadLimited(Ok(read))), + }) + .await; + return Ok(()); + }; + + let Some(data) = self.buffered_files.get_mut(&fd) else { + // File must have been closed from other thread in user application. + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::ReadLimited(Err( + ResponseError::NotFound(fd), + ))), + }) + .await; + return Ok(()); + }; + + let bytes = read + .bytes + .get(..requested_amount as usize) + .unwrap_or(&read.bytes) + .to_vec(); + let read_amount = bytes.len() as u64; + let response = ReadFileResponse { bytes, read_amount }; + + data.buffer = read.bytes; + data.buffer_position = data.fd_position; + let message = if update_fd_position { + // User originally sent `FileRequest::Read`. + data.fd_position += response.read_amount; + FileResponse::Read(Ok(response)) + } else { + // User originally sent `FileRequest::ReadLimited`. + FileResponse::ReadLimited(Ok(response)) + }; + + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(message), + }) + .await; + } + + // If the file is buffered, update `files_data`. + FileResponse::Seek(Ok(seek)) => { + let (message_id, layer_id, additional_data) = + self.request_queue.pop_front_with_data().ok_or_else(|| { + UnexpectedAgentMessage(DaemonMessage::File(FileResponse::Seek(Ok( + seek.clone() + )))) + })?; + + if let AdditionalRequestData::SeekBuffered { fd } = additional_data { + let Some(data) = self.buffered_files.get_mut(&fd) else { + // File must have been closed from other thread in user application. + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::Seek(Err( + ResponseError::NotFound(fd), + ))), + }) + .await; + return Ok(()); + }; + + data.fd_position = seek.result_offset; + } + + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::Seek(Ok(seek))), + }) + .await; + } + + // Store extra entries in `dirs_data`. + FileResponse::ReadDirBatch(Ok(batch)) => { + let (message_id, layer_id) = self.request_queue.pop_front().ok_or_else(|| { + UnexpectedAgentMessage(DaemonMessage::File(FileResponse::ReadDirBatch(Ok( + batch.clone(), + )))) + })?; + + let Some(data) = self.buffered_dirs.get_mut(&batch.fd) else { + // Directory must have been closed from other thread in user application. + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Err( + ResponseError::NotFound(batch.fd), + ))), + }) + .await; + return Ok(()); + }; + + let mut entries = batch.dir_entries.into_iter(); + let direntry = entries.next(); + data.buffered_entries = entries; + + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( + ReadDirResponse { direntry }, + ))), + }) + .await; + } + + // Doesn't require any special logic. + other => { + let (message_id, layer_id) = self + .request_queue + .pop_front() + .ok_or_else(|| UnexpectedAgentMessage(DaemonMessage::File(other.clone())))?; + message_bus + .send(ToLayer { + message_id, + layer_id, + message: ProxyToLayerMessage::File(other), + }) + .await; + } + } + + Ok(()) + } +} + +impl BackgroundTask for FilesProxy { + type MessageIn = FilesProxyMessage; + type MessageOut = ProxyMessage; + type Error = FilesProxyError; + + async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { + while let Some(message) = message_bus.recv().await { + tracing::trace!(?message, "new message in message_bus"); + + match message { + FilesProxyMessage::FileReq(message_id, layer_id, request) => { + self.file_request(request, layer_id, message_id, message_bus) + .await; + } + FilesProxyMessage::FileRes(response) => { + self.file_response(response, message_bus).await?; + } + FilesProxyMessage::LayerClosed(closed) => { + self.layer_closed(closed, message_bus).await; + } + FilesProxyMessage::LayerForked(forked) => self.layer_forked(forked), + FilesProxyMessage::ProtocolVersion(version) => self.protocol_version(version), + } + } + + tracing::trace!("message bus closed, exiting"); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use mirrord_intproxy_protocol::{LayerId, ProxyToLayerMessage}; + use mirrord_protocol::{ + file::{ + FdOpenDirRequest, OpenDirResponse, OpenFileRequest, OpenFileResponse, + OpenOptionsInternal, ReadDirBatchRequest, ReadDirBatchResponse, ReadDirRequest, + ReadDirResponse, ReadFileRequest, ReadFileResponse, ReadLimitedFileRequest, + SeekFileRequest, SeekFileResponse, SeekFromInternal, + }, + ClientMessage, ErrorKindInternal, FileRequest, FileResponse, RemoteIOError, ResponseError, + }; + use rstest::rstest; + use semver::Version; + + use super::{FilesProxy, FilesProxyMessage}; + use crate::{ + background_tasks::{BackgroundTasks, TaskSender, TaskUpdate}, + error::IntProxyError, + main_tasks::{MainTaskId, ProxyMessage, ToLayer}, + }; + + /// Sets up a [`TaskSender`] and [`BackgroundTasks`] for a functioning [`FilesProxy`]. + /// + /// - `protocol_version`: allows specifying the version of the protocol to use for testing out + /// potential mismatches in messages. + /// - `buffer_reads`: configures buffering readonly files + async fn setup_proxy( + protocol_version: Version, + file_buffer_size: u64, + ) -> ( + TaskSender, + BackgroundTasks, + ) { + let mut tasks: BackgroundTasks = + Default::default(); + + let proxy = tasks.register( + FilesProxy::new(file_buffer_size), + MainTaskId::FilesProxy, + 32, + ); + + proxy + .send(FilesProxyMessage::ProtocolVersion(protocol_version)) + .await; + + (proxy, tasks) + } + + /// Convenience for opening a dir. + async fn prepare_dir( + proxy: &TaskSender, + tasks: &mut BackgroundTasks, + ) { + let request = FileRequest::FdOpenDir(FdOpenDirRequest { remote_fd: 0xdad }); + proxy + .send(FilesProxyMessage::FileReq(0xbad, LayerId(0xa55), request)) + .await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToAgent( + ClientMessage::FileRequest(FileRequest::FdOpenDir(FdOpenDirRequest { + remote_fd: 0xdad + }),) + ))) + ), + "Mismatched message for `FdOpenDirRequest` {update:?}!" + ); + + let response = FileResponse::OpenDir(Ok(OpenDirResponse { fd: 0xdad })); + proxy.send(FilesProxyMessage::FileRes(response)).await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer { + message_id: 0xbad, + layer_id: LayerId(0xa55), + message: ProxyToLayerMessage::File(FileResponse::OpenDir(Ok( + OpenDirResponse { .. } + ))) + }))) + ), + "Mismatched message for `OpenDirResponse` {update:?}!" + ); + } + + #[tokio::test] + async fn old_protocol_uses_read_dir_request() { + let (proxy, mut tasks) = setup_proxy(Version::new(0, 1, 0), 0).await; + + prepare_dir(&proxy, &mut tasks).await; + + let readdir_request = FileRequest::ReadDir(ReadDirRequest { remote_fd: 0xdad }); + proxy + .send(FilesProxyMessage::FileReq( + 0xbad, + LayerId(0xa55), + readdir_request, + )) + .await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToAgent( + ClientMessage::FileRequest(FileRequest::ReadDir(ReadDirRequest { .. })) + ))) + ), + "Mismatched message for `ReadDirRequest` {update:?}!" + ); + + let readdir_response = FileResponse::ReadDir(Ok(ReadDirResponse { direntry: None })); + proxy + .send(FilesProxyMessage::FileRes(readdir_response)) + .await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer { + message_id: 0xbad, + layer_id: LayerId(0xa55), + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( + ReadDirResponse { .. } + ))) + }))) + ), + "Mismatched message for `ReadDirResponse` {update:?}!" + ); + + drop(proxy); + let results = tasks.results().await; + for (_, result) in results { + assert!(result.is_ok(), "{result:?}"); + } + } + + #[tokio::test] + async fn new_protocol_uses_read_dir_batch_request() { + let (proxy, mut tasks) = setup_proxy(Version::new(1, 9, 0), 0).await; + + prepare_dir(&proxy, &mut tasks).await; + + let request = FileRequest::ReadDir(ReadDirRequest { remote_fd: 0xdad }); + proxy + .send(FilesProxyMessage::FileReq(0xbad, LayerId(0xa55), request)) + .await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToAgent( + ClientMessage::FileRequest(FileRequest::ReadDirBatch(ReadDirBatchRequest { + remote_fd: 0xdad, + amount: FilesProxy::READDIR_BATCH_SIZE, + })) + ))) + ), + "Mismatched message for `ReadDirBatchRequest` {update:?}!" + ); + + let response = FileResponse::ReadDirBatch(Ok(ReadDirBatchResponse { + fd: 0xdad, + dir_entries: Vec::new(), + })); + proxy.send(FilesProxyMessage::FileRes(response)).await; + let (_, update) = tasks.next().await.unzip(); + + assert!( + matches!( + update, + Some(TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer { + message_id: 0xbad, + layer_id: LayerId(0xa55), + message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( + ReadDirResponse { .. } + ))) + }))) + ), + "Mismatched message for `ReadDirBatchResponse` {update:?}!" + ); + + drop(proxy); + let results = tasks.results().await; + for (_, result) in results { + assert!(result.is_ok(), "{result:?}"); + } + } + + /// Helper function for opening a file in a running [`FilesProxy`]. + async fn open_file( + proxy: &TaskSender, + tasks: &mut BackgroundTasks, + readonly: bool, + ) -> u64 { + let message_id = rand::random(); + let fd = rand::random(); + + let request = FileRequest::Open(OpenFileRequest { + path: PathBuf::from("/some/path"), + open_options: OpenOptionsInternal { + read: true, + write: !readonly, + ..Default::default() + }, + }); + proxy + .send(FilesProxyMessage::FileReq( + message_id, + LayerId(0), + request.clone(), + )) + .await; + let update = tasks.next().await.unwrap().1.unwrap_message(); + assert_eq!( + update, + ProxyMessage::ToAgent(ClientMessage::FileRequest(request)), + ); + + let response = FileResponse::Open(Ok(OpenFileResponse { fd })); + proxy + .send(FilesProxyMessage::FileRes(response.clone())) + .await; + let update = tasks.next().await.unwrap().1.unwrap_message(); + assert_eq!( + update, + ProxyMessage::ToLayer(ToLayer { + message_id, + layer_id: LayerId(0), + message: ProxyToLayerMessage::File(response), + }) + ); + + fd + } + + async fn make_read_request( + proxy: &TaskSender, + tasks: &mut BackgroundTasks, + remote_fd: u64, + buffer_size: u64, + start_from: Option, + ) -> ProxyMessage { + let message_id = rand::random(); + let request = if let Some(start_from) = start_from { + FileRequest::ReadLimited(ReadLimitedFileRequest { + remote_fd, + buffer_size, + start_from, + }) + } else { + FileRequest::Read(ReadFileRequest { + remote_fd, + buffer_size, + }) + }; + + proxy + .send(FilesProxyMessage::FileReq(message_id, LayerId(0), request)) + .await; + tasks.next().await.unwrap().1.unwrap_message() + } + + async fn respond_to_read_request( + proxy: &TaskSender, + tasks: &mut BackgroundTasks, + data: Vec, + limited: bool, + ) -> ProxyMessage { + let response = ReadFileResponse { + read_amount: data.len() as u64, + bytes: data, + }; + let response = if limited { + FileResponse::ReadLimited(Ok(response)) + } else { + FileResponse::Read(Ok(response)) + }; + + proxy.send(FilesProxyMessage::FileRes(response)).await; + tasks.next().await.unwrap().1.unwrap_message() + } + + #[rstest] + #[case(true, false)] + #[case(false, true)] + #[case(false, false)] + #[tokio::test] + async fn reading_from_unbuffered_file(#[case] readonly: bool, #[case] buffering_enabled: bool) { + let (proxy, mut tasks) = setup_proxy( + mirrord_protocol::VERSION.clone(), + buffering_enabled.then_some(4096).unwrap_or_default(), + ) + .await; + + let fd = open_file(&proxy, &mut tasks, readonly).await; + + let update = make_read_request(&proxy, &mut tasks, fd, 10, None).await; + assert_eq!( + update, + ProxyMessage::ToAgent(ClientMessage::FileRequest(FileRequest::Read( + ReadFileRequest { + remote_fd: fd, + buffer_size: 10, + } + ))), + ); + + let update = respond_to_read_request(&proxy, &mut tasks, vec![0; 10], false) + .await + .unwrap_proxy_to_layer_message(); + assert_eq!( + update, + ProxyToLayerMessage::File(FileResponse::Read(Ok(ReadFileResponse { + bytes: vec![0; 10], + read_amount: 10, + }))), + ); + + let update = make_read_request(&proxy, &mut tasks, fd, 1, Some(13)).await; + assert_eq!( + update, + ProxyMessage::ToAgent(ClientMessage::FileRequest(FileRequest::ReadLimited( + ReadLimitedFileRequest { + remote_fd: fd, + buffer_size: 1, + start_from: 13, + } + ))), + ); + + let update = respond_to_read_request(&proxy, &mut tasks, vec![2], true) + .await + .unwrap_proxy_to_layer_message(); + assert_eq!( + update, + ProxyToLayerMessage::File(FileResponse::ReadLimited(Ok(ReadFileResponse { + bytes: vec![2], + read_amount: 1, + }))), + ); + } + + #[tokio::test] + async fn reading_from_buffered_file() { + let (proxy, mut tasks) = setup_proxy(mirrord_protocol::VERSION.clone(), 4096).await; + + let fd = open_file(&proxy, &mut tasks, true).await; + let contents = std::iter::repeat(0_u8..=255).flatten(); + + let update = make_read_request(&proxy, &mut tasks, fd, 1, None).await; + assert_eq!( + update, + ProxyMessage::ToAgent(ClientMessage::FileRequest(FileRequest::ReadLimited( + ReadLimitedFileRequest { + remote_fd: fd, + buffer_size: 4096, + start_from: 0, + } + ))), + ); + + let data = contents.clone().take(4096).collect::>(); + let update = respond_to_read_request(&proxy, &mut tasks, data, true) + .await + .unwrap_proxy_to_layer_message(); + assert_eq!( + update, + ProxyToLayerMessage::File(FileResponse::Read(Ok(ReadFileResponse { + bytes: vec![0], + read_amount: 1, + }))), + ); + + for i in 1..=3 { + let update = make_read_request(&proxy, &mut tasks, fd, 1, None) + .await + .unwrap_proxy_to_layer_message(); + assert_eq!( + update, + ProxyToLayerMessage::File(FileResponse::Read(Ok(ReadFileResponse { + bytes: vec![i], + read_amount: 1, + }))), + ); + } + + let expected = contents.clone().skip(256).take(512).collect::>(); + let update = make_read_request(&proxy, &mut tasks, fd, 512, Some(256)) + .await + .unwrap_proxy_to_layer_message(); + assert_eq!( + update, + ProxyToLayerMessage::File(FileResponse::ReadLimited(Ok(ReadFileResponse { + bytes: expected, + read_amount: 512, + }))), + ); + + let update = make_read_request(&proxy, &mut tasks, fd, 4096 * 2, None).await; + assert_eq!( + update, + ProxyMessage::ToAgent(ClientMessage::FileRequest(FileRequest::ReadLimited( + ReadLimitedFileRequest { + remote_fd: fd, + buffer_size: 4096 * 2, + start_from: 4, + } + ))), + ); + + let data = contents.clone().skip(4).take(4096).collect::>(); + let update = respond_to_read_request(&proxy, &mut tasks, data.clone(), true) + .await + .unwrap_proxy_to_layer_message(); + assert_eq!( + update, + ProxyToLayerMessage::File(FileResponse::Read(Ok(ReadFileResponse { + bytes: data, + read_amount: 4096, + }))), + ); + + let seek_request = FileRequest::Seek(SeekFileRequest { + fd, + seek_from: SeekFromInternal::Start(444), + }); + proxy + .send(FilesProxyMessage::FileReq( + rand::random(), + LayerId(0), + seek_request.clone(), + )) + .await; + let update = tasks.next().await.unwrap().1.unwrap_message(); + assert_eq!( + update, + ProxyMessage::ToAgent(ClientMessage::FileRequest(seek_request)), + ); + let seek_response = FileResponse::Seek(Ok(SeekFileResponse { result_offset: 444 })); + proxy + .send(FilesProxyMessage::FileRes(seek_response.clone())) + .await; + let update = tasks + .next() + .await + .unwrap() + .1 + .unwrap_message() + .unwrap_proxy_to_layer_message(); + assert_eq!(update, ProxyToLayerMessage::File(seek_response),); + + let expected = contents.clone().skip(444).take(10).collect::>(); + let update = make_read_request(&proxy, &mut tasks, fd, 10, None) + .await + .unwrap_proxy_to_layer_message(); + assert_eq!( + update, + ProxyToLayerMessage::File(FileResponse::Read(Ok(ReadFileResponse { + bytes: expected, + read_amount: 10, + }))) + ); + } + + #[tokio::test] + async fn seeking_in_buffered_file() { + let (proxy, mut tasks) = setup_proxy(mirrord_protocol::VERSION.clone(), 4096).await; + + let fd = open_file(&proxy, &mut tasks, true).await; + let contents = std::iter::repeat(0_u8..=255).flatten(); + + let update = make_read_request(&proxy, &mut tasks, fd, 20, None).await; + assert_eq!( + update, + ProxyMessage::ToAgent(ClientMessage::FileRequest(FileRequest::ReadLimited( + ReadLimitedFileRequest { + remote_fd: fd, + buffer_size: 4096, + start_from: 0, + } + ))), + ); + + let data = contents.clone().take(4096).collect::>(); + let expected = contents.take(20).collect::>(); + let update = respond_to_read_request(&proxy, &mut tasks, data, true) + .await + .unwrap_proxy_to_layer_message(); + assert_eq!( + update, + ProxyToLayerMessage::File(FileResponse::Read(Ok(ReadFileResponse { + bytes: expected, + read_amount: 20, + }))), + ); + + let seek_request = FileRequest::Seek(SeekFileRequest { + fd, + seek_from: SeekFromInternal::Current(-30), + }); + proxy + .send(FilesProxyMessage::FileReq( + rand::random(), + LayerId(0), + seek_request.clone(), + )) + .await; + let update = tasks + .next() + .await + .unwrap() + .1 + .unwrap_message() + .unwrap_proxy_to_layer_message(); + assert_eq!( + update, + ProxyToLayerMessage::File(FileResponse::Seek(Err(ResponseError::RemoteIO( + RemoteIOError { + raw_os_error: Some(22), + kind: ErrorKindInternal::InvalidInput, + } + )))) + ); + + let seek_request = FileRequest::Seek(SeekFileRequest { + fd, + seek_from: SeekFromInternal::Current(-10), + }); + proxy + .send(FilesProxyMessage::FileReq( + rand::random(), + LayerId(0), + seek_request.clone(), + )) + .await; + let update = tasks.next().await.unwrap().1.unwrap_message(); + assert_eq!( + update, + ProxyMessage::ToAgent(ClientMessage::FileRequest(FileRequest::Seek( + SeekFileRequest { + fd, + seek_from: SeekFromInternal::Start(10), + } + ))), + ); + let seek_response = FileResponse::Seek(Ok(SeekFileResponse { result_offset: 10 })); + proxy + .send(FilesProxyMessage::FileRes(seek_response.clone())) + .await; + let update = tasks + .next() + .await + .unwrap() + .1 + .unwrap_message() + .unwrap_proxy_to_layer_message(); + assert_eq!(update, ProxyToLayerMessage::File(seek_response),); + } +} diff --git a/mirrord/intproxy/src/proxies/outgoing.rs b/mirrord/intproxy/src/proxies/outgoing.rs index 0f327185a26..d7782053aa9 100644 --- a/mirrord/intproxy/src/proxies/outgoing.rs +++ b/mirrord/intproxy/src/proxies/outgoing.rs @@ -8,7 +8,7 @@ use mirrord_intproxy_protocol::{ }; use mirrord_protocol::{ outgoing::{tcp::DaemonTcpOutgoing, udp::DaemonUdpOutgoing, DaemonConnect, DaemonRead}, - ConnectionId, RemoteResult, ResponseError, + ConnectionId, DaemonMessage, RemoteResult, ResponseError, }; use thiserror::Error; use tracing::Level; @@ -16,9 +16,10 @@ use tracing::Level; use self::interceptor::Interceptor; use crate::{ background_tasks::{BackgroundTask, BackgroundTasks, MessageBus, TaskSender, TaskUpdate}, + error::UnexpectedAgentMessage, main_tasks::ToLayer, proxies::outgoing::net_protocol_ext::NetProtocolExt, - request_queue::{RequestQueue, RequestQueueEmpty}, + request_queue::RequestQueue, ProxyMessage, }; @@ -35,8 +36,8 @@ pub enum OutgoingProxyError { ResponseError(#[from] ResponseError), /// The agent sent a [`DaemonConnect`] response, but the [`RequestQueue`] for layer's connec /// requests was empty. This should never happen. - #[error("failed to match connect response: {0}")] - RequestQueueEmpty(#[from] RequestQueueEmpty), + #[error(transparent)] + UnexpectedAgentMessage(#[from] UnexpectedAgentMessage), /// The proxy failed to prepare a new local socket for the intercepted connection. #[error("failed to prepare local socket: {0}")] SocketSetupError(#[from] io::Error), @@ -142,7 +143,17 @@ impl OutgoingProxy { protocol: NetProtocol, message_bus: &mut MessageBus, ) -> Result<(), OutgoingProxyError> { - let (message_id, layer_id) = self.queue(protocol).get()?; + let (message_id, layer_id) = self.queue(protocol).pop_front().ok_or_else(|| { + let message = match protocol { + NetProtocol::Datagrams => { + DaemonMessage::UdpOutgoing(DaemonUdpOutgoing::Connect(connect.clone())) + } + NetProtocol::Stream => { + DaemonMessage::TcpOutgoing(DaemonTcpOutgoing::Connect(connect.clone())) + } + }; + UnexpectedAgentMessage(message) + })?; let connect = match connect { Ok(connect) => connect, @@ -203,7 +214,8 @@ impl OutgoingProxy { request: OutgoingConnectRequest, message_bus: &mut MessageBus, ) { - self.queue(request.protocol).insert(message_id, session_id); + self.queue(request.protocol) + .push_back(message_id, session_id); let msg = request.protocol.wrap_agent_connect(request.remote_address); message_bus.send(ProxyMessage::ToAgent(msg)).await; diff --git a/mirrord/intproxy/src/proxies/simple.rs b/mirrord/intproxy/src/proxies/simple.rs index f7fe874068c..dae7881247e 100644 --- a/mirrord/intproxy/src/proxies/simple.rs +++ b/mirrord/intproxy/src/proxies/simple.rs @@ -1,335 +1,66 @@ //! The most basic proxying logic. Handles cases when the only job to do in the internal proxy is to //! pass requests and responses between the layer and the agent. -use std::{collections::HashMap, vec::IntoIter}; +use std::collections::HashMap; use mirrord_intproxy_protocol::{LayerId, MessageId, ProxyToLayerMessage}; use mirrord_protocol::{ dns::{GetAddrInfoRequest, GetAddrInfoResponse}, - file::{ - CloseDirRequest, CloseFileRequest, DirEntryInternal, OpenDirResponse, OpenFileResponse, - ReadDirBatchRequest, ReadDirBatchResponse, ReadDirRequest, ReadDirResponse, - READDIR_BATCH_VERSION, - }, - ClientMessage, FileRequest, FileResponse, GetEnvVarsRequest, RemoteResult, ResponseError, + ClientMessage, DaemonMessage, GetEnvVarsRequest, RemoteResult, }; -use semver::Version; use thiserror::Error; use crate::{ background_tasks::{BackgroundTask, MessageBus}, - main_tasks::{LayerClosed, LayerForked, ToLayer}, - remote_resources::RemoteResources, - request_queue::{RequestQueue, RequestQueueEmpty}, + error::UnexpectedAgentMessage, + main_tasks::ToLayer, + request_queue::RequestQueue, ProxyMessage, }; #[derive(Debug)] pub enum SimpleProxyMessage { - FileReq(MessageId, LayerId, FileRequest), - FileRes(FileResponse), AddrInfoReq(MessageId, LayerId, GetAddrInfoRequest), AddrInfoRes(GetAddrInfoResponse), - LayerForked(LayerForked), - LayerClosed(LayerClosed), GetEnvReq(MessageId, LayerId, GetEnvVarsRequest), GetEnvRes(RemoteResult>), - ProtocolVersion(Version), -} - -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub(crate) enum RemoteFd { - File(u64), - Dir(u64), -} - -#[derive(Clone)] -pub(crate) enum FileResource { - File, - Dir { - dirs_iter: IntoIter, - }, } #[derive(Error, Debug)] -enum FileError { - #[error("Resource `{0}` not found!")] - MissingResource(u64), - - #[error("Dir operation called on file `{0}`!")] - DirOnFile(u64), -} - -impl From for ResponseError { - fn from(file_fail: FileError) -> Self { - match file_fail { - FileError::MissingResource(remote_fd) => ResponseError::NotFound(remote_fd), - FileError::DirOnFile(remote_fd) => ResponseError::NotDirectory(remote_fd), - } - } -} - -impl FileResource { - fn next_dir(&mut self, remote_fd: u64) -> Result, FileError> { - match self { - FileResource::Dir { dirs_iter } => dirs_iter.next().map(Ok).transpose(), - FileResource::File => Err(FileError::DirOnFile(remote_fd)), - } - } -} +#[error(transparent)] +pub struct SimpleProxyError(#[from] UnexpectedAgentMessage); /// For passing messages between the layer and the agent without custom internal logic. /// Run as a [`BackgroundTask`]. #[derive(Default)] pub struct SimpleProxy { - /// Remote descriptors for open files and directories. Allows tracking across layer forks. - remote_fds: RemoteResources, - /// For [`FileRequest`]s. - file_reqs: RequestQueue, /// For [`GetAddrInfoRequest`]s. addr_info_reqs: RequestQueue, /// For [`GetEnvVarsRequest`]s. get_env_reqs: RequestQueue, } -impl SimpleProxy { - /// `readdir` works by keeping an iterator of all the `dir`s, and a call to it is - /// equivalent to doing `iterator.next()`. - /// - /// For efficiency, whenever we receive a `readdir` request from the layer, we try to - /// read more than just `next()` from the agent, while returning just the next `direntry` - /// back to layer. - async fn handle_readdir( - &mut self, - layer_id: LayerId, - remote_fd: u64, - message_id: u64, - protocol_version: Option<&Version>, - message_bus: &mut MessageBus, - ) -> Result<(), FileError> { - let resource = self - .remote_fds - .get_mut(&layer_id, &RemoteFd::Dir(remote_fd)) - .ok_or(FileError::MissingResource(remote_fd))?; - - if let Some(dir) = resource.next_dir(remote_fd)? { - message_bus - .send(ToLayer { - message_id, - message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( - ReadDirResponse { - direntry: Some(dir), - }, - ))), - layer_id, - }) - .await; - } else { - self.file_reqs.insert(message_id, layer_id); - - let request = - if protocol_version.is_some_and(|version| READDIR_BATCH_VERSION.matches(version)) { - FileRequest::ReadDirBatch(ReadDirBatchRequest { - remote_fd, - amount: 128, - }) - } else { - FileRequest::ReadDir(ReadDirRequest { remote_fd }) - }; - - // Convert it into a `ReadDirBatch` for the agent. - message_bus - .send(ProxyMessage::ToAgent(ClientMessage::FileRequest(request))) - .await; - } - - Ok(()) - } -} - impl BackgroundTask for SimpleProxy { - type Error = RequestQueueEmpty; + type Error = SimpleProxyError; type MessageIn = SimpleProxyMessage; type MessageOut = ProxyMessage; - async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), RequestQueueEmpty> { - let mut protocol_version = None; - + async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { while let Some(msg) = message_bus.recv().await { tracing::trace!(?msg, "new message in message_bus"); match msg { - SimpleProxyMessage::ProtocolVersion(new_protocol_version) => { - protocol_version = Some(new_protocol_version); - } - SimpleProxyMessage::FileReq( - _, - layer_id, - FileRequest::Close(CloseFileRequest { fd }), - ) => { - let do_close = self.remote_fds.remove(layer_id, RemoteFd::File(fd)); - if do_close { - message_bus - .send(ClientMessage::FileRequest(FileRequest::Close( - CloseFileRequest { fd }, - ))) - .await; - } - } - SimpleProxyMessage::FileReq( - _, - layer_id, - FileRequest::CloseDir(CloseDirRequest { remote_fd }), - ) => { - let do_close = self.remote_fds.remove(layer_id, RemoteFd::Dir(remote_fd)); - if do_close { - message_bus - .send(ClientMessage::FileRequest(FileRequest::CloseDir( - CloseDirRequest { remote_fd }, - ))) - .await; - } - } - SimpleProxyMessage::FileReq( - message_id, - layer_id, - FileRequest::ReadDir(ReadDirRequest { remote_fd }), - ) => { - if let Err(fail) = self - .handle_readdir( - layer_id, - remote_fd, - message_id, - protocol_version.as_ref(), - message_bus, - ) - .await - { - // Send local failure to layer. - message_bus - .send(ToLayer { - message_id, - message: ProxyToLayerMessage::File(FileResponse::ReadDir(Err( - fail.into(), - ))), - layer_id, - }) - .await; - } - } - // TODO(alex): We can remove this case when users are up-to-date for `readlink`. - SimpleProxyMessage::FileReq( - message_id, - layer_id, - req @ FileRequest::ReadLink(_), - ) => { - if protocol_version - .as_ref() - .is_some_and(|version| READDIR_BATCH_VERSION.matches(version)) - { - self.file_reqs.insert(message_id, layer_id); - message_bus - .send(ProxyMessage::ToAgent(ClientMessage::FileRequest(req))) - .await; - } else { - message_bus - .send(ToLayer { - message_id, - message: ProxyToLayerMessage::File(FileResponse::ReadLink(Err( - ResponseError::NotImplemented, - ))), - layer_id, - }) - .await; - } - } - SimpleProxyMessage::FileReq(message_id, layer_id, req) => { - self.file_reqs.insert(message_id, layer_id); - message_bus - .send(ProxyMessage::ToAgent(ClientMessage::FileRequest(req))) - .await; - } - SimpleProxyMessage::FileRes(FileResponse::Open(Ok(OpenFileResponse { fd }))) => { - let (message_id, layer_id) = self.file_reqs.get()?; - - self.remote_fds - .add(layer_id, RemoteFd::File(fd), FileResource::File); - - message_bus - .send(ToLayer { - message_id, - message: ProxyToLayerMessage::File(FileResponse::Open(Ok( - OpenFileResponse { fd }, - ))), - layer_id, - }) - .await; - } - SimpleProxyMessage::FileRes(FileResponse::OpenDir(Ok(OpenDirResponse { fd }))) => { - let (message_id, layer_id) = self.file_reqs.get()?; - - self.remote_fds.add( - layer_id, - RemoteFd::Dir(fd), - FileResource::Dir { - dirs_iter: IntoIter::default(), - }, - ); - - message_bus - .send(ToLayer { - message_id, - message: ProxyToLayerMessage::File(FileResponse::OpenDir(Ok( - OpenDirResponse { fd }, - ))), - layer_id, - }) - .await; - } - SimpleProxyMessage::FileRes(FileResponse::ReadDirBatch(Ok( - ReadDirBatchResponse { fd, dir_entries }, - ))) => { - let (message_id, layer_id) = self.file_reqs.get()?; - - let mut entries_iter = dir_entries.into_iter(); - let direntry = entries_iter.next(); - - message_bus - .send(ToLayer { - message_id, - message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( - ReadDirResponse { direntry }, - ))), - layer_id, - }) - .await; - - if let Some(FileResource::Dir { dirs_iter }) = - self.remote_fds.get_mut(&layer_id, &RemoteFd::Dir(fd)) - { - *dirs_iter = entries_iter; - } - } - SimpleProxyMessage::FileRes(res) => { - let (message_id, layer_id) = self.file_reqs.get()?; - message_bus - .send(ToLayer { - message_id, - message: ProxyToLayerMessage::File(res), - layer_id, - }) - .await; - } SimpleProxyMessage::AddrInfoReq(message_id, session_id, req) => { - self.addr_info_reqs.insert(message_id, session_id); + self.addr_info_reqs.push_back(message_id, session_id); message_bus - .send(ProxyMessage::ToAgent(ClientMessage::GetAddrInfoRequest( - req, - ))) + .send(ClientMessage::GetAddrInfoRequest(req)) .await; } SimpleProxyMessage::AddrInfoRes(res) => { - let (message_id, layer_id) = self.addr_info_reqs.get()?; + let (message_id, layer_id) = + self.addr_info_reqs.pop_front().ok_or_else(|| { + UnexpectedAgentMessage(DaemonMessage::GetAddrInfoResponse(res.clone())) + })?; message_bus .send(ToLayer { message_id, @@ -338,29 +69,17 @@ impl BackgroundTask for SimpleProxy { }) .await; } - SimpleProxyMessage::LayerClosed(LayerClosed { id }) => { - for to_close in self.remote_fds.remove_all(id) { - let req = match to_close { - RemoteFd::Dir(remote_fd) => { - FileRequest::CloseDir(CloseDirRequest { remote_fd }) - } - RemoteFd::File(fd) => FileRequest::Close(CloseFileRequest { fd }), - }; - - message_bus.send(ClientMessage::FileRequest(req)).await; - } - } - SimpleProxyMessage::LayerForked(LayerForked { child, parent }) => { - self.remote_fds.clone_all(parent, child); - } SimpleProxyMessage::GetEnvReq(message_id, layer_id, req) => { - self.get_env_reqs.insert(message_id, layer_id); + self.get_env_reqs.push_back(message_id, layer_id); message_bus - .send(ProxyMessage::ToAgent(ClientMessage::GetEnvVarsRequest(req))) + .send(ClientMessage::GetEnvVarsRequest(req)) .await; } SimpleProxyMessage::GetEnvRes(res) => { - let (message_id, layer_id) = self.get_env_reqs.get()?; + let (message_id, layer_id) = + self.get_env_reqs.pop_front().ok_or_else(|| { + UnexpectedAgentMessage(DaemonMessage::GetEnvVarsResponse(res.clone())) + })?; message_bus .send(ToLayer { message_id, @@ -373,199 +92,7 @@ impl BackgroundTask for SimpleProxy { } tracing::trace!("message bus closed, exiting"); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - - use mirrord_intproxy_protocol::{LayerId, ProxyToLayerMessage}; - use mirrord_protocol::{ - file::{ - FdOpenDirRequest, OpenDirResponse, ReadDirBatchRequest, ReadDirBatchResponse, - ReadDirRequest, ReadDirResponse, - }, - ClientMessage, FileRequest, FileResponse, - }; - use semver::Version; - - use super::SimpleProxy; - use crate::{ - background_tasks::{BackgroundTasks, TaskSender, TaskUpdate}, - error::IntProxyError, - main_tasks::{MainTaskId, ProxyMessage, ToLayer}, - proxies::simple::SimpleProxyMessage, - }; - - /// Sets up a [`TaskSender`] and [`BackgroundTasks`] for a functioning [`SimpleProxy`]. - /// - /// - `protocol_version`: allows specifying the version of the protocol to use for testing out - /// potential mismatches in messages. - async fn setup_proxy( - protocol_version: Version, - ) -> ( - TaskSender, - BackgroundTasks, - ) { - let mut tasks: BackgroundTasks = - Default::default(); - let proxy = tasks.register(SimpleProxy::default(), MainTaskId::SimpleProxy, 32); - - proxy - .send(SimpleProxyMessage::ProtocolVersion(protocol_version)) - .await; - - (proxy, tasks) - } - - /// Convenience for opening a dir. - async fn prepare_dir( - proxy: &TaskSender, - tasks: &mut BackgroundTasks, - ) { - let request = FileRequest::FdOpenDir(FdOpenDirRequest { remote_fd: 0xdad }); - proxy - .send(SimpleProxyMessage::FileReq(0xbad, LayerId(0xa55), request)) - .await; - let (_, update) = tasks.next().await.unzip(); - - assert!( - matches!( - update, - Some(TaskUpdate::Message(ProxyMessage::ToAgent( - ClientMessage::FileRequest(FileRequest::FdOpenDir(FdOpenDirRequest { .. }),) - ))) - ), - "Mismatched message for `FdOpenDirRequest` {update:?}!" - ); - - let response = FileResponse::OpenDir(Ok(OpenDirResponse { fd: 0xdad })); - proxy.send(SimpleProxyMessage::FileRes(response)).await; - let (_, update) = tasks.next().await.unzip(); - - assert!( - matches!( - update, - Some(TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer { - message_id: 0xbad, - layer_id: LayerId(0xa55), - message: ProxyToLayerMessage::File(FileResponse::OpenDir(Ok( - OpenDirResponse { .. } - ))) - }))) - ), - "Mismatched message for `OpenDirResponse` {update:?}!" - ); - } - - #[tokio::test] - async fn old_protocol_uses_read_dir_request() { - let (proxy, mut tasks) = setup_proxy(Version::new(0, 1, 0)).await; - - prepare_dir(&proxy, &mut tasks).await; - - let readdir_request = FileRequest::ReadDir(ReadDirRequest { remote_fd: 0xdad }); - proxy - .send(SimpleProxyMessage::FileReq( - 0xbad, - LayerId(0xa55), - readdir_request, - )) - .await; - let (_, update) = tasks.next().await.unzip(); - - assert!( - matches!( - update, - Some(TaskUpdate::Message(ProxyMessage::ToAgent( - ClientMessage::FileRequest(FileRequest::ReadDir(ReadDirRequest { .. })) - ))) - ), - "Mismatched message for `ReadDirRequest` {update:?}!" - ); - - let readdir_response = FileResponse::ReadDir(Ok(ReadDirResponse { direntry: None })); - proxy - .send(SimpleProxyMessage::FileRes(readdir_response)) - .await; - let (_, update) = tasks.next().await.unzip(); - - assert!( - matches!( - update, - Some(TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer { - message_id: 0xbad, - layer_id: LayerId(0xa55), - message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( - ReadDirResponse { .. } - ))) - }))) - ), - "Mismatched message for `ReadDirResponse` {update:?}!" - ); - - drop(proxy); - let results = tasks.results().await; - for (_, result) in results { - assert!(result.is_ok(), "{result:?}"); - } - } - - #[tokio::test] - async fn new_protocol_uses_read_dir_batch_request() { - let (proxy, mut tasks) = setup_proxy(Version::new(1, 8, 3)).await; - - prepare_dir(&proxy, &mut tasks).await; - - let request = FileRequest::ReadDirBatch(ReadDirBatchRequest { - remote_fd: 0xdad, - amount: 0xca7, - }); - proxy - .send(SimpleProxyMessage::FileReq(0xbad, LayerId(0xa55), request)) - .await; - let (_, update) = tasks.next().await.unzip(); - - assert!( - matches!( - update, - Some(TaskUpdate::Message(ProxyMessage::ToAgent( - ClientMessage::FileRequest(FileRequest::ReadDirBatch(ReadDirBatchRequest { - remote_fd: 0xdad, - amount: 0xca7 - })) - ))) - ), - "Mismatched message for `ReadDirBatchRequest` {update:?}!" - ); - - let response = FileResponse::ReadDirBatch(Ok(ReadDirBatchResponse { - fd: 0xdad, - dir_entries: Vec::new(), - })); - proxy.send(SimpleProxyMessage::FileRes(response)).await; - let (_, update) = tasks.next().await.unzip(); - - assert!( - matches!( - update, - Some(TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer { - message_id: 0xbad, - layer_id: LayerId(0xa55), - message: ProxyToLayerMessage::File(FileResponse::ReadDir(Ok( - ReadDirResponse { .. } - ))) - }))) - ), - "Mismatched message for `ReadDirBatchResponse` {update:?}!" - ); - - drop(proxy); - let results = tasks.results().await; - for (_, result) in results { - assert!(result.is_ok(), "{result:?}"); - } + Ok(()) } } diff --git a/mirrord/intproxy/src/remote_resources.rs b/mirrord/intproxy/src/remote_resources.rs index a195a75c1b8..055455b7818 100644 --- a/mirrord/intproxy/src/remote_resources.rs +++ b/mirrord/intproxy/src/remote_resources.rs @@ -1,21 +1,19 @@ use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, HashSet}, hash::Hash, }; use mirrord_intproxy_protocol::LayerId; use tracing::Level; -use crate::proxies::simple::FileResource; - /// For tracking remote resources allocated in the agent: open files and directories, port /// subscriptions. Remote resources can be shared by multiple layer instances because of forks. -pub struct RemoteResources { - by_layer: HashMap>, +pub struct RemoteResources { + by_layer: HashMap>, counts: HashMap, } -impl Default for RemoteResources { +impl Default for RemoteResources { fn default() -> Self { Self { by_layer: Default::default(), @@ -24,10 +22,9 @@ impl Default for RemoteResources { } } -impl RemoteResources +impl RemoteResources where T: Clone + PartialEq + Eq + Hash + core::fmt::Debug, - Resource: Clone, { /// Removes the given resource from the layer instance with the given [`LayerId`]. /// Returns whether the resource should be closed on the agent side. @@ -47,7 +44,7 @@ where Entry::Vacant(..) => return false, }; - if removed.is_none() { + if !removed { return false; } @@ -78,8 +75,8 @@ where return; }; - for resource in resources.keys().cloned() { - *self.counts.entry(resource).or_default() += 1; + for resource in &resources { + *self.counts.entry(resource.clone()).or_default() += 1; } self.by_layer.insert(dst, resources); @@ -94,7 +91,7 @@ where let resources = self.by_layer.remove(&layer_id).unwrap_or_default(); resources - .into_keys() + .into_iter() .filter(|resource| match self.counts.entry(resource.clone()) { Entry::Occupied(e) if *e.get() == 1 => { e.remove(); @@ -109,12 +106,7 @@ where } }) } -} -impl RemoteResources -where - T: Clone + PartialEq + Eq + Hash, -{ /// Adds the given resource to the layer instance with the given [`LayerId`]. /// /// Used when the layer opens a resource, e.g. with @@ -125,45 +117,10 @@ where .by_layer .entry(layer_id) .or_default() - .try_insert(resource.clone(), ()) - .is_ok(); - - if added { - *self.counts.entry(resource).or_default() += 1; - } - } -} - -impl RemoteResources -where - T: Clone + PartialEq + Eq + Hash, -{ - /// Adds the given resource to the layer instance with the given [`LayerId`]. - /// - /// Used when the layer opens a resource, e.g. with - /// [`OpenFileRequest`](mirrord_protocol::file::OpenFileRequest). - #[tracing::instrument(level = Level::TRACE, skip(self, resource, file))] - pub(crate) fn add(&mut self, layer_id: LayerId, resource: T, file: FileResource) { - let added = self - .by_layer - .entry(layer_id) - .or_default() - .try_insert(resource.clone(), file) - .is_ok(); + .insert(resource.clone()); if added { *self.counts.entry(resource).or_default() += 1; } } - - #[tracing::instrument(level = Level::TRACE, skip(self, resource_key))] - pub(crate) fn get_mut( - &mut self, - layer_id: &LayerId, - resource_key: &T, - ) -> Option<&mut FileResource> { - self.by_layer - .get_mut(layer_id) - .and_then(|files| files.get_mut(resource_key)) - } } diff --git a/mirrord/intproxy/src/request_queue.rs b/mirrord/intproxy/src/request_queue.rs index 18a99525e61..e9651884a15 100644 --- a/mirrord/intproxy/src/request_queue.rs +++ b/mirrord/intproxy/src/request_queue.rs @@ -14,44 +14,59 @@ use std::{collections::VecDeque, fmt}; use mirrord_intproxy_protocol::{LayerId, MessageId}; -use thiserror::Error; use tracing::Level; -/// Erorr returned when the proxy attempts to retrieve [`MessageId`] and [`LayerId`] of a request -/// corresponding to a response received from the agent, but the [`RequestQueue`] is empty. This -/// error should never happen. -#[derive(Error, Debug)] -#[error("request queue is empty")] -pub struct RequestQueueEmpty; - /// A queue used to match agent responses with layer requests. /// A single queue can be used for multiple types of requests only if the agent preserves order /// between them. -#[derive(Default)] -pub struct RequestQueue { - inner: VecDeque<(MessageId, LayerId)>, +pub struct RequestQueue { + inner: VecDeque<(MessageId, LayerId, T)>, +} + +impl Default for RequestQueue { + fn default() -> Self { + Self { + inner: Default::default(), + } + } } -impl fmt::Debug for RequestQueue { +impl fmt::Debug for RequestQueue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RequestQueue") .field("queue_len", &self.inner.len()) - .field("front", &self.inner.front().copied()) - .field("back", &self.inner.back().copied()) + .field("front", &self.inner.front()) + .field("back", &self.inner.back()) .finish() } } -impl RequestQueue { +impl RequestQueue { + /// Save the request at the end of this queue. + #[tracing::instrument(level = Level::TRACE)] + pub fn push_back(&mut self, message_id: MessageId, layer_id: LayerId) { + self.inner + .push_back((message_id, layer_id, Default::default())); + } +} + +impl RequestQueue { + /// Retrieve and remove a request from the front of this queue. + #[tracing::instrument(level = Level::TRACE)] + pub fn pop_front(&mut self) -> Option<(MessageId, LayerId)> { + let (message_id, layer_id, _) = self.inner.pop_front()?; + Some((message_id, layer_id)) + } + /// Save the request at the end of this queue. #[tracing::instrument(level = Level::TRACE)] - pub fn insert(&mut self, message_id: MessageId, layer_id: LayerId) { - self.inner.push_back((message_id, layer_id)); + pub fn push_back_with_data(&mut self, message_id: MessageId, layer_id: LayerId, data: T) { + self.inner.push_back((message_id, layer_id, data)); } /// Retrieve and remove a request from the front of this queue. #[tracing::instrument(level = Level::TRACE)] - pub fn get(&mut self) -> Result<(MessageId, LayerId), RequestQueueEmpty> { - self.inner.pop_front().ok_or(RequestQueueEmpty) + pub fn pop_front_with_data(&mut self) -> Option<(MessageId, LayerId, T)> { + self.inner.pop_front() } } diff --git a/mirrord/layer/tests/common/mod.rs b/mirrord/layer/tests/common/mod.rs index 0469a6695bb..9454c8b02a3 100644 --- a/mirrord/layer/tests/common/mod.rs +++ b/mirrord/layer/tests/common/mod.rs @@ -109,7 +109,7 @@ impl TestIntProxy { let agent_conn = AgentConnection::new_for_raw_address(fake_agent_address) .await .unwrap(); - let intproxy = IntProxy::new_with_connection(agent_conn, listener); + let intproxy = IntProxy::new_with_connection(agent_conn, listener, 0); intproxy .run(Duration::from_secs(5), Duration::from_secs(5)) .await diff --git a/mirrord/protocol/Cargo.toml b/mirrord/protocol/Cargo.toml index 1a36c251030..fc8ff9aad0a 100644 --- a/mirrord/protocol/Cargo.toml +++ b/mirrord/protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mirrord-protocol" -version = "1.12.0" +version = "1.12.1" authors.workspace = true description.workspace = true documentation.workspace = true diff --git a/mirrord/protocol/src/file.rs b/mirrord/protocol/src/file.rs index e8ff8db8cd3..7a655a81c49 100644 --- a/mirrord/protocol/src/file.rs +++ b/mirrord/protocol/src/file.rs @@ -14,6 +14,10 @@ use bincode::{Decode, Encode}; use nix::sys::statfs::Statfs; use semver::VersionReq; +/// Minimal mirrord-protocol version that allows [`ReadLinkFileRequest`]. +pub static READLINK_VERSION: LazyLock = + LazyLock::new(|| ">=1.6.0".parse().expect("Bad Identifier")); + /// Minimal mirrord-protocol version that allows [`ReadDirBatchRequest`]. pub static READDIR_BATCH_VERSION: LazyLock = LazyLock::new(|| ">=1.9.0".parse().expect("Bad Identifier"));