Skip to content

Commit 03ad748

Browse files
authored
fix: Fuse oneshot channel (#147)
2 parents c47fef2 + 17e56a5 commit 03ad748

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

src/server.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ where
329329
{
330330
pub(crate) fn new(recv: C::RecvStream) -> (Self, UnwrapToPending<RpcServerError<C>>) {
331331
let (error_send, error_recv) = oneshot::channel();
332-
let error_recv = UnwrapToPending(error_recv);
332+
let error_recv = UnwrapToPending(futures_lite::future::fuse(error_recv));
333333
(Self(recv, Some(error_send), PhantomData), error_recv)
334334
}
335335
}
@@ -449,12 +449,13 @@ impl<C: ConnectionErrors> fmt::Display for RpcServerError<C> {
449449
impl<C: ConnectionErrors> error::Error for RpcServerError<C> {}
450450

451451
/// Take an oneshot receiver and just return Pending the underlying future returns `Err(oneshot::Canceled)`
452-
pub(crate) struct UnwrapToPending<T>(oneshot::Receiver<T>);
452+
pub(crate) struct UnwrapToPending<T>(futures_lite::future::Fuse<oneshot::Receiver<T>>);
453453

454454
impl<T> Future for UnwrapToPending<T> {
455455
type Output = T;
456456

457457
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
458+
// todo: use is_terminated from tokio 1.44 here to avoid the fused wrapper
458459
match Pin::new(&mut self.0).poll(cx) {
459460
Poll::Ready(Ok(x)) => Poll::Ready(x),
460461
Poll::Ready(Err(_)) => Poll::Pending,

0 commit comments

Comments
 (0)