Skip to content

Commit d879eb8

Browse files
committed
fix: Avoid turning all stream timeouts to TcpIdleTimeoutException
1 parent 1c8573b commit d879eb8

File tree

5 files changed

+21
-21
lines changed

5 files changed

+21
-21
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Internal API changed
2+
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.Timers#IdleTimeoutBidi.this")

akka-stream/src/main/scala/akka/stream/StreamTimeoutException.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import akka.annotation.DoNotInherit
1515
* Not for user extension
1616
*/
1717
@DoNotInherit
18-
sealed class StreamTimeoutException(msg: String) extends TimeoutException(msg) with NoStackTrace
18+
class StreamTimeoutException(msg: String) extends TimeoutException(msg) with NoStackTrace
1919

2020
final class InitialTimeoutException(msg: String) extends StreamTimeoutException(msg)
2121

akka-stream/src/main/scala/akka/stream/impl/Timers.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,14 @@ import akka.stream.stage._
151151

152152
}
153153

154-
final class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] {
154+
private object IdleTimeoutBidi {
155+
val defaultFailureCreator: FiniteDuration => Throwable = timeout =>
156+
new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}.")
157+
}
158+
final class IdleTimeoutBidi[I, O](
159+
val timeout: FiniteDuration,
160+
failureCreator: FiniteDuration => Throwable = IdleTimeoutBidi.defaultFailureCreator)
161+
extends GraphStage[BidiShape[I, I, O, O]] {
155162
val in1 = Inlet[I]("in1")
156163
val in2 = Inlet[O]("in2")
157164
val out1 = Outlet[I]("out1")
@@ -170,7 +177,7 @@ import akka.stream.stage._
170177

171178
final override def onTimer(key: Any): Unit =
172179
if (nextDeadline - System.nanoTime < 0)
173-
failStage(new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}."))
180+
failStage(failureCreator(timeout))
174181

175182
override def preStart(): Unit =
176183
scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))

akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,11 @@
55
package akka.stream.impl.io
66

77
import java.net.InetSocketAddress
8-
import java.util.concurrent.TimeoutException
98
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
10-
119
import scala.annotation.nowarn
1210
import scala.collection.immutable
1311
import scala.concurrent.{ Future, Promise }
1412
import scala.concurrent.duration.{ Duration, FiniteDuration }
15-
1613
import akka.{ Done, NotUsed }
1714
import akka.actor.{ ActorRef, Terminated }
1815
import akka.annotation.InternalApi
@@ -22,6 +19,7 @@ import akka.io.Tcp
2219
import akka.io.Tcp._
2320
import akka.stream._
2421
import akka.stream.impl.ReactiveStreamsCompliance
22+
import akka.stream.impl.Timers
2523
import akka.stream.impl.fusing.GraphStages.detacher
2624
import akka.stream.scaladsl.{ BidiFlow, Flow, TcpIdleTimeoutException, Tcp => StreamTcp }
2725
import akka.stream.scaladsl.Tcp.{ OutgoingConnection, ServerBinding }
@@ -593,19 +591,13 @@ private[stream] object ConnectionSourceStage {
593591
case Some(address) => s" on connection to [$address]"
594592
case _ => ""
595593
}
594+
BidiFlow.fromGraph(
595+
new Timers.IdleTimeoutBidi(
596+
idleTimeout,
597+
failureCreator = _ =>
598+
new TcpIdleTimeoutException(
599+
s"TCP idle-timeout encountered$connectionToString, no bytes passed in the last $idleTimeout",
600+
idleTimeout)))
596601

597-
val toNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
598-
BidiFlow.fromFlows(
599-
Flow[ByteString].mapError {
600-
case _: TimeoutException =>
601-
new TcpIdleTimeoutException(
602-
s"TCP idle-timeout encountered$connectionToString, no bytes passed in the last $idleTimeout",
603-
idleTimeout)
604-
},
605-
Flow[ByteString])
606-
val fromNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
607-
toNetTimeout.reversed // now the bottom flow transforms the exception, the top one doesn't (since that one is "fromNet")
608-
609-
fromNetTimeout.atop(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](idleTimeout)).atop(toNetTimeout)
610602
}
611603
}

akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package akka.stream.scaladsl
66

77
import java.net.InetSocketAddress
8-
import java.util.concurrent.TimeoutException
98
import javax.net.ssl.SSLEngine
109
import javax.net.ssl.SSLSession
1110

@@ -394,7 +393,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
394393
}
395394

396395
final class TcpIdleTimeoutException(msg: String, @unused timeout: Duration)
397-
extends TimeoutException(msg: String)
396+
extends StreamTimeoutException(msg: String)
398397
with NoStackTrace // only used from a single stage
399398

400399
object TcpAttributes {

0 commit comments

Comments
 (0)