Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.ktor.http.cio

import io.ktor.http.*
import io.ktor.http.cio.internals.*
import io.ktor.utils.io.InternalAPI
import io.ktor.utils.io.core.*

/**
Expand Down
6 changes: 6 additions & 0 deletions ktor-io/api/ktor-io.api
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ public final class io/ktor/utils/io/ConcurrentIOException : java/lang/IllegalSta
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class io/ktor/utils/io/ConnectionClosedException : java/io/IOException {
public fun <init> ()V
public fun <init> (Ljava/lang/String;)V
public synthetic fun <init> (Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class io/ktor/utils/io/CountedByteReadChannel : io/ktor/utils/io/ByteReadChannel {
public fun <init> (Lio/ktor/utils/io/ByteReadChannel;)V
public fun awaitContent (ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
4 changes: 4 additions & 0 deletions ktor-io/api/ktor-io.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ final class io.ktor.utils.io/ConcurrentIOException : kotlin/IllegalStateExceptio
constructor <init>(kotlin/String, kotlin/Throwable? = ...) // io.ktor.utils.io/ConcurrentIOException.<init>|<init>(kotlin.String;kotlin.Throwable?){}[0]
}

final class io.ktor.utils.io/ConnectionClosedException : kotlinx.io/IOException { // io.ktor.utils.io/ConnectionClosedException|null[0]
constructor <init>(kotlin/String = ...) // io.ktor.utils.io/ConnectionClosedException.<init>|<init>(kotlin.String){}[0]
}

final class io.ktor.utils.io/CountedByteReadChannel : io.ktor.utils.io/ByteReadChannel { // io.ktor.utils.io/CountedByteReadChannel|null[0]
constructor <init>(io.ktor.utils.io/ByteReadChannel) // io.ktor.utils.io/CountedByteReadChannel.<init>|<init>(io.ktor.utils.io.ByteReadChannel){}[0]

Expand Down
8 changes: 8 additions & 0 deletions ktor-io/common/src/io/ktor/utils/io/Exceptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ public class ClosedWriteChannelException(cause: Throwable? = null) : ClosedByteC
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.utils.io.ClosedReadChannelException)
*/
public class ClosedReadChannelException(cause: Throwable? = null) : ClosedByteChannelException(cause)

/**
* Exception thrown when a network connection is closed or reset by peer.
* This exception is used to signal that the underlying connection was terminated.
*
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.utils.io.ConnectionClosedException)
*/
public class ConnectionClosedException(message: String = "Connection was closed") : IOException(message)
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package io.ktor.server.cio

import io.ktor.events.*
import io.ktor.http.*
import io.ktor.http.cio.Request
import io.ktor.server.application.*
import io.ktor.server.cio.backend.*
import io.ktor.server.cio.internal.*
import io.ktor.server.engine.*
import io.ktor.server.http.HttpRequestCloseHandlerKey
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.util.pipeline.*
Expand Down Expand Up @@ -169,7 +171,15 @@ public class CIOApplicationEngine(
return transferEncoding != null || (contentLength != null && contentLength > 0)
}

private suspend fun ServerRequestScope.handleRequest(request: io.ktor.http.cio.Request) {
@OptIn(InternalAPI::class)
private fun ServerRequestScope.setCloseHandler(call: CIOApplicationCall) {
onClose = {
val requestCloseHandler = call.attributes.getOrNull(HttpRequestCloseHandlerKey)
requestCloseHandler?.invoke()
}
}

private suspend fun ServerRequestScope.handleRequest(request: Request) {
withContext(userDispatcher) requestContext@{
val call = CIOApplicationCall(
applicationProvider(),
Expand All @@ -186,6 +196,7 @@ public class CIOApplicationEngine(

try {
addHandlerForExpectedHeader(output, call)
setCloseHandler(call)
pipeline.execute(call)
} catch (error: Throwable) {
handleFailure(call, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ public fun CoroutineScope.startServerConnectionPipeline(

val requestContext = RequestHandlerCoroutine + Dispatchers.Unconfined

var handlerScope: ServerRequestScope? = null
try {
while (true) { // parse requests loop
val request = try {
parseRequest(connection.input) ?: break
} catch (cause: TooLongLineException) {
} catch (_: TooLongLineException) {
respondBadRequest(actorChannel)
break // end pipeline loop
} catch (io: IOException) {
Expand Down Expand Up @@ -113,7 +114,7 @@ public fun CoroutineScope.startServerConnectionPipeline(
contentType
)
expectedHttpUpgrade = !expectedHttpBody && expectHttpUpgrade(request.method, upgrade, connectionOptions)
} catch (cause: Throwable) {
} catch (_: Throwable) {
request.release()
response.writePacket(BadRequestPacket.copy())
response.close()
Expand All @@ -129,7 +130,7 @@ public fun CoroutineScope.startServerConnectionPipeline(
val upgraded = if (expectedHttpUpgrade) CompletableDeferred<Boolean>() else null

launch(requestContext, start = CoroutineStart.UNDISPATCHED) {
val handlerScope = ServerRequestScope(
handlerScope = ServerRequestScope(
coroutineContext,
requestBody,
response,
Expand Down Expand Up @@ -181,10 +182,11 @@ public fun CoroutineScope.startServerConnectionPipeline(

if (isLastHttpRequest(version, connectionOptions)) break
}
} catch (cause: IOException) {
} catch (_: IOException) {
// already handled
coroutineContext.cancel()
} finally {
handlerScope?.onClose?.invoke()
actorChannel.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public class ServerRequestScope internal constructor(
localAddress,
upgraded
)

internal var onClose: (() -> Unit)? = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, could you check if using invokeOnCompletion for the job is viable here instead of introducing the field?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey! Sorry, I forgot to mention it here. invokeOnCompletion doesn't help in this case.

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,11 @@ class CIOHooksTest : HooksTestSuite<CIOApplicationEngine, CIOApplicationEngine.C
enableSsl = false
}
}

class CIOHttpRequestLifecycleTest :
HttpRequestLifecycleTest<CIOApplicationEngine, CIOApplicationEngine.Configuration>(CIO) {
init {
enableSsl = false
enableHttp2 = false
}
}
10 changes: 10 additions & 0 deletions ktor-server/ktor-server-core/api/ktor-server-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,16 @@ public final class io/ktor/server/http/HttpDateJvmKt {
public static final fun toHttpDateString (Ljava/time/temporal/Temporal;)Ljava/lang/String;
}

public final class io/ktor/server/http/HttpRequestLifecycleConfig {
public final fun getCancelCallOnClose ()Z
public final fun setCancelCallOnClose (Z)V
}

public final class io/ktor/server/http/HttpRequestLifecycleKt {
public static final fun getHttpRequestCloseHandlerKey ()Lio/ktor/util/AttributeKey;
public static final fun getHttpRequestLifecycle ()Lio/ktor/server/application/RouteScopedPlugin;
}

public final class io/ktor/server/http/LinkHeaderKt {
public static final fun link (Lio/ktor/server/response/ApplicationResponse;Lio/ktor/http/LinkHeader;)V
public static final fun link (Lio/ktor/server/response/ApplicationResponse;Ljava/lang/String;[Ljava/lang/String;)V
Expand Down
10 changes: 10 additions & 0 deletions ktor-server/ktor-server-core/api/ktor-server-core.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,12 @@ final class io.ktor.server.http.content/HttpStatusCodeContent : io.ktor.http.con
final fun toString(): kotlin/String // io.ktor.server.http.content/HttpStatusCodeContent.toString|toString(){}[0]
}

final class io.ktor.server.http/HttpRequestLifecycleConfig { // io.ktor.server.http/HttpRequestLifecycleConfig|null[0]
final var cancelCallOnClose // io.ktor.server.http/HttpRequestLifecycleConfig.cancelCallOnClose|{}cancelCallOnClose[0]
final fun <get-cancelCallOnClose>(): kotlin/Boolean // io.ktor.server.http/HttpRequestLifecycleConfig.cancelCallOnClose.<get-cancelCallOnClose>|<get-cancelCallOnClose>(){}[0]
final fun <set-cancelCallOnClose>(kotlin/Boolean) // io.ktor.server.http/HttpRequestLifecycleConfig.cancelCallOnClose.<set-cancelCallOnClose>|<set-cancelCallOnClose>(kotlin.Boolean){}[0]
}

final class io.ktor.server.plugins/CannotTransformContentToTypeException : io.ktor.server.plugins/ContentTransformationException, kotlinx.coroutines/CopyableThrowable<io.ktor.server.plugins/CannotTransformContentToTypeException> { // io.ktor.server.plugins/CannotTransformContentToTypeException|null[0]
constructor <init>(kotlin.reflect/KType) // io.ktor.server.plugins/CannotTransformContentToTypeException.<init>|<init>(kotlin.reflect.KType){}[0]

Expand Down Expand Up @@ -1709,6 +1715,10 @@ final val io.ktor.server.http.content/isCompressionSuppressed // io.ktor.server.
final fun (io.ktor.server.application/ApplicationCall).<get-isCompressionSuppressed>(): kotlin/Boolean // io.ktor.server.http.content/isCompressionSuppressed.<get-isCompressionSuppressed>|<get-isCompressionSuppressed>@io.ktor.server.application.ApplicationCall(){}[0]
final val io.ktor.server.http.content/isDecompressionSuppressed // io.ktor.server.http.content/isDecompressionSuppressed|@io.ktor.server.application.ApplicationCall{}isDecompressionSuppressed[0]
final fun (io.ktor.server.application/ApplicationCall).<get-isDecompressionSuppressed>(): kotlin/Boolean // io.ktor.server.http.content/isDecompressionSuppressed.<get-isDecompressionSuppressed>|<get-isDecompressionSuppressed>@io.ktor.server.application.ApplicationCall(){}[0]
final val io.ktor.server.http/HttpRequestCloseHandlerKey // io.ktor.server.http/HttpRequestCloseHandlerKey|{}HttpRequestCloseHandlerKey[0]
final fun <get-HttpRequestCloseHandlerKey>(): io.ktor.util/AttributeKey<kotlin/Function0<kotlin/Unit>> // io.ktor.server.http/HttpRequestCloseHandlerKey.<get-HttpRequestCloseHandlerKey>|<get-HttpRequestCloseHandlerKey>(){}[0]
final val io.ktor.server.http/HttpRequestLifecycle // io.ktor.server.http/HttpRequestLifecycle|{}HttpRequestLifecycle[0]
final fun <get-HttpRequestLifecycle>(): io.ktor.server.application/RouteScopedPlugin<io.ktor.server.http/HttpRequestLifecycleConfig> // io.ktor.server.http/HttpRequestLifecycle.<get-HttpRequestLifecycle>|<get-HttpRequestLifecycle>(){}[0]
final val io.ktor.server.logging/mdcProvider // io.ktor.server.logging/mdcProvider|@io.ktor.server.application.Application{}mdcProvider[0]
final fun (io.ktor.server.application/Application).<get-mdcProvider>(): io.ktor.server.logging/MDCProvider // io.ktor.server.logging/mdcProvider.<get-mdcProvider>|<get-mdcProvider>@io.ktor.server.application.Application(){}[0]
final val io.ktor.server.plugins/MutableOriginConnectionPointKey // io.ktor.server.plugins/MutableOriginConnectionPointKey|{}MutableOriginConnectionPointKey[0]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.server.http

import io.ktor.server.application.*
import io.ktor.server.application.hooks.*
import io.ktor.util.*
import io.ktor.utils.io.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.cancel

/**
* Configuration for the [HttpRequestLifecycle] plugin.
*/
public class HttpRequestLifecycleConfig internal constructor() {
/**
* When `true`, cancels the call coroutine context if the other peer resets the client connection.
* When `false` (default), request processing continues even if the connection is closed.
*
* **When to use this property: **
* - Set to `true` for long-running or resource-intensive requests where you want to stop processing
* immediately when the client disconnects (e.g., streaming, batch processing, heavy computations)
* - Keep as `false` (default) for short requests, or when you need to complete processing regardless
* of client connection status (e.g., important side effects, database transactions)
*
* Example:
* ```kotlin
* install(HttpRequestLifecycle) {
* cancelCallOnClose = true
* }
* ```
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear from the documentation where the cancellation exception will be thrown. Could you please add an example of handling the exception as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also unclear what the use case for the new property is - when should someone use it?

public var cancelCallOnClose: Boolean = false
}

/**
* Internal attribute key for storing the connection close handler callback.
*/
@InternalAPI
public val HttpRequestCloseHandlerKey: AttributeKey<() -> Unit> = AttributeKey<() -> Unit>("HttpRequestCloseHandler")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make a class to store the config instead of using a callback type. It will have name with clear semantic

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@e5l
We can have the HttpRequestLifecycleHandler class with the onClose method.
I'm not sure about this. Moreover, it's internal.


/**
* A plugin that manages the HTTP request lifecycle, particularly handling client disconnections.
*
* The [HttpRequestLifecycle] plugin allows you to detect and respond to client connection closures
* during request processing. When configured with [HttpRequestLifecycleConfig.cancelCallOnClose] set to `true`,
* the plugin will automatically cancel the request handling coroutine if the client disconnects,
* preventing unnecessary processing and freeing up resources.
*
* Remember, when the coroutine context is canceled, the next suspension point will throw [CancellationException], but until
* that moment it doesn't stop any blocking operations, so call `call.coroutineContext.ensureActive` if needed.
* Plugin only works for CIO and Netty engines. Other implementations fail on closed connection only when trying to write some response.
*
* This is particularly useful for:
* - Long-running requests where the client may disconnect before completion
* - Streaming responses where detecting disconnection allows early cleanup
* - Resource-intensive operations that should be canceled when the client is no longer waiting
*
* ## Example
*
* ```kotlin
* install(HttpRequestLifecycle) {
* cancelCallOnClose = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be the default, and we don't need an option to change it. @bjhham, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the initial idea, but iirc @zibet27 mentioned it broke quite a few tests so it appears it could be quite impactful. There's not many scenarios for keeping the call coroutine alive after the connection is lost, but they do exist. Maybe the best approach would be to log a ticket to enable it by default in 4.0.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@e5l I think the cancellation is needed only for computation-heavy requests because it can cancel a DB insert, which is usually not what you want, even if the client is not waiting for a response.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be pretty uncommon for a client to hang up before receiving a response like this though. Maybe if the DB is under load and the client has a short timeout though... I agree though, it's probably safer to assume that there could be sensitive work being done in the call coroutine. The main use-case for cancellation would be long-polling connections I guess.

* }
*
* routing {
* get("/long-process") {
* try {
* // Long-running operation
* repeat(100) {
* // throws an exception if the client disconnects during processing
* call.coroutineContext.ensureActive()
* // Process more data...
* logger.info("Very important work.")
* }
* call.respond("Completed")
* } catch (e: CancellationException) {
* // Handle client disconnected, clean up resources
* }
* }
* }
* ```
*/
@OptIn(InternalAPI::class)
public val HttpRequestLifecycle: RouteScopedPlugin<HttpRequestLifecycleConfig> = createRouteScopedPlugin(
name = "HttpRequestLifecycle",
createConfiguration = ::HttpRequestLifecycleConfig
) {
on(CallSetup) { call ->
if (
[email protected] ||
call.attributes.contains(HttpRequestCloseHandlerKey)
) {
return@on
}
call.attributes.put(HttpRequestCloseHandlerKey) {
val cause = CancellationException(
"Call context was cancelled by `HttpRequestLifecycle` plugin",
ConnectionClosedException()
)
call.coroutineContext.cancel(cause)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.ktor.http.*
import io.ktor.http.HttpHeaders
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.http.HttpRequestCloseHandlerKey
import io.ktor.server.netty.http1.*
import io.ktor.util.pipeline.*
import io.ktor.utils.io.*
Expand All @@ -25,6 +26,7 @@ internal class NettyApplicationCallHandler(
private val enginePipeline: EnginePipeline
) : ChannelInboundHandlerAdapter(), CoroutineScope {
private var currentJob: Job? = null
private var currentCall: PipelineCall? = null

override val coroutineContext: CoroutineContext = userCoroutineContext

Expand All @@ -35,9 +37,21 @@ internal class NettyApplicationCallHandler(
}
}

internal fun onConnectionClose(context: ChannelHandlerContext) {
if (context.channel().isActive) {
return
}
currentCall?.let {
currentCall = null
@OptIn(InternalAPI::class)
it.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke()
}
}

private fun handleRequest(context: ChannelHandlerContext, call: PipelineCall) {
val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context)

currentCall = call
currentJob = launch(callContext, start = CoroutineStart.UNDISPATCHED) {
when {
call is NettyHttp1ApplicationCall && !call.request.isValid() -> {
Expand Down Expand Up @@ -67,6 +81,11 @@ internal class NettyApplicationCallHandler(
}
}

override fun channelInactive(ctx: ChannelHandlerContext) {
onConnectionClose(ctx)
ctx.fireChannelInactive()
}

private fun respond408RequestTimeout(ctx: ChannelHandlerContext) {
val response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_TIMEOUT)
response.headers().add(HttpHeaders.ContentLength, "0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public abstract class NettyApplicationResponse(

internal var responseChannel: ByteReadChannel = ByteReadChannel.Empty

private val canRespond: Boolean
get() = !responseMessageSent && context.channel().isActive

override suspend fun respondOutgoingContent(content: OutgoingContent) {
try {
super.respondOutgoingContent(content)
Expand All @@ -51,7 +54,7 @@ public abstract class NettyApplicationResponse(
// because it should've been set by commitHeaders earlier
val chunked = headers[HttpHeaders.TransferEncoding] == "chunked"

if (responseMessageSent) return
if (!canRespond) return

val message = responseMessage(chunked, bytes)
responseChannel = when (message) {
Expand Down Expand Up @@ -111,7 +114,7 @@ public abstract class NettyApplicationResponse(
}

internal fun sendResponse(chunked: Boolean = true, content: ByteReadChannel) {
if (responseMessageSent) return
if (!canRespond) return

responseChannel = content
responseMessage = when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ internal class NettyHttp1Handler(
}

override fun channelInactive(context: ChannelHandlerContext) {
context.pipeline().remove(NettyApplicationCallHandler::class.java)
val handler = context.pipeline().remove(NettyApplicationCallHandler::class.java)
handler?.onConnectionClose(context)
context.fireChannelInactive()
}

Expand Down
Loading