Skip to content

Commit 02d7823

Browse files
committed
rename Msg to WithChannels again
1 parent 6bde683 commit 02d7823

File tree

5 files changed

+52
-59
lines changed

5 files changed

+52
-59
lines changed

examples/compute.rs

+7-14
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use quic_rpc::{
1515
channel::{mpsc, oneshot},
1616
rpc::{listen, Handler, RemoteRead},
1717
util::{make_client_endpoint, make_server_endpoint},
18-
LocalMpscChannel, Msg, Service, ServiceRequest, ServiceSender,
18+
LocalMpscChannel, Service, ServiceRequest, ServiceSender, WithChannels,
1919
};
2020
use quic_rpc_derive::rpc_requests;
2121
use serde::{Deserialize, Serialize};
@@ -100,13 +100,13 @@ impl ComputeActor {
100100
match msg {
101101
ComputeMessage::Sqr(sqr) => {
102102
trace!("sqr {:?}", sqr);
103-
let Msg { tx, inner, .. } = sqr;
103+
let WithChannels { tx, inner, .. } = sqr;
104104
let result = (inner.num as u128) * (inner.num as u128);
105105
tx.send(result).await?;
106106
}
107107
ComputeMessage::Sum(sum) => {
108108
trace!("sum {:?}", sum);
109-
let Msg { rx, tx, .. } = sum;
109+
let WithChannels { rx, tx, .. } = sum;
110110
let mut receiver = rx;
111111
let mut total = 0;
112112
while let Some(num) = receiver.recv().await? {
@@ -116,7 +116,7 @@ impl ComputeActor {
116116
}
117117
ComputeMessage::Fibonacci(fib) => {
118118
trace!("fibonacci {:?}", fib);
119-
let Msg { tx, inner, .. } = fib;
119+
let WithChannels { tx, inner, .. } = fib;
120120
let mut sender = tx;
121121
let mut a = 0u64;
122122
let mut b = 1u64;
@@ -129,7 +129,7 @@ impl ComputeActor {
129129
}
130130
ComputeMessage::Multiply(mult) => {
131131
trace!("multiply {:?}", mult);
132-
let Msg { rx, tx, inner } = mult;
132+
let WithChannels { rx, tx, inner } = mult;
133133
let mut receiver = rx;
134134
let mut sender = tx;
135135
let multiplier = inner.initial;
@@ -406,7 +406,6 @@ fn clear_line() -> io::Result<()> {
406406
Ok(())
407407
}
408408

409-
410409
// Simple benchmark sending oneshot senders via an mpsc channel
411410
pub async fn reference_bench(n: u64) -> anyhow::Result<()> {
412411
// Create an mpsc channel to send oneshot senders
@@ -437,10 +436,7 @@ pub async fn reference_bench(n: u64) -> anyhow::Result<()> {
437436
let rps = ((n as f64) / t0.elapsed().as_secs_f64()).round() as u64;
438437
assert_eq!(sum, 42 * n); // Each response is 42
439438
clear_line()?;
440-
println!(
441-
"Reference seq {} rps",
442-
rps.separate_with_underscores()
443-
);
439+
println!("Reference seq {} rps", rps.separate_with_underscores());
444440
}
445441

446442
// Parallel oneshot sends
@@ -456,10 +452,7 @@ pub async fn reference_bench(n: u64) -> anyhow::Result<()> {
456452
let rps = ((n as f64) / t0.elapsed().as_secs_f64()).round() as u64;
457453
assert_eq!(sum, 42 * n); // Each response is 42
458454
clear_line()?;
459-
println!(
460-
"Reference par {} rps",
461-
rps.separate_with_underscores()
462-
);
455+
println!("Reference par {} rps", rps.separate_with_underscores());
463456
}
464457

465458
Ok(())

examples/derive.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use quic_rpc::{
1111
channel::{mpsc, oneshot},
1212
rpc::{listen, Handler},
1313
util::{make_client_endpoint, make_server_endpoint},
14-
LocalMpscChannel, Msg, Service, ServiceRequest, ServiceSender,
14+
LocalMpscChannel, Service, ServiceRequest, ServiceSender, WithChannels,
1515
};
1616
// Import the macro
1717
use quic_rpc_derive::rpc_requests;
@@ -80,18 +80,18 @@ impl StorageActor {
8080
match msg {
8181
StorageMessage::Get(get) => {
8282
info!("get {:?}", get);
83-
let Msg { tx, inner, .. } = get;
83+
let WithChannels { tx, inner, .. } = get;
8484
tx.send(self.state.get(&inner.key).cloned()).await.ok();
8585
}
8686
StorageMessage::Set(set) => {
8787
info!("set {:?}", set);
88-
let Msg { tx, inner, .. } = set;
88+
let WithChannels { tx, inner, .. } = set;
8989
self.state.insert(inner.key, inner.value);
9090
tx.send(()).await.ok();
9191
}
9292
StorageMessage::List(list) => {
9393
info!("list {:?}", list);
94-
let Msg { mut tx, .. } = list;
94+
let WithChannels { mut tx, .. } = list;
9595
for (key, value) in &self.state {
9696
if tx.send(format!("{key}={value}")).await.is_err() {
9797
break;

examples/storage.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use quic_rpc::{
1010
channel::{mpsc, none::NoReceiver, oneshot},
1111
rpc::{listen, Handler},
1212
util::{make_client_endpoint, make_server_endpoint},
13-
Channels, LocalMpscChannel, Msg, Service, ServiceRequest, ServiceSender,
13+
Channels, LocalMpscChannel, Service, ServiceRequest, ServiceSender, WithChannels,
1414
};
1515
use serde::{Deserialize, Serialize};
1616
use tracing::info;
@@ -59,9 +59,9 @@ enum StorageProtocol {
5959

6060
#[derive(derive_more::From)]
6161
enum StorageMessage {
62-
Get(Msg<Get, StorageService>),
63-
Set(Msg<Set, StorageService>),
64-
List(Msg<List, StorageService>),
62+
Get(WithChannels<Get, StorageService>),
63+
Set(WithChannels<Set, StorageService>),
64+
List(WithChannels<List, StorageService>),
6565
}
6666

6767
struct StorageActor {
@@ -93,18 +93,18 @@ impl StorageActor {
9393
match msg {
9494
StorageMessage::Get(get) => {
9595
info!("get {:?}", get);
96-
let Msg { tx, inner, .. } = get;
96+
let WithChannels { tx, inner, .. } = get;
9797
tx.send(self.state.get(&inner.key).cloned()).await.ok();
9898
}
9999
StorageMessage::Set(set) => {
100100
info!("set {:?}", set);
101-
let Msg { tx, inner, .. } = set;
101+
let WithChannels { tx, inner, .. } = set;
102102
self.state.insert(inner.key, inner.value);
103103
tx.send(()).await.ok();
104104
}
105105
StorageMessage::List(list) => {
106106
info!("list {:?}", list);
107-
let Msg { mut tx, .. } = list;
107+
let WithChannels { mut tx, .. } = list;
108108
for (key, value) in &self.state {
109109
if tx.send(format!("{key}={value}")).await.is_err() {
110110
break;

quic-rpc-derive/src/lib.rs

+10-22
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,12 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream {
6363
_ => return error_tokens(input.span(), "RpcRequests can only be applied to enums"),
6464
};
6565

66+
// builder for the trait impls
6667
let mut additional_items = Vec::new();
68+
// types to check for uniqueness
6769
let mut types = HashSet::new();
70+
// variant names and types
71+
let mut variants = Vec::new();
6872

6973
for variant in &mut data_enum.variants {
7074
// Check field structure for every variant
@@ -77,6 +81,7 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream {
7781
)
7882
}
7983
};
84+
variants.push((variant.ident.clone(), request_type.clone()));
8085

8186
if !types.insert(request_type.to_token_stream().to_string()) {
8287
return error_tokens(input_span, "Each variant must have a unique request type");
@@ -107,6 +112,7 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream {
107112
);
108113
}
109114

115+
// if there is no attr, the user has to impl Channels manually
110116
if let Some(attr) = rpc_attr {
111117
let args = match attr.parse_args::<NamedTypeArgs>() {
112118
Ok(info) => info,
@@ -120,29 +126,11 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream {
120126
}
121127
}
122128

123-
let message_variants = data_enum
124-
.variants
125-
.iter()
126-
.map(|variant| {
127-
let variant_name = &variant.ident;
128-
129-
// Extract the inner type using let-else patterns
130-
let Fields::Unnamed(fields) = &variant.fields else {
131-
unreachable!()
132-
};
133-
let Some(field) = fields.unnamed.first() else {
134-
unreachable!()
135-
};
136-
let Type::Path(type_path) = &field.ty else {
137-
unreachable!()
138-
};
139-
let Some(last_segment) = type_path.path.segments.last() else {
140-
unreachable!()
141-
};
142-
let inner_type = &last_segment.ident;
143-
129+
let message_variants = variants
130+
.into_iter()
131+
.map(|(variant_name, inner_type)| {
144132
quote! {
145-
#variant_name(::quic_rpc::Msg<#inner_type, #service_name>)
133+
#variant_name(::quic_rpc::WithChannels<#inner_type, #service_name>)
146134
}
147135
})
148136
.collect::<Vec<_>>();

src/lib.rs

+24-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{fmt::Debug, io, marker::PhantomData, ops::Deref};
22

33
use channel::none::NoReceiver;
4+
use sealed::Sealed;
45
use serde::{de::DeserializeOwned, Serialize};
56
#[cfg(feature = "test")]
67
pub mod util;
@@ -22,13 +23,17 @@ impl<T> RpcMessage for T where
2223
}
2324

2425
/// Marker trait for a service
25-
pub trait Service: Debug + Clone {}
26+
pub trait Service: Send + Sync + Debug + Clone + 'static {}
27+
28+
mod sealed {
29+
pub trait Sealed {}
30+
}
2631

2732
/// Marker trait for a sender
28-
pub trait Sender: Debug {}
33+
pub trait Sender: Debug + Sealed {}
2934

3035
/// Marker trait for a receiver
31-
pub trait Receiver: Debug {}
36+
pub trait Receiver: Debug + Sealed {}
3237

3338
/// Channels to be used for a message and service
3439
pub trait Channels<S: Service> {
@@ -136,6 +141,7 @@ pub mod channel {
136141
}
137142
}
138143

144+
impl<T> crate::sealed::Sealed for Sender<T> {}
139145
impl<T> crate::Sender for Sender<T> {}
140146

141147
pub enum Receiver<T> {
@@ -183,6 +189,7 @@ pub mod channel {
183189
}
184190
}
185191

192+
impl<T> crate::sealed::Sealed for Receiver<T> {}
186193
impl<T> crate::Receiver for Receiver<T> {}
187194
}
188195

@@ -278,6 +285,7 @@ pub mod channel {
278285
}
279286
}
280287

288+
impl<T> crate::sealed::Sealed for Sender<T> {}
281289
impl<T> crate::Sender for Sender<T> {}
282290

283291
pub enum Receiver<T> {
@@ -309,30 +317,34 @@ pub mod channel {
309317
}
310318
}
311319

320+
impl<T> crate::sealed::Sealed for Receiver<T> {}
312321
impl<T> crate::Receiver for Receiver<T> {}
313322
}
314323

315324
/// No channels, used when no communication is needed
316325
pub mod none {
317-
use crate::{Receiver, Sender};
326+
use crate::{sealed::Sealed, Receiver, Sender};
318327

319328
#[derive(Debug)]
320329
pub struct NoSender;
321-
330+
impl Sealed for NoSender {}
322331
impl Sender for NoSender {}
323332

324333
#[derive(Debug)]
325334
pub struct NoReceiver;
326335

336+
impl Sealed for NoReceiver {}
327337
impl Receiver for NoReceiver {}
328338
}
329339
}
330340

331-
/// A wrapper for a message with channels to send and receive it
341+
/// A wrapper for a message with channels to send and receive it.
342+
/// This expands the protocol message to a full message that includes the
343+
/// active and unserializable channels.
332344
///
333345
/// rx and tx can be set to an appropriate channel kind.
334346
#[derive(Debug)]
335-
pub struct Msg<I: Channels<S>, S: Service> {
347+
pub struct WithChannels<I: Channels<S>, S: Service> {
336348
/// The inner message.
337349
pub inner: I,
338350
/// The return channel to send the response to. Can be set to [`crate::channel::none::NoSender`] if not needed.
@@ -344,7 +356,7 @@ pub struct Msg<I: Channels<S>, S: Service> {
344356
/// Tuple conversion from inner message and tx/rx channels to a WithChannels struct
345357
///
346358
/// For the case where you want both tx and rx channels.
347-
impl<I: Channels<S>, S: Service, Tx, Rx> From<(I, Tx, Rx)> for Msg<I, S>
359+
impl<I: Channels<S>, S: Service, Tx, Rx> From<(I, Tx, Rx)> for WithChannels<I, S>
348360
where
349361
I: Channels<S>,
350362
<I as Channels<S>>::Tx: From<Tx>,
@@ -363,7 +375,7 @@ where
363375
/// Tuple conversion from inner message and tx channel to a WithChannels struct
364376
///
365377
/// For the very common case where you just need a tx channel to send the response to.
366-
impl<I, S, Tx> From<(I, Tx)> for Msg<I, S>
378+
impl<I, S, Tx> From<(I, Tx)> for WithChannels<I, S>
367379
where
368380
I: Channels<S, Rx = NoReceiver>,
369381
S: Service,
@@ -380,7 +392,7 @@ where
380392
}
381393

382394
/// Deref so you can access the inner fields directly
383-
impl<I: Channels<S>, S: Service> Deref for Msg<I, S> {
395+
impl<I: Channels<S>, S: Service> Deref for WithChannels<I, S> {
384396
type Target = I;
385397

386398
fn deref(&self) -> &Self::Target {
@@ -686,10 +698,10 @@ impl<M, R, S> From<LocalMpscChannel<M, S>> for ServiceRequest<M, R, S> {
686698
}
687699

688700
impl<M: Send, S: Service> LocalMpscChannel<M, S> {
689-
pub fn send<T>(&self, value: impl Into<Msg<T, S>>) -> SendFut<M>
701+
pub fn send<T>(&self, value: impl Into<WithChannels<T, S>>) -> SendFut<M>
690702
where
691703
T: Channels<S>,
692-
M: From<Msg<T, S>>,
704+
M: From<WithChannels<T, S>>,
693705
{
694706
let value: M = value.into().into();
695707
SendFut::new(self.0.clone(), value)

0 commit comments

Comments
 (0)