Skip to content

Commit 206daba

Browse files
committed
enough for today
1 parent 36d0a81 commit 206daba

File tree

5 files changed

+91
-43
lines changed

5 files changed

+91
-43
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ rust-version = "1.76"
1717
serde = { version = "1", default-features = false }
1818
# just for the oneshot and mpsc queues
1919
tokio = { version = "1", features = ["sync"], default-features = false }
20+
# for PollSender (which for some reason is not available in the main tokio api)
21+
tokio-util = { version = "0.7", default-features = false }
2022

2123
# used in the endpoint handler code when using rpc
2224
tracing = { version = "0.1.41", optional = true }

examples/derive.rs

+12-19
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use std::{
55
sync::Arc,
66
};
77

8-
use n0_future::task::AbortOnDropHandle;
8+
use anyhow::bail;
9+
use n0_future::task::{self, AbortOnDropHandle};
910
use quic_rpc::{
1011
channel::{mpsc, oneshot},
1112
rpc::{listen, Handler},
@@ -62,7 +63,7 @@ impl StorageActor {
6263
recv: rx,
6364
state: BTreeMap::new(),
6465
};
65-
tokio::spawn(actor.run());
66+
n0_future::task::spawn(actor.run());
6667
let local = LocalMpscChannel::<StorageMessage, StorageService>::from(tx);
6768
StorageApi {
6869
inner: local.into(),
@@ -116,28 +117,20 @@ impl StorageApi {
116117
match &self.inner {
117118
ServiceSender::Local(local, _) => {
118119
let local = LocalMpscChannel::from(local.clone());
119-
let fun: Handler<StorageProtocol> = Arc::new(move |msg, _, tx| {
120+
let handler: Handler<StorageProtocol> = Arc::new(move |msg, _, tx| {
120121
let local = local.clone();
121-
Box::pin(async move {
122-
match msg {
123-
StorageProtocol::Get(msg) => {
124-
local.send((msg, tx)).await?;
125-
}
126-
StorageProtocol::Set(msg) => {
127-
local.send((msg, tx)).await?;
128-
}
129-
StorageProtocol::List(msg) => {
130-
local.send((msg, tx)).await?;
131-
}
132-
};
133-
Ok(())
122+
Box::pin(match msg {
123+
StorageProtocol::Get(msg) => local.send((msg, tx)),
124+
StorageProtocol::Set(msg) => local.send((msg, tx)),
125+
StorageProtocol::List(msg) => local.send((msg, tx)),
134126
})
135127
});
136-
let x = AbortOnDropHandle::new(tokio::spawn(listen(endpoint, fun)));
137-
Ok(x)
128+
Ok(AbortOnDropHandle::new(task::spawn(listen(
129+
endpoint, handler,
130+
))))
138131
}
139132
ServiceSender::Remote(_, _, _) => {
140-
Err(anyhow::anyhow!("cannot listen on a remote service"))
133+
bail!("cannot listen on a remote service");
141134
}
142135
}
143136
}

examples/storage.rs

+10-17
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55
sync::Arc,
66
};
77

8-
use n0_future::task::AbortOnDropHandle;
8+
use n0_future::task::{self, AbortOnDropHandle};
99
use quic_rpc::{
1010
channel::{mpsc, none::NoReceiver, oneshot},
1111
rpc::{listen, Handler},
@@ -76,7 +76,7 @@ impl StorageActor {
7676
recv: rx,
7777
state: BTreeMap::new(),
7878
};
79-
tokio::spawn(actor.run());
79+
n0_future::task::spawn(actor.run());
8080
let local = LocalMpscChannel::<StorageMessage, StorageService>::from(tx);
8181
StorageApi {
8282
inner: local.into(),
@@ -129,24 +129,17 @@ impl StorageApi {
129129
match &self.inner {
130130
ServiceSender::Local(local, _) => {
131131
let local = LocalMpscChannel::from(local.clone());
132-
let fun: Handler<StorageProtocol> = Arc::new(move |msg, _rx, tx| {
132+
let handler: Handler<StorageProtocol> = Arc::new(move |msg, _rx, tx| {
133133
let local = local.clone();
134-
Box::pin(async move {
135-
match msg {
136-
StorageProtocol::Get(msg) => {
137-
local.send((msg, tx)).await?;
138-
}
139-
StorageProtocol::Set(msg) => {
140-
local.send((msg, tx)).await?;
141-
}
142-
StorageProtocol::List(msg) => {
143-
local.send((msg, tx)).await?;
144-
}
145-
};
146-
Ok(())
134+
Box::pin(match msg {
135+
StorageProtocol::Get(msg) => local.send((msg, tx)),
136+
StorageProtocol::Set(msg) => local.send((msg, tx)),
137+
StorageProtocol::List(msg) => local.send((msg, tx)),
147138
})
148139
});
149-
Ok(AbortOnDropHandle::new(tokio::spawn(listen(endpoint, fun))))
140+
Ok(AbortOnDropHandle::new(task::spawn(listen(
141+
endpoint, handler,
142+
))))
150143
}
151144
ServiceSender::Remote(_, _, _) => {
152145
Err(anyhow::anyhow!("cannot listen on a remote service"))

src/lib.rs

+66-7
Original file line numberDiff line numberDiff line change
@@ -657,16 +657,75 @@ impl<M, R, S> From<LocalMpscChannel<M, S>> for ServiceRequest<M, R, S> {
657657
}
658658
}
659659

660-
impl<M: Send + Sync + 'static, S: Service> LocalMpscChannel<M, S> {
661-
pub async fn send<T>(&self, value: impl Into<WithChannels<T, S>>) -> io::Result<()>
660+
impl<M: Send, S: Service> LocalMpscChannel<M, S> {
661+
pub fn send<T>(&self, value: impl Into<WithChannels<T, S>>) -> SendFut<M>
662662
where
663663
T: Channels<S>,
664664
M: From<WithChannels<T, S>>,
665665
{
666-
self.0
667-
.send(value.into().into())
668-
.await
669-
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
670-
Ok(())
666+
let value: M = value.into().into();
667+
SendFut::new(self.0.clone(), value)
671668
}
672669
}
670+
671+
mod send_fut {
672+
use std::{
673+
future::Future,
674+
io,
675+
pin::Pin,
676+
task::{Context, Poll},
677+
};
678+
679+
use tokio::sync::mpsc::Sender;
680+
use tokio_util::sync::PollSender;
681+
682+
pub struct SendFut<T: Send> {
683+
poll_sender: PollSender<T>,
684+
value: Option<T>,
685+
}
686+
687+
impl<T: Send> SendFut<T> {
688+
pub fn new(sender: Sender<T>, value: T) -> Self {
689+
Self {
690+
poll_sender: PollSender::new(sender),
691+
value: Some(value),
692+
}
693+
}
694+
}
695+
696+
impl<T: Send + Unpin> Future for SendFut<T> {
697+
type Output = io::Result<()>;
698+
699+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
700+
let this = self.get_mut();
701+
702+
// Safely extract the value
703+
let value = match this.value.take() {
704+
Some(v) => v,
705+
None => return Poll::Ready(Ok(())), // Already completed
706+
};
707+
708+
// Try to reserve capacity
709+
match this.poll_sender.poll_reserve(cx) {
710+
Poll::Ready(Ok(())) => {
711+
// Send the item
712+
this.poll_sender.send_item(value).ok();
713+
Poll::Ready(Ok(()))
714+
}
715+
Poll::Ready(Err(_)) => {
716+
// Channel is closed
717+
Poll::Ready(Err(io::Error::new(
718+
io::ErrorKind::BrokenPipe,
719+
"Channel closed",
720+
)))
721+
}
722+
Poll::Pending => {
723+
// Restore the value and wait
724+
this.value = Some(value);
725+
Poll::Pending
726+
}
727+
}
728+
}
729+
}
730+
}
731+
use send_fut::SendFut;

0 commit comments

Comments
 (0)