diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyEmitterProcessorStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyEmitterProcessorStressTest.java new file mode 100644 index 0000000000..362623fdbf --- /dev/null +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyEmitterProcessorStressTest.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2025 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import org.openjdk.jcstress.annotations.*; +import org.openjdk.jcstress.infra.results.II_Result; + +import java.util.Queue; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; + +public class SinkManyEmitterProcessorStressTest { + + @JCStressTest + @Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Emission returned OK, but the concurrent drain cleaned the queue before the arbiter ran.") + @Outcome(id = "3, 0", expect = ACCEPTABLE, desc = "Emission was correctly cancelled due to a race or because it happened post-cancellation.") + @State + public static class EmitNextAndAutoCancelRaceStressTest { + + private final SinkManyEmitterProcessor sink = new SinkManyEmitterProcessor<>(true, 16); + + @Actor + public void emitActor(II_Result r) { + r.r1 = sink.tryEmitNext(1).ordinal(); + } + + @Actor + public void cancelActor() { + sink.asFlux().subscribe().dispose(); + } + + @Arbiter + public void arbiter(II_Result r) { + Queue q = sink.queue; + r.r2 = (q == null) ? 0 : q.size(); + } + } +} \ No newline at end of file diff --git a/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java index 96762282a0..0259d7c051 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,7 +57,7 @@ * @author Stephane Maldini */ final class SinkManyEmitterProcessor extends Flux implements InternalManySink, - Sinks.ManyWithUpstream, CoreSubscriber, Scannable, Disposable, ContextHolder { + Sinks.ManyWithUpstream, CoreSubscriber, Scannable, Disposable, ContextHolder { @SuppressWarnings("rawtypes") static final FluxPublish.PubSubInner[] EMPTY = new FluxPublish.PublishInner[0]; @@ -201,6 +201,9 @@ public void onComplete() { @Override public EmitResult tryEmitComplete() { + if (isCancelled()) { + return EmitResult.FAIL_CANCELLED; + } if (done) { return EmitResult.FAIL_TERMINATED; } @@ -217,6 +220,9 @@ public void onError(Throwable throwable) { @Override public EmitResult tryEmitError(Throwable t) { Objects.requireNonNull(t, "tryEmitError must be invoked with a non-null Throwable"); + if (isCancelled()) { + return EmitResult.FAIL_CANCELLED; + } if (done) { return EmitResult.FAIL_TERMINATED; } @@ -241,6 +247,9 @@ public void onNext(T t) { @Override public EmitResult tryEmitNext(T t) { + if (isCancelled()) { + return EmitResult.FAIL_CANCELLED; + } if (done) { return Sinks.EmitResult.FAIL_TERMINATED; } @@ -271,6 +280,23 @@ public EmitResult tryEmitNext(T t) { return subscribers == EMPTY ? EmitResult.FAIL_ZERO_SUBSCRIBER : EmitResult.FAIL_OVERFLOW; } drain(); + + // This final check is critical for handling a race between this emit operation + // and a concurrent cancellation from another thread. + // + // The race condition scenario: + // 1. This thread passes the initial isCancelled() check at the top of the method. + // 2. This thread successfully offers an item to the queue. + // 3. Concurrently, another thread disposes the last subscriber, which cancels the sink + // and triggers a drain that cleans up the just-offered item. + // + // Without this check, we would return EmitResult.OK, but the item has already been + // discarded. This check ensures we accurately report FAIL_CANCELLED, reflecting + // the final state of the operation. + if (isCancelled()) { + return EmitResult.FAIL_CANCELLED; + } + return EmitResult.OK; } @@ -382,7 +408,7 @@ public Object scanUnsafe(Attr key) { return null; } - final void drain() { + void drain() { if (WIP.getAndIncrement(this) != 0) { return; } @@ -397,11 +423,9 @@ final void drain() { boolean empty = q == null || q.isEmpty(); - if (checkTerminated(d, empty)) { - return; - } + cleanupIfTerminated(d, empty); - FluxPublish.PubSubInner[] a = subscribers; + FluxPublish.PubSubInner[] a = subscribers; if (a != EMPTY && !empty) { long maxRequested = Long.MAX_VALUE; @@ -431,10 +455,8 @@ final void drain() { d = true; v = null; } - if (checkTerminated(d, v == null)) { - return; - } - if (sourceMode != Fuseable.SYNC) { + cleanupIfTerminated(d, v == null); + if (sourceMode != Fuseable.SYNC) { s.request(1); } continue; @@ -458,16 +480,14 @@ final void drain() { empty = v == null; - if (checkTerminated(d, empty)) { - return; - } + cleanupIfTerminated(d, empty); - if (empty) { + if (empty) { //async mode only needs to break but SYNC mode needs to perform terminal cleanup here... if (sourceMode == Fuseable.SYNC) { //the q is empty done = true; - checkTerminated(true, true); + cleanupIfTerminated(true, true); } break; } @@ -494,10 +514,8 @@ final void drain() { } else if ( sourceMode == Fuseable.SYNC ) { done = true; - if (checkTerminated(true, empty)) { //empty can be true if no subscriber - break; - } - } + cleanupIfTerminated(true, empty);//empty can be true if no subscriber + } missed = WIP.addAndGet(this, -missed); if (missed == 0) { @@ -511,7 +529,14 @@ FluxPublish.PubSubInner[] terminate() { return SUBSCRIBERS.getAndSet(this, TERMINATED); } - boolean checkTerminated(boolean d, boolean empty) { + /** + * Inspects the current state and, if terminal, performs the necessary cleanup actions + * like clearing the queue and signaling subscribers. + * + * @param d the current `done` state + * @param empty if the queue is currently empty + */ + void cleanupIfTerminated(boolean d, boolean empty) { if (s == Operators.cancelledSubscription()) { if (autoCancel) { terminate(); @@ -520,7 +545,7 @@ boolean checkTerminated(boolean d, boolean empty) { q.clear(); } } - return true; + return; } if (d) { Throwable e = error; @@ -532,19 +557,16 @@ boolean checkTerminated(boolean d, boolean empty) { for (FluxPublish.PubSubInner inner : terminate()) { inner.actual.onError(e); } - return true; } else if (empty) { for (FluxPublish.PubSubInner inner : terminate()) { inner.actual.onComplete(); } - return true; } } - return false; } - final boolean add(EmitterInner inner) { + boolean add(EmitterInner inner) { for (; ; ) { FluxPublish.PubSubInner[] a = subscribers; if (a == TERMINATED) { @@ -560,7 +582,7 @@ final boolean add(EmitterInner inner) { } } - final void remove(FluxPublish.PubSubInner inner) { + void remove(FluxPublish.PubSubInner inner) { for (; ; ) { FluxPublish.PubSubInner[] a = subscribers; if (a == TERMINATED || a == EMPTY) { @@ -591,14 +613,11 @@ final void remove(FluxPublish.PubSubInner inner) { if (SUBSCRIBERS.compareAndSet(this, a, b)) { //contrary to FluxPublish, there is a possibility of auto-cancel, which //happens when the removed inner makes the subscribers array EMPTY - if (autoCancel && b == EMPTY && Operators.terminate(S, this)) { - if (WIP.getAndIncrement(this) != 0) { - return; - } - terminate(); - Queue q = queue; - if (q != null) { - q.clear(); + if (autoCancel && b == EMPTY && !isCancelled()) { + if (Operators.terminate(S, this)) { + // The state is now CANCELLED. + // Trigger a drain so the serialized drain-loop can perform the cleanup + drain(); } } return; @@ -653,5 +672,4 @@ public void dispose() { } } - } diff --git a/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java b/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java index c884b91bf1..3e1a9b4e77 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,6 +50,8 @@ import reactor.util.context.Context; import static org.assertj.core.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static reactor.core.Scannable.Attr; import static reactor.core.Scannable.Attr.*; import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST; @@ -922,4 +924,72 @@ void emitNextWithNoSubscriberNoCapacityKeepsSinkOpenWithBuffer() { .expectTimeout(Duration.ofSeconds(1)) .verify(); } + + @Test + void testThatCancelledSinkShouldNotAcceptsEmissions() { + Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); + Disposable subscription1 = sink.asFlux().subscribe(s -> System.out.println("1: " + s)); + assertEquals(1, sink.currentSubscriberCount()); + sink.tryEmitNext("Test1"); + subscription1.dispose(); + assertEquals(0, sink.currentSubscriberCount()); + Disposable subscription2 = sink.asFlux().subscribe(s -> System.out.println("2: " + s)); + assertTrue(subscription2.isDisposed()); + assertEquals(0, sink.currentSubscriberCount()); + assertTrue(sink.tryEmitNext("Test2").isFailure(), "Emissions on a cancelled sink should fail"); + } + + @Test + void testQueueShouldBeEmptyAfterCancellation() { + SinkManyEmitterProcessor processor = new SinkManyEmitterProcessor<>(true, 1); + processor.tryEmitNext(1); + assertThat(processor.queue.size()).isEqualTo(1); + processor.asFlux().doOnNext(i -> System.out.println("Received " + i)).subscribe().dispose(); + assertThat(processor.queue.size()).isEqualTo(0); + processor.tryEmitNext(2); + assertThat(processor.queue.size()).isEqualTo(0); + } + + @Test + void testNoQueueIsCreatedIfNoEmissionOccurredBeforeCancellation() { + SinkManyEmitterProcessor processor = new SinkManyEmitterProcessor<>(true, 1); + + processor.asFlux().subscribe().dispose(); + + processor.tryEmitNext(1); + assertThat(processor.queue).isNull(); + } + + @Test + void testThatOneSubscriberDisposesSinkStaysActive() { + Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); + + sink.asFlux().subscribe(i -> System.out.println("Subscriber A received: " + i)); + Disposable subscriberB = sink.asFlux().subscribe(i -> System.out.println("Subscriber B received: " + i)); + + assertThat(sink.currentSubscriberCount()).isEqualTo(2); + + sink.tryEmitNext(1); + subscriberB.dispose(); + + assertThat(sink.currentSubscriberCount()).isEqualTo(1); + + Sinks.EmitResult result = sink.tryEmitNext(2); + assertThat(result).isEqualTo(Sinks.EmitResult.OK); + } + + @Test + void testThatLastSubscriberDisposesTriggersAutoCancel() { + Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); + Disposable disposable = sink.asFlux().subscribe(); + + assertThat(sink.currentSubscriberCount()).isEqualTo(1); + + disposable.dispose(); + + assertThat(sink.currentSubscriberCount()).isEqualTo(0); + + Sinks.EmitResult result = sink.tryEmitNext(1); + assertThat(result).isEqualTo(Sinks.EmitResult.FAIL_CANCELLED); + } }