Skip to content

Commit e4d62b6

Browse files
authored
fixes performance degradation when fragmentation is used (#995)
1 parent 765abb7 commit e4d62b6

File tree

1 file changed

+18
-2
lines changed

1 file changed

+18
-2
lines changed

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,18 @@ public static int assertMtu(int mtu) {
8484

8585
@Override
8686
public Mono<Void> send(Publisher<ByteBuf> frames) {
87-
return Flux.from(frames).concatMap(this::sendOne).then();
87+
return delegate.send(
88+
Flux.from(frames)
89+
.concatMap(
90+
frame -> {
91+
FrameType frameType = FrameHeaderCodec.frameType(frame);
92+
int readableBytes = frame.readableBytes();
93+
if (!shouldFragment(frameType, readableBytes)) {
94+
return Flux.just(frame);
95+
}
96+
97+
return logFragments(Flux.from(fragmentFrame(alloc(), mtu, frame, frameType)));
98+
}));
8899
}
89100

90101
@Override
@@ -95,6 +106,11 @@ public Mono<Void> sendOne(ByteBuf frame) {
95106
return delegate.sendOne(frame);
96107
}
97108
Flux<ByteBuf> fragments = Flux.from(fragmentFrame(alloc(), mtu, frame, frameType));
109+
fragments = logFragments(fragments);
110+
return delegate.send(fragments);
111+
}
112+
113+
protected Flux<ByteBuf> logFragments(Flux<ByteBuf> fragments) {
98114
if (logger.isDebugEnabled()) {
99115
fragments =
100116
fragments.doOnNext(
@@ -107,6 +123,6 @@ public Mono<Void> sendOne(ByteBuf frame) {
107123
ByteBufUtil.prettyHexDump(byteBuf));
108124
});
109125
}
110-
return delegate.send(fragments);
126+
return fragments;
111127
}
112128
}

0 commit comments

Comments
 (0)