-
-
Notifications
You must be signed in to change notification settings - Fork 63
Add flow control support on client and server. #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
In order to support gRPC's flow control, the service has to handle `call.isReady` and the `onReady` callback. When `isReady` is false, the code should wait for `onReady` to be true. To avoid race conditions where we may miss a wakeup, the `isReady` condition is checked twice; once before, and once after assigning the wakeup `Deferred` value. Fixes #38
|
@rossabaker @ahjohannessen |
|
Thinking about it, there are enough subtleties here that it might be best to release without this, and bake it for a while a later version. |
|
This is a great addition, but perhaps, as you say, best to let it bake for a while and add tests. |
|
My big itch was to have a release on fs2-1.0. We've achieved that with the milestones, so I don't have strong opinions on whether this is makes it into 0.4.0 or a subsequent release. |
|
This has fallen a bit behind. Is this still something we want? |
|
@rossabaker When @fiadliel says this
it makes me a bit nervous with respect to stability and correctness. Perhaps tests and some canaries in the wild are needed. |
|
@fiadliel I think this is important to support. What is needed to move this forward and what are the subtleties? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fiadliel, first of all thank you for the great library and this PR!
I just want to throw in my two cents, maybe these comments would be useful.
| def sendMessage(message: Response)(implicit F: Sync[F]): F[Unit] = | ||
| F.delay(call.sendMessage(message)) | ||
|
|
||
| def sendMessageOrDelay(message: Response)(implicit F: Concurrent[F]): F[Unit] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this method be called inside Fs2ServerCallListener#handleStreamResponse instead of Fs2ServerCall#sendMessage? It looks like current implementation of server streaming doesn't take backpressure into account.
| sendMessage(message), { | ||
| Deferred[F, Unit].flatMap { wakeup => | ||
| wakeOnReady.set(wakeup.some) *> | ||
| isReady.ifM(sendMessage(message), wakeup.get *> sendMessage(message)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we check isReady after wakeup.get completes to deal with missed wakeup?
|
I'd love to see some progress on this work, any help needed? It seems like the code isn't huge, is the main remaining thing perhaps some extra tests? |
|
@timbertson The code in this PR is outdated. There is also the question if #503 changes the status around flow control if it gets merged. |
|
Thanks @ahjohannessen . Based on #503 (comment) it sounds like backpressure will be unchanged in that branch. I've opened #544 as an updated version of this PR, but targeted at v0.x. |
|
Fixed by @timbertson in #552 #553 |
In order to support gRPC's flow control, the service has to handle
call.isReadyand theonReadycallback. WhenisReadyis false,the code should wait for
onReadyto be true. To avoid race conditionswhere we may miss a wakeup, the
isReadycondition is checked twice;once before, and once after assigning the wakeup
Deferredvalue.Fixes #38