Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.Expect;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.I_Result;

import java.util.Queue;

public class SinkManyStressTest {

@JCStressTest
@Outcome(id = {"0"}, expect = Expect.ACCEPTABLE, desc = "Queue was correctly cleared.")
@Outcome(id = {"1"}, expect = Expect.FORBIDDEN, desc = "Item was leaked into the queue.")
@State
public static class SinkManyTakeZeroTest {
final SinkManyEmitterProcessor<Object> sink = new SinkManyEmitterProcessor<>(true, 16);

@Actor
public void takeZero() {
sink.asFlux().take(0).blockLast();
}

@Actor
public void emit() {
sink.tryEmitNext("Test emit");
}

@Arbiter
public void arbiter(I_Result result) {
Queue<?> q = sink.queue;
result.r1 = q == null ? 0 : q.size();
}

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2023-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,9 @@
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.I_Result;
import reactor.util.concurrent.Queues;

import java.util.Queue;

public class SinkManyUnicastStressTest {

Expand Down Expand Up @@ -59,4 +62,29 @@ public void arbiter(I_Result result) {
}
}
}

@JCStressTest
@Outcome(id = {"0"}, expect = Expect.ACCEPTABLE, desc = "Queue was correctly cleared.")
@Outcome(expect = Expect.FORBIDDEN, desc = "Item was leaked into the queue.")
@State
public static class SinkManyUnicastTakeZeroTest {

final Queue<Object> queue = Queues.unbounded().get();
final Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer(queue);

@Actor
public void takeZero() {
sink.asFlux().take(0).blockLast();
}

@Actor
public void emit() {
sink.tryEmitNext("Test emit");
}

@Arbiter
public void arbiter(I_Result result) {
result.r1 = queue.size();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,9 @@ final class FluxLimitRequest<T> extends InternalFluxOperator<T, T> {
@Nullable
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
if (this.cap == 0) {
if (super.source instanceof SourceProducer) {
((SourceProducer<?>) super.source).terminateAndCleanup();
}
Operators.complete(actual);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,7 @@
* @author Stephane Maldini
*/
final class SinkManyEmitterProcessor<T> extends Flux<T> implements InternalManySink<T>,
Sinks.ManyWithUpstream<T>, CoreSubscriber<T>, Scannable, Disposable, ContextHolder {
Sinks.ManyWithUpstream<T>, CoreSubscriber<T>, Scannable, Disposable, ContextHolder, SourceProducer<T> {

@SuppressWarnings("rawtypes")
static final FluxPublish.PubSubInner[] EMPTY = new FluxPublish.PublishInner[0];
Expand Down Expand Up @@ -294,6 +294,29 @@ int getPending() {
return q != null ? q.size() : 0;
}

@Override
public void terminateAndCleanup() {
if (Operators.terminate(S, this)) {
this.done = true;

FluxPublish.PubSubInner<T>[] innerSubscribers = terminate();

Queue<T> q = this.queue;
if (q != null) {
q.clear();
}

if (innerSubscribers.length > 0) {
CancellationException ex = new CancellationException("Processor terminated by downstream operator");
if (ERROR.compareAndSet(this, null, ex)) {
for (FluxPublish.PubSubInner<T> inner: innerSubscribers) {
inner.actual.onError(ex);
}
}
}
}
}

//TODO evaluate the use case for Disposable in the context of Sinks
@Override
public void dispose() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,7 +84,7 @@
*
* @param <T> the input and output type
*/
final class SinkManyUnicast<T> extends Flux<T> implements InternalManySink<T>, Disposable, Fuseable.QueueSubscription<T>, Fuseable {
final class SinkManyUnicast<T> extends Flux<T> implements InternalManySink<T>, Disposable, Fuseable.QueueSubscription<T>, Fuseable, SourceProducer<T> {

/**
* Create a new {@link SinkManyUnicast} that will buffer on an internal queue in an
Expand Down Expand Up @@ -344,7 +344,7 @@ void drain(@Nullable T dataSignalOfferedBeforeDrain) {
if (dataSignalOfferedBeforeDrain != null) {
if (cancelled) {
Operators.onDiscard(dataSignalOfferedBeforeDrain,
actual.currentContext());
currentContext());
}
else if (done) {
Operators.onNextDropped(dataSignalOfferedBeforeDrain,
Expand All @@ -367,10 +367,23 @@ else if (done) {
return;
}

// This handles a race condition where `cancel()` is
// called before a subscriber arrives (e.g., via `take(0)`).
if (cancelled) {
if (dataSignalOfferedBeforeDrain != null) {
Operators.onDiscard(dataSignalOfferedBeforeDrain, currentContext());
}
if (!outputFused) {
Operators.onDiscardQueueWithClear(queue, currentContext(), null);
}
}

missed = WIP.addAndGet(this, -missed);
if (missed == 0) {
break;
}

dataSignalOfferedBeforeDrain = null;
}
}

Expand Down Expand Up @@ -444,14 +457,7 @@ public void cancel() {

doTerminate();

if (WIP.getAndIncrement(this) == 0) {
if (!outputFused) {
// discard MUST be happening only and only if there is no racing on elements consumption
// which is guaranteed by the WIP guard here in case non-fused output
Operators.onDiscardQueueWithClear(queue, currentContext(), null);
}
hasDownstream = false;
}
drain(null);
}

@Override
Expand Down Expand Up @@ -515,4 +521,9 @@ public void dispose() {
public boolean isDisposed() {
return cancelled || done;
}

@Override
public void terminateAndCleanup() {
cancel();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,4 +47,28 @@ default Object scanUnsafe(Attr key) {
default String stepName() {
return "source(" + getClass().getSimpleName() + ")";
}

/**
* An internal hook for operators to instruct this {@link SourceProducer} to terminate
* and clean up its resources, bypassing the standard subscription-cancellation path.
* <p>
* This method is necessary for specific scenarios where a downstream consumer
* terminates prematurely *without* propagating a standard
* {@link org.reactivestreams.Subscription#cancel() cancel} signal upstream. The primary
* use case is for operators that short-circuit the stream, such as {@code take(0)}.
* <p>
* Without this direct termination signal, a hot source with a buffer (like a
* {@code Sinks.many().unicast().onBackpressureBuffer(queue)}) could be orphaned, leading to its buffered
* elements never being released and causing a memory leak.
* <p>
* This is not intended for public use and should only be called by Reactor framework
* operators. The default implementation is a no-op, allowing sources to opt in to this
* behavior only if they manage resources that require explicit cleanup in such scenarios.
*
* @see reactor.core.publisher.Flux#take(long)
* @see SinkManyUnicast
*/
default void terminateAndCleanup() {
// Default implementation does nothing.
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -922,4 +922,14 @@ void emitNextWithNoSubscriberNoCapacityKeepsSinkOpenWithBuffer() {
.expectTimeout(Duration.ofSeconds(1))
.verify();
}

@Test
void shouldClearQueueOnImmediateCancellation() {
SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);
processor.tryEmitNext(1);
assertThat(processor.queue.size()).isEqualTo(1);

processor.asFlux().take(0).blockLast();
assertThat(processor.queue.size()).isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2024 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,20 +16,8 @@

package reactor.core.publisher;

import java.time.Duration;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.LockSupport;

import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;

import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand All @@ -41,6 +29,18 @@
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

import java.time.Duration;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.LockSupport;

import static org.assertj.core.api.Assertions.assertThat;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;

Expand Down Expand Up @@ -340,4 +340,22 @@ public void inners() {
.as("after scannable subscription")
.containsExactly(scannable);
}

@Test
void shouldClearBufferOnImmediateCancellation() {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

Sinks.Many<Integer> sink = Sinks.many().unicast().onBackpressureBuffer(queue);

sink.tryEmitNext(1);
sink.tryEmitNext(2);
sink.tryEmitNext(3);

assertThat(queue).hasSize(3);

sink.asFlux().take(0).blockLast();

assertThat(queue).isEmpty();
}

}