Skip to content

Commit f917dc0

Browse files
committed
implement Local multiplexed transport
1 parent bc2c384 commit f917dc0

File tree

6 files changed

+234
-69
lines changed

6 files changed

+234
-69
lines changed

rsocket-transports/local/api/rsocket-transport-local.api

+4-1
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@ public final class io/rsocket/kotlin/transport/local/LocalServerTransport$Factor
4040
}
4141

4242
public abstract interface class io/rsocket/kotlin/transport/local/LocalServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
43-
public abstract fun connectionBufferCapacity (I)V
4443
public abstract fun dispatcher (Lkotlin/coroutines/CoroutineContext;)V
4544
public fun inheritDispatcher ()V
45+
public abstract fun multiplexed (II)V
46+
public static synthetic fun multiplexed$default (Lio/rsocket/kotlin/transport/local/LocalServerTransportBuilder;IIILjava/lang/Object;)V
47+
public abstract fun sequential (I)V
48+
public static synthetic fun sequential$default (Lio/rsocket/kotlin/transport/local/LocalServerTransportBuilder;IILjava/lang/Object;)V
4649
}
4750

rsocket-transports/local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalClientTransport.kt

+1-5
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,5 @@ private class LocalClientTargetImpl(
7979
) : LocalClientTarget {
8080

8181
@RSocketTransportApi
82-
override suspend fun createSession(): RSocketTransportSession {
83-
ensureActive()
84-
85-
return server.connect(coroutineContext)
86-
}
82+
override suspend fun createSession(): RSocketTransportSession = server.connect(this)
8783
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.local
18+
19+
import io.ktor.utils.io.core.*
20+
import io.rsocket.kotlin.internal.io.*
21+
import io.rsocket.kotlin.transport.*
22+
import kotlinx.coroutines.*
23+
import kotlinx.coroutines.channels.*
24+
import kotlin.coroutines.*
25+
26+
internal sealed class LocalServerConnector {
27+
@RSocketTransportApi
28+
abstract fun connect(
29+
acceptor: RSocketServerAcceptor,
30+
clientScope: CoroutineScope,
31+
serverScope: CoroutineScope,
32+
): RSocketTransportSession
33+
34+
class Sequential(
35+
private val connectionBufferCapacity: Int,
36+
) : LocalServerConnector() {
37+
@RSocketTransportApi
38+
override fun connect(
39+
acceptor: RSocketServerAcceptor,
40+
clientScope: CoroutineScope,
41+
serverScope: CoroutineScope,
42+
): RSocketTransportSession {
43+
clientScope.ensureActive()
44+
serverScope.ensureActive()
45+
46+
val clientToServer = channelForCloseable<ByteReadPacket>(connectionBufferCapacity)
47+
val serverToClient = channelForCloseable<ByteReadPacket>(connectionBufferCapacity)
48+
49+
serverScope.launch {
50+
acceptor.acceptSession(
51+
Session(
52+
coroutineContext = coroutineContext.childContext(),
53+
outbound = serverToClient,
54+
inbound = clientToServer
55+
)
56+
)
57+
}
58+
59+
return Session(
60+
coroutineContext = clientScope.coroutineContext.childContext(),
61+
outbound = clientToServer,
62+
inbound = serverToClient
63+
)
64+
}
65+
66+
67+
@RSocketTransportApi
68+
private class Session(
69+
override val coroutineContext: CoroutineContext,
70+
private val outbound: SendChannel<ByteReadPacket>,
71+
private val inbound: ReceiveChannel<ByteReadPacket>,
72+
) : RSocketTransportSession.Sequential {
73+
74+
init {
75+
coroutineContext.job.invokeOnCompletion {
76+
outbound.close(it)
77+
inbound.cancel(CancellationException("Local connection closed", it))
78+
}
79+
}
80+
81+
override suspend fun sendFrame(frame: ByteReadPacket) {
82+
outbound.send(frame)
83+
}
84+
85+
override suspend fun receiveFrame(): ByteReadPacket {
86+
return inbound.receive()
87+
}
88+
}
89+
90+
}
91+
92+
class Multiplexed(
93+
private val connectionStreamsQueueCapacity: Int,
94+
private val streamBufferCapacity: Int,
95+
) : LocalServerConnector() {
96+
@RSocketTransportApi
97+
override fun connect(
98+
acceptor: RSocketServerAcceptor,
99+
clientScope: CoroutineScope,
100+
serverScope: CoroutineScope,
101+
): RSocketTransportSession {
102+
clientScope.ensureActive()
103+
serverScope.ensureActive()
104+
105+
val clientToServer = channelForCloseable<Stream>(connectionStreamsQueueCapacity)
106+
val serverToClient = channelForCloseable<Stream>(connectionStreamsQueueCapacity)
107+
108+
serverScope.launch {
109+
acceptor.acceptSession(
110+
Session(
111+
coroutineContext = coroutineContext.childContext(),
112+
outbound = serverToClient,
113+
inbound = clientToServer,
114+
streamBufferCapacity = streamBufferCapacity
115+
)
116+
)
117+
}
118+
119+
return Session(
120+
coroutineContext = clientScope.coroutineContext.childContext(),
121+
outbound = clientToServer,
122+
inbound = serverToClient,
123+
streamBufferCapacity = streamBufferCapacity
124+
)
125+
}
126+
127+
@RSocketTransportApi
128+
private class Session(
129+
override val coroutineContext: CoroutineContext,
130+
private val outbound: SendChannel<Stream>,
131+
private val inbound: ReceiveChannel<Stream>,
132+
private val streamBufferCapacity: Int,
133+
) : RSocketTransportSession.Multiplexed {
134+
private val streamsContext = coroutineContext.supervisorContext()
135+
136+
init {
137+
coroutineContext.job.invokeOnCompletion {
138+
outbound.close(it)
139+
inbound.cancel(CancellationException("Local connection closed", it))
140+
}
141+
}
142+
143+
override suspend fun createStream(): RSocketTransportSession.Multiplexed.Stream {
144+
val clientToServer = channelForCloseable<ByteReadPacket>(streamBufferCapacity)
145+
val serverToClient = channelForCloseable<ByteReadPacket>(streamBufferCapacity)
146+
147+
outbound.send(
148+
Stream(
149+
coroutineContext = streamsContext.childContext(),
150+
outbound = serverToClient,
151+
inbound = clientToServer
152+
)
153+
)
154+
155+
return Stream(
156+
coroutineContext = streamsContext.childContext(),
157+
outbound = clientToServer,
158+
inbound = serverToClient
159+
)
160+
}
161+
162+
override suspend fun awaitStream(): RSocketTransportSession.Multiplexed.Stream {
163+
return inbound.receive()
164+
}
165+
}
166+
167+
@RSocketTransportApi
168+
private class Stream(
169+
override val coroutineContext: CoroutineContext,
170+
private val outbound: SendChannel<ByteReadPacket>,
171+
private val inbound: ReceiveChannel<ByteReadPacket>,
172+
) : RSocketTransportSession.Multiplexed.Stream, Closeable {
173+
174+
init {
175+
coroutineContext.job.invokeOnCompletion {
176+
outbound.close(it)
177+
inbound.cancel(CancellationException("Local stream closed", it))
178+
}
179+
}
180+
181+
override fun updatePriority(value: Int) {} // no effect
182+
183+
override suspend fun sendFrame(frame: ByteReadPacket) {
184+
outbound.send(frame)
185+
}
186+
187+
override suspend fun receiveFrame(): ByteReadPacket {
188+
return inbound.receive()
189+
}
190+
191+
override fun close() {
192+
cancel("Stream is closed") // just for undelivered element case
193+
}
194+
}
195+
196+
}
197+
}

rsocket-transports/local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServerInstanceImpl.kt

+3-50
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,17 @@
1616

1717
package io.rsocket.kotlin.transport.local
1818

19-
import io.ktor.utils.io.core.*
20-
import io.rsocket.kotlin.internal.io.*
2119
import io.rsocket.kotlin.transport.*
2220
import kotlinx.atomicfu.locks.*
2321
import kotlinx.coroutines.*
24-
import kotlinx.coroutines.channels.*
2522
import kotlin.coroutines.*
2623
import kotlin.random.*
2724

2825
internal class LocalServerInstanceImpl @RSocketTransportApi constructor(
2926
override val serverName: String,
3027
override val coroutineContext: CoroutineContext,
31-
private val connectionBufferCapacity: Int,
3228
private val acceptor: RSocketServerAcceptor,
29+
private val connector: LocalServerConnector,
3330
) : LocalServerInstance {
3431

3532
init {
@@ -38,31 +35,10 @@ internal class LocalServerInstanceImpl @RSocketTransportApi constructor(
3835
}
3936

4037
@RSocketTransportApi
41-
override suspend fun createSession(): RSocketTransportSession = connect(coroutineContext)
38+
override suspend fun createSession(): RSocketTransportSession = connect(this)
4239

4340
@RSocketTransportApi
44-
fun connect(clientContext: CoroutineContext): RSocketTransportSession {
45-
ensureActive()
46-
47-
val clientToServer = channelForCloseable<ByteReadPacket>(connectionBufferCapacity)
48-
val serverToClient = channelForCloseable<ByteReadPacket>(connectionBufferCapacity)
49-
50-
launch {
51-
acceptor.acceptSession(
52-
LocalSequentialSession(
53-
coroutineContext = coroutineContext.childContext(),
54-
outbound = serverToClient,
55-
inbound = clientToServer
56-
)
57-
)
58-
}
59-
60-
return LocalSequentialSession(
61-
coroutineContext = clientContext.childContext(),
62-
outbound = clientToServer,
63-
inbound = serverToClient
64-
)
65-
}
41+
fun connect(clientScope: CoroutineScope): RSocketTransportSession = connector.connect(acceptor, clientScope, this)
6642

6743
companion object {
6844
private val lock = SynchronizedObject()
@@ -85,26 +61,3 @@ internal class LocalServerInstanceImpl @RSocketTransportApi constructor(
8561
fun randomName(): String = Random.nextBytes(16).toHexString(HexFormat.UpperCase)
8662
}
8763
}
88-
89-
@RSocketTransportApi
90-
private class LocalSequentialSession(
91-
override val coroutineContext: CoroutineContext,
92-
private val outbound: SendChannel<ByteReadPacket>,
93-
private val inbound: ReceiveChannel<ByteReadPacket>,
94-
) : RSocketTransportSession.Sequential {
95-
96-
init {
97-
coroutineContext.job.invokeOnCompletion {
98-
outbound.close(it)
99-
inbound.cancel(CancellationException("Local connection closed", it))
100-
}
101-
}
102-
103-
override suspend fun sendFrame(frame: ByteReadPacket) {
104-
outbound.send(frame)
105-
}
106-
107-
override suspend fun receiveFrame(): ByteReadPacket {
108-
return inbound.receive()
109-
}
110-
}

rsocket-transports/local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServerTransport.kt

+16-11
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package io.rsocket.kotlin.transport.local
1919
import io.rsocket.kotlin.internal.io.*
2020
import io.rsocket.kotlin.transport.*
2121
import kotlinx.coroutines.*
22-
import kotlinx.coroutines.channels.*
2322
import kotlin.coroutines.*
2423

2524
// TODO: rename to inprocess and more to another module/package later
@@ -46,44 +45,50 @@ public sealed interface LocalServerTransportBuilder : RSocketTransportBuilder<
4645

4746
public fun dispatcher(context: CoroutineContext)
4847
public fun inheritDispatcher(): Unit = dispatcher(EmptyCoroutineContext)
49-
public fun connectionBufferCapacity(capacity: Int)
48+
49+
public fun sequential(connectionBufferCapacity: Int = 64)
50+
public fun multiplexed(connectionStreamsQueueCapacity: Int = 64, streamBufferCapacity: Int = 64)
5051
}
5152

5253
private class LocalServerTransportBuilderImpl : LocalServerTransportBuilder {
5354
private var dispatcher: CoroutineContext = Dispatchers.Default
54-
private var connectionBufferCapacity: Int = Channel.BUFFERED
55+
private var connector: LocalServerConnector? = null
5556

5657
override fun dispatcher(context: CoroutineContext) {
5758
check(context[Job] == null) { "Dispatcher shouldn't contain job" }
5859
this.dispatcher = context
5960
}
6061

61-
override fun connectionBufferCapacity(capacity: Int) {
62-
this.connectionBufferCapacity = capacity
62+
override fun sequential(connectionBufferCapacity: Int) {
63+
connector = LocalServerConnector.Sequential(connectionBufferCapacity)
64+
}
65+
66+
override fun multiplexed(connectionStreamsQueueCapacity: Int, streamBufferCapacity: Int) {
67+
connector = LocalServerConnector.Multiplexed(connectionStreamsQueueCapacity, streamBufferCapacity)
6368
}
6469

6570
@RSocketTransportApi
6671
override fun buildTransport(context: CoroutineContext): LocalServerTransport = LocalServerTransportImpl(
6772
coroutineContext = context.supervisorContext() + dispatcher,
68-
connectionBufferCapacity = connectionBufferCapacity
73+
connector = connector ?: LocalServerConnector.Sequential(64) // TODO: what is default?
6974
)
7075
}
7176

7277
private class LocalServerTransportImpl(
7378
override val coroutineContext: CoroutineContext,
74-
private val connectionBufferCapacity: Int,
79+
private val connector: LocalServerConnector,
7580
) : LocalServerTransport {
7681
override fun target(address: String): LocalServerTarget = LocalServerTargetImpl(
7782
serverName = address,
7883
coroutineContext = coroutineContext.supervisorContext(),
79-
connectionBufferCapacity = connectionBufferCapacity
84+
connector = connector
8085
)
8186
}
8287

8388
private class LocalServerTargetImpl(
8489
override val serverName: String,
8590
override val coroutineContext: CoroutineContext,
86-
private val connectionBufferCapacity: Int,
91+
private val connector: LocalServerConnector,
8792
) : LocalServerTarget {
8893
@RSocketTransportApi
8994
override suspend fun startServer(acceptor: RSocketServerAcceptor): LocalServerInstance {
@@ -92,8 +97,8 @@ private class LocalServerTargetImpl(
9297
return LocalServerInstanceImpl(
9398
serverName = serverName,
9499
coroutineContext = coroutineContext.supervisorContext(),
95-
connectionBufferCapacity = connectionBufferCapacity,
96-
acceptor = acceptor
100+
acceptor = acceptor,
101+
connector = connector
97102
)
98103
}
99104
}

0 commit comments

Comments
 (0)