diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInbound.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInbound.kt index aae8fc25..b061890c 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInbound.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInbound.kt @@ -54,7 +54,7 @@ internal class ConnectionInbound( } private fun receiveError(cause: Throwable) { - throw cause // TODO? + throw cause } fun createOperation(type: FrameType, requestJob: Job): ResponderOperation = when (type) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInitializer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInitializer.kt index f1ef8aac..f7a3bd54 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInitializer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInitializer.kt @@ -20,13 +20,16 @@ import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.io.* +import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* +@RSocketLoggingApi @RSocketTransportApi internal abstract class ConnectionInitializer( private val isClient: Boolean, private val frameCodec: FrameCodec, + private val frameLogger: Logger, private val connectionAcceptor: ConnectionAcceptor, private val interceptors: Interceptors, ) { @@ -42,11 +45,11 @@ internal abstract class ConnectionInitializer( else -> connection.acceptStream() ?: error("Initial stream should be received") } initialStream.setSendPriority(0) - MultiplexedConnection(isClient, frameCodec, connection, initialStream, requestsScope) + MultiplexedConnection(isClient, frameCodec, frameLogger, connection, initialStream, requestsScope) } is RSocketSequentialConnection -> { - SequentialConnection(isClient, frameCodec, connection, requestsScope) + SequentialConnection(isClient, frameCodec, frameLogger, connection, requestsScope) } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/MultiplexedConnection.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/MultiplexedConnection.kt index 430cf5d0..a9ba4e09 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/MultiplexedConnection.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/MultiplexedConnection.kt @@ -16,18 +16,22 @@ package io.rsocket.kotlin.connection +import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.operation.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* import kotlinx.io.* +@RSocketLoggingApi @RSocketTransportApi internal class MultiplexedConnection( isClient: Boolean, frameCodec: FrameCodec, + private val frameLogger: Logger, private val connection: RSocketMultiplexedConnection, private val initialStream: RSocketMultiplexedConnection.Stream, private val requestsScope: CoroutineScope, @@ -146,7 +150,7 @@ internal class MultiplexedConnection( // request is cancelled during fragmentation is CancelFrame -> error("Request was cancelled by remote party") is RequestFrame -> { - // TODO: extract assembly logic? + // TODO[fragmentation]: extract assembly logic? when { // for RC, it could contain the complete flag // complete+follows=complete, "complete" overrides "follows" flag @@ -194,7 +198,7 @@ internal class MultiplexedConnection( ): Unit = coroutineScope { val outbound = Outbound(streamId, stream) val receiveJob = launch { - val handler = OperationFrameHandler(operation) + val handler = OperationFrameHandler(operation, frameLogger) try { while (true) { val frame = frameCodec.decodeFrame( diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/SequentialConnection.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/SequentialConnection.kt index fe6fec8a..fd301aa9 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/SequentialConnection.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/SequentialConnection.kt @@ -16,17 +16,21 @@ package io.rsocket.kotlin.connection +import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.operation.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* import kotlinx.io.* +@RSocketLoggingApi @RSocketTransportApi internal class SequentialConnection( isClient: Boolean, frameCodec: FrameCodec, + private val frameLogger: Logger, private val connection: RSocketSequentialConnection, private val requestsScope: CoroutineScope, ) : ConnectionOutbound(frameCodec) { @@ -60,7 +64,7 @@ internal class SequentialConnection( ): Job = requestsScope.launch(start = CoroutineStart.ATOMIC) { operation.handleExecutionFailure(requestPayload) { ensureActive() // because of atomic start - val streamId = storage.createStream(OperationFrameHandler(operation)) + val streamId = storage.createStream(OperationFrameHandler(operation, frameLogger)) try { operation.execute(Outbound(streamId), requestPayload) } finally { @@ -116,7 +120,8 @@ internal class SequentialConnection( when { frame.follows -> ResponderInboundWrapper(connectionInbound, operationData) else -> acceptRequest(connectionInbound, operationData) - } + }, + frameLogger ) if (storage.acceptStream(streamId, handler)) { // for fragmentation @@ -159,7 +164,7 @@ internal class SequentialConnection( ) ) // close old handler - storage.replaceStream(operationData.streamId, OperationFrameHandler(operation))?.close() + storage.replaceStream(operationData.streamId, OperationFrameHandler(operation, frameLogger))?.close() } else { // should not happen really storage.removeStream(operationData.streamId)?.close() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt index b4c52669..9731ecc2 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt @@ -66,6 +66,7 @@ public class RSocketConnector internal constructor( private inner class SetupConnection() : ConnectionInitializer( isClient = true, frameCodec = FrameCodec(maxFragmentSize), + frameLogger = frameLogger, connectionAcceptor = acceptor, interceptors = interceptors ) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt index bf350992..49af8422 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt @@ -66,6 +66,7 @@ public class RSocketServer internal constructor( private inner class AcceptConnection(acceptor: ConnectionAcceptor) : ConnectionInitializer( isClient = false, frameCodec = FrameCodec(maxFragmentSize), + frameLogger = frameLogger, connectionAcceptor = acceptor, interceptors = interceptors ) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt index 66b8584e..3dfb44e5 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,8 @@ internal sealed class Frame : AutoCloseable { } internal fun dump(length: Long): String = buildString { - append("\n").append(type).append(" frame -> Stream Id: ").append(streamId).append(" Length: ").append(length) + append("\n").append(type).append(" frame -> Stream Id: ").append(streamId) + if (length != -1L) append(" Length: ").append(length) append("\nFlags: 0b").append(flags.toBinaryString()).append(" (").apply { appendFlags() }.append(")") appendSelf() } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameCodec.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameCodec.kt index cd661379..dd545031 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameCodec.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameCodec.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,5 +32,5 @@ internal class FrameCodec( fun encodeFrame(frame: Frame): Buffer = frame.toBuffer() - // TODO: move fragmentation logic here or into separate class? + // TODO[fragmentation]: move fragmentation logic here or into separate class? } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt index fcc562db..4a3b4477 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,7 +78,7 @@ internal fun Source.readRequest( return RequestFrame(type, streamId, fragmentFollows, complete, next, initialRequest, payload) } -//TODO rename or remove on fragmentation implementation +//TODO[fragmentation] rename or remove on fragmentation implementation internal fun RequestFireAndForgetFrame(streamId: Int, payload: Payload): RequestFrame = RequestFrame(FrameType.RequestFnF, streamId, false, false, false, 0, payload) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt index 778cbe8a..033a2cdf 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ private const val HonorLeaseFlag = 64 private const val ResumeEnabledFlag = 128 internal class SetupFrame( - val version: Version, //TODO check + val version: Version, val honorLease: Boolean, val keepAlive: KeepAlive, val resumeToken: Buffer?, @@ -104,7 +104,7 @@ private fun Source.readStringMimeType(): String { } private fun Sink.writeStringMimeType(mimeType: String) { - val bytes = mimeType.encodeToByteArray() //TODO check + val bytes = mimeType.encodeToByteArray() writeByte(bytes.size.toByte()) write(bytes) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt index a518837e..5bfa7e7d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +24,6 @@ private fun safeFindNextPositivePowerOfTwo(value: Int): Int = when { else -> 1 shl 32 - (value - 1).countLeadingZeroBits() } -// TODO: may be move to `internal-io` (and rename to just `rsocket-internal`) -// and use in prioritization queue to support more granular prioritization for streams -// -// TODO decide, is it needed, or can be replaced by simple map, or concurrent map on JVM? -// do benchmarks /** * IntMap implementation based on Netty IntObjectHashMap. */ diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadAssembler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadAssembler.kt index da4e94b3..7ddb311f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadAssembler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadAssembler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,9 +20,8 @@ import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.payload.* import kotlinx.io.* -// TODO: make metadata should be fully transmitted before data +// TODO[fragmentation]: make metadata should be fully transmitted before data internal class PayloadAssembler : AutoCloseable { - // TODO: better name val hasPayload: Boolean get() = data != null diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadChannel.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadChannel.kt index 656e34f4..1248097d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadChannel.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadChannel.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* internal class PayloadChannel { - // TODO: capacity should be configurable private val payloads = channelForCloseable(Channel.UNLIMITED) private val requestNs = Channel(Channel.UNLIMITED) @@ -39,7 +38,6 @@ internal class PayloadChannel { @ExperimentalStreamsApi suspend fun consumeInto(collector: FlowCollector, strategy: RequestStrategy.Element): Throwable? { - // TODO: requestNs should be cancelled on success path? payloads.consume { while (true) { payloads diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationInbound.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationInbound.kt index e3eceff5..6aeef7b5 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationInbound.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationInbound.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,10 @@ package io.rsocket.kotlin.operation +import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.payload.* internal interface OperationInbound { @@ -33,8 +35,11 @@ internal interface OperationInbound { fun receiveDone() {} } -// TODO: merge into OperationInbound? -internal class OperationFrameHandler(private val inbound: OperationInbound) { +@RSocketLoggingApi +internal class OperationFrameHandler( + private val inbound: OperationInbound, + private val frameLogger: Logger, +) { private val assembler = PayloadAssembler() fun close() { @@ -47,8 +52,7 @@ internal class OperationFrameHandler(private val inbound: OperationInbound) { fun handleFrame(frame: Frame) { if (!inbound.shouldReceiveFrame(frame.type)) { - // TODO: replace with logging - println("unexpected frame: $frame") + frameLogger.debug { "Received unexpected frame: ${frame.dump(-1)}" } return frame.close() } @@ -57,14 +61,13 @@ internal class OperationFrameHandler(private val inbound: OperationInbound) { is ErrorFrame -> inbound.receiveErrorFrame(frame.throwable) is RequestNFrame -> inbound.receiveRequestNFrame(frame.requestN) is RequestFrame -> { - // TODO: split frames if (frame.initialRequest != 0) inbound.receiveRequestNFrame(frame.initialRequest) val payload = when { // complete+follows=complete frame.complete -> when { frame.next -> assembler.assemblePayload(frame.payload) - // TODO - what if we previously received fragment? + // TODO[fragmentation] - what if we previously received fragment? else -> { check(!assembler.hasPayload) { "wrong combination of frames" } null @@ -85,11 +88,9 @@ internal class OperationFrameHandler(private val inbound: OperationInbound) { } inbound.receivePayloadFrame(payload, frame.complete) -// -// -// // TODO: recheck notes -// // TODO: if there are no fragments saved and there are no following - we can ignore going through buffer -// // TODO: really, fragment could be NULL when `complete` is true, but `next` is false + +// // TODO[fragmentation]: if there are no fragments saved and there are no following - we can ignore going through buffer +// // TODO[fragmentation]: really, fragment could be NULL when `complete` is true, but `next` is false // if (frame.next || frame.type.isRequestType) appendFragment(frame.payload) // if (frame.complete) inbound.receivePayloadFrame(assemblePayload(), complete = true) // else if (!frame.follows) inbound.receivePayloadFrame(assemblePayload(), complete = false) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationOutbound.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationOutbound.kt index f0520234..0cf24b52 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationOutbound.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationOutbound.kt @@ -31,7 +31,7 @@ internal abstract class OperationOutbound( protected val streamId: Int, private val frameCodec: FrameCodec, ) { - // TODO: decide on it + // TODO[fragmentation]: decide on it // private var firstRequestFrameSent: Boolean = false protected abstract suspend fun sendFrame(frame: Buffer) @@ -71,8 +71,7 @@ internal abstract class OperationOutbound( return sendRequestPayload(type, payload, complete, initialRequest) } - // TODO rework/simplify later - // TODO release on fail ? + // TODO[fragmentation] rework/simplify later, release on fail ? private suspend fun sendRequestPayload(type: FrameType, payload: Payload, complete: Boolean, initialRequest: Int) { if (!payload.isFragmentable(type.hasInitialRequest)) { return sendFrame(RequestFrame(type, streamId, false, complete, true, initialRequest, payload)) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt index c5ee0b9e..2c9b4c5a 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt @@ -77,7 +77,6 @@ internal class RequesterRequestChannelOperation( responsePayloads.isActive -> frameType == FrameType.Payload || frameType == FrameType.Error else -> false } || when { - // TODO: handle cancel, when `senderJob` is not started senderJob == null || senderJob?.isActive == true -> frameType == FrameType.RequestN || frameType == FrameType.Cancel else -> false } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt index 6f2103e1..83dd6d2a 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt @@ -35,7 +35,6 @@ internal class RequesterRequestResponseOperation( ) responseDeferred.join() } catch (cause: Throwable) { - // TODO: we don't need to send cancel if we have sent no frames nonCancellable { outbound.sendCancel() } throw cause } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/ResponderRequestChannelOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/ResponderRequestChannelOperation.kt index 9cb0ccba..c33b4177 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/ResponderRequestChannelOperation.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/ResponderRequestChannelOperation.kt @@ -72,7 +72,7 @@ internal class ResponderRequestChannelOperation( frameType === FrameType.Cancel || when { requestPayloads.isActive -> frameType === FrameType.Payload || frameType === FrameType.Error else -> false - } || frameType === FrameType.RequestN // TODO + } || frameType === FrameType.RequestN override fun receiveRequestNFrame(requestN: Int) { limiter.updateRequests(requestN) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/internal/PrioritizationFrameQueue.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/internal/PrioritizationFrameQueue.kt index 062848aa..5f7881c4 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/internal/PrioritizationFrameQueue.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/internal/PrioritizationFrameQueue.kt @@ -51,7 +51,6 @@ public class PrioritizationFrameQueue { return null } - // TODO: recheck, that it works fine in case priority channel is closed, but normal channel has other frames to send public suspend fun dequeueFrame(): Buffer? { tryDequeueFrame()?.let { return it } return select { @@ -60,7 +59,6 @@ public class PrioritizationFrameQueue { }.getOrNull() } - // TODO: document public fun close() { priorityFrames.close() normalFrames.close() diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt index 138b8957..3968de58 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt @@ -293,8 +293,6 @@ abstract class RSocketTest( fun testStreamRequestN() = test { start(RSocketRequestHandler { requestStream { - // TODO: we should really call close here - - it.close() (0..9).asFlow().map { payload(it.toString()) } } }) @@ -508,7 +506,7 @@ abstract class RSocketTest( delay(100) assertFalse(requesterChannel.isClosedForSend, "requesterChannel.isClosedForSend=false") assertFalse(responderChannel.isClosedForReceive, "responderChannel.isClosedForReceive=false") - payloads.forEach { requesterChannel.send(it.copy()) } //TODO? + payloads.forEach { requesterChannel.send(it.copy()) } payloads.forEach { responderChannel.checkReceived(it) } } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperationTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperationTest.kt index 7f3de311..9c9c6899 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperationTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperationTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,17 +18,17 @@ package io.rsocket.kotlin.operation import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.test.* import kotlinx.coroutines.* import kotlinx.io.* import kotlin.test.* -// TODO: write better tests class RequesterRequestResponseOperationTest : SuspendTest { private val deferred = CompletableDeferred() private val operation = RequesterRequestResponseOperation(deferred) - private val handler = OperationFrameHandler(operation) + private val handler = OperationFrameHandler(operation, NoopLogger) @Test fun testCompleteOnPayloadReceive() = test { diff --git a/rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Channels.kt b/rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Channels.kt index b1f9ee19..bb074344 100644 --- a/rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Channels.kt +++ b/rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Channels.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ private val onUndeliveredBuffer: (Buffer) -> Unit = Buffer::clear public fun bufferChannel(capacity: Int): Channel = Channel(capacity, onUndeliveredElement = onUndeliveredBuffer) -// TODO: may be drop it? public fun channelForCloseable(capacity: Int): Channel = Channel(capacity, onUndeliveredElement = onUndeliveredCloseable) diff --git a/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt index 3b66c7bb..4d7a123a 100644 --- a/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt +++ b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt @@ -30,7 +30,6 @@ import kotlin.time.* import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds -//TODO: need to somehow rework those tests, as now they are super flaky abstract class TransportTest : SuspendTest { override val testTimeout: Duration = 10.minutes diff --git a/rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpClientTransport.kt b/rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpClientTransport.kt index 164644e4..1f836f0e 100644 --- a/rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpClientTransport.kt +++ b/rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpClientTransport.kt @@ -36,7 +36,6 @@ public sealed interface KtorTcpClientTransport : RSocketTransport { public sealed interface KtorTcpClientTransportBuilder : RSocketTransportBuilder { public fun selectorManager(manager: SelectorManager, manage: Boolean) public fun socketOptions(block: SocketOptions.TCPClientSocketOptions.() -> Unit) - //TODO: TLS support } private class KtorTcpClientTransportBuilderImpl : KtorTcpClientTransportBuilder { diff --git a/rsocket-transports/netty-internal/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/internal/io.kt b/rsocket-transports/netty-internal/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/internal/io.kt index 4050e6c2..956e71df 100644 --- a/rsocket-transports/netty-internal/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/internal/io.kt +++ b/rsocket-transports/netty-internal/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/internal/io.kt @@ -37,7 +37,7 @@ public fun ByteBuf.toBuffer(): Buffer { @OptIn(UnsafeIoApi::class) public fun Buffer.toByteBuf(allocator: ByteBufAllocator): ByteBuf { - val nettyBuffer = allocator.directBuffer(size.toInt()) // TODO: length + val nettyBuffer = allocator.directBuffer(size.toInt()) while (!exhausted()) { UnsafeBufferOperations.readFromHead(this) { bytes, start, end -> nettyBuffer.writeBytes(bytes, start, end - start) diff --git a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt b/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt index a3450902..778721ee 100644 --- a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt +++ b/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +45,7 @@ internal class TcpConnection( } } - val frameAssembler = FrameWithLengthAssembler { receiveChannel.trySend(it) } //TODO + val frameAssembler = FrameWithLengthAssembler { receiveChannel.trySend(it) } socket.on( onData = frameAssembler::write, onError = { coroutineContext.job.cancel("Socket error", it) }, diff --git a/rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt b/rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt index bc219bcb..25d3b50f 100644 --- a/rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt +++ b/rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt @@ -17,7 +17,6 @@ package io.rsocket.kotlin.transport.nodejs.tcp import io.rsocket.kotlin.transport.tests.* -import kotlinx.coroutines.* import kotlin.test.* @Suppress("DEPRECATION_ERROR") @@ -32,7 +31,6 @@ class TcpTransportTest : TransportTest() { } override suspend fun after() { - delay(100) //TODO close race super.after() server.close() }