Skip to content

Commit 945caec

Browse files
committed
Reactor Optimizations
This change takes advantage of some new features in Reactor Netty.
1 parent 6a84057 commit 945caec

File tree

4 files changed

+41
-205
lines changed

4 files changed

+41
-205
lines changed

cloudfoundry-client-spring/src/main/java/org/cloudfoundry/reactor/client/CloudFoundryExceptionBuilder.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@
1717
package org.cloudfoundry.reactor.client;
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper;
20-
import io.netty.buffer.ByteBufInputStream;
21-
import io.netty.buffer.Unpooled;
2220
import org.cloudfoundry.client.v2.CloudFoundryException;
2321
import org.springframework.web.client.HttpStatusCodeException;
2422
import reactor.core.publisher.Mono;
2523
import reactor.core.util.Exceptions;
2624
import reactor.io.netty.http.HttpException;
2725

2826
import java.io.IOException;
29-
import java.io.InputStream;
3027
import java.util.Map;
3128

3229
public final class CloudFoundryExceptionBuilder {
@@ -65,12 +62,9 @@ public static CloudFoundryException build(HttpStatusCodeException cause) { // T
6562
*/
6663
@SuppressWarnings("unchecked")
6764
public static <T> Mono<T> build(HttpException cause) {
68-
return cause.getChannel().receive()
69-
.reduceWith(Unpooled::compositeBuffer, (prev, next) -> prev.addComponent(next.retain()))
70-
.then(byteBufs -> {
71-
byteBufs.setIndex(0, byteBufs.capacity());
72-
73-
try (InputStream in = new ByteBufInputStream(byteBufs)) {
65+
return cause.getChannel().receive().aggregate().toInputStream()
66+
.then(in -> {
67+
try {
7468
Map<String, ?> response = OBJECT_MAPPER.readValue(in, Map.class);
7569
Integer code = (Integer) response.get("code");
7670
String description = (String) response.get("description");
@@ -79,8 +73,6 @@ public static <T> Mono<T> build(HttpException cause) {
7973
return Mono.error(new CloudFoundryException(code, description, errorCode, cause));
8074
} catch (IOException e) {
8175
throw Exceptions.propagate(e);
82-
} finally {
83-
byteBufs.release();
8476
}
8577
});
8678
}

cloudfoundry-client-spring/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import reactor.core.publisher.Mono;
2727
import reactor.core.tuple.Tuple;
2828
import reactor.core.tuple.Tuple2;
29-
import reactor.io.netty.common.NettyInbound;
3029
import reactor.io.netty.http.HttpClient;
3130
import reactor.io.netty.http.HttpInbound;
3231
import reactor.io.netty.http.HttpOutbound;
@@ -66,18 +65,19 @@ protected final <REQ extends Validatable, RSP> Mono<RSP> doDelete(REQ request, C
6665
.map(o -> requestTransformer.apply(Tuple.of(o, validRequest)))
6766
.doOnSubscribe(s -> this.requestLogger.debug("DELETE {}", uri))
6867
.then(o -> o.send(Mono.just(validRequest)
69-
.as(JsonCodec.encode(this.objectMapper, o)))))
68+
.where(req -> this.objectMapper.canSerialize(req.getClass()))
69+
.map(JsonCodec.encode(this.objectMapper, o)))))
7070
.doOnSuccess(inbound -> this.responseLogger.debug("{} {}", inbound.status().code(), uri))
7171
.doOnSuccess(inbound -> printWarnings(inbound, this.responseLogger, uri))))
72-
.flatMap(NettyInbound::receive)
73-
.as(JsonCodec.decode(this.objectMapper, responseType));
72+
.then(inbound -> inbound.receive().aggregate().toInputStream())
73+
.map(JsonCodec.decode(this.objectMapper, responseType));
7474
}
7575

7676
protected final <REQ extends Validatable, RSP> Mono<RSP> doGet(REQ request, Class<RSP> responseType, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,
7777
Function<Tuple2<HttpOutbound, REQ>, HttpOutbound> requestTransformer) {
7878
return doGet(request, uriTransformer, requestTransformer)
79-
.flatMap(NettyInbound::receive)
80-
.as(JsonCodec.decode(this.objectMapper, responseType));
79+
.then(inbound -> inbound.receive().aggregate().toInputStream())
80+
.map(JsonCodec.decode(this.objectMapper, responseType));
8181
}
8282

8383
protected final <REQ extends Validatable> Mono<HttpInbound> doGet(REQ request, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,
@@ -104,11 +104,12 @@ protected final <REQ extends Validatable, RSP> Mono<RSP> doPatch(REQ request, Cl
104104
.map(o -> requestTransformer.apply(Tuple.of(o, validRequest)))
105105
.doOnSubscribe(s -> this.requestLogger.debug("PATCH {}", uri))
106106
.then(o -> o.send(Mono.just(validRequest)
107-
.as(JsonCodec.encode(this.objectMapper, o)))))
107+
.where(req -> this.objectMapper.canSerialize(req.getClass()))
108+
.map(JsonCodec.encode(this.objectMapper, o)))))
108109
.doOnSuccess(inbound -> this.responseLogger.debug("{} {}", inbound.status().code(), uri))
109110
.doOnSuccess(inbound -> printWarnings(inbound, this.responseLogger, uri))))
110-
.flatMap(NettyInbound::receive)
111-
.as(JsonCodec.decode(this.objectMapper, responseType));
111+
.then(inbound -> inbound.receive().aggregate().toInputStream())
112+
.map(JsonCodec.decode(this.objectMapper, responseType));
112113
}
113114

114115
protected final <REQ extends Validatable, RSP> Mono<RSP> doPost(REQ request, Class<RSP> responseType, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,
@@ -121,11 +122,12 @@ protected final <REQ extends Validatable, RSP> Mono<RSP> doPost(REQ request, Cla
121122
.map(o -> requestTransformer.apply(Tuple.of(o, validRequest)))
122123
.doOnSubscribe(s -> this.requestLogger.debug("POST {}", uri))
123124
.then(o -> o.send(Mono.just(validRequest)
124-
.as(JsonCodec.encode(this.objectMapper, o)))))
125+
.where(req -> this.objectMapper.canSerialize(req.getClass()))
126+
.map(JsonCodec.encode(this.objectMapper, o)))))
125127
.doOnSuccess(inbound -> this.responseLogger.debug("{} {}", inbound.status().code(), uri))
126128
.doOnSuccess(inbound -> printWarnings(inbound, this.responseLogger, uri))))
127-
.flatMap(NettyInbound::receive)
128-
.as(JsonCodec.decode(this.objectMapper, responseType));
129+
.then(inbound -> inbound.receive().aggregate().toInputStream())
130+
.map(JsonCodec.decode(this.objectMapper, responseType));
129131
}
130132

131133
protected final <REQ extends Validatable, RSP> Mono<RSP> doPut(REQ request, Class<RSP> responseType, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,
@@ -138,11 +140,12 @@ protected final <REQ extends Validatable, RSP> Mono<RSP> doPut(REQ request, Clas
138140
.map(o -> requestTransformer.apply(Tuple.of(o, validRequest)))
139141
.doOnSubscribe(s -> this.requestLogger.debug("PUT {}", uri))
140142
.then(o -> o.send(Mono.just(validRequest)
141-
.as(JsonCodec.encode(this.objectMapper, o)))))
143+
.where(req -> this.objectMapper.canSerialize(req.getClass()))
144+
.map(JsonCodec.encode(this.objectMapper, o)))))
142145
.doOnSuccess(inbound -> this.responseLogger.debug("{} {}", inbound.status().code(), uri))
143146
.doOnSuccess(inbound -> printWarnings(inbound, this.responseLogger, uri))))
144-
.flatMap(NettyInbound::receive)
145-
.as(JsonCodec.decode(this.objectMapper, responseType));
147+
.then(inbound -> inbound.receive().aggregate().toInputStream())
148+
.map(JsonCodec.decode(this.objectMapper, responseType));
146149
}
147150

148151
protected final <REQ extends Validatable> Mono<HttpInbound> doWs(REQ request, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,

cloudfoundry-client-spring/src/main/java/org/cloudfoundry/reactor/util/JsonCodec.java

Lines changed: 16 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,10 @@
1616

1717
package org.cloudfoundry.reactor.util;
1818

19+
import com.fasterxml.jackson.core.JsonProcessingException;
1920
import com.fasterxml.jackson.databind.ObjectMapper;
2021
import io.netty.buffer.ByteBuf;
21-
import io.netty.buffer.ByteBufInputStream;
22-
import io.netty.buffer.CompositeByteBuf;
23-
import io.netty.buffer.Unpooled;
2422
import io.netty.util.AsciiString;
25-
import org.reactivestreams.Publisher;
26-
import org.reactivestreams.Subscriber;
27-
import reactor.core.publisher.Flux;
28-
import reactor.core.publisher.Mono;
29-
import reactor.core.publisher.MonoSource;
30-
import reactor.core.subscriber.SubscriberBarrier;
3123
import reactor.core.util.Exceptions;
3224
import reactor.io.netty.http.HttpOutbound;
3325

@@ -41,177 +33,27 @@ final class JsonCodec {
4133

4234
private static final AsciiString APPLICATION_JSON = new AsciiString("application/json");
4335

44-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
45-
46-
static <T> Function<Flux<ByteBuf>, Mono<T>> decode(Class<T> type) {
47-
return decode(OBJECT_MAPPER, type);
48-
}
49-
50-
static <T> Function<Flux<ByteBuf>, Mono<T>> decode(ObjectMapper objectMapper, Class<T> type) {
51-
return source -> new JsonDecoder<>(source, objectMapper, type);
52-
}
53-
54-
static <T> Function<Mono<T>, Mono<ByteBuf>> encode(HttpOutbound httpOutbound) {
55-
return encode(OBJECT_MAPPER, httpOutbound);
56-
}
57-
58-
static <T> Function<Mono<T>, Mono<ByteBuf>> encode(ObjectMapper objectMapper, HttpOutbound httpOutbound) {
59-
httpOutbound.header(CONTENT_TYPE, APPLICATION_JSON);
60-
return source -> new JsonEncoder<>(source, objectMapper);
61-
}
62-
63-
private static final class JsonDecoder<T> extends MonoSource<ByteBuf, T> {
64-
65-
private final ObjectMapper objectMapper;
66-
67-
private final Class<T> type;
68-
69-
private JsonDecoder(Publisher<? extends ByteBuf> source, ObjectMapper objectMapper, Class<T> type) {
70-
super(source);
71-
this.objectMapper = objectMapper;
72-
this.type = type;
73-
}
74-
75-
@Override
76-
public void subscribe(Subscriber<? super T> subscriber) {
77-
this.source.subscribe(new JsonDecoderSubscriber<>(subscriber, this.objectMapper, this.type));
78-
}
79-
}
80-
81-
private static final class JsonDecoderSubscriber<T> extends SubscriberBarrier<ByteBuf, T> {
82-
83-
private final ObjectMapper objectMapper;
84-
85-
private final Class<T> type;
86-
87-
private CompositeByteBuf byteBuf;
88-
89-
private boolean done = false;
90-
91-
private JsonDecoderSubscriber(Subscriber<? super T> subscriber, ObjectMapper objectMapper, Class<T> type) {
92-
super(subscriber);
93-
this.objectMapper = objectMapper;
94-
this.type = type;
95-
}
96-
97-
@Override
98-
protected void doComplete() {
99-
if (this.done) {
100-
return;
101-
}
102-
103-
if (this.byteBuf != null) {
104-
this.byteBuf.setIndex(0, this.byteBuf.capacity());
105-
106-
try (InputStream in = new ByteBufInputStream(this.byteBuf)) {
107-
this.subscriber.onNext(this.objectMapper.readValue(in, this.type));
108-
} catch (IOException e) {
109-
onError(e);
110-
} finally {
111-
this.byteBuf.release();
112-
}
113-
}
114-
115-
this.done = true;
116-
this.subscriber.onComplete();
117-
}
118-
119-
@Override
120-
protected void doError(Throwable throwable) {
121-
if (this.done) {
122-
Exceptions.onErrorDropped(throwable);
123-
return;
124-
}
125-
126-
if (this.byteBuf != null) {
127-
this.byteBuf.release();
128-
}
129-
130-
this.done = true;
131-
this.subscriber.onError(throwable);
132-
}
133-
134-
@Override
135-
protected void doNext(ByteBuf byteBuf) {
136-
if (this.done) {
137-
Exceptions.onNextDropped(byteBuf);
138-
return;
139-
}
140-
141-
if (this.byteBuf == null) {
142-
this.byteBuf = Unpooled.compositeBuffer();
36+
static <T> Function<InputStream, T> decode(ObjectMapper objectMapper, Class<T> type) {
37+
return in -> {
38+
try {
39+
return objectMapper.readValue(in, type);
40+
} catch (IOException e) {
41+
throw Exceptions.propagate(e);
14342
}
144-
145-
this.byteBuf.addComponent(byteBuf.retain());
146-
}
147-
43+
};
14844
}
14945

150-
private static final class JsonEncoder<T> extends MonoSource<T, ByteBuf> {
151-
152-
private final ObjectMapper objectMapper;
153-
154-
private JsonEncoder(Publisher<? extends T> source, ObjectMapper objectMapper) {
155-
super(source);
156-
this.objectMapper = objectMapper;
157-
}
158-
159-
@Override
160-
public void subscribe(Subscriber<? super ByteBuf> subscriber) {
161-
this.source.subscribe(new JsonEncoderSubscriber<>(subscriber, this.objectMapper));
162-
}
163-
}
164-
165-
private static final class JsonEncoderSubscriber<T> extends SubscriberBarrier<T, ByteBuf> {
166-
167-
private final ObjectMapper objectMapper;
168-
169-
private boolean done = false;
170-
171-
private JsonEncoderSubscriber(Subscriber<? super ByteBuf> subscriber, ObjectMapper objectMapper) {
172-
super(subscriber);
173-
this.objectMapper = objectMapper;
174-
}
175-
176-
@Override
177-
protected void doComplete() {
178-
if (this.done) {
179-
return;
180-
}
181-
182-
this.done = true;
183-
this.subscriber.onComplete();
184-
}
185-
186-
@Override
187-
protected void doError(Throwable throwable) {
188-
if (this.done) {
189-
Exceptions.onErrorDropped(throwable);
190-
return;
191-
}
192-
193-
this.done = true;
194-
this.subscriber.onError(throwable);
195-
}
196-
197-
@Override
198-
protected void doNext(T t) {
199-
if (this.done) {
200-
Exceptions.onNextDropped(t);
201-
return;
202-
}
203-
204-
if (!this.objectMapper.canSerialize(t.getClass())) {
205-
return;
206-
}
46+
static <T> Function<T, ByteBuf> encode(ObjectMapper objectMapper, HttpOutbound httpOutbound) {
47+
httpOutbound.header(CONTENT_TYPE, APPLICATION_JSON);
20748

49+
return source -> {
20850
try {
209-
this.subscriber.onNext(Unpooled.wrappedBuffer(this.objectMapper.writeValueAsBytes(t)));
210-
} catch (IOException e) {
211-
onError(e);
51+
return httpOutbound.delegate().alloc().directBuffer()
52+
.writeBytes(objectMapper.writeValueAsBytes(source));
53+
} catch (JsonProcessingException e) {
54+
throw Exceptions.propagate(e);
21255
}
213-
}
214-
56+
};
21557
}
21658

21759
}

cloudfoundry-client-spring/src/main/lombok/org/cloudfoundry/reactor/util/DefaultConnectionContext.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import reactor.core.publisher.Mono;
2525
import reactor.io.netty.config.ClientOptions;
2626
import reactor.io.netty.http.HttpClient;
27-
import reactor.io.netty.http.HttpInbound;
2827

2928
import java.time.Duration;
3029
import java.util.Map;
@@ -54,8 +53,8 @@ public final class DefaultConnectionContext implements ConnectionContext {
5453
this.sslCertificateTruster = createSslCertificateTruster(trustCertificates);
5554
this.httpClient = createHttpClient(this.sslCertificateTruster);
5655
this.root = getRoot(host, port, this.sslCertificateTruster);
57-
this.info = getInfo(this.httpClient, this.root);
5856
this.objectMapper = getObjectMapper(objectMapper);
57+
this.info = getInfo(this.httpClient, this.objectMapper, this.root);
5958
}
6059

6160
@Override
@@ -102,12 +101,12 @@ private static Optional<SslCertificateTruster> createSslCertificateTruster(Boole
102101
}
103102

104103
@SuppressWarnings("unchecked")
105-
private static Mono<Map<String, String>> getInfo(HttpClient httpClient, Mono<String> root) {
104+
private static Mono<Map<String, String>> getInfo(HttpClient httpClient, ObjectMapper objectMapper, Mono<String> root) {
106105
return root
107106
.map(uri -> UriComponentsBuilder.fromUriString(uri).pathSegment("v2", "info").build().toUriString())
108107
.then(httpClient::get)
109-
.flatMap(HttpInbound::receive)
110-
.as(JsonCodec.decode(Map.class))
108+
.then(inbound -> inbound.receive().aggregate().toInputStream())
109+
.map(JsonCodec.decode(objectMapper, Map.class))
111110
.map(m -> (Map<String, String>) m)
112111
.cache();
113112
}

0 commit comments

Comments
 (0)