Skip to content

Commit 765abb7

Browse files
committed
DefaultPayload#create properly copies ByteBuf content
Closes gh-970 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent ce62903 commit 765abb7

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public static Payload create(ByteBuf data) {
100100

101101
public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
102102
try {
103-
return create(data.nioBuffer(), metadata == null ? null : metadata.nioBuffer());
103+
return create(toBytes(data), metadata != null ? toBytes(metadata) : null);
104104
} finally {
105105
data.release();
106106
if (metadata != null) {
@@ -110,7 +110,16 @@ public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
110110
}
111111

112112
public static Payload create(Payload payload) {
113-
return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null);
113+
return create(
114+
toBytes(payload.data()), payload.hasMetadata() ? toBytes(payload.metadata()) : null);
115+
}
116+
117+
private static byte[] toBytes(ByteBuf byteBuf) {
118+
byte[] bytes = new byte[byteBuf.readableBytes()];
119+
byteBuf.markReaderIndex();
120+
byteBuf.readBytes(bytes);
121+
byteBuf.resetReaderIndex();
122+
return bytes;
114123
}
115124

116125
@Override

rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,14 @@ public void ensuresThatSetupPayloadCanBeRetained() {
7373
@Test
7474
public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions() {
7575
Payload setupPayload = ByteBufPayload.create("TestData", "TestMetadata");
76-
7776
Assertions.assertThat(setupPayload.refCnt()).isOne();
7877

78+
// Keep the data and metadata around so we can try changing them independently
79+
ByteBuf dataBuf = setupPayload.data();
80+
ByteBuf metadataBuf = setupPayload.metadata();
81+
dataBuf.retain();
82+
metadataBuf.retain();
83+
7984
TestClientTransport testClientTransport = new TestClientTransport();
8085
Mono<RSocket> connectionMono =
8186
RSocketConnector.create().setupPayload(setupPayload).connect(testClientTransport);
@@ -92,6 +97,15 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
9297
.expectComplete()
9398
.verify(Duration.ofMillis(100));
9499

100+
// Changing the original data and metadata should not impact the SetupPayload
101+
dataBuf.writerIndex(dataBuf.readerIndex());
102+
dataBuf.writeChar('d');
103+
dataBuf.release();
104+
105+
metadataBuf.writerIndex(metadataBuf.readerIndex());
106+
metadataBuf.writeChar('m');
107+
metadataBuf.release();
108+
95109
Assertions.assertThat(testClientTransport.testConnection().getSent())
96110
.hasSize(2)
97111
.allMatch(
@@ -100,7 +114,11 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
100114
return payload.getDataUtf8().equals("TestData")
101115
&& payload.getMetadataUtf8().equals("TestMetadata");
102116
})
103-
.allMatch(ReferenceCounted::release);
117+
.allMatch(
118+
byteBuf -> {
119+
System.out.println("calling release " + byteBuf.refCnt());
120+
return byteBuf.release();
121+
});
104122
Assertions.assertThat(setupPayload.refCnt()).isZero();
105123
}
106124

0 commit comments

Comments
 (0)