diff --git a/build.gradle b/build.gradle index e8dbaedd0..c360e0f84 100644 --- a/build.gradle +++ b/build.gradle @@ -45,6 +45,7 @@ subprojects { ext['assertj.version'] = '3.19.0' ext['netflix.limits.version'] = '0.3.6' ext['bouncycastle-bcpkix.version'] = '1.68' + ext['aeron.version'] = '1.31.1' group = "io.rsocket" @@ -87,6 +88,10 @@ subprojects { entry 'jmh-core' entry 'jmh-generator-annprocess' } + dependencySet(group: 'io.aeron', version: ext['aeron.version']) { + entry 'aeron-client' + entry 'aeron-driver' + } } generatedPomCustomization { enabled = false diff --git a/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java index 98bed7ba7..da2c12d06 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java @@ -27,7 +27,7 @@ public abstract class BaseDuplexConnection implements DuplexConnection { protected UnboundedProcessor sender = new UnboundedProcessor(); public BaseDuplexConnection() { - onClose().doFinally(s -> doOnClose()).subscribe(); + onClose().subscribe(null, t -> doOnClose(), this::doOnClose); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java index b92c25f46..ecb49a84f 100644 --- a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java +++ b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java @@ -27,7 +27,9 @@ public KeepAliveFramesAcceptor start( KeepAliveSupport keepAliveSupport, Consumer onSendKeepAliveFrame, Consumer onTimeout) { - duplexConnection.onClose().doFinally(s -> keepAliveSupport.stop()).subscribe(); + duplexConnection + .onClose() + .subscribe(null, __ -> keepAliveSupport.stop(), keepAliveSupport::stop); return keepAliveSupport .onSendKeepAliveFrame(onSendKeepAliveFrame) .onTimeout(onTimeout) diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 5384c7e8d..edf1b4edf 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -39,6 +39,7 @@ import java.io.InputStreamReader; import java.net.SocketAddress; import java.time.Duration; +import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; @@ -51,6 +52,7 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; @@ -68,14 +70,14 @@ import reactor.util.Logger; import reactor.util.Loggers; -public interface TransportTest { +public abstract class TransportTest { - Logger logger = Loggers.getLogger(TransportTest.class); + static final Logger logger = Loggers.getLogger(TransportTest.class); - String MOCK_DATA = "test-data"; - String MOCK_METADATA = "metadata"; - String LARGE_DATA = read("words.shakespeare.txt.gz"); - Payload LARGE_PAYLOAD = ByteBufPayload.create(LARGE_DATA, LARGE_DATA); + static final String MOCK_DATA = "test-data"; + static final String MOCK_METADATA = "metadata"; + static final String LARGE_DATA = read("words.shakespeare.txt.gz"); + static final Payload LARGE_PAYLOAD = ByteBufPayload.create(LARGE_DATA, LARGE_DATA); static String read(String resourceName) { @@ -83,7 +85,10 @@ static String read(String resourceName) { new BufferedReader( new InputStreamReader( new GZIPInputStream( - TransportTest.class.getClassLoader().getResourceAsStream(resourceName))))) { + Objects.requireNonNull( + TransportTest.class + .getClassLoader() + .getResourceAsStream(resourceName)))))) { return br.lines().map(String::toLowerCase).collect(Collectors.joining("\n\r")); } catch (Throwable e) { @@ -91,21 +96,30 @@ static String read(String resourceName) { } } + TransportPair transportPair; + + protected abstract TransportPair createTransportPair(); + + @BeforeEach + void setup() { + transportPair = createTransportPair(); + } + @AfterEach - default void close() { - getTransportPair().responder.awaitAllInteractionTermination(getTimeout()); - getTransportPair().dispose(); - getTransportPair().awaitClosed(); - RuntimeException throwable = new RuntimeException(); + void close() { + transportPair.responder.awaitAllInteractionTermination(transportPair.timeout); + transportPair.dispose(); + transportPair.awaitClosed(); + RuntimeException throwable = new RuntimeException(); try { - getTransportPair().byteBufAllocator2.assertHasNoLeaks(); + transportPair.byteBufAllocator2.assertHasNoLeaks(); } catch (Throwable t) { throwable = Exceptions.addSuppressed(throwable, t); } try { - getTransportPair().byteBufAllocator1.assertHasNoLeaks(); + transportPair.byteBufAllocator1.assertHasNoLeaks(); } catch (Throwable t) { throwable = Exceptions.addSuppressed(throwable, t); } @@ -115,7 +129,7 @@ default void close() { } } - default Payload createTestPayload(int metadataPresent) { + static Payload createTestPayload(int metadataPresent) { String metadata1; switch (metadataPresent % 5) { @@ -136,66 +150,63 @@ default Payload createTestPayload(int metadataPresent) { @DisplayName("makes 10 fireAndForget requests") @Test - default void fireAndForget10() { + protected void fireAndForget10() { Flux.range(1, 10) - .flatMap(i -> getClient().fireAndForget(createTestPayload(i))) + .flatMap(i -> transportPair.client.fireAndForget(createTestPayload(i))) .as(StepVerifier::create) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); - getTransportPair().responder.awaitUntilObserved(10, getTimeout()); + Assertions.assertThat(transportPair.responder.awaitUntilObserved(10, transportPair.timeout)) + .isTrue(); } @DisplayName("makes 10 fireAndForget with Large Payload in Requests") @Test - default void largePayloadFireAndForget10() { + protected void largePayloadFireAndForget10() { Flux.range(1, 10) - .flatMap(i -> getClient().fireAndForget(LARGE_PAYLOAD.retain())) + .flatMap(i -> transportPair.client.fireAndForget(LARGE_PAYLOAD.retain())) .as(StepVerifier::create) .expectComplete() - .verify(getTimeout()); - - getTransportPair().responder.awaitUntilObserved(10, getTimeout()); - } + .verify(transportPair.timeout); - default RSocket getClient() { - return getTransportPair().getClient(); + Assertions.assertThat(transportPair.responder.awaitUntilObserved(10, transportPair.timeout)) + .isTrue(); } - Duration getTimeout(); - - TransportPair getTransportPair(); - @DisplayName("makes 10 metadataPush requests") @Test - default void metadataPush10() { - Assumptions.assumeThat(getTransportPair().withResumability).isFalse(); + protected void metadataPush10() { + Assumptions.assumeThat(transportPair.withResumability).isFalse(); Flux.range(1, 10) - .flatMap(i -> getClient().metadataPush(ByteBufPayload.create("", "test-metadata"))) + .flatMap(i -> transportPair.client.metadataPush(ByteBufPayload.create("", "test-metadata"))) .as(StepVerifier::create) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); - getTransportPair().responder.awaitUntilObserved(10, getTimeout()); + Assertions.assertThat(transportPair.responder.awaitUntilObserved(10, transportPair.timeout)) + .isTrue(); } @DisplayName("makes 10 metadataPush with Large Metadata in requests") @Test - default void largePayloadMetadataPush10() { - Assumptions.assumeThat(getTransportPair().withResumability).isFalse(); + protected void largePayloadMetadataPush10() { + Assumptions.assumeThat(transportPair.withResumability).isFalse(); Flux.range(1, 10) - .flatMap(i -> getClient().metadataPush(ByteBufPayload.create("", LARGE_DATA))) + .flatMap(i -> transportPair.client.metadataPush(ByteBufPayload.create("", LARGE_DATA))) .as(StepVerifier::create) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); - getTransportPair().responder.awaitUntilObserved(10, getTimeout()); + Assertions.assertThat(transportPair.responder.awaitUntilObserved(10, transportPair.timeout)) + .isTrue(); } @DisplayName("makes 1 requestChannel request with 0 payloads") @Test - default void requestChannel0() { - getClient() + protected void requestChannel0() { + transportPair + .client .requestChannel(Flux.empty()) .as(StepVerifier::create) .expectErrorSatisfies( @@ -203,101 +214,107 @@ default void requestChannel0() { Assertions.assertThat(t) .isInstanceOf(CancellationException.class) .hasMessage("Empty Source")) - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestChannel request with 1 payloads") @Test - default void requestChannel1() { - getClient() + protected void requestChannel1() { + transportPair + .client .requestChannel(Mono.just(createTestPayload(0))) .doOnNext(Payload::release) .as(StepVerifier::create) .expectNextCount(1) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestChannel request with 200,000 payloads") @Test - default void requestChannel200_000() { - Flux payloads = Flux.range(0, 200_000).map(this::createTestPayload); + protected void requestChannel200_000() { + Flux payloads = Flux.range(0, 200_000).map(TransportTest::createTestPayload); - getClient() + transportPair + .client .requestChannel(payloads) .doOnNext(Payload::release) .limitRate(8) .as(StepVerifier::create) .expectNextCount(200_000) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestChannel request with 50 large payloads") @Test - default void largePayloadRequestChannel50() { + protected void largePayloadRequestChannel50() { Flux payloads = Flux.range(0, 50).map(__ -> LARGE_PAYLOAD.retain()); - getClient() + transportPair + .client .requestChannel(payloads) .doOnNext(Payload::release) .as(StepVerifier::create) .expectNextCount(50) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestChannel request with 20,000 payloads") @Test - default void requestChannel20_000() { + protected void requestChannel20_000() { Flux payloads = Flux.range(0, 20_000).map(metadataPresent -> createTestPayload(7)); - getClient() + transportPair + .client .requestChannel(payloads) .doOnNext(this::assertChannelPayload) .doOnNext(Payload::release) .as(StepVerifier::create) .expectNextCount(20_000) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestChannel request with 2,000,000 payloads") @SlowTest - default void requestChannel2_000_000() { - Flux payloads = Flux.range(0, 2_000_000).map(this::createTestPayload); + protected void requestChannel2_000_000() { + Flux payloads = Flux.range(0, 2_000_000).map(TransportTest::createTestPayload); - getClient() + transportPair + .client .requestChannel(payloads) .doOnNext(Payload::release) .limitRate(8) .as(StepVerifier::create) .expectNextCount(2_000_000) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestChannel request with 3 payloads") @Test - default void requestChannel3() { + protected void requestChannel3() { AtomicLong requested = new AtomicLong(); Flux payloads = - Flux.range(0, 3).doOnRequest(requested::addAndGet).map(this::createTestPayload); + Flux.range(0, 3).doOnRequest(requested::addAndGet).map(TransportTest::createTestPayload); - getClient() + transportPair + .client .requestChannel(payloads) .doOnNext(Payload::release) .as(publisher -> StepVerifier.create(publisher, 3)) .expectNextCount(3) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); Assertions.assertThat(requested.get()).isEqualTo(3L); } @DisplayName("makes 1 requestChannel request with 256 payloads") @Test - default void requestChannel256() { + protected void requestChannel256() { AtomicInteger counter = new AtomicInteger(); Flux payloads = Flux.defer( @@ -314,8 +331,9 @@ default void requestChannel256() { .blockLast(); } - default void check(Flux payloads) { - getClient() + void check(Flux payloads) { + transportPair + .client .requestChannel(payloads) .doOnNext(ReferenceCounted::release) .limitRate(8) @@ -323,89 +341,107 @@ default void check(Flux payloads) { .expectNextCount(256) .as("expected 256 items") .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestResponse request") @Test - default void requestResponse1() { - getClient() + protected void requestResponse1() { + transportPair + .client .requestResponse(createTestPayload(1)) .doOnNext(this::assertPayload) .doOnNext(Payload::release) .as(StepVerifier::create) .expectNextCount(1) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 10 requestResponse requests") @Test - default void requestResponse10() { + protected void requestResponse10() { Flux.range(1, 10) .flatMap( i -> - getClient() + transportPair + .client .requestResponse(createTestPayload(i)) .doOnNext(v -> assertPayload(v)) .doOnNext(Payload::release)) .as(StepVerifier::create) .expectNextCount(10) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 100 requestResponse requests") @Test - default void requestResponse100() { + protected void requestResponse100() { Flux.range(1, 100) - .flatMap(i -> getClient().requestResponse(createTestPayload(i)).doOnNext(Payload::release)) + .flatMap( + i -> + transportPair + .client + .requestResponse(createTestPayload(i)) + .doOnNext(Payload::release)) .as(StepVerifier::create) .expectNextCount(100) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 50 requestResponse requests") @Test - default void largePayloadRequestResponse50() { + protected void largePayloadRequestResponse50() { Flux.range(1, 50) .flatMap( - i -> getClient().requestResponse(LARGE_PAYLOAD.retain()).doOnNext(Payload::release)) + i -> + transportPair + .client + .requestResponse(LARGE_PAYLOAD.retain()) + .doOnNext(Payload::release)) .as(StepVerifier::create) .expectNextCount(50) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 10,000 requestResponse requests") @Test - default void requestResponse10_000() { + protected void requestResponse10_000() { Flux.range(1, 10_000) - .flatMap(i -> getClient().requestResponse(createTestPayload(i)).doOnNext(Payload::release)) + .flatMap( + i -> + transportPair + .client + .requestResponse(createTestPayload(i)) + .doOnNext(Payload::release)) .as(StepVerifier::create) .expectNextCount(10_000) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestStream request and receives 10,000 responses") @Test - default void requestStream10_000() { - getClient() + protected void requestStream10_000() { + transportPair + .client .requestStream(createTestPayload(3)) .doOnNext(this::assertPayload) .doOnNext(Payload::release) .as(StepVerifier::create) .expectNextCount(10_000) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestStream request and receives 5 responses") @Test - default void requestStream5() { - getClient() + protected void requestStream5() { + transportPair + .client .requestStream(createTestPayload(3)) .doOnNext(this::assertPayload) .doOnNext(Payload::release) @@ -413,13 +449,14 @@ default void requestStream5() { .as(StepVerifier::create) .expectNextCount(5) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } @DisplayName("makes 1 requestStream request and consumes result incrementally") @Test - default void requestStreamDelayedRequestN() { - getClient() + protected void requestStreamDelayedRequestN() { + transportPair + .client .requestStream(createTestPayload(3)) .take(10) .doOnNext(Payload::release) @@ -429,29 +466,30 @@ default void requestStreamDelayedRequestN() { .thenRequest(5) .expectNextCount(5) .expectComplete() - .verify(getTimeout()); + .verify(transportPair.timeout); } - default void assertPayload(Payload p) { - TransportPair transportPair = getTransportPair(); + void assertPayload(Payload p) { + TransportPair transportPair = this.transportPair; if (!transportPair.expectedPayloadData().equals(p.getDataUtf8()) || !transportPair.expectedPayloadMetadata().equals(p.getMetadataUtf8())) { throw new IllegalStateException("Unexpected payload"); } } - default void assertChannelPayload(Payload p) { + void assertChannelPayload(Payload p) { if (!MOCK_DATA.equals(p.getDataUtf8()) || !MOCK_METADATA.equals(p.getMetadataUtf8())) { throw new IllegalStateException("Unexpected payload"); } } - class TransportPair implements Disposable { + public static class TransportPair implements Disposable { private static final String data = "hello world"; private static final String metadata = "metadata"; private final boolean withResumability; + private final boolean withAsyncSupport; private final LeaksTrackingByteBufAllocator byteBufAllocator1 = LeaksTrackingByteBufAllocator.instrument( @@ -466,24 +504,29 @@ class TransportPair implements Disposable { private final S server; + private final Duration timeout; + public TransportPair( Supplier addressSupplier, TriFunction clientTransportSupplier, - BiFunction> serverTransportSupplier) { - this(addressSupplier, clientTransportSupplier, serverTransportSupplier, false); + BiFunction> serverTransportSupplier, + Duration timeout) { + this(addressSupplier, clientTransportSupplier, serverTransportSupplier, false, timeout); } public TransportPair( Supplier addressSupplier, TriFunction clientTransportSupplier, BiFunction> serverTransportSupplier, - boolean withRandomFragmentation) { + boolean withRandomFragmentation, + Duration timeout) { this( addressSupplier, clientTransportSupplier, serverTransportSupplier, withRandomFragmentation, - false); + false, + timeout); } public TransportPair( @@ -491,13 +534,37 @@ public TransportPair( TriFunction clientTransportSupplier, BiFunction> serverTransportSupplier, boolean withRandomFragmentation, - boolean withResumability) { + boolean withResumability, + Duration timeout) { + this( + addressSupplier, + clientTransportSupplier, + serverTransportSupplier, + withRandomFragmentation, + withResumability, + true, + timeout); + } + + public TransportPair( + Supplier addressSupplier, + TriFunction clientTransportSupplier, + BiFunction> serverTransportSupplier, + boolean withRandomFragmentation, + boolean withResumability, + boolean withAsyncSupport, + Duration timeout) { + this.withResumability = withResumability; + this.withAsyncSupport = withAsyncSupport; + this.timeout = timeout; T address = addressSupplier.get(); - final boolean runClientWithAsyncInterceptors = ThreadLocalRandom.current().nextBoolean(); - final boolean runServerWithAsyncInterceptors = ThreadLocalRandom.current().nextBoolean(); + final boolean runClientWithAsyncInterceptors = + ThreadLocalRandom.current().nextBoolean() && withAsyncSupport; + final boolean runServerWithAsyncInterceptors = + ThreadLocalRandom.current().nextBoolean() && withAsyncSupport; ByteBufAllocator allocatorToSupply1; ByteBufAllocator allocatorToSupply2; @@ -561,7 +628,7 @@ public TransportPair( final RSocketConnector rSocketConnector = RSocketConnector.create() .payloadDecoder(PayloadDecoder.ZERO_COPY) - .keepAlive(Duration.ofMillis(10), Duration.ofHours(1)) + .keepAlive(Duration.ofMillis(10), timeout.dividedBy(10)) .interceptors( registry -> { if (runClientWithAsyncInterceptors && !withResumability) { @@ -616,10 +683,6 @@ public void dispose() { client.dispose(); } - RSocket getClient() { - return client; - } - public String expectedPayloadData() { return data; } diff --git a/rsocket-transport-aeron/build.gradle b/rsocket-transport-aeron/build.gradle new file mode 100644 index 000000000..0ca04c3a7 --- /dev/null +++ b/rsocket-transport-aeron/build.gradle @@ -0,0 +1,44 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id 'java-library' + id 'maven-publish' + id 'signing' + id "com.google.osdetector" version "1.4.0" +} + +dependencies { + api project(':rsocket-core') + api 'org.slf4j:slf4j-api' + api 'io.aeron:aeron-client' + + testImplementation 'io.aeron:aeron-driver' + testImplementation project(':rsocket-test') + testImplementation 'io.projectreactor:reactor-test' + testImplementation 'org.assertj:assertj-core' + testImplementation 'org.mockito:mockito-core' + testImplementation 'org.mockito:mockito-junit-jupiter' + testImplementation 'org.junit.jupiter:junit-jupiter-api' + testImplementation 'org.junit.jupiter:junit-jupiter-params' + testImplementation 'org.slf4j:slf4j-api' + + testImplementation 'ch.qos.logback:logback-classic' + testRuntimeOnly 'ch.qos.logback:logback-classic' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' +} + +description = 'Aeron RSocket transport implementations' diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronChannelAddress.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronChannelAddress.java new file mode 100644 index 000000000..2d98f15f9 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronChannelAddress.java @@ -0,0 +1,56 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.netty.util.internal.ObjectUtil; +import java.net.SocketAddress; + +public final class AeronChannelAddress extends SocketAddress { + private static final long serialVersionUID = -6934618000832236893L; + + final String channel; + + public AeronChannelAddress(String channel) { + this.channel = ObjectUtil.checkNotNull(channel, "channel"); + } + + /** The Aeron Subscription Channel. */ + public String channel() { + return channel; + } + + @Override + public String toString() { + return channel; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AeronChannelAddress)) { + return false; + } + + return ((AeronChannelAddress) o).channel.equals(channel); + } + + @Override + public int hashCode() { + return channel.hashCode(); + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronClientTransport.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronClientTransport.java new file mode 100644 index 000000000..75f5044fb --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronClientTransport.java @@ -0,0 +1,272 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.aeron.Aeron; +import io.aeron.AvailableImageHandler; +import io.aeron.ChannelUri; +import io.aeron.ChannelUriStringBuilder; +import io.aeron.CommonContext; +import io.aeron.ExclusivePublication; +import io.aeron.Image; +import io.aeron.Subscription; +import io.aeron.logbuffer.BufferClaim; +import io.aeron.logbuffer.FragmentHandler; +import io.netty.buffer.ByteBufAllocator; +import io.rsocket.DuplexConnection; +import io.rsocket.transport.ClientTransport; +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Supplier; +import org.agrona.CloseHelper; +import org.agrona.concurrent.BackoffIdleStrategy; +import org.agrona.concurrent.IdleStrategy; +import org.agrona.concurrent.NanoClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +public class AeronClientTransport implements ClientTransport { + static final Logger logger = LoggerFactory.getLogger(AeronClientTransport.class); + + static final ThreadLocal BUFFER_CLAIM = ThreadLocal.withInitial(BufferClaim::new); + + private final Aeron aeron; + private final String channel; + private final int prefetch; + private final int effort; + private final long timeoutNs; + private final Scheduler bossScheduler; + private final EventLoopGroup eventLoopGroup; + private final IdleStrategy idleStrategy; + private final ByteBufAllocator allocator; + + volatile int streamId; + static final AtomicIntegerFieldUpdater STREAM_ID = + AtomicIntegerFieldUpdater.newUpdater(AeronClientTransport.class, "streamId"); + + AeronClientTransport( + Aeron aeron, + String channel, + Scheduler bossScheduler, + EventLoopGroup eventLoopGroup, + IdleStrategy idleStrategy, + ByteBufAllocator allocator, + int prefetch, + int effort, + long timeoutNs) { + this.bossScheduler = bossScheduler; + this.eventLoopGroup = eventLoopGroup; + this.idleStrategy = idleStrategy; + this.allocator = allocator; + this.aeron = aeron; + this.channel = channel; + this.prefetch = prefetch; + this.effort = effort; + this.timeoutNs = timeoutNs; + + STREAM_ID.lazySet(this, 2); + } + + @Override + public Mono connect() { + return Mono.defer( + () -> { + final long connectionId = ThreadLocalRandom.current().nextInt(); + final int streamId = STREAM_ID.getAndAdd(this, 2); + + if (logger.isDebugEnabled()) { + logger.debug( + "Wiring new connection [{}] for stream [{}] on channel [{}]", + connectionId, + streamId, + channel); + } + + final ExclusivePublication serverManagementPublication = + aeron.addExclusivePublication(channel, Constants.SERVER_MANAGEMENT_STREAM_ID); + + final Subscription clientManagementSubscription = + aeron.addSubscription( + ChannelUri.addSessionId(channel, (int) connectionId), + Constants.CLIENT_MANAGEMENT_STREAM_ID); + + if (!PublicationUtils.tryOfferSetupFrame( + aeron.context(), + serverManagementPublication, + BUFFER_CLAIM.get(), + idleStrategy, + ChannelUri.addSessionId(channel, (int) connectionId), + connectionId, + streamId, + timeoutNs, + FrameType.SETUP)) { + + if (logger.isDebugEnabled()) { + logger.debug( + "Failed to send SetupFrame { connection: [{}]; stream: [{}]; channel: [{}] }", + connectionId, + streamId, + channel); + } + + CloseHelper.quietCloseAll( + serverManagementPublication, clientManagementSubscription); + + return Mono.error( + new TimeoutException( + String.format( + "Timeout on send SetupFrame { connection: [%s]; stream: [%s]; channel: [%s] }", + connectionId, streamId, channel))); + } + + CloseHelper.quietClose(serverManagementPublication); + + return Mono.create( + sink -> { + FragmentHandler fragmentHandler = + (buffer, offset, length, header) -> { + if (PayloadCodec.frameType(buffer, offset) != FrameType.SETUP_COMPLETE + || SetupCodec.connectionId(buffer, offset) != connectionId) { + sink.error(new IllegalStateException("Received unexpected frame")); + return; + } + + final int serverStreamId = SetupCodec.streamId(buffer, offset); + final String serverChannel = SetupCodec.channel(buffer, offset); + + final ExclusivePublication publication = + aeron.addExclusivePublication( + ChannelUri.addSessionId(serverChannel, (int) connectionId), + serverStreamId); + + final Subscription subscription = + aeron.addSubscription( + ChannelUri.addSessionId(channel, (int) connectionId), + streamId, + new AvailableImageHandler() { + boolean createdConnection = false; + + @Override + public void onAvailableImage(Image image) { + if (!createdConnection) { + createdConnection = true; + sink.success( + new AeronDuplexConnection( + serverStreamId, + publication, + image.subscription(), + eventLoopGroup, + allocator, + prefetch, + effort)); + } + } + }, + image -> {}); + sink.onCancel( + () -> CloseHelper.quietCloseAll(subscription, publication)); + }; + + final NanoClock nanoClock = aeron.context().nanoClock(); + final long nowNs = nanoClock.nanoTime(); + final long deadlineNs = nowNs + timeoutNs; + + idleStrategy.reset(); + for (; ; ) { + final int polled = clientManagementSubscription.poll(fragmentHandler, 1); + + if (polled < 1) { + idleStrategy.idle(); + + if (deadlineNs - nanoClock.nanoTime() < 0) { + sink.error( + new TimeoutException( + "Timeout on waiting response from the AeronServer[" + + channel + + "]")); + + if (logger.isDebugEnabled()) { + logger.debug( + "Timeout on receiving SetupCompleteFrame { connection: [{}]; stream: [{}]; channel: [{}] }", + connectionId, + streamId, + channel); + } + + CloseHelper.quietClose(clientManagementSubscription); + return; + } + continue; + } + + CloseHelper.quietClose(clientManagementSubscription); + return; + } + }) + .timeout(Duration.ofNanos(timeoutNs).multipliedBy(2)); + }) + .subscribeOn(bossScheduler); + } + + public static AeronClientTransport createUdp( + Aeron aeron, String host, int port, EventLoopGroup resources) { + final Supplier idleStrategySupplier = + () -> + new BackoffIdleStrategy( + /* maxSpins */ 100, + /* maxYields */ 1000, + /* minParkPeriodNs */ 10000, + /* maxParkPeriodNs */ 100000); + return new AeronClientTransport( + aeron, + new ChannelUriStringBuilder() + .media(CommonContext.UDP_MEDIA) + .endpoint(host + ":" + port) + .build(), + Schedulers.boundedElastic(), + resources, + idleStrategySupplier.get(), + ByteBufAllocator.DEFAULT, + 256, + 256, + Duration.ofSeconds(5).toNanos()); + } + + public static AeronClientTransport createIpc(Aeron aeron, EventLoopGroup resources) { + final Supplier idleStrategySupplier = + () -> + new BackoffIdleStrategy( + /* maxSpins */ 100, + /* maxYields */ 1000, + /* minParkPeriodNs */ 10000, + /* maxParkPeriodNs */ 100000); + return new AeronClientTransport( + aeron, + new ChannelUriStringBuilder().media(CommonContext.IPC_MEDIA).build(), + Schedulers.boundedElastic(), + resources, + idleStrategySupplier.get(), + ByteBufAllocator.DEFAULT, + 256, + 256, + Duration.ofSeconds(5).toNanos()); + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronDuplexConnection.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronDuplexConnection.java new file mode 100644 index 000000000..ec199f365 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronDuplexConnection.java @@ -0,0 +1,100 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.aeron.ExclusivePublication; +import io.aeron.Subscription; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.rsocket.RSocketErrorException; +import io.rsocket.frame.ErrorFrameCodec; +import io.rsocket.internal.BaseDuplexConnection; +import java.net.SocketAddress; +import reactor.core.publisher.Flux; + +class AeronDuplexConnection extends BaseDuplexConnection implements AutoCloseable { + + final int streamId; + final FluxReceiver fluxReceiver; + final MonoSendMany monoSendMany; + final ExclusivePublication publication; + final Subscription subscription; + final ByteBufAllocator allocator; + final AeronChannelAddress aeronChannelAddress; + + public AeronDuplexConnection( + int streamId, + ExclusivePublication publication, + Subscription subscription, + EventLoopGroup eventLoopGroup, + ByteBufAllocator allocator, + int prefetch, + int effort) { + this.streamId = streamId; + this.aeronChannelAddress = new AeronChannelAddress(subscription.channel()); + this.publication = publication; + this.subscription = subscription; + this.allocator = allocator; + + this.fluxReceiver = new FluxReceiver(this.onClose, eventLoopGroup.next(), subscription, effort); + this.monoSendMany = + this.sender.subscribeWith( + new MonoSendMany(this.onClose, eventLoopGroup.next(), publication, effort, prefetch)); + } + + public int streamId() { + return this.streamId; + } + + @Override + public void sendErrorAndClose(RSocketErrorException e) { + final ByteBuf errorFrame = ErrorFrameCodec.encode(this.allocator, 0, e); + this.sender.onNext(errorFrame); + + final Throwable cause = e.getCause(); + if (cause == null) { + this.sender.onComplete(); + } else { + this.sender.onError(cause); + } + } + + @Override + public Flux receive() { + return this.fluxReceiver; + } + + @Override + public ByteBufAllocator alloc() { + return this.allocator; + } + + @Override + public SocketAddress remoteAddress() { + return this.aeronChannelAddress; + } + + @Override + protected void doOnClose() { + this.fluxReceiver.dispose(); + this.monoSendMany.dispose(); + } + + @Override + public void close() { + dispose(); + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronServer.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronServer.java new file mode 100644 index 000000000..9e84b08a0 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronServer.java @@ -0,0 +1,160 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.aeron.Aeron; +import io.aeron.Subscription; +import io.aeron.logbuffer.FragmentHandler; +import io.rsocket.Closeable; +import org.agrona.CloseHelper; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.concurrent.IdleStrategy; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Scheduler; + +public class AeronServer extends BaseSubscriber implements Closeable { + + final AeronChannelAddress address; + final Aeron aeron; + final Scheduler scheduler; + final Sinks.One onCloseSink = Sinks.one(); + + final Int2ObjectHashMap activeConnections; + + AeronServer(String channel, Aeron aeron, Scheduler scheduler) { + this.address = new AeronChannelAddress(channel); + this.aeron = aeron; + this.scheduler = scheduler; + this.activeConnections = new Int2ObjectHashMap<>(); + } + + public AeronChannelAddress address() { + return this.address; + } + + static Mono create( + String channel, + Aeron aeron, + Scheduler scheduler, + IdleStrategy idleStrategy, + Handler handler) { + return Mono.fromCallable( + () -> { + final AeronServer aeronServer = new AeronServer(channel, aeron, scheduler); + final Int2ObjectHashMap activeConnections = + aeronServer.activeConnections; + return Mono.create( + sink -> { + final Subscription serverManagementSubscription = + aeron.addSubscription(channel, Constants.SERVER_MANAGEMENT_STREAM_ID); + final FragmentHandler fragmentHandler = + (buffer, offset, length, header) -> + handler + .handle( + SetupCodec.connectionId(buffer, offset), + SetupCodec.streamId(buffer, offset), + SetupCodec.channel(buffer, offset)) + .subscribe(new ConnectionSubscriber(activeConnections)); + + while (!aeronServer.isDisposed() && !serverManagementSubscription.isClosed()) { + idleStrategy.idle( + serverManagementSubscription.poll(fragmentHandler, Integer.MAX_VALUE)); + } + + if (aeronServer.isDisposed()) { + serverManagementSubscription.close(); + + synchronized (activeConnections) { + activeConnections.forEach((__, c) -> CloseHelper.quietClose(c)); + activeConnections.clear(); + } + + return; + } + + if (serverManagementSubscription.isClosed()) { + synchronized (activeConnections) { + activeConnections.forEach((__, c) -> CloseHelper.quietClose(c)); + activeConnections.clear(); + } + + sink.error( + new NotConnectedException( + "Aeron channel[" + channel + "] has been closed")); + } + }) + .subscribeOn(scheduler) + .subscribeWith(aeronServer); + }); + } + + @Override + public Mono onClose() { + return onCloseSink.asMono(); + } + + @Override + protected void hookFinally(SignalType type) { + onCloseSink.tryEmitEmpty(); + } + + @FunctionalInterface + interface Handler { + Mono handle(long connectionId, int streamId, String channel); + } + + static class ConnectionSubscriber extends BaseSubscriber { + + final Int2ObjectHashMap activeConnections; + + ConnectionSubscriber(Int2ObjectHashMap connections) { + activeConnections = connections; + } + + @Override + protected void hookOnNext(AeronDuplexConnection connection) { + final Int2ObjectHashMap activeConnections = this.activeConnections; + + synchronized (activeConnections) { + if (activeConnections.putIfAbsent(connection.streamId(), connection) != null) { + connection.dispose(); + return; + } + } + + connection + .onClose() + .subscribe( + null, + __ -> { + synchronized (activeConnections) { + activeConnections.remove(connection.streamId(), connection); + } + }, + () -> { + synchronized (activeConnections) { + activeConnections.remove(connection.streamId(), connection); + } + }); + } + + @Override + protected void hookOnError(Throwable throwable) {} + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronServerTransport.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronServerTransport.java new file mode 100644 index 000000000..3e2a07fd6 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronServerTransport.java @@ -0,0 +1,225 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.aeron.Aeron; +import io.aeron.ChannelUri; +import io.aeron.ChannelUriStringBuilder; +import io.aeron.CommonContext; +import io.aeron.ExclusivePublication; +import io.aeron.Subscription; +import io.aeron.logbuffer.BufferClaim; +import io.netty.buffer.ByteBufAllocator; +import io.rsocket.transport.ServerTransport; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import org.agrona.CloseHelper; +import org.agrona.concurrent.BackoffIdleStrategy; +import org.agrona.concurrent.IdleStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +public class AeronServerTransport implements ServerTransport { + + static final Logger logger = LoggerFactory.getLogger(AeronServerTransport.class); + + static final ThreadLocal BUFFER_CLAIM = ThreadLocal.withInitial(BufferClaim::new); + + private final Scheduler bossScheduler; + + private final EventLoopGroup eventLoopGroup; + private final Aeron aeron; + private final String channel; + private final IdleStrategy idleStrategy; + private final ByteBufAllocator allocator; + private final int prefetch; + private final int effort; + private final long timeoutNs; + + AeronServerTransport( + Aeron aeron, + String channel, + Scheduler bossScheduler, + EventLoopGroup eventLoopGroup, + IdleStrategy idleStrategy, + ByteBufAllocator allocator, + int prefetch, + int effort, + long timeoutNs) { + this.eventLoopGroup = eventLoopGroup; + this.bossScheduler = bossScheduler; + this.aeron = aeron; + this.channel = channel; + this.idleStrategy = idleStrategy; + this.allocator = allocator; + this.prefetch = prefetch; + this.effort = effort; + this.timeoutNs = timeoutNs; + } + + @Override + public Mono start(ConnectionAcceptor acceptor) { + return AeronServer.create( + channel, aeron, bossScheduler, idleStrategy, new AeronConnectionHandler(acceptor)); + } + + class AeronConnectionHandler implements AeronServer.Handler { + + final ConnectionAcceptor acceptor; + + int serverStreamId = -1; + + AeronConnectionHandler(ConnectionAcceptor acceptor) { + this.acceptor = acceptor; + } + + @Override + public Mono handle( + long clientConnectionId, int clientStreamId, String clientChannel) { + final int serverStreamId = this.serverStreamId + 2; + this.serverStreamId = serverStreamId; + + logger.info( + "Receiving connection with id {} from aeron stream id {} and channel {}", + clientConnectionId, + clientStreamId, + clientChannel); + + final ExclusivePublication clientManagementPublication; + try { + clientManagementPublication = + aeron.addExclusivePublication( + ChannelUri.addSessionId(clientChannel, (int) clientConnectionId), + Constants.CLIENT_MANAGEMENT_STREAM_ID); + } catch (Throwable t) { + return Mono.error(t); + } + + final ExclusivePublication clientPublication; + try { + clientPublication = + aeron.addExclusivePublication( + ChannelUri.addSessionId(clientChannel, (int) clientConnectionId), clientStreamId); + } catch (Throwable t) { + return Mono.error(t); + } + + final Subscription subscription; + try { + subscription = + aeron.addSubscription( + ChannelUri.addSessionId(channel, (int) clientConnectionId), + serverStreamId, + image -> { + logger.debug("receive image"); + eventLoopGroup.next().schedule(clientManagementPublication::close); + }, + image -> {}); + } catch (Throwable t) { + return Mono.error(t); + } + + final AeronDuplexConnection connection = + new AeronDuplexConnection( + serverStreamId, + clientPublication, + subscription, + eventLoopGroup, + allocator, + prefetch, + effort); + + logger.info( + "received connection [{}] from aeron channel [{}]", clientConnectionId, clientChannel); + + final BufferClaim bufferClaim = BUFFER_CLAIM.get(); + if (!PublicationUtils.tryOfferSetupFrame( + aeron.context(), + clientManagementPublication, + bufferClaim, + idleStrategy, + ChannelUri.addSessionId(channel, (int) clientConnectionId), + clientConnectionId, + serverStreamId, + timeoutNs, + FrameType.SETUP_COMPLETE)) { + + this.serverStreamId -= 2; + + logger.debug( + "Failed to send SetupCompleteFrame { connection: [{}]; stream: [{}]; channel: [{}] }", + clientConnectionId, + clientStreamId, + clientChannel); + + CloseHelper.quietCloseAll(clientManagementPublication, connection); + + return Mono.error( + new TimeoutException( + "Timeout on send SetupCompleteFrame { connection: [{}]; stream: [{}]; channel: [{}] }")); + } + + return acceptor + .apply(connection) + .thenReturn(connection) + .timeout(Duration.ofNanos(timeoutNs)) + .doOnError( + cause -> { + logger.debug( + "Cleanup connection [{}] resource. Cause: {}", clientConnectionId, cause); + CloseHelper.quietCloseAll(clientManagementPublication, connection); + }); + } + } + + public static AeronServerTransport createUdp( + Aeron aeron, String host, int port, EventLoopGroup resources) { + final Supplier idleStrategySupplier = + () -> new BackoffIdleStrategy(100, 1000, 10000, 100000); + return new AeronServerTransport( + aeron, + new ChannelUriStringBuilder() + .media(CommonContext.UDP_MEDIA) + .endpoint(host + ":" + port) + .build(), + Schedulers.boundedElastic(), + resources, + idleStrategySupplier.get(), + ByteBufAllocator.DEFAULT, + 256, + 256, + Duration.ofSeconds(5).toNanos()); + } + + public static AeronServerTransport createIpc(Aeron aeron, EventLoopGroup resources) { + final Supplier idleStrategySupplier = + () -> new BackoffIdleStrategy(100, 1000, 10000, 100000); + return new AeronServerTransport( + aeron, + new ChannelUriStringBuilder().media(CommonContext.IPC_MEDIA).build(), + Schedulers.boundedElastic(), + resources, + idleStrategySupplier.get(), + ByteBufAllocator.DEFAULT, + 256, + 256, + Duration.ofSeconds(5).toNanos()); + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/Constants.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/Constants.java new file mode 100644 index 000000000..2b94b8f21 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/Constants.java @@ -0,0 +1,25 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.aeron; + +final class Constants { + + static final int SERVER_MANAGEMENT_STREAM_ID = -1; + static final int CLIENT_MANAGEMENT_STREAM_ID = -2; + + private Constants() {} +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/EventLoop.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/EventLoop.java new file mode 100644 index 000000000..54176e841 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/EventLoop.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.agrona.concurrent.IdleStrategy; +import reactor.core.Disposable; + +public final class EventLoop extends AtomicBoolean implements Runnable, Disposable { + + final MpscUnboundedArrayQueue tasksQueue; + final IdleStrategy idleStrategy; + + boolean terminated = false; + + EventLoop(IdleStrategy idleStrategy) { + this.idleStrategy = idleStrategy; + this.tasksQueue = new MpscUnboundedArrayQueue<>(256); + } + + @Override + public void dispose() { + if (!compareAndSet(false, true)) { + return; + } + + this.tasksQueue.offer(() -> this.terminated = true); + } + + @Override + public boolean isDisposed() { + return get(); + } + + @Override + public void run() { + final MpscUnboundedArrayQueue tasksQueue = this.tasksQueue; + final IdleStrategy idleStrategy = this.idleStrategy; + + while (!this.terminated) { + Runnable task = tasksQueue.relaxedPoll(); + + if (task == null) { + idleStrategy.reset(); + while (task == null) { + if (this.terminated) { + return; + } + + idleStrategy.idle(); + task = tasksQueue.relaxedPoll(); + } + } + + task.run(); + } + } + + void schedule(Runnable task) { + this.tasksQueue.offer(task); + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/EventLoopGroup.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/EventLoopGroup.java new file mode 100644 index 000000000..cc91f521a --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/EventLoopGroup.java @@ -0,0 +1,103 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.agrona.concurrent.BackoffIdleStrategy; +import org.agrona.concurrent.IdleStrategy; +import reactor.core.Disposable; + +public final class EventLoopGroup extends AtomicInteger implements Disposable { + + static AtomicReference CACHED_DEFAULT = new AtomicReference<>(); + + static final int DISPOSED = Integer.MIN_VALUE; + + final EventLoop[] resources; + + EventLoopGroup(int size, Supplier idleStrategySupplier) { + this.resources = new EventLoop[size]; + + for (int i = 0; i < size; i++) { + final IdleStrategy idleStrategy = idleStrategySupplier.get(); + final EventLoop eventLoop = new EventLoop(idleStrategy); + this.resources[i] = eventLoop; + new Thread(eventLoop).start(); + } + } + + @Override + public void dispose() { + if (get() == DISPOSED && getAndSet(DISPOSED) == DISPOSED) { + return; + } + + for (EventLoop resource : this.resources) { + resource.dispose(); + } + } + + @Override + public boolean isDisposed() { + return get() == DISPOSED; + } + + EventLoop next() { + final EventLoop[] resources = this.resources; + final int resourcesCount = resources.length; + for (; ; ) { + final int index = get(); + + if (index == DISPOSED) { + return resources[0]; + } + + final int nextIndex = (index + 1) % resourcesCount; + if (compareAndSet(index, nextIndex)) { + return resources[index]; + } + } + } + + public static EventLoopGroup create(int size) { + return create(size, () -> new BackoffIdleStrategy(100, 1000, 10000, 100000)); + } + + public static EventLoopGroup create( + int size, Supplier idleStrategySupplier) { + return new EventLoopGroup(size, idleStrategySupplier); + } + + public static EventLoopGroup cached() { + EventLoopGroup s = CACHED_DEFAULT.get(); + if (s != null) { + return s; + } + s = + new EventLoopGroup( + Runtime.getRuntime().availableProcessors(), + () -> new BackoffIdleStrategy(100, 1000, 10000, 100000)); + if (CACHED_DEFAULT.compareAndSet(null, s)) { + return s; + } + // the reference was updated in the meantime with a cached scheduler + // fallback to it and dispose the extraneous one + s.dispose(); + return CACHED_DEFAULT.get(); + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/FluxReceiver.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/FluxReceiver.java new file mode 100644 index 000000000..79ca45f76 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/FluxReceiver.java @@ -0,0 +1,253 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import static io.rsocket.transport.aeron.Operators.addCapCancellable; +import static io.rsocket.transport.aeron.Operators.removeCapCancellable; + +import io.aeron.ControlledFragmentAssembler; +import io.aeron.Subscription; +import io.aeron.logbuffer.ControlledFragmentHandler; +import io.aeron.logbuffer.Header; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.agrona.DirectBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Operators; +import reactor.core.publisher.Sinks; + +class FluxReceiver extends Flux + implements org.reactivestreams.Subscription, ControlledFragmentHandler, Runnable { + + static final Logger logger = LoggerFactory.getLogger(FluxReceiver.class); + static final ThreadLocal WRAPPED_DIRECT_BUFFER_BYTE_BUF = + ThreadLocal.withInitial(WrappedDirectBufferByteBuf::new); + + final Subscription subscription; + final ControlledFragmentAssembler assembler; + final EventLoop eventLoop; + final Sinks.Empty onClose; + final int effort; + + volatile long requested; + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(FluxReceiver.class, "requested"); + + static final long INITIAL = -1; + static final long SUBSCRIBED = 0; + static final long DISPOSED = Long.MIN_VALUE + 1; + static final long CANCELLED = Long.MIN_VALUE; + + CoreSubscriber actual; + + int produced; + + public FluxReceiver( + Sinks.Empty onClose, EventLoop eventLoop, Subscription subscription, int effort) { + this.onClose = onClose; + this.eventLoop = eventLoop; + this.subscription = subscription; + this.assembler = new ControlledFragmentAssembler(this); + this.effort = effort; + REQUESTED.lazySet(this, INITIAL); + } + + @Override + public void subscribe(CoreSubscriber actual) { + if (this.requested == INITIAL && REQUESTED.compareAndSet(this, INITIAL, SUBSCRIBED)) { + this.actual = actual; + actual.onSubscribe(this); + } else { + Operators.error( + actual, new IllegalStateException("FluxReceiver allows only a single Subscriber")); + } + } + + @Override + public void request(long n) { + if (Operators.validate(n)) { + if (addCapCancellable(REQUESTED, this, n) == 0) { + drain(n); + } + } + } + + @Override + public void run() { + final long requested = this.requested; + + if (requested == CANCELLED) { + if (logger.isDebugEnabled()) { + logger.debug("Closing Aeron Subscription due to cancellation"); + } + this.subscription.close(); + return; + } else if (requested == DISPOSED) { + if (logger.isDebugEnabled()) { + logger.debug("Closing Aeron Subscription due to disposure"); + } + this.actual.onError(new CancellationException("Disposed")); + this.subscription.close(); + return; + } + + drain(requested); + } + + void drain(long n) { + final Subscription subscription = this.subscription; + final ControlledFragmentAssembler assembler = this.assembler; + + int produced; + int effort = this.effort; + + for (; ; ) { + boolean consumed = false; + + for (; ; ) { + int polled = subscription.controlledPoll(assembler, effort); + + produced = this.produced; + if (produced == -1) { + subscription.close(); + return; + } else if (produced == -2) { + this.actual.onError(new CancellationException("Disposed")); + subscription.close(); + return; + } + + if (polled < 1) { + if (subscription.isClosed()) { + final NotConnectedException exception = + new NotConnectedException("Aeron Subscription has been closed unexpectedly"); + this.actual.onError(exception); + this.onClose.tryEmitError(exception); + return; + } + + --effort; + } else { + effort -= polled; + + if (produced >= n) { + consumed = true; + break; + } + } + + if (effort == 0) { + break; + } + } + + if (consumed) { + n = removeCapCancellable(REQUESTED, this, produced); + if (n == CANCELLED) { + if (logger.isDebugEnabled()) { + logger.debug("Closing Aeron Subscription due to cancellation"); + } + subscription.close(); + return; + } else if (n == DISPOSED) { + if (logger.isDebugEnabled()) { + logger.debug("Closing Aeron Subscription due to disposure"); + } + this.actual.onError(new CancellationException("Disposed")); + subscription.close(); + return; + } else if (n == 0) { + this.produced = 0; + return; + } + } else { + this.eventLoop.schedule(this); + return; + } + } + } + + @Override + public Action onFragment(DirectBuffer buffer, int offset, int length, Header header) { + final long requested = this.requested; + if (requested == CANCELLED) { + this.produced = -1; + return Action.ABORT; + } + + if (requested == DISPOSED) { + this.produced = -2; + return Action.ABORT; + } + + final WrappedDirectBufferByteBuf wrappedDirectBuffer = WRAPPED_DIRECT_BUFFER_BYTE_BUF.get(); + wrappedDirectBuffer.wrap(buffer, offset, offset + length); + + if (logger.isDebugEnabled()) { + logger.debug("Receiving:\n{}\n", ByteBufUtil.prettyHexDump(wrappedDirectBuffer)); + } + + this.actual.onNext(wrappedDirectBuffer); + + if (requested != Long.MAX_VALUE) { + this.produced++; + + if (this.produced == requested) { + return Action.BREAK; + } + } + + return Action.CONTINUE; + } + + @Override + public void cancel() { + long state = this.requested; + if (state == CANCELLED || state == DISPOSED) { + return; + } + + state = REQUESTED.getAndSet(this, CANCELLED); + if (state == CANCELLED || state == DISPOSED) { + return; + } + + if (state == 0) { + this.subscription.close(); + } + } + + public void dispose() { + long state = this.requested; + if (state == CANCELLED || state == DISPOSED) { + return; + } + + state = REQUESTED.getAndSet(this, DISPOSED); + if (state == CANCELLED || state == DISPOSED) { + return; + } + + if (state == 0) { + this.subscription.close(); + } + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/FrameType.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/FrameType.java new file mode 100644 index 000000000..58342193a --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/FrameType.java @@ -0,0 +1,78 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.aeron; + +import java.util.Arrays; + +/** + * Types of frame that can be sent. + * + * @see Frame + * Types + */ +enum FrameType { + + /** Reserved. */ + RESERVED(0x00), + + SETUP(0x01), + + SETUP_COMPLETE(0x02); + + /** The size of the encoded frame type */ + static final int ENCODED_SIZE = 6; + + private static final FrameType[] FRAME_TYPES_BY_ENCODED_TYPE; + + static { + FRAME_TYPES_BY_ENCODED_TYPE = new FrameType[getMaximumEncodedType() + 1]; + + for (FrameType frameType : values()) { + FRAME_TYPES_BY_ENCODED_TYPE[frameType.encodedType] = frameType; + } + } + + private final int encodedType; + + FrameType(int encodedType) { + this.encodedType = encodedType; + } + + /** + * Returns the {@code FrameType} that matches the specified {@code encodedType}. + * + * @param encodedType the encoded type + * @return the {@code FrameType} that matches the specified {@code encodedType} + */ + public static FrameType fromEncodedType(int encodedType) { + FrameType frameType = FRAME_TYPES_BY_ENCODED_TYPE[encodedType]; + + if (frameType == null) { + throw new IllegalArgumentException(String.format("Frame type %d is unknown", encodedType)); + } + + return frameType; + } + + private static int getMaximumEncodedType() { + return Arrays.stream(values()).mapToInt(frameType -> frameType.encodedType).max().orElse(0); + } + + public int getEncodedType() { + return encodedType; + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/MonoSendMany.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/MonoSendMany.java new file mode 100644 index 000000000..17000a634 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/MonoSendMany.java @@ -0,0 +1,270 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.aeron.ExclusivePublication; +import io.aeron.logbuffer.BufferClaim; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.agrona.concurrent.UnsafeBuffer; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.Fuseable; +import reactor.core.Fuseable.QueueSubscription; +import reactor.core.publisher.Operators; +import reactor.core.publisher.Sinks; +import reactor.util.context.Context; + +class MonoSendMany implements Disposable, CoreSubscriber, Runnable { + static final Logger logger = LoggerFactory.getLogger(MonoSendMany.class); + + static final ThreadLocal BUFFER_CLAIMS = ThreadLocal.withInitial(BufferClaim::new); + static final ThreadLocal UNSAFE_BUFFER = ThreadLocal.withInitial(UnsafeBuffer::new); + + final Sinks.Empty onClose; + final EventLoop eventLoop; + final ExclusivePublication publication; + final int effort; + final int prefetch; + final int limit; + + volatile int wip; + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(MonoSendMany.class, "wip"); + + QueueSubscription subscription; + + ByteBuf undelivered; + int produced; + + Throwable t; + boolean done; + boolean cancelled; + + public MonoSendMany( + Sinks.Empty onClose, + EventLoop eventLoop, + ExclusivePublication publication, + int effort, + int prefetch) { + this.onClose = onClose; + this.eventLoop = eventLoop; + this.publication = publication; + this.effort = effort; + this.prefetch = prefetch; + this.limit = prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : (prefetch - (prefetch >> 2)); + } + + @Override + @SuppressWarnings("unchecked") + public void onSubscribe(Subscription s) { + if (Operators.validate(this.subscription, s)) { + final QueueSubscription queueSubscription = (QueueSubscription) s; + final int prefetch = this.prefetch; + + this.subscription = queueSubscription; + + // upstream always ASYNC fuseable + queueSubscription.requestFusion(Fuseable.ANY); + queueSubscription.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch); + } + } + + @Override + public void onNext(ByteBuf byteBuf) { + drain(); + } + + @Override + public void onError(Throwable t) { + if (this.done) { + Operators.onErrorDropped(t, Context.empty()); + return; + } + + this.done = true; + this.t = t; + + drain(); + } + + @Override + public void onComplete() { + if (this.done) { + return; + } + + this.done = true; + + drain(); + } + + @Override + public void run() { + drainAsync(); + } + + void drain() { + if (WIP.getAndIncrement(this) != 0) { + return; + } + + drainAsync(); + } + + void drainAsync() { + final Sinks.Empty onClose = this.onClose; + final ExclusivePublication publication = this.publication; + final QueueSubscription qs = this.subscription; + final EventLoop eventLoop = this.eventLoop; + final BufferClaim bufferClaim = BUFFER_CLAIMS.get(); + final UnsafeBuffer unsafeBuffer = UNSAFE_BUFFER.get(); + final int effort = this.effort; + + int missed = this.wip; + int sent = this.produced; + + if (this.cancelled) { + qs.cancel(); + publication.close(); + return; + } + + final ByteBuf undelivered = this.undelivered; + if (undelivered != null) { + final boolean delivered; + try { + delivered = + PublicationUtils.tryClaimOrOffer( + undelivered, publication, bufferClaim, unsafeBuffer, effort); + } catch (Throwable t) { + undelivered.release(); + qs.cancel(); + + this.done = true; + onClose.tryEmitError(t); + return; + } + + if (!delivered) { + eventLoop.schedule(this); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Emitted frame: \n{}\n", ByteBufUtil.prettyHexDump(undelivered)); + } + + this.undelivered = null; + undelivered.release(); + + sent++; + + if (sent == this.limit) { + qs.request(this.limit); + sent = 0; + } + } + + for (; ; ) { + if (this.cancelled) { + qs.cancel(); + publication.close(); + return; + } + + for (; ; ) { + final ByteBuf buf = qs.poll(); + + final boolean empty = buf == null; + if (empty) { + if (this.done) { + publication.close(); + + final Throwable t = this.t; + if (t == null) { + onClose.tryEmitEmpty(); + } else { + onClose.tryEmitError(t); + } + } + break; + } + + final boolean delivered; + try { + delivered = + PublicationUtils.tryClaimOrOffer(buf, publication, bufferClaim, unsafeBuffer, effort); + } catch (Throwable t) { + qs.cancel(); + buf.release(); + + this.done = true; + onClose.tryEmitError(t); + return; + } + + if (!delivered) { + this.undelivered = buf; + this.produced = sent; + eventLoop.schedule(this); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Emitted frame: \n{}\n", ByteBufUtil.prettyHexDump(buf)); + } + + buf.release(); + + sent++; + + if (sent == this.limit) { + qs.request(this.limit); + sent = 0; + } + } + + int w = this.wip; + if (missed == w) { + this.produced = sent; + missed = WIP.addAndGet(this, -missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + + @Override + public void dispose() { + if (this.cancelled) { + return; + } + + this.cancelled = true; + + if (WIP.getAndIncrement(this) == 0) { + this.subscription.cancel(); + } + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/NotConnectedException.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/NotConnectedException.java new file mode 100644 index 000000000..8f6c5beaf --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/NotConnectedException.java @@ -0,0 +1,30 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.aeron; + +public class NotConnectedException extends RuntimeException { + + private static final long serialVersionUID = -5521573868855763403L; + + public NotConnectedException() { + super(); + } + + public NotConnectedException(String message) { + super(message); + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/Operators.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/Operators.java new file mode 100644 index 000000000..3efaf93f4 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/Operators.java @@ -0,0 +1,55 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +class Operators { + + static long addCapCancellable(AtomicLongFieldUpdater updater, T instance, long n) { + for (; ; ) { + long r = updater.get(instance); + if (r == Long.MIN_VALUE || r == Long.MAX_VALUE) { + return r; + } + long u = addCap(r, n); + if (updater.compareAndSet(instance, r, u)) { + return r; + } + } + } + + static long removeCapCancellable(AtomicLongFieldUpdater updater, T instance, long n) { + for (; ; ) { + long r = updater.get(instance); + if (r == Long.MIN_VALUE || r == Long.MAX_VALUE) { + return r; + } + long u = addCap(r, -n); + if (updater.compareAndSet(instance, r, u)) { + return u; + } + } + } + + static long addCap(long a, long b) { + long res = a + b; + if (res < 0L) { + return Long.MAX_VALUE; + } + return res; + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/PayloadCodec.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/PayloadCodec.java new file mode 100644 index 000000000..d9c2ad62f --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/PayloadCodec.java @@ -0,0 +1,44 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; + +final class PayloadCodec { + private static final short VERSION = 0; + + private PayloadCodec() {} + + public static FrameType frameType(DirectBuffer byteBuf, int offset) { + int intFrameType = byteBuf.getInt(offset + Short.BYTES); + + return FrameType.fromEncodedType(intFrameType); + } + + public static short version(DirectBuffer directBuffer, int offset) { + return directBuffer.getShort(offset); + } + + public static void encode(MutableDirectBuffer directBuffer, int offset, FrameType type) { + directBuffer.putShort(offset, VERSION); + directBuffer.putInt(offset + Short.BYTES, type.getEncodedType()); + } + + public static int length() { + return Short.BYTES + Integer.BYTES; + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/PublicationUtils.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/PublicationUtils.java new file mode 100644 index 000000000..90237548a --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/PublicationUtils.java @@ -0,0 +1,124 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.aeron.Aeron; +import io.aeron.ExclusivePublication; +import io.aeron.Publication; +import io.aeron.logbuffer.BufferClaim; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.rsocket.util.NumberUtils; +import java.nio.ByteBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.concurrent.IdleStrategy; +import org.agrona.concurrent.NanoClock; +import org.agrona.concurrent.UnsafeBuffer; + +class PublicationUtils { + + public static boolean tryOfferSetupFrame( + Aeron.Context context, + ExclusivePublication publication, + BufferClaim bufferClaim, + IdleStrategy idleStrategy, + String channel, + long connectionId, + int streamId, + long timeoutNs, + FrameType frameType) { + + final int lengthToClime = + SetupCodec.constantLength() + + NumberUtils.requireUnsignedShort(ByteBufUtil.utf8Bytes(channel)); + + final NanoClock nanoClock = context.nanoClock(); + final long nowNs = nanoClock.nanoTime(); + final long deadlineNs = nowNs + timeoutNs; + + idleStrategy.reset(); + for (; ; ) { + final long state = publication.tryClaim(lengthToClime, bufferClaim); + + if (state < 0) { + if (state != ExclusivePublication.CLOSED) { + return false; + } + + idleStrategy.idle(); + if (deadlineNs - nanoClock.nanoTime() < 0) { + return false; + } + + continue; + } + + SetupCodec.encode( + bufferClaim.buffer(), bufferClaim.offset(), connectionId, streamId, channel, frameType); + + bufferClaim.commit(); + + return true; + } + } + + public static boolean tryClaimOrOffer( + ByteBuf content, + ExclusivePublication publication, + BufferClaim bufferClaim, + UnsafeBuffer unsafeBuffer, + int effort) { + ByteBuffer byteBuffer = content.nioBuffer(); + boolean successful = false; + int offset = content.readerIndex(); + int capacity = content.readableBytes(); + int maxPayloadLength = publication.maxPayloadLength(); + + if (capacity < maxPayloadLength) { + while (!successful && effort-- > 0) { + long offer = publication.tryClaim(capacity, bufferClaim); + if (offer >= 0) { + try { + final MutableDirectBuffer b = bufferClaim.buffer(); + int claimOffset = bufferClaim.offset(); + b.putBytes(claimOffset, byteBuffer, offset, capacity); + } finally { + bufferClaim.commit(); + successful = true; + } + } else { + if (offer == Publication.CLOSED) { + throw new NotConnectedException(); + } + } + } + } else { + unsafeBuffer.wrap(byteBuffer, offset, capacity); + while (!successful && effort-- > 0) { + long offer = publication.offer(unsafeBuffer); + if (offer < 0) { + if (offer == Publication.CLOSED) { + throw new NotConnectedException(); + } + } else { + successful = true; + } + } + } + + return successful; + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/SetupCodec.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/SetupCodec.java new file mode 100644 index 000000000..926d217f6 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/SetupCodec.java @@ -0,0 +1,63 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.netty.buffer.ByteBufUtil; +import io.rsocket.util.NumberUtils; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; + +final class SetupCodec { + private SetupCodec() {} + + public static void encode( + MutableDirectBuffer mutableDirectBuffer, + int offset, + long connectionId, + int streamId, + String channel, + FrameType frameType) { + + PayloadCodec.encode(mutableDirectBuffer, offset, frameType); + + mutableDirectBuffer.putLong(offset + PayloadCodec.length(), connectionId); + mutableDirectBuffer.putInt(offset + PayloadCodec.length() + Long.BYTES, streamId); + + int channelUtf8Length = NumberUtils.requireUnsignedShort(ByteBufUtil.utf8Bytes(channel)); + mutableDirectBuffer.putStringUtf8( + offset + PayloadCodec.length() + Long.BYTES + Integer.BYTES, channel, channelUtf8Length); + } + + public static long connectionId(DirectBuffer directBuffer, int bufferOffset) { + int offset = bufferOffset + PayloadCodec.length(); + return directBuffer.getLong(offset); + } + + public static int streamId(DirectBuffer directBuffer, int bufferOffset) { + int offset = bufferOffset + PayloadCodec.length() + Long.BYTES; + return directBuffer.getInt(offset); + } + + public static String channel(DirectBuffer directBuffer, int bufferOffset) { + int offset = bufferOffset + PayloadCodec.length() + Long.BYTES + Integer.BYTES; + + return directBuffer.getStringUtf8(offset); + } + + public static int constantLength() { + return PayloadCodec.length() + Long.BYTES + Integer.BYTES + Integer.BYTES; + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/WrappedDirectBufferByteBuf.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/WrappedDirectBufferByteBuf.java new file mode 100644 index 000000000..1d590561c --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/WrappedDirectBufferByteBuf.java @@ -0,0 +1,324 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.transport.aeron; + +import io.netty.buffer.AbstractByteBuf; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import org.agrona.DirectBuffer; + +class WrappedDirectBufferByteBuf extends AbstractByteBuf { + + private DirectBuffer directBuffer; + private final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + + public WrappedDirectBufferByteBuf() { + super(Integer.MAX_VALUE); + } + + public void wrap(DirectBuffer c, int offset, int limit) { + this.directBuffer = c; + this.setIndex(offset, limit); + } + + @Override + protected byte _getByte(int index) { + return directBuffer.getByte(index); + } + + @Override + protected short _getShort(int index) { + return directBuffer.getShort(index, byteOrder); + } + + @Override + protected short _getShortLE(int index) { + return directBuffer.getShort(index, ByteOrder.LITTLE_ENDIAN); + } + + @Override + protected int _getUnsignedMedium(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected int _getUnsignedMediumLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected int _getInt(int index) { + return directBuffer.getInt(index, byteOrder); + } + + @Override + protected int _getIntLE(int index) { + return directBuffer.getInt(index, ByteOrder.LITTLE_ENDIAN); + } + + @Override + protected long _getLong(int index) { + return directBuffer.getLong(index, byteOrder); + } + + @Override + protected long _getLongLE(int index) { + return directBuffer.getLong(index, ByteOrder.LITTLE_ENDIAN); + } + + @Override + protected void _setByte(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setShort(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setShortLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setMedium(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setMediumLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setInt(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setIntLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setLong(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setLongLE(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public int capacity() { + return directBuffer.capacity(); + } + + @Override + public ByteBuf capacity(int newCapacity) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBufAllocator alloc() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteOrder order() { + return ByteOrder.nativeOrder(); + } + + @Override + public ByteBuf unwrap() { + return null; + } + + @Override + public boolean isDirect() { + return directBuffer.byteBuffer().isDirect(); + } + + @Override + public boolean isReadOnly() { + return true; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + directBuffer.getBytes(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + directBuffer.getBytes(index, dst, dst.remaining()); + return this; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, InputStream in, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf copy(int index, int length) { + checkIndex(index, length); + return alloc() + .heapBuffer(length, maxCapacity()) + .writeBytes(directBuffer.byteArray(), index, length); + } + + @Override + public int nioBufferCount() { + return 1; + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + final ByteBuffer buffer = directBuffer.byteBuffer(); + if (buffer != null) { + return (ByteBuffer) buffer.duplicate().position(index).limit(index + length); + } else { + final byte[] bytes = directBuffer.byteArray(); + return ByteBuffer.wrap(bytes, index, length); + } + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + return nioBuffer(index, length); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return new ByteBuffer[] {nioBuffer(index, length)}; + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException(); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasMemoryAddress() { + return true; + } + + @Override + public long memoryAddress() { + return directBuffer.addressOffset(); + } + + @Override + public int refCnt() { + return 1; + } + + @Override + public ByteBuf touch() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf touch(Object hint) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retain(int increment) { + return this; + } + + @Override + public ByteBuf retain() { + return this; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } +} diff --git a/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/package-info.java b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/package-info.java new file mode 100644 index 000000000..cc01eeb04 --- /dev/null +++ b/rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** The Aeron-based RSocket transport implementations. */ +@NonNullApi +package io.rsocket.transport.aeron; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronPing.java b/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronPing.java new file mode 100644 index 000000000..b6f03e6d2 --- /dev/null +++ b/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronPing.java @@ -0,0 +1,94 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.aeron; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import io.aeron.Aeron; +import io.netty.buffer.ByteBufAllocator; +import io.rsocket.RSocket; +import io.rsocket.core.RSocketConnector; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.test.PingClient; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.time.Duration; +import org.HdrHistogram.Recorder; +import org.agrona.concurrent.BackoffIdleStrategy; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public final class AeronPing { + + public static void main(String... args) { + Aeron aeron = Aeron.connect(); + ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; + + String aeronUrl = "aeron:ipc"; + + AeronClientTransport aeronClientTransport = + new AeronClientTransport( + aeron, + aeronUrl, + Schedulers.newSingle("AeronScheduler"), + new EventLoopGroup(2, () -> new BackoffIdleStrategy(100, 1000, 10000, 1000000)), + new BackoffIdleStrategy(100, 1000, 10000, 1000000), + allocator, + 256, + 32, + SECONDS.toNanos(5)); + + // TcpClientTransport tcpClientTransport = TcpClientTransport.create(8080); + // + // WebsocketClientTransport websocketClientTransport = WebsocketClientTransport.create(8080); + + Mono client = + RSocketConnector.create() + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .connect(aeronClientTransport); + + RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); + + // Get name representing the running Java virtual machine. + // It returns something like 6460@AURORA. Where the value + // before the @ symbol is the PID. + String jvmName = bean.getName(); + System.out.println("Name = " + jvmName); + + // Extract the PID by splitting the string returned by the + // bean.getName() method. + long pid = Long.valueOf(jvmName.split("@")[0]); + + System.out.println("Client connected to channel[" + aeronUrl + "]. PID = " + pid); + + PingClient pingClient = new PingClient(client); + Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1)); + final int count = 1_000_000_000; + pingClient + .requestResponsePingPong(count, recorder) + .doOnTerminate(() -> System.out.println("Sent " + count + " messages.")) + .blockLast(); + + // RSocket rSocket = client.block(); + // + // for (int i = 0; i < 10000; i++) { + // rSocket.requestResponse(DefaultPayload.create("Message : " + i)).block(); + // } + + System.exit(0); + } +} diff --git a/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronPongServer.java b/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronPongServer.java new file mode 100644 index 000000000..0e8710d1d --- /dev/null +++ b/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronPongServer.java @@ -0,0 +1,120 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.aeron; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import io.aeron.Aeron; +import io.aeron.driver.MediaDriver; +import io.aeron.driver.ThreadingMode; +import io.netty.buffer.ByteBufAllocator; +import io.rsocket.Closeable; +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.util.ByteBufPayload; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.util.concurrent.ThreadLocalRandom; +import org.agrona.concurrent.BackoffIdleStrategy; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public final class AeronPongServer { + static { + final MediaDriver.Context ctx = + new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true); + MediaDriver.launch(ctx); + } + + public static void main(String... args) { + String aeronUrl = "aeron:ipc"; + Aeron aeron = Aeron.connect(); + + ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; + + AeronServerTransport aeronServerTransport = + new AeronServerTransport( + aeron, + aeronUrl, + Schedulers.newSingle("AeronBossScheduler"), + new EventLoopGroup(2, () -> new BackoffIdleStrategy(100, 1000, 10000, 1000000)), + new BackoffIdleStrategy(100, 1000, 10000, 1000000), + allocator, + 256, + 32, + SECONDS.toNanos(1)); + + // TcpServerTransport tcpServerTransport = TcpServerTransport.create(8080); + // + // WebsocketServerTransport websocketServerTransport = WebsocketServerTransport.create(8080); + // + final Closeable server = + RSocketServer.create(new PingHandler()) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bindNow(aeronServerTransport); + + RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); + + // Get name representing the running Java virtual machine. + // It returns something like 6460@AURORA. Where the value + // before the @ symbol is the PID. + String jvmName = bean.getName(); + System.out.println("Name = " + jvmName); + + // Extract the PID by splitting the string returned by the + // bean.getName() method. + long pid = Long.valueOf(jvmName.split("@")[0]); + + System.out.println("Server Started on channel[" + aeronUrl + "]. PID = " + pid); + + server.onClose().block(); + } + + public static class PingHandler implements SocketAcceptor { + private final Payload pong; + + public PingHandler() { + byte[] data = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + this.pong = ByteBufPayload.create(data); + } + + public PingHandler(byte[] data) { + this.pong = ByteBufPayload.create(data); + } + + public Mono accept(ConnectionSetupPayload setup, RSocket sendingSocket) { + return Mono.just( + new RSocket() { + public Mono requestResponse(Payload payload) { + payload.release(); + return Mono.just(PingHandler.this.pong.retain()); + } + + public Flux requestStream(Payload payload) { + payload.release(); + return Flux.range(0, 100).map((v) -> PingHandler.this.pong.retain()).log(); + } + }); + } + } +} diff --git a/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronTransportTest.java b/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronTransportTest.java new file mode 100644 index 000000000..c193ae702 --- /dev/null +++ b/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronTransportTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.aeron; + +import io.aeron.Aeron; +import io.aeron.driver.MediaDriver; +import io.aeron.driver.ThreadingMode; +import io.rsocket.test.TransportTest; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +final class AeronTransportTest extends TransportTest { + + static final MediaDriver mediaDriver = + MediaDriver.launch( + new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true)); + + static final Aeron clientAeron = Aeron.connect(); + static final Aeron serverAeron = Aeron.connect(); + static final EventLoopGroup eventLoopGroup = EventLoopGroup.create(4); + + @Override + protected TransportPair createTransportPair() { + return new AeronTransportPair(mediaDriver, clientAeron, serverAeron); + } + + static class AeronTransportPair extends TransportPair { + + final MediaDriver mediaDriver; + final Aeron clientAeron; + final Aeron serverAeron; + + public AeronTransportPair(MediaDriver driver, Aeron clientAeron, Aeron serverAeron) { + super( + () -> + InetSocketAddress.createUnresolved( + "0.0.0.0", ThreadLocalRandom.current().nextInt(20000) + 5000), + (address, server, allocator) -> + AeronClientTransport.createIpc(clientAeron, eventLoopGroup), + (address, allocator) -> AeronServerTransport.createIpc(serverAeron, eventLoopGroup), + false, + false, + false, + Duration.ofMinutes(2)); + this.mediaDriver = driver; + this.clientAeron = clientAeron; + this.serverAeron = serverAeron; + } + + @Override + public void dispose() { + super.dispose(); + // CloseHelper.quietCloseAll(clientAeron, serverAeron); + } + } +} diff --git a/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronWithFragmentationTransportTest.java b/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronWithFragmentationTransportTest.java new file mode 100644 index 000000000..28f3df34c --- /dev/null +++ b/rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronWithFragmentationTransportTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.aeron; + +import io.aeron.Aeron; +import io.aeron.driver.MediaDriver; +import io.aeron.driver.ThreadingMode; +import io.rsocket.test.TransportTest; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; +import org.junit.jupiter.api.Disabled; + +@Disabled +final class AeronWithFragmentationTransportTest + extends TransportTest { + + static final MediaDriver mediaDriver = + MediaDriver.launch( + new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true)); + + static final Aeron clientAeron = Aeron.connect(); + static final Aeron serverAeron = Aeron.connect(); + static final EventLoopGroup eventLoopGroup = EventLoopGroup.create(4); + + @Override + protected TransportPair createTransportPair() { + return new AeronTransportPair(mediaDriver, clientAeron, serverAeron); + } + + static class AeronTransportPair extends TransportPair { + + final MediaDriver mediaDriver; + final Aeron clientAeron; + final Aeron serverAeron; + + public AeronTransportPair(MediaDriver driver, Aeron clientAeron, Aeron serverAeron) { + super( + () -> + InetSocketAddress.createUnresolved( + "0.0.0.0", ThreadLocalRandom.current().nextInt(20000) + 5000), + (address, server, allocator) -> + AeronClientTransport.createIpc(clientAeron, eventLoopGroup), + (address, allocator) -> AeronServerTransport.createIpc(serverAeron, eventLoopGroup), + true, + false, + false, + Duration.ofMinutes(2)); + this.mediaDriver = driver; + this.clientAeron = clientAeron; + this.serverAeron = serverAeron; + } + + @Override + public void dispose() { + super.dispose(); + // CloseHelper.quietCloseAll(clientAeron, serverAeron); + } + } +} diff --git a/rsocket-transport-aeron/src/test/resources/logback-test.xml b/rsocket-transport-aeron/src/test/resources/logback-test.xml new file mode 100644 index 000000000..775301be3 --- /dev/null +++ b/rsocket-transport-aeron/src/test/resources/logback-test.xml @@ -0,0 +1,43 @@ + + + + + + + + %date{HH:mm:ss.SSS} %-10thread %-42logger %msg%n + + + + + + + + + + + + + + + + + + + + + diff --git a/rsocket-transport-aeron/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/rsocket-transport-aeron/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..ca6ee9cea --- /dev/null +++ b/rsocket-transport-aeron/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java index 51c812cc3..1a43b6c36 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java @@ -16,27 +16,21 @@ package io.rsocket.transport.local; +import io.rsocket.Closeable; import io.rsocket.test.TransportTest; import java.time.Duration; import java.util.UUID; -final class LocalResumableTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> "test-" + UUID.randomUUID(), - (address, server, allocator) -> LocalClientTransport.create(address, allocator), - (address, allocator) -> LocalServerTransport.create(address), - false, - true); - - @Override - public Duration getTimeout() { - return Duration.ofSeconds(10); - } +final class LocalResumableTransportTest extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> "test-" + UUID.randomUUID(), + (address, server, allocator) -> LocalClientTransport.create(address, allocator), + (address, allocator) -> LocalServerTransport.create(address), + false, + true, + Duration.ofSeconds(10)); } } diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java index 124cecec9..92786b20f 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java @@ -16,27 +16,21 @@ package io.rsocket.transport.local; +import io.rsocket.Closeable; import io.rsocket.test.TransportTest; import java.time.Duration; import java.util.UUID; -final class LocalResumableWithFragmentationTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> "test-" + UUID.randomUUID(), - (address, server, allocator) -> LocalClientTransport.create(address, allocator), - (address, allocator) -> LocalServerTransport.create(address), - true, - true); - - @Override - public Duration getTimeout() { - return Duration.ofSeconds(10); - } +final class LocalResumableWithFragmentationTransportTest extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> "test-" + UUID.randomUUID(), + (address, server, allocator) -> LocalClientTransport.create(address, allocator), + (address, allocator) -> LocalServerTransport.create(address), + true, + true, + Duration.ofSeconds(10)); } } diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java index e9c137255..7f57cd8b9 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java @@ -16,25 +16,19 @@ package io.rsocket.transport.local; +import io.rsocket.Closeable; import io.rsocket.test.TransportTest; import java.time.Duration; import java.util.UUID; -final class LocalTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> "test-" + UUID.randomUUID(), - (address, server, allocator) -> LocalClientTransport.create(address, allocator), - (address, allocator) -> LocalServerTransport.create(address)); - - @Override - public Duration getTimeout() { - return Duration.ofSeconds(10); - } +final class LocalTransportTest extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> "test-" + UUID.randomUUID(), + (address, server, allocator) -> LocalClientTransport.create(address, allocator), + (address, allocator) -> LocalServerTransport.create(address), + Duration.ofSeconds(10)); } } diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportWithFragmentationTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportWithFragmentationTest.java index 4c2f47771..db79c1d34 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportWithFragmentationTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportWithFragmentationTest.java @@ -16,26 +16,20 @@ package io.rsocket.transport.local; +import io.rsocket.Closeable; import io.rsocket.test.TransportTest; import java.time.Duration; import java.util.UUID; -final class LocalTransportWithFragmentationTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> "test-" + UUID.randomUUID(), - (address, server, allocator) -> LocalClientTransport.create(address, allocator), - (address, allocator) -> LocalServerTransport.create(address), - true); - - @Override - public Duration getTimeout() { - return Duration.ofSeconds(10); - } +final class LocalTransportWithFragmentationTest extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> "test-" + UUID.randomUUID(), + (address, server, allocator) -> LocalClientTransport.create(address, allocator), + (address, allocator) -> LocalServerTransport.create(address), + true, + Duration.ofSeconds(10)); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpFragmentationTransportTest.java index 299ea96c0..11defba49 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpFragmentationTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpFragmentationTransportTest.java @@ -19,36 +19,31 @@ import io.netty.channel.ChannelOption; import io.rsocket.test.TransportTest; import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import java.net.InetSocketAddress; import java.time.Duration; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpServer; -final class TcpFragmentationTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> InetSocketAddress.createUnresolved("localhost", 0), - (address, server, allocator) -> - TcpClientTransport.create( - TcpClient.create() - .remoteAddress(server::address) - .option(ChannelOption.ALLOCATOR, allocator)), - (address, allocator) -> - TcpServerTransport.create( - TcpServer.create() - .bindAddress(() -> address) - .option(ChannelOption.ALLOCATOR, allocator)), - true); - - @Override - public Duration getTimeout() { - return Duration.ofMinutes(2); - } +final class TcpFragmentationTransportTest + extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + TcpClientTransport.create( + TcpClient.create() + .remoteAddress(server::address) + .option(ChannelOption.ALLOCATOR, allocator)), + (address, allocator) -> + TcpServerTransport.create( + TcpServer.create() + .bindAddress(() -> address) + .option(ChannelOption.ALLOCATOR, allocator)), + true, + Duration.ofMinutes(2)); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableTransportTest.java index cf9e0540c..40d4860f9 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableTransportTest.java @@ -19,37 +19,31 @@ import io.netty.channel.ChannelOption; import io.rsocket.test.TransportTest; import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import java.net.InetSocketAddress; import java.time.Duration; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpServer; -final class TcpResumableTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> InetSocketAddress.createUnresolved("localhost", 0), - (address, server, allocator) -> - TcpClientTransport.create( - TcpClient.create() - .remoteAddress(server::address) - .option(ChannelOption.ALLOCATOR, allocator)), - (address, allocator) -> - TcpServerTransport.create( - TcpServer.create() - .bindAddress(() -> address) - .option(ChannelOption.ALLOCATOR, allocator)), - false, - true); - - @Override - public Duration getTimeout() { - return Duration.ofMinutes(3); - } +final class TcpResumableTransportTest extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + TcpClientTransport.create( + TcpClient.create() + .remoteAddress(server::address) + .option(ChannelOption.ALLOCATOR, allocator)), + (address, allocator) -> + TcpServerTransport.create( + TcpServer.create() + .bindAddress(() -> address) + .option(ChannelOption.ALLOCATOR, allocator)), + false, + true, + Duration.ofMinutes(2)); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java index 7d9d80542..d6006b2d7 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java @@ -19,37 +19,32 @@ import io.netty.channel.ChannelOption; import io.rsocket.test.TransportTest; import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import java.net.InetSocketAddress; import java.time.Duration; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpServer; -final class TcpResumableWithFragmentationTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> InetSocketAddress.createUnresolved("localhost", 0), - (address, server, allocator) -> - TcpClientTransport.create( - TcpClient.create() - .remoteAddress(server::address) - .option(ChannelOption.ALLOCATOR, allocator)), - (address, allocator) -> - TcpServerTransport.create( - TcpServer.create() - .bindAddress(() -> address) - .option(ChannelOption.ALLOCATOR, allocator)), - true, - true); - - @Override - public Duration getTimeout() { - return Duration.ofMinutes(3); - } +final class TcpResumableWithFragmentationTransportTest + extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + TcpClientTransport.create( + TcpClient.create() + .remoteAddress(server::address) + .option(ChannelOption.ALLOCATOR, allocator)), + (address, allocator) -> + TcpServerTransport.create( + TcpServer.create() + .bindAddress(() -> address) + .option(ChannelOption.ALLOCATOR, allocator)), + true, + true, + Duration.ofMinutes(2)); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java index 85481924a..d78719cb2 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java @@ -1,11 +1,11 @@ package io.rsocket.transport.netty; import io.netty.channel.ChannelOption; -import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.rsocket.test.TransportTest; import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import java.net.InetSocketAddress; import java.security.cert.CertificateException; @@ -13,46 +13,43 @@ import reactor.core.Exceptions; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpServer; +import reactor.netty.tcp.TcpSslContextSpec; -public class TcpSecureTransportTest implements TransportTest { - private final TransportPair transportPair = - new TransportPair<>( - () -> new InetSocketAddress("localhost", 0), - (address, server, allocator) -> - TcpClientTransport.create( - TcpClient.create() - .option(ChannelOption.ALLOCATOR, allocator) - .remoteAddress(server::address) - .secure( - ssl -> - ssl.sslContext( - SslContextBuilder.forClient() - .trustManager(InsecureTrustManagerFactory.INSTANCE)))), - (address, allocator) -> { - try { - SelfSignedCertificate ssc = new SelfSignedCertificate(); - TcpServer server = - TcpServer.create() - .option(ChannelOption.ALLOCATOR, allocator) - .bindAddress(() -> address) - .secure( - ssl -> - ssl.sslContext( - SslContextBuilder.forServer( - ssc.certificate(), ssc.privateKey()))); - return TcpServerTransport.create(server); - } catch (CertificateException e) { - throw Exceptions.propagate(e); - } - }); +public class TcpSecureTransportTest extends TransportTest { @Override - public Duration getTimeout() { - return Duration.ofMinutes(10); - } - - @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> new InetSocketAddress("localhost", 0), + (address, server, allocator) -> + TcpClientTransport.create( + TcpClient.create() + .option(ChannelOption.ALLOCATOR, allocator) + .remoteAddress(server::address) + .secure( + ssl -> + ssl.sslContext( + TcpSslContextSpec.forClient() + .configure( + scb -> + scb.trustManager( + InsecureTrustManagerFactory.INSTANCE))))), + (address, allocator) -> { + try { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + TcpServer server = + TcpServer.create() + .option(ChannelOption.ALLOCATOR, allocator) + .bindAddress(() -> address) + .secure( + ssl -> + ssl.sslContext( + TcpSslContextSpec.forServer(ssc.certificate(), ssc.privateKey()))); + return TcpServerTransport.create(server); + } catch (CertificateException e) { + throw Exceptions.propagate(e); + } + }, + Duration.ofMinutes(2)); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpTransportTest.java index c474f9b0b..aef20eacb 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpTransportTest.java @@ -19,35 +19,29 @@ import io.netty.channel.ChannelOption; import io.rsocket.test.TransportTest; import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import java.net.InetSocketAddress; import java.time.Duration; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpServer; -final class TcpTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> InetSocketAddress.createUnresolved("localhost", 0), - (address, server, allocator) -> - TcpClientTransport.create( - TcpClient.create() - .remoteAddress(server::address) - .option(ChannelOption.ALLOCATOR, allocator)), - (address, allocator) -> - TcpServerTransport.create( - TcpServer.create() - .bindAddress(() -> address) - .option(ChannelOption.ALLOCATOR, allocator))); - - @Override - public Duration getTimeout() { - return Duration.ofMinutes(2); - } +final class TcpTransportTest extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + TcpClientTransport.create( + TcpClient.create() + .remoteAddress(server::address) + .option(ChannelOption.ALLOCATOR, allocator)), + (address, allocator) -> + TcpServerTransport.create( + TcpServer.create() + .bindAddress(() -> address) + .option(ChannelOption.ALLOCATOR, allocator)), + Duration.ofMinutes(2)); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java index 34dc99ae0..f19a6f497 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java @@ -19,40 +19,35 @@ import io.netty.channel.ChannelOption; import io.rsocket.test.TransportTest; import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.WebsocketServerTransport; import java.net.InetSocketAddress; import java.time.Duration; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; -final class WebsocketResumableTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> InetSocketAddress.createUnresolved("localhost", 0), - (address, server, allocator) -> - WebsocketClientTransport.create( - HttpClient.create() - .host(server.address().getHostName()) - .port(server.address().getPort()) - .option(ChannelOption.ALLOCATOR, allocator), - ""), - (address, allocator) -> - WebsocketServerTransport.create( - HttpServer.create() - .host(address.getHostName()) - .port(address.getPort()) - .option(ChannelOption.ALLOCATOR, allocator)), - false, - true); - - @Override - public Duration getTimeout() { - return Duration.ofMinutes(3); - } +final class WebsocketResumableTransportTest + extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + WebsocketClientTransport.create( + HttpClient.create() + .host(server.address().getHostName()) + .port(server.address().getPort()) + .option(ChannelOption.ALLOCATOR, allocator), + ""), + (address, allocator) -> + WebsocketServerTransport.create( + HttpServer.create() + .host(address.getHostName()) + .port(address.getPort()) + .option(ChannelOption.ALLOCATOR, allocator)), + false, + true, + Duration.ofMinutes(2)); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java index 21c027e88..91ba6c804 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java @@ -19,40 +19,35 @@ import io.netty.channel.ChannelOption; import io.rsocket.test.TransportTest; import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.WebsocketServerTransport; import java.net.InetSocketAddress; import java.time.Duration; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; -final class WebsocketResumableWithFragmentationTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> InetSocketAddress.createUnresolved("localhost", 0), - (address, server, allocator) -> - WebsocketClientTransport.create( - HttpClient.create() - .host(server.address().getHostName()) - .port(server.address().getPort()) - .option(ChannelOption.ALLOCATOR, allocator), - ""), - (address, allocator) -> - WebsocketServerTransport.create( - HttpServer.create() - .host(address.getHostName()) - .port(address.getPort()) - .option(ChannelOption.ALLOCATOR, allocator)), - true, - true); - - @Override - public Duration getTimeout() { - return Duration.ofMinutes(3); - } +final class WebsocketResumableWithFragmentationTransportTest + extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + WebsocketClientTransport.create( + HttpClient.create() + .host(server.address().getHostName()) + .port(server.address().getPort()) + .option(ChannelOption.ALLOCATOR, allocator), + ""), + (address, allocator) -> + WebsocketServerTransport.create( + HttpServer.create() + .host(address.getHostName()) + .port(address.getPort()) + .option(ChannelOption.ALLOCATOR, allocator)), + true, + true, + Duration.ofMinutes(2)); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java index 9777c8bfa..ae7e51cab 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java @@ -17,62 +17,59 @@ package io.rsocket.transport.netty; import io.netty.channel.ChannelOption; -import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.rsocket.test.TransportTest; import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.WebsocketServerTransport; import java.net.InetSocketAddress; import java.security.cert.CertificateException; import java.time.Duration; import reactor.core.Exceptions; +import reactor.netty.http.Http11SslContextSpec; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; -final class WebsocketSecureTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> new InetSocketAddress("localhost", 0), - (address, server, allocator) -> - WebsocketClientTransport.create( - HttpClient.create() - .option(ChannelOption.ALLOCATOR, allocator) - .remoteAddress(server::address) - .secure( - ssl -> - ssl.sslContext( - SslContextBuilder.forClient() - .trustManager(InsecureTrustManagerFactory.INSTANCE))), - String.format( - "https://%s:%d/", - server.address().getHostName(), server.address().getPort())), - (address, allocator) -> { - try { - SelfSignedCertificate ssc = new SelfSignedCertificate(); - HttpServer server = - HttpServer.create() - .option(ChannelOption.ALLOCATOR, allocator) - .bindAddress(() -> address) - .secure( - ssl -> - ssl.sslContext( - SslContextBuilder.forServer( - ssc.certificate(), ssc.privateKey()))); - return WebsocketServerTransport.create(server); - } catch (CertificateException e) { - throw Exceptions.propagate(e); - } - }); - - @Override - public Duration getTimeout() { - return Duration.ofMinutes(5); - } +final class WebsocketSecureTransportTest + extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> new InetSocketAddress("localhost", 0), + (address, server, allocator) -> + WebsocketClientTransport.create( + HttpClient.create() + .option(ChannelOption.ALLOCATOR, allocator) + .remoteAddress(server::address) + .secure( + ssl -> + ssl.sslContext( + Http11SslContextSpec.forClient() + .configure( + scb -> + scb.trustManager( + InsecureTrustManagerFactory.INSTANCE)))), + String.format( + "https://%s:%d/", server.address().getHostName(), server.address().getPort())), + (address, allocator) -> { + try { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + HttpServer server = + HttpServer.create() + .option(ChannelOption.ALLOCATOR, allocator) + .bindAddress(() -> address) + .secure( + ssl -> + ssl.sslContext( + Http11SslContextSpec.forServer( + ssc.certificate(), ssc.privateKey()))); + return WebsocketServerTransport.create(server); + } catch (CertificateException e) { + throw Exceptions.propagate(e); + } + }, + Duration.ofMinutes(2)); } } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketTransportTest.java index 93d7bdb2f..632e1ff8c 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketTransportTest.java @@ -19,38 +19,32 @@ import io.netty.channel.ChannelOption; import io.rsocket.test.TransportTest; import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.WebsocketServerTransport; import java.net.InetSocketAddress; import java.time.Duration; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; -final class WebsocketTransportTest implements TransportTest { - - private final TransportPair transportPair = - new TransportPair<>( - () -> InetSocketAddress.createUnresolved("localhost", 0), - (address, server, allocator) -> - WebsocketClientTransport.create( - HttpClient.create() - .host(server.address().getHostName()) - .port(server.address().getPort()) - .option(ChannelOption.ALLOCATOR, allocator), - ""), - (address, allocator) -> - WebsocketServerTransport.create( - HttpServer.create() - .host(address.getHostName()) - .port(address.getPort()) - .option(ChannelOption.ALLOCATOR, allocator))); - - @Override - public Duration getTimeout() { - return Duration.ofMinutes(3); - } +final class WebsocketTransportTest extends TransportTest { @Override - public TransportPair getTransportPair() { - return transportPair; + protected TransportPair createTransportPair() { + return new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + WebsocketClientTransport.create( + HttpClient.create() + .host(server.address().getHostName()) + .port(server.address().getPort()) + .option(ChannelOption.ALLOCATOR, allocator), + ""), + (address, allocator) -> + WebsocketServerTransport.create( + HttpServer.create() + .host(address.getHostName()) + .port(address.getPort()) + .option(ChannelOption.ALLOCATOR, allocator)), + Duration.ofMinutes(2)); } } diff --git a/settings.gradle b/settings.gradle index 25c3feee5..45a5654d2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,6 +25,7 @@ include 'rsocket-micrometer' include 'rsocket-test' include 'rsocket-transport-local' include 'rsocket-transport-netty' +include 'rsocket-transport-aeron' include 'rsocket-bom' include 'rsocket-examples'