Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,15 +69,9 @@ final class FluxBuffer<T, C extends Collection<? super T>> extends InternalFluxO

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super C> actual) {
if (size == skip) {
return new BufferExactSubscriber<>(actual, size, bufferSupplier);
}
else if (skip > size) {
return new BufferSkipSubscriber<>(actual, size, skip, bufferSupplier);
}
else {
return new BufferOverlappingSubscriber<>(actual, size, skip, bufferSupplier);
}
return (size == skip) ? new BufferExactSubscriber<>(actual, size, bufferSupplier) :
(skip > size) ? new BufferSkipSubscriber<>(actual, size, skip, bufferSupplier) :
new BufferOverlappingSubscriber<>(actual, size, skip, bufferSupplier);
}

@Override
Expand Down
46 changes: 20 additions & 26 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -3132,10 +3132,9 @@ public final Flux<T> expand(Function<? super T, ? extends Publisher<? extends T>
* @return a filtered {@link Mono}
*/
public final Mono<T> filter(final Predicate<? super T> tester) {
if (this instanceof Fuseable) {
return onAssembly(new MonoFilterFuseable<>(this, tester));
}
return onAssembly(new MonoFilter<>(this, tester));
return onAssembly(this instanceof Fuseable
? new MonoFilterFuseable<>(this, tester)
: new MonoFilter<>(this, tester));
}

/**
Expand Down Expand Up @@ -3291,10 +3290,9 @@ public final Mono<Boolean> hasElement() {
* @return a transformed {@link Mono}
*/
public final <R> Mono<R> handle(BiConsumer<? super T, SynchronousSink<R>> handler) {
if (this instanceof Fuseable) {
return onAssembly(new MonoHandleFuseable<>(this, handler));
}
return onAssembly(new MonoHandle<>(this, handler));
return (this instanceof Fuseable)
? onAssembly(new MonoHandleFuseable<>(this, handler))
: onAssembly(new MonoHandle<>(this, handler));
}

/**
Expand Down Expand Up @@ -3418,10 +3416,9 @@ public final Mono<T> log(@Nullable String category,
SignalLogger<T> log = new SignalLogger<>(this, category, level,
showOperatorLine, options);

if (this instanceof Fuseable) {
return onAssembly(new MonoLogFuseable<>(this, log));
}
return onAssembly(new MonoLog<>(this, log));
return (this instanceof Fuseable)
? onAssembly(new MonoLogFuseable<>(this, log))
: onAssembly(new MonoLog<>(this, log));
}


Expand Down Expand Up @@ -3468,10 +3465,9 @@ public final Mono<T> log(Logger logger,
s -> logger,
options);

if (this instanceof Fuseable) {
return onAssembly(new MonoLogFuseable<>(this, log));
}
return onAssembly(new MonoLog<>(this, log));
return (this instanceof Fuseable)
? onAssembly(new MonoLogFuseable<>(this, log))
: onAssembly(new MonoLog<>(this, log));
}

/**
Expand All @@ -3486,10 +3482,9 @@ public final Mono<T> log(Logger logger,
* @return a new {@link Mono}
*/
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
if (this instanceof Fuseable) {
return onAssembly(new MonoMapFuseable<>(this, mapper));
}
return onAssembly(new MonoMap<>(this, mapper));
return (this instanceof Fuseable)
? onAssembly(new MonoMapFuseable<>(this, mapper))
: onAssembly(new MonoMap<>(this, mapper));
}

/**
Expand Down Expand Up @@ -3574,11 +3569,10 @@ public final Mono<T> metrics() {
if (!Metrics.isInstrumentationAvailable()) {
return this;
}

if (this instanceof Fuseable) {
return onAssembly(new MonoMetricsFuseable<>(this));
}
return onAssembly(new MonoMetrics<>(this));

return (this instanceof Fuseable)
? onAssembly(new MonoMetricsFuseable<>(this))
: onAssembly(new MonoMetrics<>(this));
}

/**
Expand Down