Skip to content

Drop or rename TODOs (issues will be created) and use logging for unexpected frame #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ internal class ConnectionInbound(
}

private fun receiveError(cause: Throwable) {
throw cause // TODO?
throw cause
}

fun createOperation(type: FrameType, requestJob: Job): ResponderOperation = when (type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*

@RSocketLoggingApi
@RSocketTransportApi
internal abstract class ConnectionInitializer(
private val isClient: Boolean,
private val frameCodec: FrameCodec,
private val frameLogger: Logger,
private val connectionAcceptor: ConnectionAcceptor,
private val interceptors: Interceptors,
) {
Expand All @@ -42,11 +45,11 @@ internal abstract class ConnectionInitializer(
else -> connection.acceptStream() ?: error("Initial stream should be received")
}
initialStream.setSendPriority(0)
MultiplexedConnection(isClient, frameCodec, connection, initialStream, requestsScope)
MultiplexedConnection(isClient, frameCodec, frameLogger, connection, initialStream, requestsScope)
}

is RSocketSequentialConnection -> {
SequentialConnection(isClient, frameCodec, connection, requestsScope)
SequentialConnection(isClient, frameCodec, frameLogger, connection, requestsScope)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@

package io.rsocket.kotlin.connection

import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.operation.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlinx.io.*

@RSocketLoggingApi
@RSocketTransportApi
internal class MultiplexedConnection(
isClient: Boolean,
frameCodec: FrameCodec,
private val frameLogger: Logger,
private val connection: RSocketMultiplexedConnection,
private val initialStream: RSocketMultiplexedConnection.Stream,
private val requestsScope: CoroutineScope,
Expand Down Expand Up @@ -146,7 +150,7 @@ internal class MultiplexedConnection(
// request is cancelled during fragmentation
is CancelFrame -> error("Request was cancelled by remote party")
is RequestFrame -> {
// TODO: extract assembly logic?
// TODO[fragmentation]: extract assembly logic?
when {
// for RC, it could contain the complete flag
// complete+follows=complete, "complete" overrides "follows" flag
Expand Down Expand Up @@ -194,7 +198,7 @@ internal class MultiplexedConnection(
): Unit = coroutineScope {
val outbound = Outbound(streamId, stream)
val receiveJob = launch {
val handler = OperationFrameHandler(operation)
val handler = OperationFrameHandler(operation, frameLogger)
try {
while (true) {
val frame = frameCodec.decodeFrame(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@

package io.rsocket.kotlin.connection

import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.operation.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlinx.io.*

@RSocketLoggingApi
@RSocketTransportApi
internal class SequentialConnection(
isClient: Boolean,
frameCodec: FrameCodec,
private val frameLogger: Logger,
private val connection: RSocketSequentialConnection,
private val requestsScope: CoroutineScope,
) : ConnectionOutbound(frameCodec) {
Expand Down Expand Up @@ -60,7 +64,7 @@ internal class SequentialConnection(
): Job = requestsScope.launch(start = CoroutineStart.ATOMIC) {
operation.handleExecutionFailure(requestPayload) {
ensureActive() // because of atomic start
val streamId = storage.createStream(OperationFrameHandler(operation))
val streamId = storage.createStream(OperationFrameHandler(operation, frameLogger))
try {
operation.execute(Outbound(streamId), requestPayload)
} finally {
Expand Down Expand Up @@ -116,7 +120,8 @@ internal class SequentialConnection(
when {
frame.follows -> ResponderInboundWrapper(connectionInbound, operationData)
else -> acceptRequest(connectionInbound, operationData)
}
},
frameLogger
)
if (storage.acceptStream(streamId, handler)) {
// for fragmentation
Expand Down Expand Up @@ -159,7 +164,7 @@ internal class SequentialConnection(
)
)
// close old handler
storage.replaceStream(operationData.streamId, OperationFrameHandler(operation))?.close()
storage.replaceStream(operationData.streamId, OperationFrameHandler(operation, frameLogger))?.close()
} else {
// should not happen really
storage.removeStream(operationData.streamId)?.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class RSocketConnector internal constructor(
private inner class SetupConnection() : ConnectionInitializer(
isClient = true,
frameCodec = FrameCodec(maxFragmentSize),
frameLogger = frameLogger,
connectionAcceptor = acceptor,
interceptors = interceptors
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class RSocketServer internal constructor(
private inner class AcceptConnection(acceptor: ConnectionAcceptor) : ConnectionInitializer(
isClient = false,
frameCodec = FrameCodec(maxFragmentSize),
frameLogger = frameLogger,
connectionAcceptor = acceptor,
interceptors = interceptors
) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,7 +41,8 @@ internal sealed class Frame : AutoCloseable {
}

internal fun dump(length: Long): String = buildString {
append("\n").append(type).append(" frame -> Stream Id: ").append(streamId).append(" Length: ").append(length)
append("\n").append(type).append(" frame -> Stream Id: ").append(streamId)
if (length != -1L) append(" Length: ").append(length)
append("\nFlags: 0b").append(flags.toBinaryString()).append(" (").apply { appendFlags() }.append(")")
appendSelf()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,5 +32,5 @@ internal class FrameCodec(

fun encodeFrame(frame: Frame): Buffer = frame.toBuffer()

// TODO: move fragmentation logic here or into separate class?
// TODO[fragmentation]: move fragmentation logic here or into separate class?
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,7 +78,7 @@ internal fun Source.readRequest(
return RequestFrame(type, streamId, fragmentFollows, complete, next, initialRequest, payload)
}

//TODO rename or remove on fragmentation implementation
//TODO[fragmentation] rename or remove on fragmentation implementation
internal fun RequestFireAndForgetFrame(streamId: Int, payload: Payload): RequestFrame =
RequestFrame(FrameType.RequestFnF, streamId, false, false, false, 0, payload)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@ private const val HonorLeaseFlag = 64
private const val ResumeEnabledFlag = 128

internal class SetupFrame(
val version: Version, //TODO check
val version: Version,
val honorLease: Boolean,
val keepAlive: KeepAlive,
val resumeToken: Buffer?,
Expand Down Expand Up @@ -104,7 +104,7 @@ private fun Source.readStringMimeType(): String {
}

private fun Sink.writeStringMimeType(mimeType: String) {
val bytes = mimeType.encodeToByteArray() //TODO check
val bytes = mimeType.encodeToByteArray()
writeByte(bytes.size.toByte())
write(bytes)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,11 +24,6 @@ private fun safeFindNextPositivePowerOfTwo(value: Int): Int = when {
else -> 1 shl 32 - (value - 1).countLeadingZeroBits()
}

// TODO: may be move to `internal-io` (and rename to just `rsocket-internal`)
// and use in prioritization queue to support more granular prioritization for streams
//
// TODO decide, is it needed, or can be replaced by simple map, or concurrent map on JVM?
// do benchmarks
/**
* IntMap implementation based on Netty IntObjectHashMap.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,9 +20,8 @@ import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.io.*

// TODO: make metadata should be fully transmitted before data
// TODO[fragmentation]: make metadata should be fully transmitted before data
internal class PayloadAssembler : AutoCloseable {
// TODO: better name
val hasPayload: Boolean
get() = data != null

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,6 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

internal class PayloadChannel {
// TODO: capacity should be configurable
private val payloads = channelForCloseable<Payload>(Channel.UNLIMITED)
private val requestNs = Channel<Int>(Channel.UNLIMITED)

Expand All @@ -39,7 +38,6 @@ internal class PayloadChannel {

@ExperimentalStreamsApi
suspend fun consumeInto(collector: FlowCollector<Payload>, strategy: RequestStrategy.Element): Throwable? {
// TODO: requestNs should be cancelled on success path?
payloads.consume {
while (true) {
payloads
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,10 @@

package io.rsocket.kotlin.operation

import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.payload.*

internal interface OperationInbound {
Expand All @@ -33,8 +35,11 @@ internal interface OperationInbound {
fun receiveDone() {}
}

// TODO: merge into OperationInbound?
internal class OperationFrameHandler(private val inbound: OperationInbound) {
@RSocketLoggingApi
internal class OperationFrameHandler(
private val inbound: OperationInbound,
private val frameLogger: Logger,
) {
private val assembler = PayloadAssembler()

fun close() {
Expand All @@ -47,8 +52,7 @@ internal class OperationFrameHandler(private val inbound: OperationInbound) {

fun handleFrame(frame: Frame) {
if (!inbound.shouldReceiveFrame(frame.type)) {
// TODO: replace with logging
println("unexpected frame: $frame")
frameLogger.debug { "Received unexpected frame: ${frame.dump(-1)}" }
return frame.close()
}

Expand All @@ -57,14 +61,13 @@ internal class OperationFrameHandler(private val inbound: OperationInbound) {
is ErrorFrame -> inbound.receiveErrorFrame(frame.throwable)
is RequestNFrame -> inbound.receiveRequestNFrame(frame.requestN)
is RequestFrame -> {
// TODO: split frames
if (frame.initialRequest != 0) inbound.receiveRequestNFrame(frame.initialRequest)

val payload = when {
// complete+follows=complete
frame.complete -> when {
frame.next -> assembler.assemblePayload(frame.payload)
// TODO - what if we previously received fragment?
// TODO[fragmentation] - what if we previously received fragment?
else -> {
check(!assembler.hasPayload) { "wrong combination of frames" }
null
Expand All @@ -85,11 +88,9 @@ internal class OperationFrameHandler(private val inbound: OperationInbound) {
}

inbound.receivePayloadFrame(payload, frame.complete)
//
//
// // TODO: recheck notes
// // TODO: if there are no fragments saved and there are no following - we can ignore going through buffer
// // TODO: really, fragment could be NULL when `complete` is true, but `next` is false

// // TODO[fragmentation]: if there are no fragments saved and there are no following - we can ignore going through buffer
// // TODO[fragmentation]: really, fragment could be NULL when `complete` is true, but `next` is false
// if (frame.next || frame.type.isRequestType) appendFragment(frame.payload)
// if (frame.complete) inbound.receivePayloadFrame(assemblePayload(), complete = true)
// else if (!frame.follows) inbound.receivePayloadFrame(assemblePayload(), complete = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal abstract class OperationOutbound(
protected val streamId: Int,
private val frameCodec: FrameCodec,
) {
// TODO: decide on it
// TODO[fragmentation]: decide on it
// private var firstRequestFrameSent: Boolean = false

protected abstract suspend fun sendFrame(frame: Buffer)
Expand Down Expand Up @@ -71,8 +71,7 @@ internal abstract class OperationOutbound(
return sendRequestPayload(type, payload, complete, initialRequest)
}

// TODO rework/simplify later
// TODO release on fail ?
// TODO[fragmentation] rework/simplify later, release on fail ?
private suspend fun sendRequestPayload(type: FrameType, payload: Payload, complete: Boolean, initialRequest: Int) {
if (!payload.isFragmentable(type.hasInitialRequest)) {
return sendFrame(RequestFrame(type, streamId, false, complete, true, initialRequest, payload))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ internal class RequesterRequestChannelOperation(
responsePayloads.isActive -> frameType == FrameType.Payload || frameType == FrameType.Error
else -> false
} || when {
// TODO: handle cancel, when `senderJob` is not started
senderJob == null || senderJob?.isActive == true -> frameType == FrameType.RequestN || frameType == FrameType.Cancel
else -> false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ internal class RequesterRequestResponseOperation(
)
responseDeferred.join()
} catch (cause: Throwable) {
// TODO: we don't need to send cancel if we have sent no frames
nonCancellable { outbound.sendCancel() }
throw cause
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ internal class ResponderRequestChannelOperation(
frameType === FrameType.Cancel || when {
requestPayloads.isActive -> frameType === FrameType.Payload || frameType === FrameType.Error
else -> false
} || frameType === FrameType.RequestN // TODO
} || frameType === FrameType.RequestN

override fun receiveRequestNFrame(requestN: Int) {
limiter.updateRequests(requestN)
Expand Down
Loading
Loading