Skip to content

Commit e2b062c

Browse files
authored
updated switch transform to use unbounded processor to prevent illegal state exception (#440)
1 parent b27c7f6 commit e2b062c

File tree

8 files changed

+181
-198
lines changed

8 files changed

+181
-198
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,15 @@
1616

1717
package io.rsocket;
1818

19+
import static io.rsocket.util.ExceptionUtil.noStacktrace;
20+
1921
import io.netty.buffer.Unpooled;
2022
import io.netty.util.collection.IntObjectHashMap;
2123
import io.rsocket.exceptions.ConnectionException;
2224
import io.rsocket.exceptions.Exceptions;
2325
import io.rsocket.internal.LimitableRequestPublisher;
2426
import io.rsocket.internal.UnboundedProcessor;
2527
import io.rsocket.util.PayloadImpl;
26-
import org.reactivestreams.Publisher;
27-
import org.reactivestreams.Subscriber;
28-
import reactor.core.Disposable;
29-
import reactor.core.publisher.*;
30-
31-
import javax.annotation.Nullable;
3228
import java.nio.channels.ClosedChannelException;
3329
import java.time.Duration;
3430
import java.util.Collection;
@@ -37,8 +33,11 @@
3733
import java.util.function.Consumer;
3834
import java.util.function.Function;
3935
import java.util.function.Supplier;
40-
41-
import static io.rsocket.util.ExceptionUtil.noStacktrace;
36+
import javax.annotation.Nullable;
37+
import org.reactivestreams.Publisher;
38+
import org.reactivestreams.Subscriber;
39+
import reactor.core.Disposable;
40+
import reactor.core.publisher.*;
4241

4342
/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
4443
class RSocketClient implements RSocket {
@@ -99,7 +98,7 @@ class RSocketClient implements RSocket {
9998
})
10099
.subscribe();
101100
}
102-
101+
103102
connection
104103
.onClose()
105104
.doFinally(

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,20 @@
1616

1717
package io.rsocket;
1818

19+
import static io.rsocket.Frame.Request.initialRequestN;
20+
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
21+
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
22+
1923
import io.netty.buffer.ByteBuf;
2024
import io.netty.buffer.Unpooled;
2125
import io.netty.util.collection.IntObjectHashMap;
2226
import io.rsocket.exceptions.ApplicationException;
2327
import io.rsocket.internal.LimitableRequestPublisher;
2428
import io.rsocket.internal.UnboundedProcessor;
2529
import io.rsocket.util.PayloadImpl;
30+
import java.util.Collection;
31+
import java.util.function.Consumer;
32+
import javax.annotation.Nullable;
2633
import org.reactivestreams.Publisher;
2734
import org.reactivestreams.Subscriber;
2835
import org.reactivestreams.Subscription;
@@ -32,14 +39,6 @@
3239
import reactor.core.publisher.SignalType;
3340
import reactor.core.publisher.UnicastProcessor;
3441

35-
import javax.annotation.Nullable;
36-
import java.util.Collection;
37-
import java.util.function.Consumer;
38-
39-
import static io.rsocket.Frame.Request.initialRequestN;
40-
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
41-
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
42-
4342
/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
4443
class RSocketServer implements RSocket {
4544

Lines changed: 60 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,80 @@
11
package io.rsocket.internal;
22

3+
import java.util.Objects;
4+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
5+
import java.util.function.BiFunction;
36
import org.reactivestreams.Publisher;
47
import org.reactivestreams.Subscription;
58
import reactor.core.CoreSubscriber;
6-
import reactor.core.publisher.DirectProcessor;
79
import reactor.core.publisher.Flux;
810
import reactor.core.publisher.Operators;
911

10-
import java.util.Objects;
11-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
12-
import java.util.function.BiFunction;
13-
1412
public final class SwitchTransform<T, R> extends Flux<R> {
1513

16-
final Publisher<? extends T> source;
17-
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
18-
19-
public SwitchTransform(Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
20-
this.source = Objects.requireNonNull(source, "source");
21-
this.transformer = Objects.requireNonNull(transformer, "transformer");
22-
}
23-
24-
@Override
25-
public void subscribe(CoreSubscriber<? super R> actual) {
26-
source.subscribe(new SwitchTransformSubscriber<>(actual, transformer));
27-
}
14+
final Publisher<? extends T> source;
15+
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
2816

29-
static final class SwitchTransformSubscriber<T, R> implements CoreSubscriber<T> {
30-
final CoreSubscriber<? super R> actual;
31-
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
32-
final DirectProcessor<T> processor = DirectProcessor.create();
17+
public SwitchTransform(
18+
Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
19+
this.source = Objects.requireNonNull(source, "source");
20+
this.transformer = Objects.requireNonNull(transformer, "transformer");
21+
}
3322

34-
Subscription s;
23+
@Override
24+
public void subscribe(CoreSubscriber<? super R> actual) {
25+
Flux.from(source).subscribe(new SwitchTransformSubscriber<>(actual, transformer));
26+
}
3527

36-
volatile int once;
37-
@SuppressWarnings("rawtypes")
38-
static final AtomicIntegerFieldUpdater<SwitchTransformSubscriber> ONCE =
39-
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once");
28+
static final class SwitchTransformSubscriber<T, R> implements CoreSubscriber<T> {
29+
@SuppressWarnings("rawtypes")
30+
static final AtomicIntegerFieldUpdater<SwitchTransformSubscriber> ONCE =
31+
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once");
4032

41-
SwitchTransformSubscriber(CoreSubscriber<? super R> actual, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
42-
this.actual = actual;
43-
this.transformer = transformer;
44-
}
33+
final CoreSubscriber<? super R> actual;
34+
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
35+
final UnboundedProcessor<T> processor = new UnboundedProcessor<>();
36+
Subscription s;
37+
volatile int once;
4538

46-
@Override
47-
public void onSubscribe(Subscription s) {
48-
if (Operators.validate(this.s, s)) {
49-
this.s = s;
39+
SwitchTransformSubscriber(
40+
CoreSubscriber<? super R> actual,
41+
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
42+
this.actual = actual;
43+
this.transformer = transformer;
44+
}
5045

51-
processor.onSubscribe(s);
52-
}
53-
}
46+
@Override
47+
public void onSubscribe(Subscription s) {
48+
if (Operators.validate(this.s, s)) {
49+
this.s = s;
50+
processor.onSubscribe(s);
51+
}
52+
}
5453

55-
@Override
56-
public void onNext(T t) {
57-
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
58-
try {
59-
Publisher<? extends R> result = Objects.requireNonNull(transformer.apply(t, processor),
60-
"The transformer returned a null value");
61-
result.subscribe(actual);
62-
}
63-
catch (Throwable e) {
64-
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
65-
return;
66-
}
67-
}
68-
processor.onNext(t);
69-
}
54+
@Override
55+
public void onNext(T t) {
56+
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
57+
try {
58+
Publisher<? extends R> result =
59+
Objects.requireNonNull(
60+
transformer.apply(t, processor), "The transformer returned a null value");
61+
Flux.from(result).subscribe(actual);
62+
} catch (Throwable e) {
63+
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
64+
return;
65+
}
66+
}
67+
processor.onNext(t);
68+
}
7069

71-
@Override
72-
public void onError(Throwable t) {
73-
processor.onError(t);
74-
}
70+
@Override
71+
public void onError(Throwable t) {
72+
processor.onError(t);
73+
}
7574

76-
@Override
77-
public void onComplete() {
78-
processor.onComplete();
79-
}
80-
}
75+
@Override
76+
public void onComplete() {
77+
processor.onComplete();
78+
}
79+
}
8180
}

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
package io.rsocket.internal;
1717

1818
import io.netty.util.internal.shaded.org.jctools.queues.atomic.MpscGrowableAtomicArrayQueue;
19+
import java.util.Objects;
20+
import java.util.Queue;
21+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
22+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
1923
import org.reactivestreams.Subscriber;
2024
import org.reactivestreams.Subscription;
2125
import reactor.core.CoreSubscriber;
@@ -27,11 +31,6 @@
2731
import reactor.util.concurrent.Queues;
2832
import reactor.util.context.Context;
2933

30-
import java.util.Objects;
31-
import java.util.Queue;
32-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
33-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
34-
3534
/**
3635
* A Processor implementation that takes a custom queue and allows only a single subscriber.
3736
*

rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package io.rsocket.internal;
22

3+
import java.util.concurrent.CountDownLatch;
34
import org.junit.Assert;
45
import org.junit.Test;
56

6-
import java.util.concurrent.CountDownLatch;
7-
87
public class UnboundedProcessorTest {
98
@Test
109
public void testOnNextBeforeSubscribe_10() {
@@ -49,39 +48,34 @@ public void testOnNextBeforeSubscribeN(int n) {
4948

5049
Assert.assertEquals(n, count);
5150
}
52-
51+
5352
@Test
5453
public void testOnNextAfterSubscribe_10() throws Exception {
5554
testOnNextAfterSubscribeN(10);
5655
}
57-
56+
5857
@Test
5958
public void testOnNextAfterSubscribe_100() throws Exception {
6059
testOnNextAfterSubscribeN(100);
6160
}
62-
61+
6362
@Test
6463
public void testOnNextAfterSubscribe_1000() throws Exception {
6564
testOnNextAfterSubscribeN(1000);
6665
}
67-
68-
public void testOnNextAfterSubscribeN(int n) throws Exception {
66+
67+
public void testOnNextAfterSubscribeN(int n) throws Exception {
6968
CountDownLatch latch = new CountDownLatch(n);
70-
UnboundedProcessor<Integer> processor = new UnboundedProcessor<>();
71-
processor
72-
.log()
73-
.doOnNext(integer ->
74-
latch.countDown())
75-
.subscribe();
76-
69+
UnboundedProcessor<Integer> processor = new UnboundedProcessor<>();
70+
processor.log().doOnNext(integer -> latch.countDown()).subscribe();
71+
7772
for (int i = 0; i < n; i++) {
7873
System.out.println("onNexting -> " + i);
7974
processor.onNext(i);
8075
}
81-
76+
8277
processor.drain();
83-
78+
8479
latch.await();
8580
}
86-
8781
}

0 commit comments

Comments
 (0)