-
-
Notifications
You must be signed in to change notification settings - Fork 470
Implement HappyEyeballs for NettyConnectionPool (#1996) #3838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for zio-http ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
0ee1472 to
86865c1
Compare
zio-http/jvm/src/main/scala/zio/http/netty/client/NettyConnectionPool.scala
Outdated
Show resolved
Hide resolved
|
|
||
| private[netty] object NettyConnectionPool { | ||
|
|
||
| private val HappyEyeballsDelay: Duration = 250.millis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment that this is according to spec recommendations
zio-http/jvm/src/main/scala/zio/http/netty/client/NettyConnectionPool.scala
Outdated
Show resolved
Hide resolved
zio-http/jvm/src/main/scala/zio/http/netty/client/NettyConnectionPool.scala
Outdated
Show resolved
Hide resolved
| localAddress: Option[InetSocketAddress], | ||
| )(implicit trace: Trace): ZIO[Scope, Throwable, JChannel] = { | ||
|
|
||
| if (resolvedHosts.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very minor nit (feel free to ignore) in order to make this a bit more readable, you can pat-mat on resolvedHosts.size instead of using if statements. e.g.,
resolvedHosts.size match {
case 0 => ...
case 1 => ...
case _ => ....
}| queue <- Queue.bounded[Unit](requestedCapacity = 1) | ||
| channel <- ZIO.raceAll( | ||
| connectToAddress( | ||
| addresses.head, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to be using head and tail it might be better for sortAddresses to return a List instead. Creating a list with a ListBuffer is very cheap
| } else { | ||
| val addresses = sortAddresses(resolvedHosts) | ||
| for { | ||
| queue <- Queue.bounded[Unit](requestedCapacity = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A Promise seems to be a better fit for this usecase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I made a mistake, there is a loc missing. But there can potentially multiple errors, so it need to be fulfilled multiple times. So queue is right
zio-http/jvm/src/main/scala/zio/http/netty/client/NettyConnectionPool.scala
Outdated
Show resolved
Hide resolved
| ) | ||
| }, | ||
| ) | ||
| } yield channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there is a race condition / connection leak here. In the case that two attempts connect at the same time (within the netty connection pool), we won't be closing the 2nd connection until the Scope is closed down. We need to guarantee that only the "winner' connection remains open by the end of this operation
Co-authored-by: kyri-petrou <[email protected]>
| connectionTimeout, | ||
| localAddress, | ||
| ).onExit { | ||
| case e: Exit.Success[JChannel] => successful.update(channels => channels :+ e.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will work; Interruption might occur right between the moment that we create the channel and the evaluation of the next effect; before we reach this point.
We probably need to pass the ref into connectToAddress itself and add the channel to the list (potentially unsafely) at some uninterruptible point
| channel <- channels.headOption match { | ||
| case ch: Some[JChannel] => ZIO.succeed(ch.value) | ||
| case None => ZIO.fail(new RuntimeException("All connection attempts failed")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it better to partition by open channels and pick the first one? And then close all other ones?
| case ch: Some[JChannel] => ZIO.succeed(ch.value) | ||
| case None => ZIO.fail(new RuntimeException("All connection attempts failed")) | ||
| } | ||
| _ <- ZIO.foreachDiscard(channels.tail)(ch => ZIO.attempt(ch.close())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential leak here again in case that closing one of them fails. I think this is better:
| _ <- ZIO.foreachDiscard(channels.tail)(ch => ZIO.attempt(ch.close())) | |
| _ <- ZIO.foreachDiscard(channels.tail)(ch => ZIO.ignore(ch.close())) |
fixes #1996
/claim #1996