Skip to content

Commit 6aa3d8a

Browse files
committed
reworks TransportTest abstraction
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 6047692 commit 6aa3d8a

16 files changed

+593
-557
lines changed

Diff for: rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

+266-229
Large diffs are not rendered by default.

Diff for: rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronTransportTest.java

+6-13
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.time.Duration;
2525
import java.util.concurrent.ThreadLocalRandom;
2626

27-
final class AeronTransportTest implements TransportTest {
27+
final class AeronTransportTest extends TransportTest<InetSocketAddress, AeronServer> {
2828

2929
static final MediaDriver mediaDriver =
3030
MediaDriver.launch(
@@ -34,17 +34,9 @@ final class AeronTransportTest implements TransportTest {
3434
static final Aeron serverAeron = Aeron.connect();
3535
static final EventLoopGroup eventLoopGroup = EventLoopGroup.create(4);
3636

37-
final AeronTransportPair transportPair =
38-
new AeronTransportPair(mediaDriver, clientAeron, serverAeron);
39-
4037
@Override
41-
public Duration getTimeout() {
42-
return Duration.ofMinutes(2);
43-
}
44-
45-
@Override
46-
public TransportPair getTransportPair() {
47-
return transportPair;
38+
protected TransportPair<InetSocketAddress, AeronServer> createTransportPair() {
39+
return new AeronTransportPair(mediaDriver, clientAeron, serverAeron);
4840
}
4941

5042
static class AeronTransportPair extends TransportPair<InetSocketAddress, AeronServer> {
@@ -57,13 +49,14 @@ public AeronTransportPair(MediaDriver driver, Aeron clientAeron, Aeron serverAer
5749
super(
5850
() ->
5951
InetSocketAddress.createUnresolved(
60-
"127.0.0.1", ThreadLocalRandom.current().nextInt(20000) + 5000),
52+
"0.0.0.0", ThreadLocalRandom.current().nextInt(20000) + 5000),
6153
(address, server, allocator) ->
6254
AeronClientTransport.createIpc(clientAeron, eventLoopGroup),
6355
(address, allocator) -> AeronServerTransport.createIpc(serverAeron, eventLoopGroup),
6456
false,
6557
false,
66-
false);
58+
false,
59+
Duration.ofMinutes(2));
6760
this.mediaDriver = driver;
6861
this.clientAeron = clientAeron;
6962
this.serverAeron = serverAeron;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2015-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.transport.aeron;
18+
19+
import io.aeron.Aeron;
20+
import io.aeron.driver.MediaDriver;
21+
import io.aeron.driver.ThreadingMode;
22+
import io.rsocket.test.TransportTest;
23+
import java.net.InetSocketAddress;
24+
import java.time.Duration;
25+
import java.util.concurrent.ThreadLocalRandom;
26+
import org.junit.jupiter.api.Disabled;
27+
28+
@Disabled
29+
final class AeronWithFragmentationTransportTest
30+
extends TransportTest<InetSocketAddress, AeronServer> {
31+
32+
static final MediaDriver mediaDriver =
33+
MediaDriver.launch(
34+
new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true));
35+
36+
static final Aeron clientAeron = Aeron.connect();
37+
static final Aeron serverAeron = Aeron.connect();
38+
static final EventLoopGroup eventLoopGroup = EventLoopGroup.create(4);
39+
40+
@Override
41+
protected TransportPair<InetSocketAddress, AeronServer> createTransportPair() {
42+
return new AeronTransportPair(mediaDriver, clientAeron, serverAeron);
43+
}
44+
45+
static class AeronTransportPair extends TransportPair<InetSocketAddress, AeronServer> {
46+
47+
final MediaDriver mediaDriver;
48+
final Aeron clientAeron;
49+
final Aeron serverAeron;
50+
51+
public AeronTransportPair(MediaDriver driver, Aeron clientAeron, Aeron serverAeron) {
52+
super(
53+
() ->
54+
InetSocketAddress.createUnresolved(
55+
"0.0.0.0", ThreadLocalRandom.current().nextInt(20000) + 5000),
56+
(address, server, allocator) ->
57+
AeronClientTransport.createIpc(clientAeron, eventLoopGroup),
58+
(address, allocator) -> AeronServerTransport.createIpc(serverAeron, eventLoopGroup),
59+
true,
60+
false,
61+
false,
62+
Duration.ofMinutes(2));
63+
this.mediaDriver = driver;
64+
this.clientAeron = clientAeron;
65+
this.serverAeron = serverAeron;
66+
}
67+
68+
@Override
69+
public void dispose() {
70+
super.dispose();
71+
// CloseHelper.quietCloseAll(clientAeron, serverAeron);
72+
}
73+
}
74+
}

Diff for: rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java

+10-16
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,21 @@
1616

1717
package io.rsocket.transport.local;
1818

19+
import io.rsocket.Closeable;
1920
import io.rsocket.test.TransportTest;
2021
import java.time.Duration;
2122
import java.util.UUID;
2223

23-
final class LocalResumableTransportTest implements TransportTest {
24-
25-
private final TransportPair transportPair =
26-
new TransportPair<>(
27-
() -> "test-" + UUID.randomUUID(),
28-
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
29-
(address, allocator) -> LocalServerTransport.create(address),
30-
false,
31-
true);
32-
33-
@Override
34-
public Duration getTimeout() {
35-
return Duration.ofSeconds(10);
36-
}
24+
final class LocalResumableTransportTest extends TransportTest<String, Closeable> {
3725

3826
@Override
39-
public TransportPair getTransportPair() {
40-
return transportPair;
27+
protected TransportPair createTransportPair() {
28+
return new TransportPair<>(
29+
() -> "test-" + UUID.randomUUID(),
30+
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
31+
(address, allocator) -> LocalServerTransport.create(address),
32+
false,
33+
true,
34+
Duration.ofSeconds(10));
4135
}
4236
}

Diff for: rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java

+10-16
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,21 @@
1616

1717
package io.rsocket.transport.local;
1818

19+
import io.rsocket.Closeable;
1920
import io.rsocket.test.TransportTest;
2021
import java.time.Duration;
2122
import java.util.UUID;
2223

23-
final class LocalResumableWithFragmentationTransportTest implements TransportTest {
24-
25-
private final TransportPair transportPair =
26-
new TransportPair<>(
27-
() -> "test-" + UUID.randomUUID(),
28-
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
29-
(address, allocator) -> LocalServerTransport.create(address),
30-
true,
31-
true);
32-
33-
@Override
34-
public Duration getTimeout() {
35-
return Duration.ofSeconds(10);
36-
}
24+
final class LocalResumableWithFragmentationTransportTest extends TransportTest<String, Closeable> {
3725

3826
@Override
39-
public TransportPair getTransportPair() {
40-
return transportPair;
27+
protected TransportPair<String, Closeable> createTransportPair() {
28+
return new TransportPair<>(
29+
() -> "test-" + UUID.randomUUID(),
30+
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
31+
(address, allocator) -> LocalServerTransport.create(address),
32+
true,
33+
true,
34+
Duration.ofSeconds(10));
4135
}
4236
}

Diff for: rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java

+8-14
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,19 @@
1616

1717
package io.rsocket.transport.local;
1818

19+
import io.rsocket.Closeable;
1920
import io.rsocket.test.TransportTest;
2021
import java.time.Duration;
2122
import java.util.UUID;
2223

23-
final class LocalTransportTest implements TransportTest {
24-
25-
private final TransportPair transportPair =
26-
new TransportPair<>(
27-
() -> "test-" + UUID.randomUUID(),
28-
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
29-
(address, allocator) -> LocalServerTransport.create(address));
30-
31-
@Override
32-
public Duration getTimeout() {
33-
return Duration.ofSeconds(10);
34-
}
24+
final class LocalTransportTest extends TransportTest<String, Closeable> {
3525

3626
@Override
37-
public TransportPair getTransportPair() {
38-
return transportPair;
27+
protected TransportPair<String, Closeable> createTransportPair() {
28+
return new TransportPair<>(
29+
() -> "test-" + UUID.randomUUID(),
30+
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
31+
(address, allocator) -> LocalServerTransport.create(address),
32+
Duration.ofSeconds(10));
3933
}
4034
}

Diff for: rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportWithFragmentationTest.java

+9-15
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,20 @@
1616

1717
package io.rsocket.transport.local;
1818

19+
import io.rsocket.Closeable;
1920
import io.rsocket.test.TransportTest;
2021
import java.time.Duration;
2122
import java.util.UUID;
2223

23-
final class LocalTransportWithFragmentationTest implements TransportTest {
24-
25-
private final TransportPair transportPair =
26-
new TransportPair<>(
27-
() -> "test-" + UUID.randomUUID(),
28-
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
29-
(address, allocator) -> LocalServerTransport.create(address),
30-
true);
31-
32-
@Override
33-
public Duration getTimeout() {
34-
return Duration.ofSeconds(10);
35-
}
24+
final class LocalTransportWithFragmentationTest extends TransportTest<String, Closeable> {
3625

3726
@Override
38-
public TransportPair getTransportPair() {
39-
return transportPair;
27+
protected TransportPair<String, Closeable> createTransportPair() {
28+
return new TransportPair<>(
29+
() -> "test-" + UUID.randomUUID(),
30+
(address, server, allocator) -> LocalClientTransport.create(address, allocator),
31+
(address, allocator) -> LocalServerTransport.create(address),
32+
true,
33+
Duration.ofSeconds(10));
4034
}
4135
}

Diff for: rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpFragmentationTransportTest.java

+18-23
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,31 @@
1919
import io.netty.channel.ChannelOption;
2020
import io.rsocket.test.TransportTest;
2121
import io.rsocket.transport.netty.client.TcpClientTransport;
22+
import io.rsocket.transport.netty.server.CloseableChannel;
2223
import io.rsocket.transport.netty.server.TcpServerTransport;
2324
import java.net.InetSocketAddress;
2425
import java.time.Duration;
2526
import reactor.netty.tcp.TcpClient;
2627
import reactor.netty.tcp.TcpServer;
2728

28-
final class TcpFragmentationTransportTest implements TransportTest {
29-
30-
private final TransportPair transportPair =
31-
new TransportPair<>(
32-
() -> InetSocketAddress.createUnresolved("localhost", 0),
33-
(address, server, allocator) ->
34-
TcpClientTransport.create(
35-
TcpClient.create()
36-
.remoteAddress(server::address)
37-
.option(ChannelOption.ALLOCATOR, allocator)),
38-
(address, allocator) ->
39-
TcpServerTransport.create(
40-
TcpServer.create()
41-
.bindAddress(() -> address)
42-
.option(ChannelOption.ALLOCATOR, allocator)),
43-
true);
44-
45-
@Override
46-
public Duration getTimeout() {
47-
return Duration.ofMinutes(2);
48-
}
29+
final class TcpFragmentationTransportTest
30+
extends TransportTest<InetSocketAddress, CloseableChannel> {
4931

5032
@Override
51-
public TransportPair getTransportPair() {
52-
return transportPair;
33+
protected TransportPair<InetSocketAddress, CloseableChannel> createTransportPair() {
34+
return new TransportPair<>(
35+
() -> InetSocketAddress.createUnresolved("localhost", 0),
36+
(address, server, allocator) ->
37+
TcpClientTransport.create(
38+
TcpClient.create()
39+
.remoteAddress(server::address)
40+
.option(ChannelOption.ALLOCATOR, allocator)),
41+
(address, allocator) ->
42+
TcpServerTransport.create(
43+
TcpServer.create()
44+
.bindAddress(() -> address)
45+
.option(ChannelOption.ALLOCATOR, allocator)),
46+
true,
47+
Duration.ofMinutes(2));
5348
}
5449
}

Diff for: rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableTransportTest.java

+18-24
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,31 @@
1919
import io.netty.channel.ChannelOption;
2020
import io.rsocket.test.TransportTest;
2121
import io.rsocket.transport.netty.client.TcpClientTransport;
22+
import io.rsocket.transport.netty.server.CloseableChannel;
2223
import io.rsocket.transport.netty.server.TcpServerTransport;
2324
import java.net.InetSocketAddress;
2425
import java.time.Duration;
2526
import reactor.netty.tcp.TcpClient;
2627
import reactor.netty.tcp.TcpServer;
2728

28-
final class TcpResumableTransportTest implements TransportTest {
29-
30-
private final TransportPair transportPair =
31-
new TransportPair<>(
32-
() -> InetSocketAddress.createUnresolved("localhost", 0),
33-
(address, server, allocator) ->
34-
TcpClientTransport.create(
35-
TcpClient.create()
36-
.remoteAddress(server::address)
37-
.option(ChannelOption.ALLOCATOR, allocator)),
38-
(address, allocator) ->
39-
TcpServerTransport.create(
40-
TcpServer.create()
41-
.bindAddress(() -> address)
42-
.option(ChannelOption.ALLOCATOR, allocator)),
43-
false,
44-
true);
45-
46-
@Override
47-
public Duration getTimeout() {
48-
return Duration.ofMinutes(3);
49-
}
29+
final class TcpResumableTransportTest extends TransportTest<InetSocketAddress, CloseableChannel> {
5030

5131
@Override
52-
public TransportPair getTransportPair() {
53-
return transportPair;
32+
protected TransportPair<InetSocketAddress, CloseableChannel> createTransportPair() {
33+
return new TransportPair<>(
34+
() -> InetSocketAddress.createUnresolved("localhost", 0),
35+
(address, server, allocator) ->
36+
TcpClientTransport.create(
37+
TcpClient.create()
38+
.remoteAddress(server::address)
39+
.option(ChannelOption.ALLOCATOR, allocator)),
40+
(address, allocator) ->
41+
TcpServerTransport.create(
42+
TcpServer.create()
43+
.bindAddress(() -> address)
44+
.option(ChannelOption.ALLOCATOR, allocator)),
45+
false,
46+
true,
47+
Duration.ofMinutes(2));
5448
}
5549
}

0 commit comments

Comments
 (0)