Skip to content

Commit b40d86d

Browse files
committed
adds graceful shutdown support
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: OlegDokuka <[email protected]>
1 parent fc64b43 commit b40d86d

18 files changed

+593
-20
lines changed

rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.rsocket.DuplexConnection;
66
import io.rsocket.RSocket;
77
import io.rsocket.frame.decoder.PayloadDecoder;
8+
import reactor.core.publisher.Sinks;
89
import reactor.util.annotation.Nullable;
910

1011
public class TestRequesterResponderSupport extends RequesterResponderSupport implements RSocket {
@@ -27,7 +28,8 @@ public TestRequesterResponderSupport(
2728
PayloadDecoder.ZERO_COPY,
2829
connection,
2930
streamIdSupplier,
30-
__ -> null);
31+
__ -> null,
32+
Sinks.empty());
3133
this.requesterLeaseTracker = requesterLeaseTracker;
3234
}
3335

rsocket-core/src/main/java/io/rsocket/RSocket.java

+2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ default double availability() {
8787
@Override
8888
default void dispose() {}
8989

90+
default void disposeGracefully() {}
91+
9092
@Override
9193
default boolean isDisposed() {
9294
return false;

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -655,10 +655,16 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
655655
requesterLeaseTracker = null;
656656
}
657657

658+
final Sinks.Empty<Void> requesterOnGracefulShutdownSink =
659+
Sinks.unsafe().empty();
660+
final Sinks.Empty<Void> responderOnGracefulShutdownSink =
661+
Sinks.unsafe().empty();
658662
final Sinks.Empty<Void> requesterOnAllClosedSink =
659663
Sinks.unsafe().empty();
660664
final Sinks.Empty<Void> responderOnAllClosedSink =
661665
Sinks.unsafe().empty();
666+
final Sinks.Empty<Void> requesterGracefulShutdownStartedSink =
667+
Sinks.unsafe().empty();
662668

663669
RSocket rSocketRequester =
664670
new RSocketRequester(
@@ -673,7 +679,12 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
673679
keepAliveHandler,
674680
interceptors::initRequesterRequestInterceptor,
675681
requesterLeaseTracker,
682+
requesterGracefulShutdownStartedSink,
683+
requesterOnGracefulShutdownSink,
676684
requesterOnAllClosedSink,
685+
Mono.whenDelayError(
686+
responderOnGracefulShutdownSink.asMono(),
687+
requesterOnGracefulShutdownSink.asMono()),
677688
Mono.whenDelayError(
678689
responderOnAllClosedSink.asMono(),
679690
requesterOnAllClosedSink.asMono()));
@@ -725,7 +736,9 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
725736
leases.sender)
726737
: interceptors
727738
::initResponderRequestInterceptor,
728-
responderOnAllClosedSink);
739+
responderOnGracefulShutdownSink,
740+
responderOnAllClosedSink,
741+
requesterGracefulShutdownStartedSink.asMono());
729742

730743
return wrappedRSocketRequester;
731744
})

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.rsocket.DuplexConnection;
2424
import io.rsocket.Payload;
2525
import io.rsocket.RSocket;
26+
import io.rsocket.exceptions.ConnectionCloseException;
2627
import io.rsocket.exceptions.ConnectionErrorException;
2728
import io.rsocket.exceptions.Exceptions;
2829
import io.rsocket.frame.ErrorFrameCodec;
@@ -67,6 +68,7 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
6768

6869
@Nullable private final RequesterLeaseTracker requesterLeaseTracker;
6970

71+
private final Sinks.Empty<Void> onGracefulShutdownStartedSink;
7072
private final Sinks.Empty<Void> onThisSideClosedSink;
7173
private final Mono<Void> onAllClosed;
7274
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
@@ -83,7 +85,10 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
8385
@Nullable KeepAliveHandler keepAliveHandler,
8486
Function<RSocket, RequestInterceptor> requestInterceptorFunction,
8587
@Nullable RequesterLeaseTracker requesterLeaseTracker,
88+
Sinks.Empty<Void> onGracefulShutdownStartedSink,
89+
Sinks.Empty<Void> onGracefulShutdownSink,
8690
Sinks.Empty<Void> onThisSideClosedSink,
91+
Mono<Void> onGracefulShutdownDone,
8792
Mono<Void> onAllClosed) {
8893
super(
8994
mtu,
@@ -92,14 +97,17 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
9297
payloadDecoder,
9398
connection,
9499
streamIdSupplier,
95-
requestInterceptorFunction);
100+
requestInterceptorFunction,
101+
onGracefulShutdownSink);
96102

97103
this.requesterLeaseTracker = requesterLeaseTracker;
104+
this.onGracefulShutdownStartedSink = onGracefulShutdownStartedSink;
98105
this.onThisSideClosedSink = onThisSideClosedSink;
99106
this.onAllClosed = onAllClosed;
100107

101108
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
102109
connection.onClose().subscribe(null, this::tryShutdown, this::tryShutdown);
110+
onGracefulShutdownDone.subscribe(null, null, connection::dispose);
103111

104112
connection.receive().subscribe(this::handleIncomingFrames, e -> {});
105113

@@ -200,6 +208,17 @@ public void dispose() {
200208
getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Disposed"));
201209
}
202210

211+
@Override
212+
public void disposeGracefully() {
213+
getDuplexConnection()
214+
.sendFrame(
215+
0,
216+
ErrorFrameCodec.encode(
217+
getAllocator(), 0, new ConnectionCloseException("Graceful Shutdown")));
218+
this.onGracefulShutdownStartedSink.tryEmitEmpty();
219+
super.terminate();
220+
}
221+
203222
@Override
204223
public boolean isDisposed() {
205224
return terminationError != null;
@@ -352,6 +371,12 @@ private void tryTerminate(Supplier<Throwable> errorSupplier) {
352371
}
353372
if (terminationError == null) {
354373
Throwable e = errorSupplier.get();
374+
375+
if (e instanceof ConnectionCloseException) {
376+
this.onGracefulShutdownStartedSink.tryEmitEmpty();
377+
super.terminate();
378+
return;
379+
}
355380
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
356381
terminate(e);
357382
} else {
@@ -418,7 +443,7 @@ private void terminate(Throwable e) {
418443
requesterLeaseTracker.dispose(e);
419444
}
420445

421-
final Collection<FrameHandler> activeStreamsCopy;
446+
final Collection<FrameHandler> activeStreamsCopy; // in case of graceful shut down is empty
422447
synchronized (this) {
423448
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
424449
activeStreamsCopy = new ArrayList<>(activeStreams.values());

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,18 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
7373
int maxFrameLength,
7474
int maxInboundPayloadSize,
7575
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
76-
Sinks.Empty<Void> onThisSideClosedSink) {
76+
Sinks.Empty<Void> onGracefulShutdownSink,
77+
Sinks.Empty<Void> onThisSideClosedSink,
78+
Mono<Void> onRequesterGracefulShutdownStarted) {
7779
super(
7880
mtu,
7981
maxFrameLength,
8082
maxInboundPayloadSize,
8183
payloadDecoder,
8284
connection,
8385
null,
84-
requestInterceptorFunction);
86+
requestInterceptorFunction,
87+
onGracefulShutdownSink);
8588

8689
this.requestHandler = requestHandler;
8790

@@ -92,12 +95,18 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
9295
.onClose()
9396
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
9497

98+
onRequesterGracefulShutdownStarted.subscribe(null, null, this::onGracefulShutdownStarted);
99+
95100
connection.receive().subscribe(this::handleFrame, e -> {});
96101
}
97102

103+
private void onGracefulShutdownStarted() {
104+
super.terminate();
105+
requestHandler.disposeGracefully();
106+
}
107+
98108
private void tryTerminateOnConnectionError(Throwable e) {
99109
if (LOGGER.isDebugEnabled()) {
100-
101110
LOGGER.debug("Try terminate connection on responder side");
102111
}
103112
tryTerminate(() -> e);

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,11 @@ private Mono<Void> acceptSetup(
438438
requesterLeaseTracker = null;
439439
}
440440

441+
final Sinks.Empty<Void> requesterOnGracefulShutdownSink = Sinks.unsafe().empty();
442+
final Sinks.Empty<Void> responderOnGracefulShutdownSink = Sinks.unsafe().empty();
441443
final Sinks.Empty<Void> requesterOnAllClosedSink = Sinks.unsafe().empty();
442444
final Sinks.Empty<Void> responderOnAllClosedSink = Sinks.unsafe().empty();
445+
final Sinks.Empty<Void> requesterGracefulShutdownStartedSink = Sinks.unsafe().empty();
443446

444447
RSocket rSocketRequester =
445448
new RSocketRequester(
@@ -454,7 +457,12 @@ private Mono<Void> acceptSetup(
454457
keepAliveHandler,
455458
interceptors::initRequesterRequestInterceptor,
456459
requesterLeaseTracker,
460+
requesterGracefulShutdownStartedSink,
461+
requesterOnGracefulShutdownSink,
457462
requesterOnAllClosedSink,
463+
Mono.whenDelayError(
464+
responderOnGracefulShutdownSink.asMono(),
465+
requesterOnGracefulShutdownSink.asMono()),
458466
Mono.whenDelayError(
459467
responderOnAllClosedSink.asMono(), requesterOnAllClosedSink.asMono()));
460468

@@ -495,7 +503,9 @@ wrappedDuplexConnection, rejectedSetupError(err)))
495503
interceptors.initResponderRequestInterceptor(
496504
rSocket, (RequestInterceptor) leases.sender)
497505
: interceptors::initResponderRequestInterceptor,
498-
responderOnAllClosedSink);
506+
responderOnGracefulShutdownSink,
507+
responderOnAllClosedSink,
508+
requesterGracefulShutdownStartedSink.asMono());
499509
})
500510
.doFinally(signalType -> setupPayload.release())
501511
.then();

rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java

+59-8
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
import io.netty.util.collection.IntObjectMap;
66
import io.rsocket.DuplexConnection;
77
import io.rsocket.RSocket;
8+
import io.rsocket.exceptions.CanceledException;
89
import io.rsocket.frame.decoder.PayloadDecoder;
910
import io.rsocket.plugins.RequestInterceptor;
1011
import java.util.Objects;
1112
import java.util.function.Function;
13+
import reactor.core.publisher.Sinks;
1214
import reactor.util.annotation.Nullable;
1315

1416
class RequesterResponderSupport {
@@ -19,19 +21,24 @@ class RequesterResponderSupport {
1921
private final PayloadDecoder payloadDecoder;
2022
private final ByteBufAllocator allocator;
2123
private final DuplexConnection connection;
24+
private final Sinks.Empty<Void> onGracefulShutdownSink;
2225
@Nullable private final RequestInterceptor requestInterceptor;
2326

2427
@Nullable final StreamIdSupplier streamIdSupplier;
2528
final IntObjectMap<FrameHandler> activeStreams;
2629

30+
boolean terminating;
31+
boolean terminated;
32+
2733
public RequesterResponderSupport(
2834
int mtu,
2935
int maxFrameLength,
3036
int maxInboundPayloadSize,
3137
PayloadDecoder payloadDecoder,
3238
DuplexConnection connection,
3339
@Nullable StreamIdSupplier streamIdSupplier,
34-
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction) {
40+
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
41+
Sinks.Empty<Void> onGracefulShutdownSink) {
3542

3643
this.activeStreams = new IntObjectHashMap<>();
3744
this.mtu = mtu;
@@ -41,6 +48,7 @@ public RequesterResponderSupport(
4148
this.allocator = connection.alloc();
4249
this.streamIdSupplier = streamIdSupplier;
4350
this.connection = connection;
51+
this.onGracefulShutdownSink = onGracefulShutdownSink;
4452
this.requestInterceptor = requestInterceptorFunction.apply((RSocket) this);
4553
}
4654

@@ -88,6 +96,9 @@ public int getNextStreamId() {
8896
final StreamIdSupplier streamIdSupplier = this.streamIdSupplier;
8997
if (streamIdSupplier != null) {
9098
synchronized (this) {
99+
if (this.terminating) {
100+
throw new CanceledException("Disposed");
101+
}
91102
return streamIdSupplier.nextStreamId(this.activeStreams);
92103
}
93104
} else {
@@ -107,6 +118,10 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
107118
if (streamIdSupplier != null) {
108119
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
109120
synchronized (this) {
121+
if (this.terminating) {
122+
throw new CanceledException("Disposed");
123+
}
124+
110125
final int streamId = streamIdSupplier.nextStreamId(activeStreams);
111126

112127
activeStreams.put(streamId, frameHandler);
@@ -119,6 +134,11 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
119134
}
120135

121136
public synchronized boolean add(int streamId, FrameHandler frameHandler) {
137+
if (this.terminating) {
138+
throw new CanceledException(
139+
"This RSocket is either disposed or disposing, and no longer accepting new requests");
140+
}
141+
122142
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
123143
// copy of Map.putIfAbsent(key, value) without `streamId` boxing
124144
final FrameHandler previousHandler = activeStreams.get(streamId);
@@ -148,14 +168,45 @@ public synchronized FrameHandler get(int streamId) {
148168
* @return {@code true} if there is {@link FrameHandler} for the given {@code streamId} and the
149169
* instance equals to the passed one
150170
*/
151-
public synchronized boolean remove(int streamId, FrameHandler frameHandler) {
152-
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
153-
// copy of Map.remove(key, value) without `streamId` boxing
154-
final FrameHandler curValue = activeStreams.get(streamId);
155-
if (!Objects.equals(curValue, frameHandler)) {
156-
return false;
171+
public boolean remove(int streamId, FrameHandler frameHandler) {
172+
final boolean terminated;
173+
synchronized (this) {
174+
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
175+
// copy of Map.remove(key, value) without `streamId` boxing
176+
final FrameHandler curValue = activeStreams.get(streamId);
177+
if (!Objects.equals(curValue, frameHandler)) {
178+
return false;
179+
}
180+
activeStreams.remove(streamId);
181+
if (this.terminating && activeStreams.size() == 0) {
182+
terminated = true;
183+
this.terminated = true;
184+
} else {
185+
terminated = false;
186+
}
187+
}
188+
189+
if (terminated) {
190+
onGracefulShutdownSink.tryEmitEmpty();
157191
}
158-
activeStreams.remove(streamId);
159192
return true;
160193
}
194+
195+
public void terminate() {
196+
final boolean terminated;
197+
synchronized (this) {
198+
this.terminating = true;
199+
200+
if (activeStreams.size() == 0) {
201+
terminated = true;
202+
this.terminated = true;
203+
} else {
204+
terminated = false;
205+
}
206+
}
207+
208+
if (terminated) {
209+
onGracefulShutdownSink.tryEmitEmpty();
210+
}
211+
}
161212
}

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

+7
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,8 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
721721
protected Runnable delayer;
722722
protected Sinks.One<RSocket> producer;
723723

724+
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
725+
protected Sinks.Empty<Void> thisGracefulShutdownSink;
724726
protected Sinks.Empty<Void> thisClosedSink;
725727

726728
@Override
@@ -740,6 +742,8 @@ protected void doInit() {
740742

741743
@Override
742744
protected RSocket newRSocket() {
745+
this.onGracefulShutdownStartedSink = Sinks.empty();
746+
this.thisGracefulShutdownSink = Sinks.empty();
743747
this.thisClosedSink = Sinks.empty();
744748
return new RSocketRequester(
745749
connection,
@@ -753,7 +757,10 @@ protected RSocket newRSocket() {
753757
null,
754758
__ -> null,
755759
null,
760+
onGracefulShutdownStartedSink,
761+
thisGracefulShutdownSink,
756762
thisClosedSink,
763+
thisGracefulShutdownSink.asMono(),
757764
thisClosedSink.asMono());
758765
}
759766
}

0 commit comments

Comments
 (0)