Skip to content

Commit

Permalink
Add a tokio sleep after MirrordExecution::start so we can await on it…
Browse files Browse the repository at this point in the history
… and drive the runtime, preventing the websocket reset. (#2954)

* Add a tokio sleep after MirrordExecution::start so we can await on it and drive the runtime, preventing the websocket reset.

* Talk about 2 connections being made.

* fix docs

* changelog
  • Loading branch information
meowjesty authored Dec 3, 2024
1 parent 4b0d899 commit 6f0bf20
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/+79-bogus-websocket-reset.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a sleep and await on it after websocket connection to drive IO runtime and prevent websocket closing without handshake.
10 changes: 10 additions & 0 deletions mirrord/cli/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ impl MirrordExecution {
/// 1. Drop this struct when an error occurs,
/// 2. Successfully `exec`,
/// 3. Wait for intproxy exit with [`MirrordExecution::wait`].
///
/// # `async` shenanigans when using the mirrord operator.
///
/// Here we connect a websocket to the operator created agent, and this connection
/// might get reset if we don't drive its IO (call `await` on the runtime after the
/// websocket is up). This is an issue because we start things with `execv`, so we're
/// kinda out of the whole Rust world of nicely dropping things.
///
/// tl;dr: In `exec_process`, you need to call and `await` either
/// [`tokio::time::sleep`] or [`tokio::task::yield_now`] after calling this function.
#[tracing::instrument(level = Level::TRACE, skip_all)]
pub(crate) async fn start<P>(
config: &LayerConfig,
Expand Down
7 changes: 7 additions & 0 deletions mirrord/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ where
#[cfg(not(target_os = "macos"))]
let execution_info = MirrordExecution::start(&config, &mut sub_progress, analytics).await?;

// This is not being yielded, as this is not proper async, something along those lines.
// We need an `await` somewhere in this function to drive our socket IO that happens
// in `MirrordExecution::start`. If we don't have this here, then the websocket
// connection resets, and in the operator you'll get a websocket error.
tokio::time::sleep(Duration::from_micros(1)).await;

#[cfg(target_os = "macos")]
let (_did_sip_patch, binary) = match execution_info.patched_path {
None => (false, args.binary.clone()),
Expand Down Expand Up @@ -139,6 +145,7 @@ where
.into_iter()
.map(CString::new)
.collect::<CliResult<Vec<_>, _>>()?;

// env vars should be formatted as "varname=value" CStrings
let env = env_vars
.into_iter()
Expand Down
6 changes: 6 additions & 0 deletions mirrord/operator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@ impl OperatorApi<PreparedClientCert> {
/// Starts a new operator session and connects to the target.
/// Returned [`OperatorSessionConnection::session`] can be later used to create another
/// connection in the same session with [`OperatorApi::connect_in_existing_session`].
///
/// 2 connections are made to the operator (this means that we hit the target's
/// `connect_resource` twice):
///
/// 1. The 1st one is here;
/// 2. The 2nd is on the `AgentConnection::new`;
#[tracing::instrument(
level = Level::TRACE,
skip(layer_config, progress),
Expand Down

0 comments on commit 6f0bf20

Please sign in to comment.