Skip to content

Commit

Permalink
Buffering readonly files (#2979)
Browse files Browse the repository at this point in the history
* Config entry

* Moved file ops to a separate proxy and implemented buffering

* Docs

* Fixed readlink version req

* Fixed old tests

* Tests for buffered reading

* Fixed seek logic

* Changelog

* Renamed RequestQueue methods

* Reduce log level to trace for some methods

* More docs and renames

* Configurable buffer size

* Updated json schema and configuration.md

* Test code fix
  • Loading branch information
Razz4780 authored Dec 16, 2024
1 parent b736a7f commit 80ec76d
Show file tree
Hide file tree
Showing 22 changed files with 1,610 additions and 614 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions changelog.d/2069.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added to mirrord config a new experimental flag `.experimental.buffer_file_reads`. When this flag is enabled, mirrord will fetch remote readonly files in at least 4kb chunks.
This is to improve performance with applications that make many small reads from remote files.
10 changes: 10 additions & 0 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,16 @@
"null"
]
},
"readonly_file_buffer": {
"title": "_experimental_ readonly_file_buffer {#experimental-readonly_file_buffer}",
"description": "Sets buffer size for readonly remote files (in bytes, for example 4096). If set, such files will be read in chunks and buffered locally. This improves performace when the user application reads data in small portions.\n\nSetting to 0 disables file buffering.\n\n<https://github.com/metalbear-co/mirrord/issues/2069>",
"type": [
"integer",
"null"
],
"format": "uint64",
"minimum": 0.0
},
"tcp_ping4_mock": {
"title": "_experimental_ tcp_ping4_mock {#experimental-tcp_ping4_mock}",
"description": "<https://github.com/metalbear-co/mirrord/issues/2421#issuecomment-2093200904>",
Expand Down
6 changes: 6 additions & 0 deletions mirrord/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ impl From<u32> for AnalyticValue {
}
}

impl From<u64> for AnalyticValue {
fn from(n: u64) -> Self {
AnalyticValue::Number(u32::try_from(n).unwrap_or(u32::MAX))
}
}

impl From<usize> for AnalyticValue {
fn from(n: usize) -> Self {
AnalyticValue::Number(u32::try_from(n).unwrap_or(u32::MAX))
Expand Down
18 changes: 11 additions & 7 deletions mirrord/cli/src/internal_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,17 @@ pub(crate) async fn proxy(
let first_connection_timeout = Duration::from_secs(config.internal_proxy.start_idle_timeout);
let consecutive_connection_timeout = Duration::from_secs(config.internal_proxy.idle_timeout);

IntProxy::new_with_connection(agent_conn, listener)
.run(first_connection_timeout, consecutive_connection_timeout)
.await
.map_err(InternalProxyError::from)
.inspect_err(|error| {
tracing::error!(%error, "Internal proxy encountered an error, exiting");
})
IntProxy::new_with_connection(
agent_conn,
listener,
config.experimental.readonly_file_buffer,
)
.run(first_connection_timeout, consecutive_connection_timeout)
.await
.map_err(InternalProxyError::from)
.inspect_err(|error| {
tracing::error!(%error, "Internal proxy encountered an error, exiting");
})
}

/// Creates a connection with the agent and handles one round of ping pong.
Expand Down
37 changes: 37 additions & 0 deletions mirrord/config/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ configuration file containing all fields.
"override": {
"DATABASE_CONNECTION": "db://localhost:7777/my-db",
"LOCAL_BEAR": "panda"
},
"mapping": {
".+_TIMEOUT": "1000"
}
},
"fs": {
Expand Down Expand Up @@ -460,6 +463,16 @@ Enables `getifaddrs` hook that removes IPv6 interfaces from the list returned by

DEPRECATED, WILL BE REMOVED

### _experimental_ readonly_file_buffer {#experimental-readonly_file_buffer}

Sets buffer size for readonly remote files (in bytes, for example 4096).
If set, such files will be read in chunks and buffered locally.
This improves performace when the user application reads data in small portions.

Setting to 0 disables file buffering.

<https://github.com/metalbear-co/mirrord/issues/2069>

### _experimental_ tcp_ping4_mock {#experimental-tcp_ping4_mock}

<https://github.com/metalbear-co/mirrord/issues/2421#issuecomment-2093200904>
Expand Down Expand Up @@ -649,6 +662,9 @@ See the environment variables [reference](https://mirrord.dev/docs/reference/env
"override": {
"DATABASE_CONNECTION": "db://localhost:7777/my-db",
"LOCAL_BEAR": "panda"
},
"mapping": {
".+_TIMEOUT": "1000"
}
}
}
Expand Down Expand Up @@ -692,6 +708,27 @@ If set, the variables are fetched after the user application is started.
This setting is meant to resolve issues when using mirrord via the IntelliJ plugin on WSL
and the remote environment contains a lot of variables.

### feature.env.mapping {#feature-env-mapping}

Specify map of patterns that if matched will replace the value according to specification.

*Capture groups are allowed.*

Example:
```json
{
".+_TIMEOUT": "10000"
"LOG_.+_VERBOSITY": "debug"
"(\w+)_(\d+)": "magic-value"
}
```

Will do the next replacements for environment variables that match:

`CONNECTION_TIMEOUT: 500` => `CONNECTION_TIMEOUT: 10000`
`LOG_FILE_VERBOSITY: info` => `LOG_FILE_VERBOSITY: debug`
`DATA_1234: common-value` => `DATA_1234: magic-value`

### feature.env.override {#feature-env-override}

Allows setting or overriding environment variables (locally) with a custom value.
Expand Down
13 changes: 13 additions & 0 deletions mirrord/config/src/experimental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ pub struct ExperimentalConfig {
/// Uses /dev/null for creating local fake files (should be better than using /tmp)
#[config(default = true)]
pub use_dev_null: bool,

/// ### _experimental_ readonly_file_buffer {#experimental-readonly_file_buffer}
///
/// Sets buffer size for readonly remote files (in bytes, for example 4096).
/// If set, such files will be read in chunks and buffered locally.
/// This improves performace when the user application reads data in small portions.
///
/// Setting to 0 disables file buffering.
///
/// <https://github.com/metalbear-co/mirrord/issues/2069>
#[config(default = 0)]
pub readonly_file_buffer: u64,
}

impl CollectAnalytics for &ExperimentalConfig {
Expand All @@ -68,5 +80,6 @@ impl CollectAnalytics for &ExperimentalConfig {
analytics.add("enable_exec_hooks_linux", self.enable_exec_hooks_linux);
analytics.add("hide_ipv6_interfaces", self.hide_ipv6_interfaces);
analytics.add("disable_reuseaddr", self.disable_reuseaddr);
analytics.add("readonly_file_buffer", self.readonly_file_buffer);
}
}
1 change: 1 addition & 0 deletions mirrord/intproxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ exponential-backoff = "2"

[dev-dependencies]
reqwest.workspace = true
rstest.workspace = true
24 changes: 12 additions & 12 deletions mirrord/intproxy/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct LocalMessage<T> {
}

/// Messages sent by the layer and handled by the internal proxy.
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub enum LayerToProxyMessage {
/// A request to start new `layer <-> proxy` session.
/// This should be the first message sent by the layer after opening a new connection to the
Expand All @@ -54,7 +54,7 @@ pub enum LayerToProxyMessage {
}

/// Layer process information
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub struct ProcessInfo {
/// Process ID.
pub pid: u32,
Expand Down Expand Up @@ -82,7 +82,7 @@ pub struct LayerId(pub u64);
/// Sharing state between [`exec`](https://man7.org/linux/man-pages/man3/exec.3.html) calls is currently not supported.
/// Therefore, when the layer initializes, it uses [`NewSessionRequest::New`] and does not inherit
/// any state.
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub enum NewSessionRequest {
/// Layer initialized from its constructor, has a fresh state.
New(ProcessInfo),
Expand Down Expand Up @@ -119,7 +119,7 @@ impl fmt::Display for NetProtocol {
}

/// A request to initiate a new outgoing connection.
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub struct OutgoingConnectRequest {
/// The address the user application tries to connect to.
pub remote_address: SocketAddress,
Expand All @@ -128,7 +128,7 @@ pub struct OutgoingConnectRequest {
}

/// Requests related to incoming connections.
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub enum IncomingRequest {
/// A request made by layer when it starts listening for mirrored connections.
PortSubscribe(PortSubscribe),
Expand All @@ -152,7 +152,7 @@ pub struct ConnMetadataRequest {

/// A response to layer's [`ConnMetadataRequest`].
/// Contains metadata useful for hooking `getsockname` and `getpeername`.
#[derive(Encode, Decode, Debug, Clone)]
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)]
pub struct ConnMetadataResponse {
/// Original source of data, provided by the agent. Meant to be exposed to the user instead of
/// the real source, which will always be localhost.
Expand All @@ -176,7 +176,7 @@ pub struct ConnMetadataResponse {
///
/// For each connection incoming to the remote port,
/// the internal proxy will initiate a new connection to the local port specified in `listening_on`.
#[derive(Encode, Decode, Debug, Clone)]
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)]
pub struct PortSubscribe {
/// Local address on which the layer is listening.
pub listening_on: SocketAddr,
Expand All @@ -185,7 +185,7 @@ pub struct PortSubscribe {
}

/// Instructions for the internal proxy and the agent on how to execute port mirroring.
#[derive(Encode, Decode, Debug, Clone)]
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)]
pub enum PortSubscription {
/// Wrapped [`StealType`] specifies how to execute port mirroring.
Steal(StealType),
Expand All @@ -194,7 +194,7 @@ pub enum PortSubscription {
}

/// A request to stop proxying incoming connections.
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub struct PortUnsubscribe {
/// Port on the remote pod that layer mirrored.
pub port: Port,
Expand All @@ -203,7 +203,7 @@ pub struct PortUnsubscribe {
}

/// Messages sent by the internal proxy and handled by the layer.
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub enum ProxyToLayerMessage {
/// A response to [`NewSessionRequest`]. Contains the identifier of the new `layer <-> proxy`
/// session.
Expand All @@ -221,7 +221,7 @@ pub enum ProxyToLayerMessage {
}

/// A response to layer's [`IncomingRequest`].
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub enum IncomingResponse {
/// A response to layer's [`PortSubscribe`].
/// As a temporary workaround to [agent protocol](mirrord_protocol) limitations, the only error
Expand All @@ -234,7 +234,7 @@ pub enum IncomingResponse {
}

/// A response to layer's [`OutgoingConnectRequest`].
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub struct OutgoingConnectResponse {
/// The address the layer should connect to instead of the address requested by the user.
pub layer_address: SocketAddress,
Expand Down
11 changes: 11 additions & 0 deletions mirrord/intproxy/src/background_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ where

/// An error that can occur when executing a [`BackgroundTask`].
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub enum TaskError<Err> {
/// An internal task error.
Error(Err),
Expand All @@ -182,6 +183,16 @@ pub enum TaskUpdate<MOut, Err> {
Finished(Result<(), TaskError<Err>>),
}

#[cfg(test)]
impl<MOut, Err: fmt::Debug> TaskUpdate<MOut, Err> {
pub fn unwrap_message(self) -> MOut {
match self {
Self::Message(mout) => mout,
Self::Finished(res) => panic!("expected a message, got task result: {res:?}"),
}
}
}

/// A struct that can be used to send messages to a [`BackgroundTask`] registered
///
/// A struct that can be used to send messages to a [`BackgroundTask`] registered in the
Expand Down
18 changes: 13 additions & 5 deletions mirrord/intproxy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ use crate::{
agent_conn::{AgentChannelError, AgentConnectionError},
layer_initializer::LayerInitializerError,
ping_pong::PingPongError,
proxies::{incoming::IncomingProxyError, outgoing::OutgoingProxyError},
request_queue::RequestQueueEmpty,
proxies::{
files::FilesProxyError, incoming::IncomingProxyError, outgoing::OutgoingProxyError,
simple::SimpleProxyError,
},
MainTaskId,
};

#[derive(Error, Debug)]
#[error("agent sent an unexpected message: {0:?}")]
pub struct UnexpectedAgentMessage(pub DaemonMessage);

#[derive(Error, Debug)]
pub enum IntProxyError {
#[error("waiting for the first layer connection timed out")]
Expand All @@ -26,8 +32,8 @@ pub enum IntProxyError {
AgentConnection(#[from] AgentConnectionError),
#[error("agent closed connection with error: {0}")]
AgentFailed(String),
#[error("agent sent unexpected message: {0:?}")]
UnexpectedAgentMessage(DaemonMessage),
#[error(transparent)]
UnexpectedAgentMessage(#[from] UnexpectedAgentMessage),

#[error("background task {0} exited unexpectedly")]
TaskExit(MainTaskId),
Expand All @@ -43,11 +49,13 @@ pub enum IntProxyError {
#[error("layer connection failed: {0}")]
LayerConnection(#[from] CodecError),
#[error("simple proxy failed: {0}")]
SimpleProxy(#[from] RequestQueueEmpty),
SimpleProxy(#[from] SimpleProxyError),
#[error("outgoing proxy failed: {0}")]
OutgoingProxy(#[from] OutgoingProxyError),
#[error("incoming proxy failed: {0}")]
IncomingProxy(#[from] IncomingProxyError),
#[error("files proxy failed: {0}")]
FilesProxy(#[from] FilesProxyError),
}

pub type Result<T> = core::result::Result<T, IntProxyError>;
Loading

0 comments on commit 80ec76d

Please sign in to comment.