Skip to content

Commit 782b751

Browse files
committed
Netty TCP and WebSocket transports
1 parent 8f3a9ec commit 782b751

20 files changed

Lines changed: 1498 additions & 0 deletions

File tree

gradle/libs.versions.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ kotlinx-bcv = "0.14.0"
88

99
ktor = "2.3.8"
1010

11+
netty = "4.1.107.Final"
12+
13+
bouncycastle = "1.77"
14+
1115
turbine = "1.0.0"
1216

1317
rsocket-java = "1.1.3"
@@ -44,6 +48,11 @@ ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor" }
4448
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
4549
ktor-server-jetty = { module = "io.ktor:ktor-server-jetty", version.ref = "ktor" }
4650

51+
netty-handler = { module = "io.netty:netty-handler", version.ref = "netty" }
52+
netty-codec-http = { module = "io.netty:netty-codec-http", version.ref = "netty" }
53+
54+
bouncycastle = { module = "org.bouncycastle:bcpkix-jdk18on", version.ref = "bouncycastle" }
55+
4756
turbine = { module = "app.cash.turbine:turbine", version.ref = "turbine" }
4857

4958
rsocket-java-core = { module = 'io.rsocket:rsocket-core', version.ref = "rsocket-java" }

rsocket-internal-io/api/rsocket-internal-io.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public final class io/rsocket/kotlin/internal/io/ChannelsKt {
2424

2525
public final class io/rsocket/kotlin/internal/io/ContextKt {
2626
public static final fun childContext (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
27+
public static final fun invokeOnCancellation (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
28+
public static synthetic fun invokeOnCancellation$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
2729
public static final fun supervisorContext (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
2830
}
2931

rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Context.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,23 @@ import kotlin.coroutines.*
2121

2222
public fun CoroutineContext.supervisorContext(): CoroutineContext = plus(SupervisorJob(get(Job)))
2323
public fun CoroutineContext.childContext(): CoroutineContext = plus(Job(get(Job)))
24+
25+
public inline fun CoroutineScope.invokeOnCancellation(
26+
context: CoroutineContext = EmptyCoroutineContext,
27+
crossinline block: suspend () -> Unit,
28+
) {
29+
launch(context) {
30+
try {
31+
awaitCancellation()
32+
} catch (cause: Throwable) {
33+
withContext(NonCancellable) {
34+
try {
35+
block()
36+
} catch (suppressed: Throwable) {
37+
cause.addSuppressed(suppressed)
38+
}
39+
}
40+
throw cause
41+
}
42+
}
43+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTarget : io/rsocket/kotlin/transport/RSocketClientTarget {
2+
public abstract fun getRemoteAddress ()Ljava/net/SocketAddress;
3+
}
4+
5+
public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport : io/rsocket/kotlin/transport/RSocketTransport {
6+
public static final field Factory Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport$Factory;
7+
public fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTarget;
8+
}
9+
10+
public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
11+
}
12+
13+
public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
14+
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
15+
public abstract fun channel (Lkotlin/reflect/KClass;)V
16+
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
17+
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
18+
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
19+
}
20+
21+
public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerInstance : io/rsocket/kotlin/transport/RSocketServerInstance {
22+
public abstract fun getLocalAddress ()Ljava/net/InetSocketAddress;
23+
}
24+
25+
public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTarget : io/rsocket/kotlin/transport/RSocketServerTarget {
26+
public abstract fun getLocalAddress ()Ljava/net/InetSocketAddress;
27+
}
28+
29+
public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport : io/rsocket/kotlin/transport/RSocketTransport {
30+
public static final field Factory Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport$Factory;
31+
public fun target ()Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTarget;
32+
public fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTarget;
33+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;Ljava/lang/String;IILjava/lang/Object;)Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTarget;
34+
}
35+
36+
public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
37+
}
38+
39+
public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
40+
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
41+
public abstract fun channel (Lkotlin/reflect/KClass;)V
42+
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
43+
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Lio/netty/channel/EventLoopGroup;Z)V
44+
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
45+
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
46+
}
47+
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import rsocketbuild.*
18+
19+
plugins {
20+
id("rsocketbuild.multiplatform-library")
21+
}
22+
23+
description = "rsocket-kotlin Netty TCP client/server transport implementation"
24+
25+
kotlin {
26+
jvmTarget()
27+
28+
sourceSets {
29+
jvmMain.dependencies {
30+
implementation(projects.rsocketInternalIo)
31+
api(projects.rsocketCore)
32+
api(libs.netty.handler)
33+
}
34+
jvmTest.dependencies {
35+
implementation(projects.rsocketTransportTests)
36+
implementation(libs.bouncycastle)
37+
}
38+
}
39+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.netty.tcp
18+
19+
import io.ktor.utils.io.core.*
20+
import io.netty.buffer.*
21+
import io.netty.channel.*
22+
import io.netty.channel.socket.*
23+
import io.netty.handler.codec.*
24+
import io.netty.handler.ssl.*
25+
import io.rsocket.kotlin.internal.io.*
26+
import io.rsocket.kotlin.transport.*
27+
import kotlinx.coroutines.channels.*
28+
import kotlinx.coroutines.channels.Channel
29+
import java.net.*
30+
import kotlin.coroutines.*
31+
32+
internal class NettyTcpChannelHandler(
33+
private val sslContext: SslContext?,
34+
private val remoteAddress: SocketAddress?,
35+
) : ChannelInitializer<DuplexChannel>() {
36+
private val frames = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)
37+
38+
@RSocketTransportApi
39+
fun connect(
40+
context: CoroutineContext,
41+
channel: DuplexChannel,
42+
): NettyTcpSession = NettyTcpSession(
43+
coroutineContext = context,
44+
channel = channel,
45+
frames = frames
46+
)
47+
48+
override fun initChannel(ch: DuplexChannel): Unit = with(ch.pipeline()) {
49+
if (sslContext != null) {
50+
val sslHandler = if (
51+
remoteAddress is InetSocketAddress &&
52+
ch.parent() == null // not server
53+
) {
54+
sslContext.newHandler(ch.alloc(), remoteAddress.hostName, remoteAddress.port)
55+
} else {
56+
sslContext.newHandler(ch.alloc())
57+
}
58+
addLast("ssl", sslHandler)
59+
}
60+
addLast(
61+
"rsocket-length-encoder",
62+
LengthFieldPrepender(
63+
/* lengthFieldLength = */ 3
64+
)
65+
)
66+
addLast(
67+
"rsocket-length-decoder",
68+
LengthFieldBasedFrameDecoder(
69+
/* maxFrameLength = */ Int.MAX_VALUE,
70+
/* lengthFieldOffset = */ 0,
71+
/* lengthFieldLength = */ 3,
72+
/* lengthAdjustment = */ 0,
73+
/* initialBytesToStrip = */ 3
74+
)
75+
)
76+
addLast(
77+
"rsocket-frame-receiver",
78+
IncomingFramesChannelHandler(frames)
79+
)
80+
}
81+
82+
private class IncomingFramesChannelHandler(
83+
private val channel: SendChannel<ByteReadPacket>,
84+
) : SimpleChannelInboundHandler<ByteBuf>() {
85+
override fun channelInactive(ctx: ChannelHandlerContext) {
86+
channel.close() //TODO?
87+
super.channelInactive(ctx)
88+
}
89+
90+
override fun channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf) {
91+
channel.trySend(buildPacket {
92+
writeFully(msg.nioBuffer())
93+
}).getOrThrow()
94+
}
95+
}
96+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.netty.tcp
18+
19+
import io.netty.bootstrap.*
20+
import io.netty.channel.*
21+
import io.netty.channel.ChannelFactory
22+
import io.netty.channel.nio.*
23+
import io.netty.channel.socket.*
24+
import io.netty.channel.socket.nio.*
25+
import io.netty.handler.ssl.*
26+
import io.rsocket.kotlin.internal.io.*
27+
import io.rsocket.kotlin.transport.*
28+
import kotlinx.coroutines.*
29+
import java.net.*
30+
import kotlin.coroutines.*
31+
import kotlin.reflect.*
32+
33+
public sealed interface NettyTcpClientTarget : RSocketClientTarget {
34+
public val remoteAddress: SocketAddress
35+
}
36+
37+
public sealed interface NettyTcpClientTransport : RSocketTransport<
38+
InetSocketAddress,
39+
NettyTcpClientTarget> {
40+
41+
public fun target(hostname: String, port: Int): NettyTcpClientTarget = target(InetSocketAddress(hostname, port))
42+
43+
public companion object Factory : RSocketTransportFactory<
44+
InetSocketAddress,
45+
NettyTcpClientTarget,
46+
NettyTcpClientTransport,
47+
NettyTcpClientTransportBuilder>(::NettyTcpClientTransportBuilderImpl)
48+
}
49+
50+
public sealed interface NettyTcpClientTransportBuilder : RSocketTransportBuilder<
51+
InetSocketAddress,
52+
NettyTcpClientTarget,
53+
NettyTcpClientTransport> {
54+
55+
public fun channel(cls: KClass<out DuplexChannel>)
56+
public fun channelFactory(factory: ChannelFactory<out DuplexChannel>)
57+
public fun eventLoopGroup(group: EventLoopGroup, manage: Boolean)
58+
59+
public fun bootstrap(block: Bootstrap.() -> Unit)
60+
public fun ssl(block: SslContextBuilder.() -> Unit)
61+
}
62+
63+
private class NettyTcpClientTransportBuilderImpl : NettyTcpClientTransportBuilder {
64+
private var channelFactory: ChannelFactory<out DuplexChannel>? = null
65+
private var eventLoopGroup: EventLoopGroup? = null
66+
private var manageEventLoopGroup: Boolean = false
67+
private var bootstrap: (Bootstrap.() -> Unit)? = null
68+
private var ssl: (SslContextBuilder.() -> Unit)? = null
69+
70+
override fun channel(cls: KClass<out DuplexChannel>) {
71+
this.channelFactory = ReflectiveChannelFactory(cls.java)
72+
}
73+
74+
override fun channelFactory(factory: ChannelFactory<out DuplexChannel>) {
75+
this.channelFactory = factory
76+
}
77+
78+
override fun eventLoopGroup(group: EventLoopGroup, manage: Boolean) {
79+
this.eventLoopGroup = group
80+
this.manageEventLoopGroup = manage
81+
}
82+
83+
override fun bootstrap(block: Bootstrap.() -> Unit) {
84+
bootstrap = block
85+
}
86+
87+
override fun ssl(block: SslContextBuilder.() -> Unit) {
88+
ssl = block
89+
}
90+
91+
@RSocketTransportApi
92+
override fun buildTransport(context: CoroutineContext): NettyTcpClientTransport {
93+
val group = eventLoopGroup ?: NioEventLoopGroup()
94+
val factory = channelFactory ?: ReflectiveChannelFactory(NioSocketChannel::class.java)
95+
96+
val transportContext = context.supervisorContext() + group.asCoroutineDispatcher()
97+
if (manageEventLoopGroup) CoroutineScope(transportContext).invokeOnCancellation {
98+
group.shutdownGracefully().awaitFuture()
99+
}
100+
101+
val sslContext = ssl?.let {
102+
SslContextBuilder
103+
.forClient()
104+
.apply(it)
105+
.build()
106+
}
107+
108+
val bootstrap = Bootstrap().apply {
109+
bootstrap?.invoke(this)
110+
group(group)
111+
channelFactory(factory)
112+
}
113+
114+
return NettyTcpClientTransportImpl(
115+
coroutineContext = transportContext,
116+
sslContext = sslContext,
117+
bootstrap = bootstrap
118+
)
119+
}
120+
}
121+
122+
private class NettyTcpClientTransportImpl(
123+
override val coroutineContext: CoroutineContext,
124+
private val sslContext: SslContext?,
125+
private val bootstrap: Bootstrap,
126+
) : NettyTcpClientTransport {
127+
override fun target(address: InetSocketAddress): NettyTcpClientTarget = NettyTcpClientTargetImpl(
128+
coroutineContext = coroutineContext.supervisorContext(),
129+
remoteAddress = address,
130+
sslContext = sslContext,
131+
bootstrap = bootstrap
132+
)
133+
}
134+
135+
private class NettyTcpClientTargetImpl(
136+
override val coroutineContext: CoroutineContext,
137+
override val remoteAddress: SocketAddress,
138+
private val sslContext: SslContext?,
139+
private val bootstrap: Bootstrap,
140+
) : NettyTcpClientTarget {
141+
@RSocketTransportApi
142+
override suspend fun createSession(): RSocketTransportSession {
143+
ensureActive()
144+
145+
val handler = NettyTcpChannelHandler(
146+
sslContext = sslContext,
147+
remoteAddress = remoteAddress
148+
)
149+
val future = bootstrap.clone().apply {
150+
handler(handler)
151+
}.connect(remoteAddress)
152+
153+
future.awaitFuture()
154+
155+
return handler.connect(coroutineContext.childContext(), future.channel() as DuplexChannel)
156+
}
157+
}

0 commit comments

Comments
 (0)