Skip to content

Commit a83a173

Browse files
Ryland Degnanrobertroeser
Ryland Degnan
authored andcommitted
Zero copy (#444)
* Added configurable frame decoder * Test frame decoder with ping pong
1 parent 3b53d5d commit a83a173

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+769
-441
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ import io.rsocket.Payload;
6161
import io.rsocket.RSocket;
6262
import io.rsocket.RSocketFactory;
6363
import io.rsocket.transport.netty.client.WebsocketClientTransport;
64-
import io.rsocket.util.PayloadImpl;
64+
import io.rsocket.util.DefaultPayload;
6565
import reactor.core.publisher.Flux;
6666
6767
import java.net.URI;
@@ -72,7 +72,7 @@ public class ExampleClient {
7272
RSocket client = RSocketFactory.connect().keepAlive().transport(ws).start().block();
7373
7474
try {
75-
Flux<Payload> s = client.requestStream(PayloadImpl.textPayload("peace"));
75+
Flux<Payload> s = client.requestStream(DefaultPayload.textPayload("peace"));
7676
7777
s.take(10).doOnNext(p -> System.out.println(p.getDataUtf8())).blockLast();
7878
} finally {

rsocket-core/src/jmh/java/io/rsocket/FragmentationPerf.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io.rsocket.fragmentation.FrameFragmenter;
44
import io.rsocket.fragmentation.FrameReassembler;
5-
import io.rsocket.util.PayloadImpl;
5+
import io.rsocket.util.DefaultPayload;
66
import java.nio.ByteBuffer;
77
import java.util.concurrent.ThreadLocalRandom;
88
import java.util.stream.Collectors;
@@ -43,13 +43,13 @@ public void setup(Blackhole bh) {
4343
ByteBuffer data = createRandomBytes(1 << 18);
4444
ByteBuffer metadata = createRandomBytes(1 << 18);
4545
largeFrame =
46-
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, new PayloadImpl(data, metadata), 1);
46+
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
4747
largeFrameFragmenter = new FrameFragmenter(1024);
4848

4949
data = createRandomBytes(16);
5050
metadata = createRandomBytes(16);
5151
smallFrame =
52-
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, new PayloadImpl(data, metadata), 1);
52+
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
5353
smallFrameFragmenter = new FrameFragmenter(2);
5454
smallFramesIterable =
5555
smallFrameFragmenter

rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
import io.rsocket.RSocketFactory.Start;
2020
import io.rsocket.perfutil.TestDuplexConnection;
21-
import io.rsocket.util.PayloadImpl;
21+
import io.rsocket.util.DefaultPayload;
22+
2223
import java.nio.ByteBuffer;
2324
import java.nio.charset.StandardCharsets;
2425
import org.openjdk.jmh.annotations.Benchmark;
@@ -79,7 +80,7 @@ public static class Input {
7980

8081
static final ByteBuffer HELLO = ByteBuffer.wrap("HELLO".getBytes(StandardCharsets.UTF_8));
8182

82-
static final Payload HELLO_PAYLOAD = new PayloadImpl(HELLO);
83+
static final Payload HELLO_PAYLOAD = new DefaultPayload(HELLO);
8384

8485
static final DirectProcessor<Frame> clientReceive = DirectProcessor.create();
8586
static final DirectProcessor<Frame> serverReceive = DirectProcessor.create();

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

+62-24
Original file line numberDiff line numberDiff line change
@@ -17,47 +17,50 @@
1717

1818
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
1919

20+
import io.netty.buffer.ByteBuf;
21+
import io.netty.buffer.Unpooled;
22+
import io.netty.util.AbstractReferenceCounted;
23+
import io.rsocket.Frame.Setup;
2024
import io.rsocket.frame.SetupFrameFlyweight;
21-
import java.nio.ByteBuffer;
2225

2326
/**
2427
* Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data
2528
*/
26-
public abstract class ConnectionSetupPayload implements Payload {
29+
public abstract class ConnectionSetupPayload extends AbstractReferenceCounted implements Payload {
2730

2831
public static final int NO_FLAGS = 0;
2932
public static final int HONOR_LEASE = SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE;
3033
public static final int STRICT_INTERPRETATION = SetupFrameFlyweight.FLAGS_STRICT_INTERPRETATION;
3134

3235
public static ConnectionSetupPayload create(String metadataMimeType, String dataMimeType) {
33-
return new ConnectionSetupPayloadImpl(
34-
metadataMimeType, dataMimeType, Frame.NULL_BYTEBUFFER, Frame.NULL_BYTEBUFFER, NO_FLAGS);
36+
return new DefaultConnectionSetupPayload(
37+
metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, NO_FLAGS);
3538
}
3639

3740
public static ConnectionSetupPayload create(
3841
String metadataMimeType, String dataMimeType, Payload payload) {
39-
return new ConnectionSetupPayloadImpl(
42+
return new DefaultConnectionSetupPayload(
4043
metadataMimeType,
4144
dataMimeType,
42-
payload.getData(),
43-
payload.getMetadata(),
45+
payload.sliceData(),
46+
payload.sliceMetadata(),
4447
payload.hasMetadata() ? FLAGS_M : 0);
4548
}
4649

4750
public static ConnectionSetupPayload create(
4851
String metadataMimeType, String dataMimeType, int flags) {
49-
return new ConnectionSetupPayloadImpl(
50-
metadataMimeType, dataMimeType, Frame.NULL_BYTEBUFFER, Frame.NULL_BYTEBUFFER, flags);
52+
return new DefaultConnectionSetupPayload(
53+
metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, flags);
5154
}
5255

5356
public static ConnectionSetupPayload create(final Frame setupFrame) {
5457
Frame.ensureFrameType(FrameType.SETUP, setupFrame);
55-
return new ConnectionSetupPayloadImpl(
56-
Frame.Setup.metadataMimeType(setupFrame),
57-
Frame.Setup.dataMimeType(setupFrame),
58-
setupFrame.getData(),
59-
setupFrame.getMetadata(),
60-
Frame.Setup.getFlags(setupFrame));
58+
return new DefaultConnectionSetupPayload(
59+
Setup.metadataMimeType(setupFrame),
60+
Setup.dataMimeType(setupFrame),
61+
setupFrame.sliceData(),
62+
setupFrame.sliceMetadata(),
63+
Setup.getFlags(setupFrame));
6164
}
6265

6366
public abstract String metadataMimeType();
@@ -79,27 +82,42 @@ public boolean hasMetadata() {
7982
return Frame.isFlagSet(getFlags(), FLAGS_M);
8083
}
8184

82-
private static final class ConnectionSetupPayloadImpl extends ConnectionSetupPayload {
85+
@Override
86+
public ConnectionSetupPayload retain() {
87+
super.retain();
88+
return this;
89+
}
90+
91+
@Override
92+
public ConnectionSetupPayload retain(int increment) {
93+
super.retain(increment);
94+
return this;
95+
}
96+
97+
public abstract ConnectionSetupPayload touch();
98+
public abstract ConnectionSetupPayload touch(Object hint);
99+
100+
private static final class DefaultConnectionSetupPayload extends ConnectionSetupPayload {
83101

84102
private final String metadataMimeType;
85103
private final String dataMimeType;
86-
private final ByteBuffer data;
87-
private final ByteBuffer metadata;
104+
private final ByteBuf data;
105+
private final ByteBuf metadata;
88106
private final int flags;
89107

90-
public ConnectionSetupPayloadImpl(
108+
public DefaultConnectionSetupPayload(
91109
String metadataMimeType,
92110
String dataMimeType,
93-
ByteBuffer data,
94-
ByteBuffer metadata,
111+
ByteBuf data,
112+
ByteBuf metadata,
95113
int flags) {
96114
this.metadataMimeType = metadataMimeType;
97115
this.dataMimeType = dataMimeType;
98116
this.data = data;
99117
this.metadata = metadata;
100118
this.flags = flags;
101119

102-
if (!hasMetadata() && metadata.remaining() > 0) {
120+
if (!hasMetadata() && metadata.readableBytes() > 0) {
103121
throw new IllegalArgumentException("metadata flag incorrect");
104122
}
105123
}
@@ -115,18 +133,38 @@ public String dataMimeType() {
115133
}
116134

117135
@Override
118-
public ByteBuffer getData() {
136+
public ByteBuf sliceData() {
119137
return data;
120138
}
121139

122140
@Override
123-
public ByteBuffer getMetadata() {
141+
public ByteBuf sliceMetadata() {
124142
return metadata;
125143
}
126144

127145
@Override
128146
public int getFlags() {
129147
return flags;
130148
}
149+
150+
@Override
151+
public ConnectionSetupPayload touch() {
152+
data.touch();
153+
metadata.touch();
154+
return this;
155+
}
156+
157+
@Override
158+
public ConnectionSetupPayload touch(Object hint) {
159+
data.touch(hint);
160+
metadata.touch(hint);
161+
return this;
162+
}
163+
164+
@Override
165+
protected void deallocate() {
166+
data.release();
167+
metadata.release();
168+
}
131169
}
132170
}

rsocket-core/src/main/java/io/rsocket/Frame.java

+19-51
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import io.rsocket.frame.RequestNFrameFlyweight;
3131
import io.rsocket.frame.SetupFrameFlyweight;
3232
import io.rsocket.frame.VersionFlyweight;
33-
import java.nio.ByteBuffer;
3433
import java.nio.charset.StandardCharsets;
3534
import javax.annotation.Nullable;
3635
import org.slf4j.Logger;
@@ -41,9 +40,7 @@
4140
*
4241
* <p>This provides encoding, decoding and field accessors.
4342
*/
44-
public class Frame implements ByteBufHolder {
45-
public static final ByteBuffer NULL_BYTEBUFFER = ByteBuffer.allocateDirect(0);
46-
43+
public class Frame implements Payload, ByteBufHolder {
4744
private static final Recycler<Frame> RECYCLER =
4845
new Recycler<Frame>() {
4946
protected Frame newObject(Handle<Frame> handle) {
@@ -52,7 +49,7 @@ protected Frame newObject(Handle<Frame> handle) {
5249
};
5350

5451
private final Handle<Frame> handle;
55-
private @Nullable ByteBuf content;
52+
private ByteBuf content;
5653

5754
private Frame(final Handle<Frame> handle) {
5855
this.handle = handle;
@@ -183,43 +180,25 @@ public boolean release(int decrement) {
183180
}
184181

185182
/**
186-
* Return {@link ByteBuffer} that is a {@link ByteBuffer#slice()} for the frame metadata
183+
* Return {@link ByteBuf} that is a {@link ByteBuf#slice()} for the frame metadata
187184
*
188-
* <p>If no metadata is present, the ByteBuffer will have 0 capacity.
185+
* <p>If no metadata is present, the ByteBuf will have 0 capacity.
189186
*
190-
* @return ByteBuffer containing the content
187+
* @return ByteBuf containing the content
191188
*/
192-
public ByteBuffer getMetadata() {
193-
final ByteBuf metadata = FrameHeaderFlyweight.sliceFrameMetadata(content);
194-
if (metadata == null) {
195-
return NULL_BYTEBUFFER;
196-
} else if (metadata.readableBytes() > 0) {
197-
final ByteBuffer buffer = ByteBuffer.allocateDirect(metadata.readableBytes());
198-
metadata.readBytes(buffer);
199-
buffer.flip();
200-
return buffer;
201-
} else {
202-
return NULL_BYTEBUFFER;
203-
}
189+
public ByteBuf sliceMetadata() {
190+
return hasMetadata() ? FrameHeaderFlyweight.sliceFrameMetadata(content) : Unpooled.EMPTY_BUFFER;
204191
}
205192

206193
/**
207-
* Return {@link ByteBuffer} that is a {@link ByteBuffer#slice()} for the frame data
194+
* Return {@link ByteBuf} that is a {@link ByteBuf#slice()} for the frame data
208195
*
209-
* <p>If no data is present, the ByteBuffer will have 0 capacity.
196+
* <p>If no data is present, the ByteBuf will have 0 capacity.
210197
*
211-
* @return ByteBuffer containing the data
198+
* @return ByteBuf containing the data
212199
*/
213-
public ByteBuffer getData() {
214-
final ByteBuf data = FrameHeaderFlyweight.sliceFrameData(content);
215-
if (data.readableBytes() > 0) {
216-
final ByteBuffer buffer = ByteBuffer.allocateDirect(data.readableBytes());
217-
data.readBytes(buffer);
218-
buffer.flip();
219-
return buffer;
220-
} else {
221-
return NULL_BYTEBUFFER;
222-
}
200+
public ByteBuf sliceData() {
201+
return FrameHeaderFlyweight.sliceFrameData(content);
223202
}
224203

225204
/**
@@ -270,14 +249,11 @@ public static int setFlag(int current, int toSet) {
270249
return current | toSet;
271250
}
272251

252+
@Override
273253
public boolean hasMetadata() {
274254
return Frame.isFlagSet(this.flags(), FLAGS_M);
275255
}
276256

277-
public String getDataUtf8() {
278-
return StandardCharsets.UTF_8.decode(getData()).toString();
279-
}
280-
281257
/* TODO:
282258
*
283259
* fromRequest(type, id, payload)
@@ -297,14 +273,8 @@ public static Frame from(
297273
String metadataMimeType,
298274
String dataMimeType,
299275
Payload payload) {
300-
final ByteBuf metadata =
301-
payload.hasMetadata()
302-
? Unpooled.wrappedBuffer(payload.getMetadata())
303-
: Unpooled.EMPTY_BUFFER;
304-
final ByteBuf data =
305-
payload.getData() != null
306-
? Unpooled.wrappedBuffer(payload.getData())
307-
: Unpooled.EMPTY_BUFFER;
276+
final ByteBuf metadata = payload.hasMetadata() ? payload.sliceMetadata() : Unpooled.EMPTY_BUFFER;
277+
final ByteBuf data = payload.sliceData();
308278

309279
final Frame frame = RECYCLER.get();
310280
frame.content =
@@ -460,9 +430,8 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
460430
if (initialRequestN < 1) {
461431
throw new IllegalStateException("initial request n must be greater than 0");
462432
}
463-
final @Nullable ByteBuf metadata =
464-
payload.hasMetadata() ? Unpooled.wrappedBuffer(payload.getMetadata()) : null;
465-
final ByteBuf data = Unpooled.wrappedBuffer(payload.getData());
433+
final @Nullable ByteBuf metadata = payload.hasMetadata() ? payload.sliceMetadata() : null;
434+
final ByteBuf data = payload.sliceData();
466435

467436
final Frame frame = RECYCLER.get();
468437
frame.content =
@@ -561,9 +530,8 @@ public static Frame from(int streamId, FrameType type, Payload payload) {
561530
}
562531

563532
public static Frame from(int streamId, FrameType type, Payload payload, int flags) {
564-
final ByteBuf metadata =
565-
payload.hasMetadata() ? Unpooled.wrappedBuffer(payload.getMetadata()) : null;
566-
final ByteBuf data = Unpooled.wrappedBuffer(payload.getData());
533+
final ByteBuf metadata = payload.hasMetadata() ? payload.sliceMetadata() : null;
534+
final ByteBuf data = payload.sliceData();
567535
return from(streamId, type, metadata, data, flags);
568536
}
569537

0 commit comments

Comments
 (0)