Skip to content

Commit 04b82b3

Browse files
committed
Merge branch 'release/0.12.1-RC2'
2 parents e75dda6 + aadcece commit 04b82b3

File tree

5 files changed

+205
-7
lines changed

5 files changed

+205
-7
lines changed

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212
# limitations under the License.
1313
#
1414

15-
version=0.12.1-RC1
15+
version=0.12.1-RC2

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf h
267267

268268
ByteBuf data = assembleData(frame, streamId);
269269

270-
return FragmentationFlyweight.encode(allocator, header, metadata, data);
270+
return FragmentationFlyweight.encode(allocator, header, metadata.retain(), data);
271271
}
272272

273273
private ByteBuf assembleData(ByteBuf frame, int streamId) {

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
*/
4343
public final class WebsocketClientTransport implements ClientTransport, TransportHeaderAware {
4444

45+
private static final String DEFAULT_PATH = "/";
46+
4547
private final HttpClient client;
4648

4749
private String path;
@@ -117,7 +119,7 @@ public static WebsocketClientTransport create(URI uri) {
117119
public static WebsocketClientTransport create(TcpClient client) {
118120
Objects.requireNonNull(client, "client must not be null");
119121

120-
return create(HttpClient.from(client), "/");
122+
return create(HttpClient.from(client), DEFAULT_PATH);
121123
}
122124

123125
/**
@@ -132,6 +134,8 @@ public static WebsocketClientTransport create(HttpClient client, String path) {
132134
Objects.requireNonNull(client, "client must not be null");
133135
Objects.requireNonNull(path, "path must not be null");
134136

137+
path = path.startsWith(DEFAULT_PATH) ? path : (DEFAULT_PATH + path);
138+
135139
return new WebsocketClientTransport(client, path);
136140
}
137141

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.integration;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import io.rsocket.AbstractRSocket;
22+
import io.rsocket.Payload;
23+
import io.rsocket.RSocket;
24+
import io.rsocket.RSocketFactory;
25+
import io.rsocket.transport.netty.client.TcpClientTransport;
26+
import io.rsocket.transport.netty.server.CloseableChannel;
27+
import io.rsocket.transport.netty.server.TcpServerTransport;
28+
import io.rsocket.util.DefaultPayload;
29+
import io.rsocket.util.RSocketProxy;
30+
import java.util.concurrent.ThreadLocalRandom;
31+
import org.junit.jupiter.api.AfterEach;
32+
import org.junit.jupiter.api.BeforeEach;
33+
import org.junit.jupiter.api.Test;
34+
import reactor.core.publisher.Flux;
35+
import reactor.core.publisher.Mono;
36+
37+
public class FragmentTest {
38+
private static final int frameSize = 128;
39+
private AbstractRSocket handler;
40+
private CloseableChannel server;
41+
private String message = null;
42+
private String metaData = null;
43+
44+
@BeforeEach
45+
public void startup() {
46+
int randomPort = ThreadLocalRandom.current().nextInt(10_000, 20_000);
47+
StringBuilder message = new StringBuilder();
48+
StringBuilder metaData = new StringBuilder();
49+
for (int i = 0; i < 100; i++) {
50+
message.append("RESPONSE ");
51+
metaData.append("METADATA ");
52+
}
53+
this.message = message.toString();
54+
this.metaData = metaData.toString();
55+
TcpServerTransport serverTransport = TcpServerTransport.create(randomPort);
56+
server =
57+
RSocketFactory.receive()
58+
.fragment(frameSize)
59+
.acceptor((setup, sendingSocket) -> Mono.just(new RSocketProxy(handler)))
60+
.transport(serverTransport)
61+
.start()
62+
.block();
63+
}
64+
65+
private RSocket buildClient() {
66+
return RSocketFactory.connect()
67+
.fragment(frameSize)
68+
.transport(TcpClientTransport.create(server.address()))
69+
.start()
70+
.block();
71+
}
72+
73+
@AfterEach
74+
public void cleanup() {
75+
server.dispose();
76+
}
77+
78+
@Test
79+
void testFragmentNoMetaData() {
80+
System.out.println(
81+
"-------------------------------------------------testFragmentNoMetaData-------------------------------------------------");
82+
handler =
83+
new AbstractRSocket() {
84+
@Override
85+
public Flux<Payload> requestStream(Payload payload) {
86+
String request = payload.getDataUtf8();
87+
String metaData = payload.getMetadataUtf8();
88+
System.out.println("request message: " + request);
89+
System.out.println("request metadata: " + metaData);
90+
91+
return Flux.just(DefaultPayload.create(request));
92+
}
93+
};
94+
95+
RSocket client = buildClient();
96+
97+
System.out.println("original message: " + message);
98+
System.out.println("original metadata: " + metaData);
99+
Payload payload = client.requestStream(DefaultPayload.create(message)).blockLast();
100+
System.out.println("response message: " + payload.getDataUtf8());
101+
System.out.println("response metadata: " + payload.getMetadataUtf8());
102+
103+
assertThat(message).isEqualTo(payload.getDataUtf8());
104+
}
105+
106+
@Test
107+
void testFragmentRequestMetaDataOnly() {
108+
System.out.println(
109+
"-------------------------------------------------testFragmentRequestMetaDataOnly-------------------------------------------------");
110+
handler =
111+
new AbstractRSocket() {
112+
@Override
113+
public Flux<Payload> requestStream(Payload payload) {
114+
String request = payload.getDataUtf8();
115+
String metaData = payload.getMetadataUtf8();
116+
System.out.println("request message: " + request);
117+
System.out.println("request metadata: " + metaData);
118+
119+
return Flux.just(DefaultPayload.create(request));
120+
}
121+
};
122+
123+
RSocket client = buildClient();
124+
125+
System.out.println("original message: " + message);
126+
System.out.println("original metadata: " + metaData);
127+
Payload payload = client.requestStream(DefaultPayload.create(message, metaData)).blockLast();
128+
System.out.println("response message: " + payload.getDataUtf8());
129+
System.out.println("response metadata: " + payload.getMetadataUtf8());
130+
131+
assertThat(message).isEqualTo(payload.getDataUtf8());
132+
}
133+
134+
@Test
135+
void testFragmentBothMetaData() {
136+
System.out.println(
137+
"-------------------------------------------------testFragmentBothMetaData-------------------------------------------------");
138+
handler =
139+
new AbstractRSocket() {
140+
@Override
141+
public Flux<Payload> requestStream(Payload payload) {
142+
String request = payload.getDataUtf8();
143+
String metaData = payload.getMetadataUtf8();
144+
System.out.println("request message: " + request);
145+
System.out.println("request metadata: " + metaData);
146+
147+
return Flux.just(DefaultPayload.create(request, metaData));
148+
}
149+
150+
@Override
151+
public Mono<Payload> requestResponse(Payload payload) {
152+
String request = payload.getDataUtf8();
153+
String metaData = payload.getMetadataUtf8();
154+
System.out.println("request message: " + request);
155+
System.out.println("request metadata: " + metaData);
156+
157+
return Mono.just(DefaultPayload.create(request, metaData));
158+
}
159+
};
160+
161+
RSocket client = buildClient();
162+
163+
System.out.println("original message: " + message);
164+
System.out.println("original metadata: " + metaData);
165+
Payload payload = client.requestStream(DefaultPayload.create(message, metaData)).blockLast();
166+
System.out.println("response message: " + payload.getDataUtf8());
167+
System.out.println("response metadata: " + payload.getMetadataUtf8());
168+
169+
assertThat(message).isEqualTo(payload.getDataUtf8());
170+
}
171+
}

rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/client/WebsocketClientTransportTest.java

+27-4
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,25 @@ void connectNoServer() {
5555
@DisplayName("creates client with BindAddress")
5656
@Test
5757
void createBindAddress() {
58-
assertThat(WebsocketClientTransport.create("test-bind-address", 8000)).isNotNull();
58+
assertThat(WebsocketClientTransport.create("test-bind-address", 8000))
59+
.isNotNull()
60+
.hasFieldOrPropertyWithValue("path", "/");
5961
}
6062

6163
@DisplayName("creates client with HttpClient")
6264
@Test
6365
void createHttpClient() {
64-
assertThat(WebsocketClientTransport.create(HttpClient.create(), "/")).isNotNull();
66+
assertThat(WebsocketClientTransport.create(HttpClient.create(), "/"))
67+
.isNotNull()
68+
.hasFieldOrPropertyWithValue("path", "/");
69+
}
70+
71+
@DisplayName("creates client with HttpClient and path without root")
72+
@Test
73+
void createHttpClientWithPathWithoutRoot() {
74+
assertThat(WebsocketClientTransport.create(HttpClient.create(), "test"))
75+
.isNotNull()
76+
.hasFieldOrPropertyWithValue("path", "/test");
6577
}
6678

6779
@DisplayName("creates client with InetSocketAddress")
@@ -70,7 +82,8 @@ void createInetSocketAddress() {
7082
assertThat(
7183
WebsocketClientTransport.create(
7284
InetSocketAddress.createUnresolved("test-bind-address", 8000)))
73-
.isNotNull();
85+
.isNotNull()
86+
.hasFieldOrPropertyWithValue("path", "/");
7487
}
7588

7689
@DisplayName("create throws NullPointerException with null bindAddress")
@@ -122,7 +135,17 @@ void createPort() {
122135
@DisplayName("creates client with URI")
123136
@Test
124137
void createUri() {
125-
assertThat(WebsocketClientTransport.create(URI.create("ws://test-host/"))).isNotNull();
138+
assertThat(WebsocketClientTransport.create(URI.create("ws://test-host")))
139+
.isNotNull()
140+
.hasFieldOrPropertyWithValue("path", "/");
141+
}
142+
143+
@DisplayName("creates client with URI path")
144+
@Test
145+
void createUriPath() {
146+
assertThat(WebsocketClientTransport.create(URI.create("ws://test-host/test")))
147+
.isNotNull()
148+
.hasFieldOrPropertyWithValue("path", "/test");
126149
}
127150

128151
@DisplayName("sets transport headers")

0 commit comments

Comments
 (0)