Skip to content

Commit 8f3a9ec

Browse files
committed
migrate nodejs transport to new API
1 parent 28c6f03 commit 8f3a9ec

File tree

5 files changed

+279
-0
lines changed

5 files changed

+279
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.nodejs.tcp
18+
19+
public class NodejsTcpAddress(
20+
public val hostname: String,
21+
public val port: Int,
22+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.nodejs.tcp
18+
19+
import io.rsocket.kotlin.internal.io.*
20+
import io.rsocket.kotlin.transport.*
21+
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
22+
import kotlinx.coroutines.*
23+
import kotlin.coroutines.*
24+
25+
public sealed interface NodejsTcpClientTarget : RSocketClientTarget {
26+
public val address: NodejsTcpAddress
27+
}
28+
29+
public sealed interface NodejsTcpClientTransport : RSocketTransport<
30+
NodejsTcpAddress,
31+
NodejsTcpClientTarget> {
32+
33+
public fun target(hostname: String, port: Int): NodejsTcpClientTarget = target(NodejsTcpAddress(hostname, port))
34+
35+
public companion object Factory : RSocketTransportFactory<
36+
NodejsTcpAddress,
37+
NodejsTcpClientTarget,
38+
NodejsTcpClientTransport,
39+
NodejsTcpClientTransportBuilder>({ NodejsTcpClientTransportBuilderImpl })
40+
}
41+
42+
public sealed interface NodejsTcpClientTransportBuilder : RSocketTransportBuilder<
43+
NodejsTcpAddress,
44+
NodejsTcpClientTarget,
45+
NodejsTcpClientTransport>
46+
47+
private object NodejsTcpClientTransportBuilderImpl : NodejsTcpClientTransportBuilder {
48+
@RSocketTransportApi
49+
override fun buildTransport(context: CoroutineContext): NodejsTcpClientTransport = NodejsTcpClientTransportImpl(
50+
coroutineContext = context.supervisorContext(),
51+
)
52+
}
53+
54+
private class NodejsTcpClientTransportImpl(
55+
override val coroutineContext: CoroutineContext,
56+
) : NodejsTcpClientTransport {
57+
58+
override fun target(address: NodejsTcpAddress): NodejsTcpClientTarget = NodejsTcpClientTargetImpl(
59+
coroutineContext = coroutineContext.supervisorContext(),
60+
address = address
61+
)
62+
}
63+
64+
private class NodejsTcpClientTargetImpl(
65+
override val coroutineContext: CoroutineContext,
66+
override val address: NodejsTcpAddress,
67+
) : NodejsTcpClientTarget {
68+
69+
@RSocketTransportApi
70+
override suspend fun createSession(): RSocketTransportSession {
71+
ensureActive()
72+
73+
return NodejsTcpSession(coroutineContext.childContext(), connect(address.port, address.hostname))
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.nodejs.tcp
18+
19+
import io.rsocket.kotlin.internal.io.*
20+
import io.rsocket.kotlin.transport.*
21+
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
22+
import kotlinx.coroutines.*
23+
import kotlin.coroutines.*
24+
25+
public sealed interface NodejsTcpServerInstance : RSocketServerInstance {
26+
public val address: NodejsTcpAddress
27+
}
28+
29+
public sealed interface NodejsTcpServerTarget : RSocketServerTarget<NodejsTcpServerInstance> {
30+
public val address: NodejsTcpAddress
31+
}
32+
33+
public sealed interface NodejsTcpServerTransport : RSocketTransport<
34+
NodejsTcpAddress,
35+
NodejsTcpServerTarget> {
36+
37+
public fun target(hostname: String, port: Int): NodejsTcpServerTarget = target(NodejsTcpAddress(hostname, port))
38+
39+
public companion object Factory : RSocketTransportFactory<
40+
NodejsTcpAddress,
41+
NodejsTcpServerTarget,
42+
NodejsTcpServerTransport,
43+
NodejsTcpServerTransportBuilder>({ NodejsTcpServerTransportBuilderImpl })
44+
}
45+
46+
public sealed interface NodejsTcpServerTransportBuilder : RSocketTransportBuilder<
47+
NodejsTcpAddress,
48+
NodejsTcpServerTarget,
49+
NodejsTcpServerTransport>
50+
51+
private object NodejsTcpServerTransportBuilderImpl : NodejsTcpServerTransportBuilder {
52+
@RSocketTransportApi
53+
override fun buildTransport(context: CoroutineContext): NodejsTcpServerTransport = NodejsTcpServerTransportImpl(
54+
coroutineContext = context.supervisorContext(),
55+
)
56+
}
57+
58+
private class NodejsTcpServerTransportImpl(
59+
override val coroutineContext: CoroutineContext,
60+
) : NodejsTcpServerTransport {
61+
override fun target(address: NodejsTcpAddress): NodejsTcpServerTarget = NodejsTcpServerTargetImpl(
62+
coroutineContext = coroutineContext.supervisorContext(),
63+
address = address
64+
)
65+
}
66+
67+
private class NodejsTcpServerTargetImpl(
68+
override val coroutineContext: CoroutineContext,
69+
override val address: NodejsTcpAddress,
70+
) : NodejsTcpServerTarget {
71+
72+
@RSocketTransportApi
73+
override suspend fun startServer(acceptor: RSocketServerAcceptor): NodejsTcpServerInstance {
74+
ensureActive()
75+
76+
return NodejsTcpServerInstanceImpl(
77+
coroutineContext = coroutineContext.supervisorContext(),
78+
address = address,
79+
acceptor = acceptor,
80+
)
81+
}
82+
}
83+
84+
@RSocketTransportApi
85+
private class NodejsTcpServerInstanceImpl(
86+
override val coroutineContext: CoroutineContext,
87+
override val address: NodejsTcpAddress,
88+
private val acceptor: RSocketServerAcceptor,
89+
) : NodejsTcpServerInstance {
90+
init {
91+
val server = createServer(address.port, address.hostname, {
92+
coroutineContext.job.cancel("Server closed")
93+
}) {
94+
launch {
95+
acceptor.acceptSession(NodejsTcpSession(coroutineContext.childContext(), it))
96+
}
97+
}
98+
launch {
99+
try {
100+
awaitCancellation()
101+
} catch (cause: Throwable) {
102+
suspendCoroutine { cont -> server.close { cont.resume(Unit) } }
103+
throw cause
104+
}
105+
}
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.nodejs.tcp
18+
19+
import io.ktor.utils.io.core.*
20+
import io.ktor.utils.io.js.*
21+
import io.rsocket.kotlin.internal.io.*
22+
import io.rsocket.kotlin.transport.*
23+
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
24+
import kotlinx.coroutines.*
25+
import kotlinx.coroutines.channels.*
26+
import org.khronos.webgl.*
27+
import kotlin.coroutines.*
28+
29+
@RSocketTransportApi
30+
internal class NodejsTcpSession(
31+
override val coroutineContext: CoroutineContext,
32+
private val socket: Socket,
33+
) : RSocketTransportSession.Sequential {
34+
35+
private val sendChannel = channelForCloseable<ByteReadPacket>(8)
36+
private val receiveChannel = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)
37+
38+
init {
39+
launch {
40+
sendChannel.consumeEach { packet ->
41+
socket.write(Uint8Array(packet.withLength().readArrayBuffer()))
42+
}
43+
}
44+
45+
coroutineContext.job.invokeOnCompletion {
46+
when (it) {
47+
null -> socket.destroy()
48+
else -> socket.destroy(Error(it.message, it.cause))
49+
}
50+
}
51+
52+
val frameAssembler = FrameWithLengthAssembler { receiveChannel.trySend(it) } //TODO
53+
socket.on(
54+
onData = { frameAssembler.write { writeFully(it.buffer) } },
55+
onError = { coroutineContext.job.cancel("Socket error", it) },
56+
onClose = { if (!it) coroutineContext.job.cancel("Socket closed") }
57+
)
58+
}
59+
60+
override suspend fun sendFrame(frame: ByteReadPacket) {
61+
sendChannel.send(frame)
62+
}
63+
64+
override suspend fun receiveFrame(): ByteReadPacket {
65+
return receiveChannel.receive()
66+
}
67+
}

rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt

+8
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,11 @@ class TcpTransportTest : TransportTest() {
3434
server.close()
3535
}
3636
}
37+
38+
class NodejsTcpTransportTest : TransportTest() {
39+
override suspend fun before() {
40+
val port = PortProvider.next()
41+
startServer(NodejsTcpServerTransport(testContext).target("127.0.0.1", port))
42+
client = connectClient(NodejsTcpClientTransport(testContext).target("127.0.0.1", port))
43+
}
44+
}

0 commit comments

Comments
 (0)