@@ -104,6 +104,7 @@ impl<T: RpcMessage> Sink<T> for SendSink<T> {
104
104
105
105
enum RecvStreamInner < T : RpcMessage > {
106
106
Direct ( :: flume:: r#async:: RecvStream < ' static , T > ) ,
107
+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
107
108
DirectTokio ( tokio_stream:: wrappers:: ReceiverStream < T > ) ,
108
109
Boxed ( Pin < Box < dyn Stream < Item = Result < T , anyhow:: Error > > + Send + Sync + ' static > > ) ,
109
110
}
@@ -129,6 +130,7 @@ impl<T: RpcMessage> RecvStream<T> {
129
130
}
130
131
131
132
/// Create a new receive stream from a direct flume receive stream
133
+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
132
134
pub ( crate ) fn direct_tokio ( stream : tokio_stream:: wrappers:: ReceiverStream < T > ) -> Self {
133
135
Self ( RecvStreamInner :: DirectTokio ( stream) )
134
136
}
@@ -144,6 +146,7 @@ impl<T: RpcMessage> Stream for RecvStream<T> {
144
146
Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
145
147
Poll :: Pending => Poll :: Pending ,
146
148
} ,
149
+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
147
150
RecvStreamInner :: DirectTokio ( stream) => match stream. poll_next_unpin ( cx) {
148
151
Poll :: Ready ( Some ( item) ) => Poll :: Ready ( Some ( Ok ( item) ) ) ,
149
152
Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
@@ -158,6 +161,7 @@ enum OpenFutureInner<'a, In: RpcMessage, Out: RpcMessage> {
158
161
/// A direct future (todo)
159
162
Direct ( super :: flume:: OpenBiFuture < In , Out > ) ,
160
163
/// A direct future (todo)
164
+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
161
165
DirectTokio ( BoxFuture < ' a , anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > ) ,
162
166
/// A boxed future
163
167
Boxed ( BoxFuture < ' a , anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > ) ,
@@ -172,6 +176,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> OpenFuture<'a, In, Out> {
172
176
Self ( OpenFutureInner :: Direct ( f) )
173
177
}
174
178
/// Create a new boxed future
179
+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
175
180
pub fn direct_tokio (
176
181
f : impl Future < Output = anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > + Send + Sync + ' a ,
177
182
) -> Self {
@@ -195,6 +200,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> Future for OpenFuture<'a, In, Out> {
195
200
. poll ( cx)
196
201
. map_ok ( |( send, recv) | ( SendSink :: direct ( send. 0 ) , RecvStream :: direct ( recv. 0 ) ) )
197
202
. map_err ( |e| e. into ( ) ) ,
203
+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
198
204
OpenFutureInner :: DirectTokio ( f) => f. poll ( cx) ,
199
205
OpenFutureInner :: Boxed ( f) => f. poll ( cx) ,
200
206
}
@@ -205,6 +211,7 @@ enum AcceptFutureInner<'a, In: RpcMessage, Out: RpcMessage> {
205
211
/// A direct future
206
212
Direct ( super :: flume:: AcceptBiFuture < In , Out > ) ,
207
213
/// A direct future
214
+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
208
215
DirectTokio ( BoxedFuture < ' a , anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > ) ,
209
216
/// A boxed future
210
217
Boxed ( BoxedFuture < ' a , anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > ) ,
@@ -220,6 +227,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> AcceptFuture<'a, In, Out> {
220
227
}
221
228
222
229
/// bla
230
+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
223
231
pub fn direct_tokio (
224
232
f : impl Future < Output = anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > + Send + Sync + ' a ,
225
233
) -> Self {
@@ -243,6 +251,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> Future for AcceptFuture<'a, In, Out> {
243
251
. poll ( cx)
244
252
. map_ok ( |( send, recv) | ( SendSink :: direct ( send. 0 ) , RecvStream :: direct ( recv. 0 ) ) )
245
253
. map_err ( |e| e. into ( ) ) ,
254
+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
246
255
AcceptFutureInner :: DirectTokio ( f) => f. poll ( cx) ,
247
256
AcceptFutureInner :: Boxed ( f) => f. poll ( cx) ,
248
257
}
0 commit comments