Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add a channel body #100

Closed
wants to merge 1 commit into from
Closed

util: add a channel body #100

wants to merge 1 commit into from

Conversation

davidpdrsn
Copy link
Member

Adds a new body type that's backed by a tokio::sync::mpsc channel. Inspired by hyper's old Body::channel.

@davidpdrsn davidpdrsn changed the title Add channel body util: add a channel body Nov 19, 2023
seanmonstar
seanmonstar previously approved these changes Nov 20, 2023

impl<D, E> Sender<D, E> {
/// Send a frame on the channel.
pub async fn send(&self, frame: Frame<D>) -> Result<(), SendError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it's not required internally, should we make these all &mut self?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is addressed in 4fed62f

/// A body backed by a channel.
pub struct Channel<D, E = std::convert::Infallible> {
rx_frame: mpsc::Receiver<Frame<D>>,
rx_error: mpsc::Receiver<E>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a oneshot channel work? It should be a smaller struct, and use less memory wen sending the error (the mpsc channel reserves blocks).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I thought it wouldn't be possible because one shot::Receiver doesn't have a poll_recv method but I suppose I can call Future::poll instead 🤔 I'll try that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i ran into this issue myself, and noticed this thread here. i've opened tokio-rs/tokio#7059 to introduce a poll_recv(..) method for oneshot::Receiver<T>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio-rs/tokio#7059 is now closed. to poll the receiver, it should be pinned and polled via Future::poll, rather than adding a poll_recv method matching mpsc::Receiver<T>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is addressed in 08aba95

@seanmonstar seanmonstar dismissed their stale review November 20, 2023 21:44

Didn't mean to approve just yet

cratelyn added a commit to cratelyn/tokio that referenced this pull request Dec 31, 2024
this commit adds a `rx.poll_recv(&mut cx)` to the public interface
of `tokio::sync::oneshot::Receiver<T>`.

this method has the following signature:

```rust
// tokio/src/sync/oneshot.rs
impl<T> Receiver<T> {
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
        // ...
    }
}
```

this is similar to the `tokio::sync::mpsc::Receiver::poll_recv` and
`tokio::sync::mpsc::UnboundedReceiver::poll_recv` methods, which have the
following signature:

```rust
// tokio/src/sync/mpsc/bounded.rs
impl<T> Receiver<T> {
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        // ...
    }
}
```

see: https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv

in particular, note the `&mut self` receiver of these methods, as
opposed to the `Pin<&mut Self>` receiver in `Future::poll(..)`. today, a
oneshot receiver must be pinned in order to be polled via
`Future::poll(..)`.

`tokio::sync::oneshot::Receiver::try_recv(..)` has an important but
subtle difference from `poll_recv(..)`, alluded to in its documentation:

> If a pending value exists in the channel, it is returned. If no value
> has been sent, the current task will not be registered for future
> notification.
>
> This function is useful to call from outside the context of an
> asynchronous task.

see hyperium/http-body#100 for an example use-case for this.

if we *are* in the context of an asynchronous task, we may wish to poll
on the receiver-end of the channel and register for future notification,
indicating that we should be awoken later when a value is ready or when
conditions yielding a spurious failure have passed.

providing a means to poll a `&mut Receiver<T>` avoids the performance
impact of boxing the receiver as an erased `dyn Future` trait object, or
of using an `tokio::sync::mpsc::Receiver<T>`, or the ergonomic wrinkles
of needing to rely on pin projection in asynchronous types that compose
on top of oneshot channels.

---

* https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv
* https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#method.poll_recv
* https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll
* https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html#method.try_recv
* https://github.com/hyperium/http-body/pull/100/files#r1399818104
* hyperium/http-body#100
cratelyn added a commit to cratelyn/tokio that referenced this pull request Dec 31, 2024
this commit adds a `rx.poll_recv(&mut cx)` to the public interface
of `tokio::sync::oneshot::Receiver<T>`.

this method has the following signature:

```rust
// tokio/src/sync/oneshot.rs
impl<T> Receiver<T> {
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
        // ...
    }
}
```

this is similar to the `tokio::sync::mpsc::Receiver::poll_recv` and
`tokio::sync::mpsc::UnboundedReceiver::poll_recv` methods, which have the
following signature:

```rust
// tokio/src/sync/mpsc/bounded.rs
impl<T> Receiver<T> {
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        // ...
    }
}
```

see: https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv

in particular, note the `&mut self` receiver of these methods, as
opposed to the `Pin<&mut Self>` receiver in `Future::poll(..)`. today, a
oneshot receiver must be pinned in order to be polled via
`Future::poll(..)`.

`tokio::sync::oneshot::Receiver::try_recv(..)` has an important but
subtle difference from `poll_recv(..)`, alluded to in its documentation:

> If a pending value exists in the channel, it is returned. If no value
> has been sent, the current task will not be registered for future
> notification.
>
> This function is useful to call from outside the context of an
> asynchronous task.

see hyperium/http-body#100 for an example use-case for this.

if we *are* in the context of an asynchronous task, we may wish to poll
on the receiver-end of the channel and register for future notification,
indicating that we should be awoken later when a value is ready or when
conditions yielding a spurious failure have passed.

providing a means to poll a `&mut Receiver<T>` avoids the performance
impact of boxing the receiver as an erased `dyn Future` trait object, or
of using an `tokio::sync::mpsc::Receiver<T>`, or the ergonomic wrinkles
of needing to rely on pin projection in asynchronous types that compose
on top of oneshot channels.

---

* https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv
* https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#method.poll_recv
* https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll
* https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html#method.try_recv
* https://github.com/hyperium/http-body/pull/100/files#r1399818104
* hyperium/http-body#100
cratelyn added a commit to cratelyn/http-body that referenced this pull request Jan 6, 2025
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399781061

this commit refactors the channel-backed body in hyperium#100, changing the
`mpsc::Receiver<E>` used to transmit an error into a
`oneshot::Receiver<E>`.

this should improve memory usage, and make the channel a smaller
structure.

in order to achieve this, some minor adjustments are made:

* use pin projection, projecting pinnedness to the oneshot receiver,
  polling it via `core::future::Future::poll(..)` to yield a body frame.

* add `Debug` bounds were needed.

as an alternative, see tokio-rs/tokio#7059, which proposed a
`poll_recv(..)` inherent method for a oneshot channel receiver.
cratelyn added a commit to cratelyn/http-body that referenced this pull request Jan 6, 2025
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399780355

this commit refactors the channel-backed body in hyperium#100, changing the
signature of `send_*` methods on the sender to require a mutable
reference.
@cratelyn cratelyn mentioned this pull request Jan 6, 2025
@cratelyn
Copy link
Contributor

cratelyn commented Jan 6, 2025

👋 hiya! i wanted to note that i've opened #140, which rebases this commit atop the latest changes in master, and addresses @seanmonstar's feedback here and here.

cratelyn added a commit to cratelyn/http-body that referenced this pull request Jan 6, 2025
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399780355

this commit refactors the channel-backed body in hyperium#100, changing the
signature of `send_*` methods on the sender to require a mutable
reference.
cratelyn added a commit to cratelyn/http-body that referenced this pull request Jan 8, 2025
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399781061

this commit refactors the channel-backed body in hyperium#100, changing the
`mpsc::Receiver<E>` used to transmit an error into a
`oneshot::Receiver<E>`.

this should improve memory usage, and make the channel a smaller
structure.

in order to achieve this, some minor adjustments are made:

* use pin projection, projecting pinnedness to the oneshot receiver,
  polling it via `core::future::Future::poll(..)` to yield a body frame.

* add `Debug` bounds were needed.

as an alternative, see tokio-rs/tokio#7059, which proposed a
`poll_recv(..)` inherent method for a oneshot channel receiver.
cratelyn added a commit to cratelyn/http-body that referenced this pull request Jan 8, 2025
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399780355

this commit refactors the channel-backed body in hyperium#100, changing the
signature of `send_*` methods on the sender to require a mutable
reference.
cratelyn added a commit to cratelyn/http-body that referenced this pull request Jan 15, 2025
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399781061

this commit refactors the channel-backed body in hyperium#100, changing the
`mpsc::Receiver<E>` used to transmit an error into a
`oneshot::Receiver<E>`.

this should improve memory usage, and make the channel a smaller
structure.

in order to achieve this, some minor adjustments are made:

* use pin projection, projecting pinnedness to the oneshot receiver,
  polling it via `core::future::Future::poll(..)` to yield a body frame.

* add `Debug` bounds were needed.

as an alternative, see tokio-rs/tokio#7059, which proposed a
`poll_recv(..)` inherent method for a oneshot channel receiver.
cratelyn added a commit to cratelyn/http-body that referenced this pull request Jan 15, 2025
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399780355

this commit refactors the channel-backed body in hyperium#100, changing the
signature of `send_*` methods on the sender to require a mutable
reference.
cratelyn added a commit to cratelyn/http-body that referenced this pull request Jan 15, 2025
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399781061

this commit refactors the channel-backed body in hyperium#100, changing the
`mpsc::Receiver<E>` used to transmit an error into a
`oneshot::Receiver<E>`.

this should improve memory usage, and make the channel a smaller
structure.

in order to achieve this, some minor adjustments are made:

* use pin projection, projecting pinnedness to the oneshot receiver,
  polling it via `core::future::Future::poll(..)` to yield a body frame.

* add `Debug` bounds were needed.

as an alternative, see tokio-rs/tokio#7059, which proposed a
`poll_recv(..)` inherent method for a oneshot channel receiver.
cratelyn added a commit to cratelyn/http-body that referenced this pull request Jan 15, 2025
this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399780355

this commit refactors the channel-backed body in hyperium#100, changing the
signature of `send_*` methods on the sender to require a mutable
reference.
seanmonstar pushed a commit that referenced this pull request Jan 17, 2025
* Add channel body

* review: use `sync::oneshot` for error channel

this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399781061

this commit refactors the channel-backed body in #100, changing the
`mpsc::Receiver<E>` used to transmit an error into a
`oneshot::Receiver<E>`.

this should improve memory usage, and make the channel a smaller
structure.

in order to achieve this, some minor adjustments are made:

* use pin projection, projecting pinnedness to the oneshot receiver,
  polling it via `core::future::Future::poll(..)` to yield a body frame.

* add `Debug` bounds were needed.

as an alternative, see tokio-rs/tokio#7059, which proposed a
`poll_recv(..)` inherent method for a oneshot channel receiver.

* review: use `&mut self` method receivers

this applies a review suggestion here:
https://github.com/hyperium/http-body/pull/100/files#r1399780355

this commit refactors the channel-backed body in #100, changing the
signature of `send_*` methods on the sender to require a mutable
reference.

* review: fix `<Channel<D, E> as Body>::poll_frame()`

see: #140 (comment)

this commit adds test coverage exposing the bug, and tightens the
pattern used to match frames yielded by the data channel.

now, when the channel is closed, a `None` will flow onwards and poll the
error channel. `None` will be returned when the error channel is closed,
which also indicates that the associated `Sender` has been dropped.

---------

Co-authored-by: David Pedersen <[email protected]>
@cratelyn
Copy link
Contributor

this can be closed, this work was finished in #140! 🏋️‍♀️

@seanmonstar seanmonstar deleted the channel branch January 17, 2025 16:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants