Skip to content

Commit 87223c6

Browse files
OlegDokukarobertroeser
authored andcommitted
fixes issue with requestChannel never completes if server cancels (#592) (#594)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 88102b7 commit 87223c6

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

-1
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,6 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
471471
case CANCEL:
472472
{
473473
LimitableRequestPublisher sender = senders.remove(streamId);
474-
receivers.remove(streamId);
475474
if (sender != null) {
476475
sender.cancel();
477476
}

rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.ArrayList;
4141
import java.util.List;
4242
import java.util.stream.Collectors;
43+
import org.assertj.core.api.Assertions;
4344
import org.junit.Rule;
4445
import org.junit.Test;
4546
import org.reactivestreams.Publisher;
@@ -49,6 +50,7 @@
4950
import reactor.core.publisher.Flux;
5051
import reactor.core.publisher.Mono;
5152
import reactor.core.publisher.MonoProcessor;
53+
import reactor.core.publisher.UnicastProcessor;
5254

5355
public class RSocketClientTest {
5456

@@ -195,6 +197,24 @@ public void testChannelRequestCancellation() {
195197
.blockFirst();
196198
}
197199

200+
@Test
201+
public void testChannelRequestServerSideCancellation() {
202+
MonoProcessor<Payload> cancelled = MonoProcessor.create();
203+
UnicastProcessor<Payload> request = UnicastProcessor.create();
204+
request.onNext(EmptyPayload.INSTANCE);
205+
rule.socket.requestChannel(request).subscribe(cancelled);
206+
int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
207+
rule.connection.addToReceivedBuffer(Frame.Cancel.from(streamId));
208+
rule.connection.addToReceivedBuffer(Frame.PayloadFrame.from(streamId, NEXT_COMPLETE));
209+
Flux.first(
210+
cancelled,
211+
Flux.error(new IllegalStateException("Channel request not cancelled"))
212+
.delaySubscription(Duration.ofSeconds(1)))
213+
.blockFirst();
214+
215+
Assertions.assertThat(request.isDisposed()).isTrue();
216+
}
217+
198218
public int sendRequestResponse(Publisher<Payload> response) {
199219
Subscriber<Payload> sub = TestSubscriber.create();
200220
response.subscribe(sub);

0 commit comments

Comments
 (0)