Skip to content

Commit eaae9b0

Browse files
author
likun
committed
Reduce excessive off-heap memory consumption caused by occasional large keys.
1 parent 8321f5d commit eaae9b0

File tree

3 files changed

+198
-2
lines changed

3 files changed

+198
-2
lines changed

src/main/java/io/lettuce/core/ClientOptions.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ public class ClientOptions implements Serializable {
9494

9595
public static final boolean DEFAULT_USE_HASH_INDEX_QUEUE = true;
9696

97+
public static final int DEFAULT_READ_BUFFER_SIZE = 64 * 1024;
98+
99+
private static final boolean DEFAULT_CREATE_BYTEBUF_WHEN_RECV_LARGE_KEY = false;
100+
97101
private final boolean autoReconnect;
98102

99103
private final Predicate<RedisCommand<?, ?, ?>> replayFilter;
@@ -130,6 +134,10 @@ public class ClientOptions implements Serializable {
130134

131135
private final boolean useHashIndexedQueue;
132136

137+
private final boolean createByteBufWhenRecvLargeKey;
138+
139+
private final int readBufferSize;
140+
133141
protected ClientOptions(Builder builder) {
134142
this.autoReconnect = builder.autoReconnect;
135143
this.replayFilter = builder.replayFilter;
@@ -149,6 +157,8 @@ protected ClientOptions(Builder builder) {
149157
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
150158
this.timeoutOptions = builder.timeoutOptions;
151159
this.useHashIndexedQueue = builder.useHashIndexedQueue;
160+
this.createByteBufWhenRecvLargeKey = builder.createByteBufWhenRecvLargeKey;
161+
this.readBufferSize = builder.readBufferSize;
152162
}
153163

154164
protected ClientOptions(ClientOptions original) {
@@ -170,6 +180,8 @@ protected ClientOptions(ClientOptions original) {
170180
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
171181
this.timeoutOptions = original.getTimeoutOptions();
172182
this.useHashIndexedQueue = original.isUseHashIndexedQueue();
183+
this.createByteBufWhenRecvLargeKey = original.isCreateByteBufWhenRecvLargeKey();
184+
this.readBufferSize = original.getReadBufferSize();
173185
}
174186

175187
/**
@@ -241,6 +253,10 @@ public static class Builder {
241253

242254
private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;
243255

256+
private boolean createByteBufWhenRecvLargeKey = DEFAULT_CREATE_BYTEBUF_WHEN_RECV_LARGE_KEY;
257+
258+
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
259+
244260
protected Builder() {
245261
}
246262

@@ -529,6 +545,31 @@ public Builder useHashIndexQueue(boolean useHashIndexedQueue) {
529545
return this;
530546
}
531547

548+
/**
549+
* In the case of large keys, the command handler will create a {@link io.netty.buffer.ByteBuf} to hold the large key.
550+
* to avoid occassional large keys from occupying excessive memory
551+
*
552+
* @param createByteBufWhenRecvLargeKey true/false
553+
* @return {@code this}
554+
* @see io.lettuce.core.protocol.CommandHandler
555+
*/
556+
public Builder createByteBufWhenRecvLargeKey(boolean createByteBufWhenRecvLargeKey) {
557+
this.createByteBufWhenRecvLargeKey = createByteBufWhenRecvLargeKey;
558+
return this;
559+
}
560+
561+
/**
562+
* Set the read buffer size for receiving data from Redis server in bytes. See {@link #DEFAULT_READ_BUFFER_SIZE}.
563+
*
564+
* @param readBufferSize Read ByteBuf Size
565+
* @return {@code this}
566+
* @see io.lettuce.core.protocol.CommandHandler
567+
*/
568+
public Builder readBufferSize(int readBufferSize) {
569+
this.readBufferSize = readBufferSize;
570+
return this;
571+
}
572+
532573
/**
533574
* Create a new instance of {@link ClientOptions}.
534575
*
@@ -558,7 +599,8 @@ public ClientOptions.Builder mutate() {
558599
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
559600
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
560601
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
561-
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
602+
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions())
603+
.createByteBufWhenRecvLargeKey(isCreateByteBufWhenRecvLargeKey()).readBufferSize(getReadBufferSize());
562604

563605
return builder;
564606
}
@@ -773,6 +815,24 @@ public TimeoutOptions getTimeoutOptions() {
773815
return timeoutOptions;
774816
}
775817

818+
/**
819+
* get original readBuffer size in {@link io.lettuce.core.protocol.CommandHandler}
820+
*
821+
* @return the original readBuffer size in {@link io.lettuce.core.protocol.CommandHandler}
822+
*/
823+
public int getReadBufferSize() {
824+
return readBufferSize;
825+
}
826+
827+
/**
828+
* if true , the client will create a bytebuf when recv large key ( size > {@link #readBufferSize})
829+
*
830+
* @return true/false
831+
*/
832+
public boolean isCreateByteBufWhenRecvLargeKey() {
833+
return createByteBufWhenRecvLargeKey;
834+
}
835+
776836
/**
777837
* Defines the re-authentication behavior of the Redis client.
778838
* <p/>

src/main/java/io/lettuce/core/protocol/CommandHandler.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import io.lettuce.core.tracing.Tracer;
5656
import io.lettuce.core.tracing.Tracing;
5757
import io.netty.buffer.ByteBuf;
58+
import io.netty.buffer.ByteBufAllocator;
5859
import io.netty.channel.Channel;
5960
import io.netty.channel.ChannelDuplexHandler;
6061
import io.netty.channel.ChannelHandler;
@@ -119,6 +120,10 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom
119120

120121
private final BackpressureSource backpressureSource = new BackpressureSource();
121122

123+
private final int defaultReadBufferSize;
124+
125+
private final boolean createByteBufWhenRecvLargeKey;
126+
122127
private RedisStateMachine rsm;
123128

124129
private Channel channel;
@@ -139,6 +144,10 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom
139144

140145
private Tracing.Endpoint tracedEndpoint;
141146

147+
private ByteBuf tmpReadBuffer;
148+
149+
private ByteBufAllocator byteBufAllocator;
150+
142151
/**
143152
* Initialize a new instance that handles commands from the supplied queue.
144153
*
@@ -165,6 +174,10 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc
165174
this.tracingEnabled = tracing.isEnabled();
166175

167176
this.decodeBufferPolicy = clientOptions.getDecodeBufferPolicy();
177+
178+
this.defaultReadBufferSize = clientOptions.getReadBufferSize();
179+
180+
this.createByteBufWhenRecvLargeKey = clientOptions.isCreateByteBufWhenRecvLargeKey();
168181
}
169182

170183
public Endpoint getEndpoint() {
@@ -222,7 +235,8 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
222235

223236
setState(LifecycleState.REGISTERED);
224237

225-
readBuffer = ctx.alloc().buffer(8192 * 8);
238+
byteBufAllocator = ctx.alloc();
239+
readBuffer = ctx.alloc().buffer(defaultReadBufferSize);
226240
rsm = new RedisStateMachine();
227241
ctx.fireChannelRegistered();
228242
}
@@ -615,6 +629,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
615629
logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim());
616630
}
617631

632+
// if buffer capacity larger than default capacity, then create a new buffer with double capacity
633+
if (createByteBufWhenRecvLargeKey && readBuffer.capacity() == defaultReadBufferSize
634+
&& readBuffer.writableBytes() < input.readableBytes() && byteBufAllocator != null) {
635+
ByteBuf byteBuf = byteBufAllocator.directBuffer(readBuffer.capacity() << 1);
636+
byteBuf.writeBytes(readBuffer);
637+
tmpReadBuffer = readBuffer;
638+
readBuffer = byteBuf;
639+
}
640+
618641
readBuffer.touch("CommandHandler.read(…)");
619642
readBuffer.writeBytes(input);
620643

@@ -642,6 +665,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
642665
}
643666
}
644667

668+
boolean decodeComplete = false;
645669
while (canDecode(buffer)) {
646670

647671
if (isPushDecode(buffer)) {
@@ -702,6 +726,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
702726
logger.debug("{} Completing command {}", logPrefix(), command);
703727
}
704728
complete(command);
729+
decodeComplete = true;
705730
} catch (Exception e) {
706731
logger.warn("{} Unexpected exception during request: {}", logPrefix, e.toString(), e);
707732
}
@@ -712,6 +737,16 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
712737
}
713738

714739
decodeBufferPolicy.afterDecoding(buffer);
740+
releaseLargeBufferIfNecessary(buffer, decodeComplete);
741+
}
742+
743+
private void releaseLargeBufferIfNecessary(ByteBuf buffer, boolean decodeComplete) {
744+
if (decodeComplete && this.tmpReadBuffer != null) {
745+
buffer.release();
746+
this.readBuffer = tmpReadBuffer;
747+
this.readBuffer.clear();
748+
this.tmpReadBuffer = null;
749+
}
715750
}
716751

717752
protected void notifyPushListeners(PushMessage notification) {

src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,20 @@
2626
import static org.mockito.Mockito.*;
2727
import static org.mockito.Mockito.eq;
2828

29+
import java.io.BufferedReader;
30+
import java.io.BufferedWriter;
31+
import java.io.File;
32+
import java.io.FileWriter;
2933
import java.io.IOException;
34+
import java.io.InputStream;
35+
import java.io.InputStreamReader;
3036
import java.net.Inet4Address;
3137
import java.net.InetSocketAddress;
3238
import java.nio.ByteBuffer;
39+
import java.nio.charset.StandardCharsets;
40+
import java.nio.file.Files;
3341
import java.time.Duration;
42+
import java.util.ArrayList;
3443
import java.util.Arrays;
3544
import java.util.Collection;
3645
import java.util.Collections;
@@ -65,6 +74,7 @@
6574
import io.lettuce.core.output.KeyValueListOutput;
6675
import io.lettuce.core.output.StatusOutput;
6776
import io.lettuce.core.output.ValueListOutput;
77+
import io.lettuce.core.output.ValueOutput;
6878
import io.lettuce.core.resource.ClientResources;
6979
import io.lettuce.core.tracing.Tracing;
7080
import io.lettuce.test.Delay;
@@ -650,4 +660,95 @@ void shouldHandleNullBuffers() throws Exception {
650660
sut.channelUnregistered(context);
651661
}
652662

663+
/**
664+
* if large keys are received ,the large buffer will created and then released
665+
*/
666+
@Test
667+
void shouldLargeBufferCreatedAndRelease() throws Exception {
668+
ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
669+
channelPromise.setSuccess();
670+
ClientOptions clientOptions = ClientOptions.builder().createByteBufWhenRecvLargeKey(true).build();
671+
sut = new CommandHandler(clientOptions, clientResources, endpoint);
672+
sut.channelRegistered(context);
673+
sut.channelActive(context);
674+
sut.getStack().add(new Command<>(CommandType.GET, new ValueOutput<String, String>(StringCodec.UTF8)));
675+
676+
ByteBuf internalBuffer = ReflectionTestUtils.getField(sut, "readBuffer");
677+
678+
// step1 Receive First TCP Packet
679+
ByteBuf msg = context.alloc().buffer(13);
680+
// 1+5+2+7 ($+length+\r\n+len(value))
681+
msg.writeBytes("$65536\r\nval_abc".getBytes(StandardCharsets.UTF_8));
682+
sut.channelRead(context, msg);
683+
684+
int markedReaderIndex = ReflectionTestUtils.getField(internalBuffer, "markedReaderIndex");
685+
assertThat(markedReaderIndex).isEqualTo(8);
686+
assertThat(internalBuffer.readerIndex()).isEqualTo(8);
687+
assertThat(internalBuffer.writerIndex()).isEqualTo(15);
688+
689+
// step2 Receive Second TCP Packet
690+
ByteBuf msg2 = context.alloc().buffer(64 * 1024);
691+
StringBuilder sb = new StringBuilder();
692+
// 65536-7
693+
for (int i = 0; i < 65529; i++) {
694+
sb.append((char) ('a' + i % 26));
695+
}
696+
sb.append("\r\n");
697+
msg2.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
698+
sut.channelRead(context, msg2);
699+
700+
// step3 Got Result: readBuffer.capacity = 64k and tmpBuffer is null
701+
ByteBuf readBuffer = ReflectionTestUtils.getField(sut, "readBuffer");
702+
assertThat(readBuffer.capacity()).isEqualTo(64 * 1024);
703+
assertThat(readBuffer.readerIndex()).isZero();
704+
assertThat(readBuffer.writerIndex()).isZero();
705+
706+
ByteBuf tmpBuffer = ReflectionTestUtils.getField(sut, "tmpReadBuffer");
707+
assertThat(tmpBuffer).isNull();
708+
}
709+
710+
/**
711+
* readBufferSize = 16 createByteBufWhenRecvLargeKey = true large than readBufferSize will create a new buffer
712+
*
713+
*/
714+
@Test
715+
void shouldLargeThan16ThenCreateAndRelease() throws Exception {
716+
ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
717+
channelPromise.setSuccess();
718+
int readBufferSize = 16;
719+
ClientOptions clientOptions = ClientOptions.builder().createByteBufWhenRecvLargeKey(true).readBufferSize(readBufferSize)
720+
.build();
721+
sut = new CommandHandler(clientOptions, clientResources, endpoint);
722+
sut.channelRegistered(context);
723+
sut.channelActive(context);
724+
sut.getStack().add(new Command<>(CommandType.GET, new ValueOutput<String, String>(StringCodec.UTF8)));
725+
726+
ByteBuf internalBuffer = ReflectionTestUtils.getField(sut, "readBuffer");
727+
728+
// step1 Receive First TCP Packet
729+
ByteBuf msg = context.alloc().buffer(13);
730+
// 1+5+2+7 ($+length+\r\n+len(value))
731+
msg.writeBytes("$16\r\nabc_val".getBytes(StandardCharsets.UTF_8));
732+
sut.channelRead(context, msg);
733+
734+
int markedReaderIndex = ReflectionTestUtils.getField(internalBuffer, "markedReaderIndex");
735+
assertThat(markedReaderIndex).isEqualTo(5);
736+
assertThat(internalBuffer.readerIndex()).isEqualTo(5);
737+
assertThat(internalBuffer.writerIndex()).isEqualTo(12);
738+
739+
// step2 Receive Second TCP Packet
740+
ByteBuf msg2 = context.alloc().buffer(32);
741+
msg2.writeBytes("abcd_abcd\r\n".getBytes(StandardCharsets.UTF_8));
742+
sut.channelRead(context, msg2);
743+
744+
// step3 Got Result: readBuffer.capacity = 64k and tmpBuffer is null
745+
ByteBuf readBuffer = ReflectionTestUtils.getField(sut, "readBuffer");
746+
assertThat(readBuffer.capacity()).isEqualTo(readBufferSize);
747+
assertThat(readBuffer.readerIndex()).isZero();
748+
assertThat(readBuffer.writerIndex()).isZero();
749+
750+
ByteBuf tmpBuffer = ReflectionTestUtils.getField(sut, "tmpReadBuffer");
751+
assertThat(tmpBuffer).isNull();
752+
}
753+
653754
}

0 commit comments

Comments
 (0)