diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyStressTest.java new file mode 100644 index 0000000000..852dc6e0d3 --- /dev/null +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyStressTest.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2025 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.I_Result; + +import java.util.Queue; + +public class SinkManyStressTest { + + @JCStressTest + @Outcome(id = {"0"}, expect = Expect.ACCEPTABLE, desc = "Queue was correctly cleared.") + @Outcome(id = {"1"}, expect = Expect.FORBIDDEN, desc = "Item was leaked into the queue.") + @State + public static class SinkManyTakeZeroTest { + final SinkManyEmitterProcessor sink = new SinkManyEmitterProcessor<>(true, 16); + + @Actor + public void takeZero() { + sink.asFlux().take(0).blockLast(); + } + + @Actor + public void emit() { + sink.tryEmitNext("Test emit"); + } + + @Arbiter + public void arbiter(I_Result result) { + Queue q = sink.queue; + result.r1 = q == null ? 0 : q.size(); + } + + } +} diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyUnicastStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyUnicastStressTest.java index 6d17412371..8a74f34fe5 100644 --- a/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyUnicastStressTest.java +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/SinkManyUnicastStressTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2023-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,9 @@ import org.openjdk.jcstress.annotations.Outcome; import org.openjdk.jcstress.annotations.State; import org.openjdk.jcstress.infra.results.I_Result; +import reactor.util.concurrent.Queues; + +import java.util.Queue; public class SinkManyUnicastStressTest { @@ -59,4 +62,29 @@ public void arbiter(I_Result result) { } } } + + @JCStressTest + @Outcome(id = {"0"}, expect = Expect.ACCEPTABLE, desc = "Queue was correctly cleared.") + @Outcome(expect = Expect.FORBIDDEN, desc = "Item was leaked into the queue.") + @State + public static class SinkManyUnicastTakeZeroTest { + + final Queue queue = Queues.unbounded().get(); + final Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(queue); + + @Actor + public void takeZero() { + sink.asFlux().take(0).blockLast(); + } + + @Actor + public void emit() { + sink.tryEmitNext("Test emit"); + } + + @Arbiter + public void arbiter(I_Result result) { + result.r1 = queue.size(); + } + } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxLimitRequest.java b/reactor-core/src/main/java/reactor/core/publisher/FluxLimitRequest.java index 2fed227f4e..d53fa4ce62 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxLimitRequest.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxLimitRequest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,9 @@ final class FluxLimitRequest extends InternalFluxOperator { @Nullable public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { if (this.cap == 0) { + if (super.source instanceof SourceProducer) { + ((SourceProducer) super.source).terminateAndCleanup(); + } Operators.complete(actual); return null; } diff --git a/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java index 96762282a0..e52d634bc1 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SinkManyEmitterProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,7 +57,7 @@ * @author Stephane Maldini */ final class SinkManyEmitterProcessor extends Flux implements InternalManySink, - Sinks.ManyWithUpstream, CoreSubscriber, Scannable, Disposable, ContextHolder { + Sinks.ManyWithUpstream, CoreSubscriber, Scannable, Disposable, ContextHolder, SourceProducer { @SuppressWarnings("rawtypes") static final FluxPublish.PubSubInner[] EMPTY = new FluxPublish.PublishInner[0]; @@ -294,6 +294,29 @@ int getPending() { return q != null ? q.size() : 0; } + @Override + public void terminateAndCleanup() { + if (Operators.terminate(S, this)) { + this.done = true; + + FluxPublish.PubSubInner[] innerSubscribers = terminate(); + + Queue q = this.queue; + if (q != null) { + q.clear(); + } + + if (innerSubscribers.length > 0) { + CancellationException ex = new CancellationException("Processor terminated by downstream operator"); + if (ERROR.compareAndSet(this, null, ex)) { + for (FluxPublish.PubSubInner inner: innerSubscribers) { + inner.actual.onError(ex); + } + } + } + } + } + //TODO evaluate the use case for Disposable in the context of Sinks @Override public void dispose() { diff --git a/reactor-core/src/main/java/reactor/core/publisher/SinkManyUnicast.java b/reactor-core/src/main/java/reactor/core/publisher/SinkManyUnicast.java index a62bbba146..ba2bc24c31 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SinkManyUnicast.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SinkManyUnicast.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -84,7 +84,7 @@ * * @param the input and output type */ -final class SinkManyUnicast extends Flux implements InternalManySink, Disposable, Fuseable.QueueSubscription, Fuseable { +final class SinkManyUnicast extends Flux implements InternalManySink, Disposable, Fuseable.QueueSubscription, Fuseable, SourceProducer { /** * Create a new {@link SinkManyUnicast} that will buffer on an internal queue in an @@ -344,7 +344,7 @@ void drain(@Nullable T dataSignalOfferedBeforeDrain) { if (dataSignalOfferedBeforeDrain != null) { if (cancelled) { Operators.onDiscard(dataSignalOfferedBeforeDrain, - actual.currentContext()); + currentContext()); } else if (done) { Operators.onNextDropped(dataSignalOfferedBeforeDrain, @@ -367,10 +367,23 @@ else if (done) { return; } + // This handles a race condition where `cancel()` is + // called before a subscriber arrives (e.g., via `take(0)`). + if (cancelled) { + if (dataSignalOfferedBeforeDrain != null) { + Operators.onDiscard(dataSignalOfferedBeforeDrain, currentContext()); + } + if (!outputFused) { + Operators.onDiscardQueueWithClear(queue, currentContext(), null); + } + } + missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } + + dataSignalOfferedBeforeDrain = null; } } @@ -444,14 +457,7 @@ public void cancel() { doTerminate(); - if (WIP.getAndIncrement(this) == 0) { - if (!outputFused) { - // discard MUST be happening only and only if there is no racing on elements consumption - // which is guaranteed by the WIP guard here in case non-fused output - Operators.onDiscardQueueWithClear(queue, currentContext(), null); - } - hasDownstream = false; - } + drain(null); } @Override @@ -515,4 +521,9 @@ public void dispose() { public boolean isDisposed() { return cancelled || done; } + + @Override + public void terminateAndCleanup() { + cancel(); + } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/SourceProducer.java b/reactor-core/src/main/java/reactor/core/publisher/SourceProducer.java index b1f2498092..8bace82ce1 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SourceProducer.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SourceProducer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,4 +47,28 @@ default Object scanUnsafe(Attr key) { default String stepName() { return "source(" + getClass().getSimpleName() + ")"; } + + /** + * An internal hook for operators to instruct this {@link SourceProducer} to terminate + * and clean up its resources, bypassing the standard subscription-cancellation path. + *

+ * This method is necessary for specific scenarios where a downstream consumer + * terminates prematurely *without* propagating a standard + * {@link org.reactivestreams.Subscription#cancel() cancel} signal upstream. The primary + * use case is for operators that short-circuit the stream, such as {@code take(0)}. + *

+ * Without this direct termination signal, a hot source with a buffer (like a + * {@code Sinks.many().unicast().onBackpressureBuffer(queue)}) could be orphaned, leading to its buffered + * elements never being released and causing a memory leak. + *

+ * This is not intended for public use and should only be called by Reactor framework + * operators. The default implementation is a no-op, allowing sources to opt in to this + * behavior only if they manage resources that require explicit cleanup in such scenarios. + * + * @see reactor.core.publisher.Flux#take(long) + * @see SinkManyUnicast + */ + default void terminateAndCleanup() { + // Default implementation does nothing. + } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java b/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java index c884b91bf1..6e84695c5e 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/SinkManyEmitterProcessorTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -922,4 +922,14 @@ void emitNextWithNoSubscriberNoCapacityKeepsSinkOpenWithBuffer() { .expectTimeout(Duration.ofSeconds(1)) .verify(); } + + @Test + void shouldClearQueueOnImmediateCancellation() { + SinkManyEmitterProcessor processor = new SinkManyEmitterProcessor<>(true, 1); + processor.tryEmitNext(1); + assertThat(processor.queue.size()).isEqualTo(1); + + processor.asFlux().take(0).blockLast(); + assertThat(processor.queue.size()).isEqualTo(0); + } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/SinkManyUnicastTest.java b/reactor-core/src/test/java/reactor/core/publisher/SinkManyUnicastTest.java index 31f391ff5d..e15f97c2bb 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/SinkManyUnicastTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/SinkManyUnicastTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2024 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2015-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,20 +16,8 @@ package reactor.core.publisher; -import java.time.Duration; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.LockSupport; - import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; - import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; @@ -41,6 +29,18 @@ import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; +import java.time.Duration; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.LockSupport; + import static org.assertj.core.api.Assertions.assertThat; import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST; @@ -340,4 +340,22 @@ public void inners() { .as("after scannable subscription") .containsExactly(scannable); } + + @Test + void shouldClearBufferOnImmediateCancellation() { + BlockingQueue queue = new LinkedBlockingQueue<>(); + + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(queue); + + sink.tryEmitNext(1); + sink.tryEmitNext(2); + sink.tryEmitNext(3); + + assertThat(queue).hasSize(3); + + sink.asFlux().take(0).blockLast(); + + assertThat(queue).isEmpty(); + } + }