Skip to content

fix(client): race in connection errors propagation #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ http-body-util = "0.1.0"
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
tokio-test = "0.4"
pretty_env_logger = "0.5"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
ctor = "0.2"

[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
pnet_datalink = "0.35.0"
Expand Down
77 changes: 72 additions & 5 deletions src/client/legacy/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,22 +576,89 @@ where
panic!("http2 feature is not enabled");
} else {
#[cfg(feature = "http1")] {
// Perform the HTTP/1.1 handshake on the provided I/O stream.
// Uses the h1_builder to establish a connection, returning a sender (tx) for requests
// and a connection task (conn) that manages the connection lifecycle.
let (mut tx, conn) =
h1_builder.handshake(io).await.map_err(Error::tx)?;
h1_builder.handshake(io).await.map_err(crate::client::legacy::client::Error::tx)?;

// Log that the HTTP/1.1 handshake has completed successfully.
// This indicates the connection is established and ready for request processing.
trace!(
"http1 handshake complete, spawning background dispatcher task"
);
// Create a oneshot channel to communicate errors from the connection task.
// err_tx sends errors from the connection task, and err_rx receives them
// to correlate connection failures with request readiness errors.
let (err_tx, err_rx) = tokio::sync::oneshot::channel();
// Spawn the connection task in the background using the executor.
// The task manages the HTTP/1.1 connection, including upgrades (e.g., WebSocket).
// Errors are sent via err_tx to ensure they can be checked if the sender (tx) fails.
executor.execute(
conn.with_upgrades()
.map_err(|e| debug!("client connection error: {}", e))
.map_err(|e| {
// Log the connection error at debug level for diagnostic purposes.
debug!("client connection error: {:?}", e);
// Log that the error is being sent to the error channel.
trace!("sending connection error to error channel");
// Send the error via the oneshot channel, ignoring send failures
// (e.g., if the receiver is dropped, which is handled later).
let _ =err_tx.send(e);
})
.map(|_| ()),
);

// Log that the client is waiting for the connection to be ready.
// Readiness indicates the sender (tx) can accept a request without blocking.
trace!("waiting for connection to be ready");
// Check if the sender is ready to accept a request.
// This ensures the connection is fully established before proceeding.
// aka:
// Wait for 'conn' to ready up before we
// declare this tx as usable
tx.ready().await.map_err(Error::tx)?;
PoolTx::Http1(tx)
match tx.ready().await {
// If ready, the connection is usable for sending requests.
Ok(_) => {
// Log that the connection is ready for use.
trace!("connection is ready");
// Drop the error receiver, as it’s no longer needed since the sender is ready.
// This prevents waiting for errors that won’t occur in a successful case.
drop(err_rx);
// Wrap the sender in PoolTx::Http1 for use in the connection pool.
PoolTx::Http1(tx)
}
// If the sender fails with a closed channel error, check for a specific connection error.
// This distinguishes between a vague ChannelClosed error and an actual connection failure.
Err(e) if e.is_closed() => {
// Log that the channel is closed, indicating a potential connection issue.
trace!("connection channel closed, checking for connection error");
// Check the oneshot channel for a specific error from the connection task.
match err_rx.await {
// If an error was received, it’s a specific connection failure.
Ok(err) => {
// Log the specific connection error for diagnostics.
trace!("received connection error: {:?}", err);
// Return the error wrapped in Error::tx to propagate it.
return Err(crate::client::legacy::client::Error::tx(err));
}
// If the error channel is closed, no specific error was sent.
// Fall back to the vague ChannelClosed error.
Err(_) => {
// Log that the error channel is closed, indicating no specific error.
trace!("error channel closed, returning the vague ChannelClosed error");
// Return the original error wrapped in Error::tx.
return Err(crate::client::legacy::client::Error::tx(e));
}
}
}
// For other errors (e.g., timeout, I/O issues), propagate them directly.
// These are not ChannelClosed errors and don’t require error channel checks.
Err(e) => {
// Log the specific readiness failure for diagnostics.
trace!("connection readiness failed: {:?}", e);
// Return the error wrapped in Error::tx to propagate it.
return Err(crate::client::legacy::client::Error::tx(e));
}
}
}
#[cfg(not(feature = "http1"))] {
panic!("http1 feature is not enabled");
Expand Down
Loading
Loading