Skip to content

Commit

Permalink
Send readdir as a batched request, where the response comes with a li…
Browse files Browse the repository at this point in the history
…st of dirs, instead of just one. (#2637)

* Send readdir as a batched request, where the response comes with a list of dirs, instead of just one.

* stop being dumb

* working dir read

* protocol version check to decide which request to use

* use is_none to check for new file added

* use correct protocol version

* clippy

* try_insert is like hashset insert

* try_insert is like hashset insert 2

* Send error back to layer when we hit in cached.

* dont break protocol

* dont break protocol (request)

* proper protocol version check

* no option + some comments

* protocol version

* use right protocol version for req

* protocol dance

* protocol dance (tango)

* update test for readdirbatch

* fix client message in test

* more test fixes

* macos test fix

* move handle_readdir to simpleproxy

* remove todo

* docs

* bump protocol + macos fix

* macos is this what you want

* increase test timeout

* testing intproxy readdir batch

* comment out ut

* testing the messages

* close file request macos

* close and dont expect

* add expect close

* close file then close dir

* bump protocol

* wrong fd value

* no close

* fixed test I think

* close

* close close

* remove extra

* change order // add env var to intproxy logs

* put in a folder

* from_str

* is this how you add intproxy logs artifact

* fix ci

* put it in tmp

* continue on error

* only upload tmp

* the whole dir

* setup tracing

* only mirrord trace

* set log only once

* only macos tracing

* try_init

* only relevant test, fix order

* only godir

* only go_dir

* is this the right way

* only go 23

* layer doesnt ask for close

* close file then close dir

* manually close file

* fix go

* increase sleep

* move to separate function go

* logs

* change protocol version

* ignore after close messages

* send empty vec

* fixed tests

* change order

* I had played myself, but now it works.

* changelog

* clippy

* schema

* space

* is gone
  • Loading branch information
meowjesty authored Aug 20, 2024
1 parent f980a59 commit fee777a
Show file tree
Hide file tree
Showing 19 changed files with 710 additions and 218 deletions.
190 changes: 110 additions & 80 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions changelog.d/2611.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Adds a batched readdir, which should hopefully improve the performance when reading many dirs. Introduces a new ReadDirBatched message to the protocol.
2 changes: 1 addition & 1 deletion mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@
},
"log_level": {
"title": "internal_proxy.log_level {#internal_proxy-log_level}",
"description": "Set the log level for the internal proxy. RUST_LOG convention (i.e `mirrord=trace`) will only be used if log_destination is set",
"description": "Set the log level for the internal proxy. RUST_LOG convention (i.e `mirrord=trace`) will only be used if `log_destination` is set.",
"type": [
"string",
"null"
Expand Down
43 changes: 28 additions & 15 deletions mirrord/agent/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,8 @@ use std::{

use faccess::{AccessMode, PathExt};
use libc::DT_DIR;
use mirrord_protocol::{
file::{
AccessFileRequest, AccessFileResponse, CloseDirRequest, CloseFileRequest, DirEntryInternal,
FdOpenDirRequest, GetDEnts64Request, GetDEnts64Response, OpenDirResponse, OpenFileRequest,
OpenFileResponse, OpenOptionsInternal, OpenRelativeFileRequest, ReadDirRequest,
ReadDirResponse, ReadFileRequest, ReadFileResponse, ReadLimitedFileRequest,
ReadLinkFileRequest, ReadLinkFileResponse, SeekFileRequest, SeekFileResponse,
WriteFileRequest, WriteFileResponse, WriteLimitedFileRequest, XstatFsRequest,
XstatFsResponse, XstatRequest, XstatResponse,
},
FileRequest, FileResponse, RemoteResult, ResponseError,
};
use tracing::{error, trace};
use mirrord_protocol::{file::*, FileRequest, FileResponse, RemoteResult, ResponseError};
use tracing::{error, trace, Level};

use crate::error::Result;

Expand Down Expand Up @@ -216,15 +205,20 @@ impl FileManager {
let xstat_result = self.xstatfs(fd);
Some(FileResponse::XstatFs(xstat_result))
}

// dir operations
FileRequest::FdOpenDir(FdOpenDirRequest { remote_fd }) => {
let open_dir_result = self.fdopen_dir(remote_fd);
Some(FileResponse::OpenDir(open_dir_result))
}

FileRequest::ReadDir(ReadDirRequest { remote_fd }) => {
let read_dir_result = self.read_dir(remote_fd);
Some(FileResponse::ReadDir(read_dir_result))
}
FileRequest::ReadDirBatch(ReadDirBatchRequest { remote_fd, amount }) => {
let read_dir_result = self.read_dir_batch(remote_fd, amount);
Some(FileResponse::ReadDirBatch(read_dir_result))
}
FileRequest::CloseDir(CloseDirRequest { remote_fd }) => {
self.close_dir(remote_fd);
None
Expand Down Expand Up @@ -723,7 +717,7 @@ impl FileManager {
}
}

#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = Level::TRACE, skip(self), ret)]
pub(crate) fn read_dir(&mut self, fd: u64) -> RemoteResult<ReadDirResponse> {
let dir_stream = self.get_dir_stream(fd)?;
let result = if let Some(offset_entry_pair) = dir_stream.next() {
Expand All @@ -737,6 +731,25 @@ impl FileManager {
Ok(result)
}

/// Instead of returning just 1 [`DirEntryInternal`] from a `readdir` call (which in
/// Rust means advancing the [`read_dir`](std::fs::read_dir) iterator), we return
/// an iterator with (at most) `amount` items.
#[tracing::instrument(level = Level::TRACE, skip(self), ret)]
pub(crate) fn read_dir_batch(
&mut self,
fd: u64,
amount: usize,
) -> RemoteResult<ReadDirBatchResponse> {
let result = self
.get_dir_stream(fd)?
.take(amount)
.map(DirEntryInternal::try_from)
.try_collect::<Vec<_>>()
.map(|dir_entries| ReadDirBatchResponse { fd, dir_entries })?;

Ok(result)
}

/// The getdents64 syscall writes dir entries to a buffer, as long as they fit.
/// If a call did not process all the entries in a dir, the result of the next call continues
/// where the last one stopped.
Expand Down
1 change: 1 addition & 0 deletions mirrord/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![feature(let_chains)]
#![feature(type_alias_impl_trait)]
#![feature(entry_insert)]
#![feature(iterator_try_collect)]
#![cfg_attr(target_os = "linux", feature(tcp_quickack))]
#![feature(lazy_cell)]
#![warn(clippy::indexing_slicing)]
Expand Down
6 changes: 4 additions & 2 deletions mirrord/config/src/internal_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ pub struct InternalProxyConfig {
pub idle_timeout: u64,

/// ### internal_proxy.log_level {#internal_proxy-log_level}
///
/// Set the log level for the internal proxy.
/// RUST_LOG convention (i.e `mirrord=trace`)
/// will only be used if log_destination is set
/// RUST_LOG convention (i.e `mirrord=trace`) will only be used if `log_destination`
/// is set.
pub log_level: Option<String>,

/// ### internal_proxy.log_destination {#internal_proxy-log_destination}
///
/// Set the log file destination for the internal proxy.
pub log_destination: Option<String>,

Expand Down
18 changes: 10 additions & 8 deletions mirrord/intproxy/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,7 @@ use std::{
use bincode::{Decode, Encode};
use mirrord_protocol::{
dns::{GetAddrInfoRequest, GetAddrInfoResponse},
file::{
AccessFileRequest, AccessFileResponse, CloseDirRequest, CloseFileRequest, FdOpenDirRequest,
GetDEnts64Request, GetDEnts64Response, OpenDirResponse, OpenFileRequest, OpenFileResponse,
OpenRelativeFileRequest, ReadDirRequest, ReadDirResponse, ReadFileRequest,
ReadFileResponse, ReadLimitedFileRequest, ReadLinkFileRequest, ReadLinkFileResponse,
SeekFileRequest, SeekFileResponse, WriteFileRequest, WriteFileResponse,
WriteLimitedFileRequest, XstatFsRequest, XstatFsResponse, XstatRequest, XstatResponse,
},
file::*,
outgoing::SocketAddress,
tcp::StealType,
FileRequest, FileResponse, GetEnvVarsRequest, Port, RemoteResult,
Expand Down Expand Up @@ -372,6 +365,15 @@ impl_request!(
res_path = ProxyToLayerMessage::File => FileResponse::ReadDir,
);

// The `req = ReadDirBatchRequest` is a lie, the layer doesn't use this message, it'll use
// the regular `ReadDirRequest`, but we're doing it like this here to keep things simple.
impl_request!(
req = ReadDirBatchRequest,
res = RemoteResult<ReadDirBatchResponse>,
req_path = LayerToProxyMessage::File => FileRequest::ReadDirBatch,
res_path = ProxyToLayerMessage::File => FileResponse::ReadDirBatch,
);

impl_request!(
req = GetDEnts64Request,
res = RemoteResult<GetDEnts64Response>,
Expand Down
8 changes: 8 additions & 0 deletions mirrord/intproxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(map_try_insert)]
#![warn(clippy::indexing_slicing)]

use std::{collections::HashMap, time::Duration};
Expand Down Expand Up @@ -300,6 +301,13 @@ impl IntProxy {
self.task_txs.agent.send(ClientMessage::ReadyForLogs).await;
}

self.task_txs
.simple
.send(SimpleProxyMessage::ProtocolVersion(
protocol_version.clone(),
))
.await;

self.task_txs
.incoming
.send(IncomingProxyMessage::AgentProtocolVersion(protocol_version))
Expand Down
Loading

0 comments on commit fee777a

Please sign in to comment.