Skip to content

Commit 52cd84e

Browse files
committed
Caching Refresh Token for Listeners
Previously, the refresh token caching mechanism for listeners used a DirectProcessor.cache(1). The had the effect of only caching a value if there was already a subscriber attached. As this should return a value on subscription, regardless of the last time there was a subscriber, this change updates the code to use a ReplayProcessor which guarentees the "last" value is always available.
1 parent 08a8552 commit 52cd84e

File tree

1 file changed

+6
-11
lines changed

1 file changed

+6
-11
lines changed

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,11 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232
import org.springframework.web.util.UriComponentsBuilder;
33-
import reactor.core.publisher.DirectProcessor;
3433
import reactor.core.publisher.Flux;
3534
import reactor.core.publisher.Mono;
35+
import reactor.core.publisher.ReplayProcessor;
3636
import reactor.ipc.netty.http.client.HttpClientRequest;
3737
import reactor.ipc.netty.http.client.HttpClientResponse;
38-
import reactor.util.function.Tuple2;
39-
import reactor.util.function.Tuples;
4038

4139
import java.time.LocalDateTime;
4240
import java.time.ZoneId;
@@ -75,7 +73,7 @@ public abstract class AbstractUaaTokenProvider implements TokenProvider {
7573

7674
private final ConcurrentMap<ConnectionContext, Mono<String>> accessTokens = new ConcurrentHashMap<>(1);
7775

78-
private final ConcurrentMap<ConnectionContext, Tuple2<DirectProcessor<String>, Flux<String>>> refreshTokenStreams = new ConcurrentHashMap<>(1);
76+
private final ConcurrentMap<ConnectionContext, ReplayProcessor<String>> refreshTokenStreams = new ConcurrentHashMap<>(1);
7977

8078
private final ConcurrentMap<ConnectionContext, Mono<String>> refreshTokens = new ConcurrentHashMap<>(1);
8179

@@ -102,7 +100,7 @@ public String getClientSecret() {
102100
* @return a {@link Flux} that emits the last token on subscribe and new refresh tokens as they are negotiated
103101
*/
104102
public Flux<String> getRefreshTokens(ConnectionContext connectionContext) {
105-
return getRefreshTokenStream(connectionContext).getT2();
103+
return getRefreshTokenStream(connectionContext);
106104
}
107105

108106
@Override
@@ -188,7 +186,7 @@ private Consumer<Map<String, String>> extractRefreshToken(ConnectionContext conn
188186
}
189187

190188
this.refreshTokens.put(connectionContext, Mono.just(refreshToken));
191-
getRefreshTokenStream(connectionContext).getT1().onNext(refreshToken);
189+
getRefreshTokenStream(connectionContext).onNext(refreshToken);
192190
});
193191
}
194192

@@ -201,11 +199,8 @@ private Function<Mono<HttpClientResponse>, Mono<String>> extractTokens(Connectio
201199
.map(AbstractUaaTokenProvider::extractAccessToken);
202200
}
203201

204-
private Tuple2<DirectProcessor<String>, Flux<String>> getRefreshTokenStream(ConnectionContext connectionContext) {
205-
return this.refreshTokenStreams.computeIfAbsent(connectionContext, c -> {
206-
DirectProcessor<String> processor = DirectProcessor.create();
207-
return Tuples.of(processor, processor.cache(1));
208-
});
202+
private ReplayProcessor<String> getRefreshTokenStream(ConnectionContext connectionContext) {
203+
return this.refreshTokenStreams.computeIfAbsent(connectionContext, c -> ReplayProcessor.create(1));
209204
}
210205

211206
private Mono<HttpClientResponse> primaryToken(ConnectionContext connectionContext) {

0 commit comments

Comments
 (0)