Skip to content

Commit

Permalink
review: use sync::oneshot for error channel
Browse files Browse the repository at this point in the history
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399781061

this commit refactors the channel-backed body in hyperium#100, changing the
`mpsc::Receiver<E>` used to transmit an error into a
`oneshot::Receiver<E>`.

this should improve memory usage, and make the channel a smaller
structure.

in order to achieve this, some minor adjustments are made:

* use pin projection, projecting pinnedness to the oneshot receiver,
  polling it via `core::future::Future::poll(..)` to yield a body frame.

* add `Debug` bounds were needed.

as an alternative, see tokio-rs/tokio#7059, which proposed a
`poll_recv(..)` inherent method for a oneshot channel receiver.
  • Loading branch information
cratelyn committed Jan 6, 2025
1 parent 78f8678 commit 08aba95
Showing 1 changed file with 22 additions and 28 deletions.
50 changes: 22 additions & 28 deletions http-body-util/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ use std::{
use bytes::Buf;
use http::HeaderMap;
use http_body::{Body, Frame};
use tokio::sync::mpsc;

/// A body backed by a channel.
pub struct Channel<D, E = std::convert::Infallible> {
rx_frame: mpsc::Receiver<Frame<D>>,
rx_error: mpsc::Receiver<E>,
use pin_project_lite::pin_project;
use tokio::sync::{mpsc, oneshot};

pin_project! {
/// A body backed by a channel.
pub struct Channel<D, E = std::convert::Infallible> {
rx_frame: mpsc::Receiver<Frame<D>>,
#[pin]
rx_error: oneshot::Receiver<E>,
}
}

impl<D, E> Channel<D, E> {
Expand All @@ -25,7 +29,7 @@ impl<D, E> Channel<D, E> {
/// provided buffer capacity must be at least 1.
pub fn new(buffer: usize) -> (Sender<D, E>, Self) {
let (tx_frame, rx_frame) = mpsc::channel(buffer);
let (tx_error, rx_error) = mpsc::channel(1);
let (tx_error, rx_error) = oneshot::channel();
(Sender { tx_frame, tx_error }, Self { rx_frame, rx_error })
}
}
Expand All @@ -38,24 +42,27 @@ where
type Error = E;

fn poll_frame(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.rx_frame.poll_recv(cx) {
let this = self.project();

match this.rx_frame.poll_recv(cx) {
Poll::Ready(frame) => return Poll::Ready(frame.map(Ok)),
Poll::Pending => {}
}

match self.rx_error.poll_recv(cx) {
Poll::Ready(err) => return Poll::Ready(err.map(Err)),
use core::future::Future;
match this.rx_error.poll(cx) {
Poll::Ready(err) => return Poll::Ready(err.ok().map(Err)),
Poll::Pending => {}
}

Poll::Pending
}
}

impl<D, E> std::fmt::Debug for Channel<D, E> {
impl<D, E: std::fmt::Debug> std::fmt::Debug for Channel<D, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Channel")
.field("rx_frame", &self.rx_frame)
Expand All @@ -67,7 +74,7 @@ impl<D, E> std::fmt::Debug for Channel<D, E> {
/// A sender half created through [`Channel::new`].
pub struct Sender<D, E = std::convert::Infallible> {
tx_frame: mpsc::Sender<Frame<D>>,
tx_error: mpsc::Sender<E>,
tx_error: oneshot::Sender<E>,
}

impl<D, E> Sender<D, E> {
Expand All @@ -88,24 +95,11 @@ impl<D, E> Sender<D, E> {

/// Aborts the body in an abnormal fashion.
pub fn abort(self, error: E) {
match self.tx_error.try_send(error) {
Ok(_) => {}
Err(err) => {
match err {
mpsc::error::TrySendError::Full(_) => {
// Channel::new creates the error channel with space for 1 message and we
// only send once because this method consumes `self`. So the receiver
// can't be full.
unreachable!("error receiver should never be full")
}
mpsc::error::TrySendError::Closed(_) => {}
}
}
}
self.tx_error.send(error).ok();
}
}

impl<D, E> std::fmt::Debug for Sender<D, E> {
impl<D, E: std::fmt::Debug> std::fmt::Debug for Sender<D, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender")
.field("tx_frame", &self.tx_frame)
Expand Down

0 comments on commit 08aba95

Please sign in to comment.