Skip to content

Commit 28c6f03

Browse files
committed
migrate ktor integration to use new API
1 parent d5f05dd commit 28c6f03

File tree

6 files changed

+46
-52
lines changed

6 files changed

+46
-52
lines changed

rsocket-ktor/client/api/rsocket-ktor-client.api

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
public final class io/rsocket/kotlin/ktor/client/BuildersKt {
2-
public static final fun rSocket (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3-
public static final fun rSocket (Lio/ktor/client/HttpClient;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
2+
public static final fun rSocket (Lio/ktor/client/HttpClient;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3+
public static final fun rSocket (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
44
public static final fun rSocket (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
5-
public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
6-
public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
5+
public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
6+
public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
77
}
88

99
public final class io/rsocket/kotlin/ktor/client/RSocketSupport {

rsocket-ktor/client/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ kotlin {
3030
sourceSets {
3131
commonMain.dependencies {
3232
implementation(projects.rsocketTransportKtorWebsocketInternal)
33+
implementation(projects.rsocketInternalIo)
3334
api(projects.rsocketCore)
3435
api(libs.ktor.client.websockets)
3536
}

rsocket-ktor/client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,50 +22,48 @@ import io.ktor.client.plugins.websocket.*
2222
import io.ktor.client.request.*
2323
import io.ktor.http.*
2424
import io.rsocket.kotlin.*
25+
import io.rsocket.kotlin.internal.io.*
2526
import io.rsocket.kotlin.transport.*
2627
import io.rsocket.kotlin.transport.ktor.websocket.internal.*
28+
import kotlinx.coroutines.*
2729
import kotlin.coroutines.*
2830

2931
public suspend fun HttpClient.rSocket(
3032
request: HttpRequestBuilder.() -> Unit,
31-
): RSocket = plugin(RSocketSupport).run {
32-
connector.connect(KtorClientTransport(this@rSocket, request))
33-
}
33+
): RSocket = plugin(RSocketSupport).connector.connect(KtorWebSocketClientTarget(this, request))
3434

3535
public suspend fun HttpClient.rSocket(
3636
urlString: String,
37-
secure: Boolean = false,
3837
request: HttpRequestBuilder.() -> Unit = {},
39-
): RSocket = rSocket {
40-
url {
41-
this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS
42-
this.port = protocol.defaultPort
43-
takeFrom(urlString)
44-
}
38+
): RSocket = rSocket(method = HttpMethod.Get, host = null, port = null, path = null) {
39+
url.protocol = URLProtocol.WS
40+
url.port = url.protocol.defaultPort
41+
url.takeFrom(urlString)
4542
request()
4643
}
4744

4845
public suspend fun HttpClient.rSocket(
46+
method: HttpMethod = HttpMethod.Get,
4947
host: String? = null,
5048
port: Int? = null,
5149
path: String? = null,
52-
secure: Boolean = false,
5350
request: HttpRequestBuilder.() -> Unit = {},
5451
): RSocket = rSocket {
55-
url {
56-
this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS
57-
this.port = protocol.defaultPort
58-
set(host = host, port = port, path = path)
59-
}
52+
this.method = method
53+
url("ws", host, port, path)
6054
request()
6155
}
6256

63-
private class KtorClientTransport(
57+
private class KtorWebSocketClientTarget(
6458
private val client: HttpClient,
6559
private val request: HttpRequestBuilder.() -> Unit,
66-
) : ClientTransport {
67-
override val coroutineContext: CoroutineContext get() = client.coroutineContext
60+
) : RSocketClientTarget {
61+
override val coroutineContext: CoroutineContext = client.coroutineContext.supervisorContext()
62+
63+
@RSocketTransportApi
64+
override suspend fun createSession(): RSocketTransportSession {
65+
ensureActive()
6866

69-
@TransportApi
70-
override suspend fun connect(): Connection = WebSocketConnection(client.webSocketSession(request))
67+
return KtorWebSocketSession(client.webSocketSession(request))
68+
}
7169
}

rsocket-ktor/server/api/rsocket-ktor-server.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ public final class io/rsocket/kotlin/ktor/server/RSocketSupport$Feature : io/kto
1616
}
1717

1818
public final class io/rsocket/kotlin/ktor/server/RoutingKt {
19+
public static final fun rSocket (Lio/ktor/server/routing/Route;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;)V
1920
public static final fun rSocket (Lio/ktor/server/routing/Route;Ljava/lang/String;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;)V
21+
public static synthetic fun rSocket$default (Lio/ktor/server/routing/Route;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;ILjava/lang/Object;)V
2022
public static synthetic fun rSocket$default (Lio/ktor/server/routing/Route;Ljava/lang/String;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;ILjava/lang/Object;)V
2123
}
2224

rsocket-ktor/server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,24 @@ package io.rsocket.kotlin.ktor.server
1919
import io.ktor.server.application.*
2020
import io.ktor.server.websocket.*
2121
import io.ktor.util.*
22+
import io.rsocket.kotlin.*
2223
import io.rsocket.kotlin.core.*
24+
import io.rsocket.kotlin.transport.*
25+
import io.rsocket.kotlin.transport.ktor.websocket.internal.*
26+
import kotlinx.coroutines.*
2327

2428
public class RSocketSupport private constructor(
25-
internal val server: RSocketServer,
29+
private val server: RSocketServer,
2630
) {
31+
@RSocketTransportApi
32+
internal fun handler(acceptor: ConnectionAcceptor): suspend DefaultWebSocketServerSession.() -> Unit {
33+
val serverAcceptor = server.createAcceptor(acceptor)
34+
return {
35+
serverAcceptor.acceptSession(KtorWebSocketSession(this))
36+
coroutineContext.job.join()
37+
}
38+
}
39+
2740
public class Config internal constructor() {
2841
public var server: RSocketServer = RSocketServer()
2942
public fun server(block: RSocketServerBuilder.() -> Unit) {

rsocket-ktor/server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,11 @@ import io.ktor.server.routing.*
2121
import io.ktor.server.websocket.*
2222
import io.rsocket.kotlin.*
2323
import io.rsocket.kotlin.transport.*
24-
import io.rsocket.kotlin.transport.ktor.websocket.internal.*
25-
import kotlinx.coroutines.*
2624

27-
public fun Route.rSocket(
28-
path: String? = null,
29-
protocol: String? = null,
30-
acceptor: ConnectionAcceptor,
31-
): Unit = application.plugin(RSocketSupport).run {
32-
server.bindIn(application, KtorServerTransport(this@rSocket, path, protocol), acceptor)
33-
}
25+
@OptIn(RSocketTransportApi::class)
26+
public fun Route.rSocket(protocol: String? = null, acceptor: ConnectionAcceptor): Unit =
27+
webSocket(protocol, application.plugin(RSocketSupport).handler(acceptor))
3428

35-
private class KtorServerTransport(
36-
private val route: Route,
37-
private val path: String?,
38-
private val protocol: String?,
39-
) : ServerTransport<Unit> {
40-
@TransportApi
41-
override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit) {
42-
val handler: suspend DefaultWebSocketServerSession.() -> Unit = {
43-
val connection = WebSocketConnection(this)
44-
accept(connection)
45-
}
46-
when (path) {
47-
null -> route.webSocket(protocol, handler)
48-
else -> route.webSocket(path, protocol, handler)
49-
}
50-
}
51-
}
29+
@OptIn(RSocketTransportApi::class)
30+
public fun Route.rSocket(path: String, protocol: String? = null, acceptor: ConnectionAcceptor): Unit =
31+
webSocket(path, protocol, application.plugin(RSocketSupport).handler(acceptor))

0 commit comments

Comments
 (0)