Skip to content

Commit 5582290

Browse files
author
OlegDokuka
committed
finalize GS functionality
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: OlegDokuka <[email protected]>
1 parent a5a060a commit 5582290

File tree

9 files changed

+392
-13
lines changed

9 files changed

+392
-13
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

+5
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ public void dispose() {
210210

211211
@Override
212212
public void disposeGracefully() {
213+
getDuplexConnection()
214+
.sendFrame(
215+
0,
216+
ErrorFrameCodec.encode(
217+
getAllocator(), 0, new ConnectionCloseException("Graceful Shutdown")));
213218
this.onGracefulShutdownStartedSink.tryEmitEmpty();
214219
super.terminate();
215220
}

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
5656

5757
private final RSocket requestHandler;
5858
private final Sinks.Empty<Void> onThisSideClosedSink;
59-
private final Mono<Void> onRequesterGracefullShutdownStarted;
6059

6160
@Nullable private final ResponderLeaseTracker leaseHandler;
6261

@@ -91,7 +90,6 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
9190

9291
this.leaseHandler = leaseHandler;
9392
this.onThisSideClosedSink = onThisSideClosedSink;
94-
this.onRequesterGracefullShutdownStarted = onRequesterGracefulShutdownStarted;
9593

9694
connection
9795
.onClose()

rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,11 @@ void sendFirstPayload(Payload payload, long initialRequestN) {
238238
return;
239239
}
240240

241-
sm.remove(streamId, this);
242-
243241
final ByteBuf cancelFrame = CancelFrameCodec.encode(allocator, streamId);
244242
connection.sendFrame(streamId, cancelFrame);
245243

244+
sm.remove(streamId, this);
245+
246246
if (requestInterceptor != null) {
247247
requestInterceptor.onCancel(streamId, FrameType.REQUEST_STREAM);
248248
}
@@ -276,12 +276,13 @@ public final void cancel() {
276276

277277
if (isFirstFrameSent(previousState)) {
278278
final int streamId = this.streamId;
279-
this.requesterResponderSupport.remove(streamId, this);
280279

281280
ReassemblyUtils.synchronizedRelease(this, previousState);
282281

283282
this.connection.sendFrame(streamId, CancelFrameCodec.encode(this.allocator, streamId));
284283

284+
this.requesterResponderSupport.remove(streamId, this);
285+
285286
final RequestInterceptor requestInterceptor = this.requestInterceptor;
286287
if (requestInterceptor != null) {
287288
requestInterceptor.onCancel(streamId, FrameType.REQUEST_STREAM);
@@ -309,13 +310,14 @@ public final void handlePayload(Payload p) {
309310
}
310311

311312
final int streamId = this.streamId;
312-
this.requesterResponderSupport.remove(streamId, this);
313313

314314
final IllegalStateException cause =
315315
Exceptions.failWithOverflow(
316316
"The number of messages received exceeds the number requested");
317317
this.connection.sendFrame(streamId, CancelFrameCodec.encode(this.allocator, streamId));
318318

319+
this.requesterResponderSupport.remove(streamId, this);
320+
319321
final RequestInterceptor requestInterceptor = this.requestInterceptor;
320322
if (requestInterceptor != null) {
321323
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, cause);

rsocket-core/src/main/java/io/rsocket/core/RequestStreamResponderSubscriber.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ public void onNext(Payload p) {
144144
final ByteBuf errorFrame = ErrorFrameCodec.encode(allocator, streamId, e);
145145
sender.sendFrame(streamId, errorFrame);
146146

147+
this.requesterResponderSupport.remove(streamId, this);
148+
147149
final RequestInterceptor requestInterceptor = this.requestInterceptor;
148150
if (requestInterceptor != null) {
149151
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, e);
@@ -162,6 +164,8 @@ public void onNext(Payload p) {
162164
new CanceledException("Failed to validate payload. Cause" + e.getMessage()));
163165
sender.sendFrame(streamId, errorFrame);
164166

167+
this.requesterResponderSupport.remove(streamId, this);
168+
165169
final RequestInterceptor requestInterceptor = this.requestInterceptor;
166170
if (requestInterceptor != null) {
167171
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, e);
@@ -176,6 +180,8 @@ public void onNext(Payload p) {
176180
return;
177181
}
178182

183+
this.requesterResponderSupport.remove(streamId, this);
184+
179185
final RequestInterceptor requestInterceptor = this.requestInterceptor;
180186
if (requestInterceptor != null) {
181187
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, t);
@@ -195,8 +201,6 @@ boolean tryTerminateOnError() {
195201
return false;
196202
}
197203

198-
this.requesterResponderSupport.remove(this.streamId, this);
199-
200204
currentSubscription.cancel();
201205

202206
return true;
@@ -222,11 +226,12 @@ public void onError(Throwable t) {
222226
}
223227

224228
final int streamId = this.streamId;
225-
this.requesterResponderSupport.remove(streamId, this);
226229

227230
final ByteBuf errorFrame = ErrorFrameCodec.encode(this.allocator, streamId, t);
228231
this.connection.sendFrame(streamId, errorFrame);
229232

233+
this.requesterResponderSupport.remove(streamId, this);
234+
230235
final RequestInterceptor requestInterceptor = this.requestInterceptor;
231236
if (requestInterceptor != null) {
232237
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, t);
@@ -246,11 +251,12 @@ public void onComplete() {
246251
}
247252

248253
final int streamId = this.streamId;
249-
this.requesterResponderSupport.remove(streamId, this);
250254

251255
final ByteBuf completeFrame = PayloadFrameCodec.encodeComplete(this.allocator, streamId);
252256
this.connection.sendFrame(streamId, completeFrame);
253257

258+
this.requesterResponderSupport.remove(streamId, this);
259+
254260
final RequestInterceptor requestInterceptor = this.requestInterceptor;
255261
if (requestInterceptor != null) {
256262
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, null);
@@ -321,7 +327,6 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
321327
S.lazySet(this, Operators.cancelledSubscription());
322328

323329
final int streamId = this.streamId;
324-
this.requesterResponderSupport.remove(streamId, this);
325330

326331
this.frames = null;
327332
frames.release();
@@ -334,6 +339,8 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
334339
new CanceledException("Failed to reassemble payload. Cause: " + e.getMessage()));
335340
this.connection.sendFrame(streamId, errorFrame);
336341

342+
this.requesterResponderSupport.remove(streamId, this);
343+
337344
final RequestInterceptor requestInterceptor = this.requestInterceptor;
338345
if (requestInterceptor != null) {
339346
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, e);
@@ -354,7 +361,6 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
354361
this.done = true;
355362

356363
final int streamId = this.streamId;
357-
this.requesterResponderSupport.remove(streamId, this);
358364

359365
ReferenceCountUtil.safeRelease(frames);
360366

@@ -366,6 +372,8 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
366372
new CanceledException("Failed to reassemble payload. Cause: " + t.getMessage()));
367373
this.connection.sendFrame(streamId, errorFrame);
368374

375+
this.requesterResponderSupport.remove(streamId, this);
376+
369377
final RequestInterceptor requestInterceptor = this.requestInterceptor;
370378
if (requestInterceptor != null) {
371379
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, t);

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

+53
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import java.util.function.BiFunction;
8383
import java.util.function.Function;
8484
import java.util.stream.Stream;
85+
import org.assertj.core.api.Assertions;
8586
import org.assertj.core.api.Assumptions;
8687
import org.junit.jupiter.api.AfterEach;
8788
import org.junit.jupiter.api.BeforeEach;
@@ -96,6 +97,7 @@
9697
import org.reactivestreams.Publisher;
9798
import org.reactivestreams.Subscriber;
9899
import org.reactivestreams.Subscription;
100+
import reactor.core.Disposable;
99101
import reactor.core.Scannable;
100102
import reactor.core.publisher.BaseSubscriber;
101103
import reactor.core.publisher.Flux;
@@ -1452,6 +1454,57 @@ public void testWorkaround959(String type) {
14521454
}
14531455
}
14541456

1457+
@Test
1458+
public void testDisposeGracefully() {
1459+
System.out.println(
1460+
FrameHeaderCodec.frameType(
1461+
Unpooled.wrappedBuffer(ByteBufUtil.decodeHexDump("000000012400"))));
1462+
final RSocketRequester rSocketRequester = rule.socket;
1463+
final AssertSubscriber<Void> onGracefulShutdownSubscriber =
1464+
rule.thisGracefulShutdownSink.asMono().subscribeWith(AssertSubscriber.create());
1465+
final AssertSubscriber<Void> onCloseSubscriber =
1466+
rSocketRequester.onClose().subscribeWith(new AssertSubscriber<>());
1467+
1468+
final Disposable stream = rSocketRequester.requestStream(EmptyPayload.INSTANCE).subscribe();
1469+
1470+
FrameAssert.assertThat(rule.connection.awaitFrame())
1471+
.typeOf(REQUEST_STREAM)
1472+
.hasClientSideStreamId()
1473+
.hasNoLeaks();
1474+
1475+
Assertions.assertThat(rSocketRequester.isDisposed()).isFalse();
1476+
1477+
rSocketRequester.disposeGracefully();
1478+
Assertions.assertThat(rSocketRequester.isDisposed()).isFalse();
1479+
onGracefulShutdownSubscriber.assertNotTerminated();
1480+
1481+
FrameAssert.assertThat(rule.connection.awaitFrame())
1482+
.typeOf(FrameType.ERROR)
1483+
.hasStreamIdZero()
1484+
.hasData("Graceful Shutdown")
1485+
.hasNoLeaks();
1486+
1487+
stream.dispose();
1488+
Assertions.assertThat(rSocketRequester.isDisposed()).isFalse();
1489+
Assertions.assertThat(rule.connection.isDisposed()).isFalse();
1490+
onGracefulShutdownSubscriber.assertTerminated();
1491+
1492+
FrameAssert.assertThat(rule.connection.awaitFrame())
1493+
.typeOf(CANCEL)
1494+
.hasClientSideStreamId()
1495+
.hasNoLeaks();
1496+
1497+
rule.otherGracefulShutdownSink.tryEmitEmpty();
1498+
Assertions.assertThat(rSocketRequester.isDisposed()).isTrue();
1499+
Assertions.assertThat(rule.connection.isDisposed()).isTrue();
1500+
onCloseSubscriber.assertNotTerminated();
1501+
1502+
rule.otherClosedSink.tryEmitEmpty();
1503+
onCloseSubscriber.assertTerminated();
1504+
1505+
rule.assertHasNoLeaks();
1506+
}
1507+
14551508
public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
14561509

14571510
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

+61
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,14 @@
6767
import io.rsocket.util.ByteBufPayload;
6868
import io.rsocket.util.DefaultPayload;
6969
import io.rsocket.util.EmptyPayload;
70+
import java.time.Duration;
7071
import java.util.List;
7172
import java.util.concurrent.CancellationException;
7273
import java.util.concurrent.ThreadLocalRandom;
7374
import java.util.concurrent.atomic.AtomicBoolean;
7475
import java.util.concurrent.atomic.AtomicReference;
7576
import java.util.stream.Stream;
77+
import org.assertj.core.api.Assertions;
7678
import org.assertj.core.api.Assumptions;
7779
import org.junit.jupiter.api.AfterEach;
7880
import org.junit.jupiter.api.BeforeEach;
@@ -1179,6 +1181,65 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
11791181
rule.assertHasNoLeaks();
11801182
}
11811183

1184+
@Test
1185+
void testGracefulShutdown() {
1186+
final AssertSubscriber<Void> onCloseSubscriber = AssertSubscriber.create();
1187+
final AssertSubscriber<Void> onGracefulShutdownSubscriber = AssertSubscriber.create();
1188+
final Sinks.Empty<Void> onDisposeGracefullySink = Sinks.unsafe().empty();
1189+
1190+
boolean[] disposed = new boolean[] {false};
1191+
boolean[] disposedGracefully = new boolean[] {false};
1192+
1193+
rule.setAcceptingSocket(
1194+
new RSocket() {
1195+
1196+
@Override
1197+
public Flux<Payload> requestStream(Payload payload) {
1198+
return Flux.interval(Duration.ofMillis(100))
1199+
.takeUntilOther(onDisposeGracefullySink.asMono())
1200+
.map(tick -> ByteBufPayload.create(String.valueOf(tick)));
1201+
}
1202+
1203+
@Override
1204+
public void dispose() {
1205+
disposed[0] = true;
1206+
}
1207+
1208+
@Override
1209+
public void disposeGracefully() {
1210+
disposedGracefully[0] = true;
1211+
}
1212+
});
1213+
1214+
rule.connection.addToReceivedBuffer(
1215+
RequestStreamFrameCodec.encode(
1216+
rule.allocator, 1, false, Long.MAX_VALUE, null, Unpooled.EMPTY_BUFFER));
1217+
1218+
rule.onCloseSink.asMono().subscribe(onCloseSubscriber);
1219+
rule.onGracefulShutdownSink.asMono().subscribe(onGracefulShutdownSubscriber);
1220+
1221+
rule.onGracefulShutdownStartedSink.tryEmitEmpty();
1222+
Assertions.assertThat(disposed[0]).isFalse();
1223+
Assertions.assertThat(disposedGracefully[0]).isTrue();
1224+
Assertions.assertThat(rule.connection.isDisposed()).isFalse();
1225+
onCloseSubscriber.assertNotTerminated();
1226+
onGracefulShutdownSubscriber.assertNotTerminated();
1227+
1228+
onDisposeGracefullySink.tryEmitEmpty();
1229+
Assertions.assertThat(disposed[0]).isFalse();
1230+
Assertions.assertThat(disposedGracefully[0]).isTrue();
1231+
Assertions.assertThat(rule.connection.isDisposed()).isFalse();
1232+
onCloseSubscriber.assertNotTerminated();
1233+
onGracefulShutdownSubscriber.assertTerminated();
1234+
1235+
rule.connection.dispose();
1236+
Assertions.assertThat(disposed[0]).isTrue();
1237+
Assertions.assertThat(disposedGracefully[0]).isTrue();
1238+
Assertions.assertThat(rule.connection.isDisposed()).isTrue();
1239+
onCloseSubscriber.assertTerminated();
1240+
onGracefulShutdownSubscriber.assertTerminated();
1241+
}
1242+
11821243
public static class ServerSocketRule extends AbstractSocketRule<RSocketResponder> {
11831244

11841245
private RSocket acceptingSocket;

rsocket-examples/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ dependencies {
2828
implementation "io.micrometer:micrometer-core"
2929
implementation "io.micrometer:micrometer-tracing"
3030
implementation project(":rsocket-micrometer")
31-
testImplementation 'org.awaitility:awaitility'
3231

3332
runtimeOnly 'ch.qos.logback:logback-classic'
3433

@@ -37,6 +36,7 @@ dependencies {
3736
testImplementation 'org.mockito:mockito-core'
3837
testImplementation 'org.assertj:assertj-core'
3938
testImplementation 'io.projectreactor:reactor-test'
39+
testImplementation 'org.awaitility:awaitility'
4040
testImplementation "io.micrometer:micrometer-test"
4141
testImplementation "io.micrometer:micrometer-tracing-integration-test"
4242

0 commit comments

Comments
 (0)