16
16
17
17
package io .rsocket ;
18
18
19
- import static io .rsocket .util .ExceptionUtil .noStacktrace ;
20
-
21
19
import io .netty .buffer .Unpooled ;
22
20
import io .netty .util .collection .IntObjectHashMap ;
23
21
import io .rsocket .exceptions .ConnectionException ;
24
22
import io .rsocket .exceptions .Exceptions ;
25
23
import io .rsocket .internal .LimitableRequestPublisher ;
24
+ import io .rsocket .internal .UnboundedProcessor ;
26
25
import io .rsocket .util .PayloadImpl ;
26
+ import org .reactivestreams .Publisher ;
27
+ import org .reactivestreams .Subscriber ;
28
+ import reactor .core .Disposable ;
29
+ import reactor .core .publisher .*;
30
+
31
+ import javax .annotation .Nullable ;
27
32
import java .nio .channels .ClosedChannelException ;
28
33
import java .time .Duration ;
29
34
import java .util .Collection ;
32
37
import java .util .function .Consumer ;
33
38
import java .util .function .Function ;
34
39
import java .util .function .Supplier ;
35
- import javax .annotation .Nullable ;
36
- import org .reactivestreams .Publisher ;
37
- import org .reactivestreams .Subscriber ;
38
- import reactor .core .Disposable ;
39
- import reactor .core .publisher .*;
40
+
41
+ import static io .rsocket .util .ExceptionUtil .noStacktrace ;
40
42
41
43
/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
42
44
class RSocketClient implements RSocket {
@@ -52,7 +54,7 @@ class RSocketClient implements RSocket {
52
54
private final IntObjectHashMap <Subscriber <Payload >> receivers ;
53
55
private final AtomicInteger missedAckCounter ;
54
56
55
- private final FluxProcessor < Frame , Frame > sendProcessor ;
57
+ private final UnboundedProcessor < Frame > sendProcessor ;
56
58
57
59
private @ Nullable Disposable keepAliveSendSub ;
58
60
private volatile long timeLastTickSentMs ;
@@ -80,8 +82,7 @@ class RSocketClient implements RSocket {
80
82
this .missedAckCounter = new AtomicInteger ();
81
83
82
84
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
83
- // connections
84
- this .sendProcessor = EmitterProcessor .<Frame >create ().serialize ();
85
+ this .sendProcessor = new UnboundedProcessor <>();
85
86
86
87
if (!Duration .ZERO .equals (tickPeriod )) {
87
88
long ackTimeoutMs = ackTimeout .toMillis ();
@@ -98,8 +99,15 @@ class RSocketClient implements RSocket {
98
99
})
99
100
.subscribe ();
100
101
}
101
-
102
- connection .onClose ().doFinally (signalType -> cleanup ()).doOnError (errorConsumer ).subscribe ();
102
+
103
+ connection
104
+ .onClose ()
105
+ .doFinally (
106
+ signalType -> {
107
+ cleanup ();
108
+ })
109
+ .doOnError (errorConsumer )
110
+ .subscribe ();
103
111
104
112
connection
105
113
.send (sendProcessor )
@@ -205,7 +213,7 @@ public Flux<Payload> requestStream(Payload payload) {
205
213
206
214
@ Override
207
215
public Flux <Payload > requestChannel (Publisher <Payload > payloads ) {
208
- return handleStreamResponse (Flux .from (payloads ), FrameType .REQUEST_CHANNEL );
216
+ return handleChannel (Flux .from (payloads ), FrameType .REQUEST_CHANNEL );
209
217
}
210
218
211
219
@ Override
@@ -255,6 +263,7 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
255
263
} else if (contains (streamId ) && !receiver .isTerminated ()) {
256
264
sendProcessor .onNext (Frame .RequestN .from (streamId , l ));
257
265
}
266
+ sendProcessor .drain ();
258
267
})
259
268
.doOnError (
260
269
t -> {
@@ -268,7 +277,10 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
268
277
sendProcessor .onNext (Frame .Cancel .from (streamId ));
269
278
}
270
279
})
271
- .doFinally (s -> removeReceiver (streamId ));
280
+ .doFinally (
281
+ s -> {
282
+ removeReceiver (streamId );
283
+ });
272
284
}));
273
285
}
274
286
@@ -291,11 +303,14 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
291
303
return receiver
292
304
.doOnError (t -> sendProcessor .onNext (Frame .Error .from (streamId , t )))
293
305
.doOnCancel (() -> sendProcessor .onNext (Frame .Cancel .from (streamId )))
294
- .doFinally (s -> removeReceiver (streamId ));
306
+ .doFinally (
307
+ s -> {
308
+ removeReceiver (streamId );
309
+ });
295
310
}));
296
311
}
297
312
298
- private Flux <Payload > handleStreamResponse (Flux <Payload > request , FrameType requestType ) {
313
+ private Flux <Payload > handleChannel (Flux <Payload > request , FrameType requestType ) {
299
314
return started .thenMany (
300
315
Flux .defer (
301
316
new Supplier <Flux <Payload >>() {
@@ -328,6 +343,7 @@ public Flux<Payload> get() {
328
343
}
329
344
330
345
if (_firstRequest ) {
346
+ AtomicBoolean firstPayload = new AtomicBoolean (true );
331
347
Flux <Frame > requestFrames =
332
348
request
333
349
.transform (
@@ -345,19 +361,10 @@ public Flux<Payload> get() {
345
361
})
346
362
.map (
347
363
new Function <Payload , Frame >() {
348
- boolean firstPayload = true ;
349
364
350
365
@ Override
351
366
public Frame apply (Payload payload ) {
352
- boolean _firstPayload = false ;
353
- synchronized (this ) {
354
- if (firstPayload ) {
355
- firstPayload = false ;
356
- _firstPayload = true ;
357
- }
358
- }
359
-
360
- if (_firstPayload ) {
367
+ if (firstPayload .compareAndSet (true , false )) {
361
368
return Frame .Request .from (
362
369
streamId , requestType , payload , l );
363
370
} else {
@@ -372,6 +379,9 @@ public Frame apply(Payload payload) {
372
379
sendOneFrame (
373
380
Frame .PayloadFrame .from (
374
381
streamId , FrameType .COMPLETE ));
382
+ if (firstPayload .get ()) {
383
+ receiver .onComplete ();
384
+ }
375
385
}
376
386
});
377
387
@@ -522,6 +532,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
522
532
if (sender != null ) {
523
533
int n = Frame .RequestN .requestN (frame );
524
534
sender .increaseRequestLimit (n );
535
+ sendProcessor .drain ();
525
536
}
526
537
break ;
527
538
}
0 commit comments