-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compute: handle future epochs in channel adapter #32017
Conversation
Previously the compute command channel adapter assumed that it is impossible to receive responses with epoch values greater than the channel adapter's current epoch. This assumption is wrong because the Timely cluster-side of workers learns about new connection epochs by observing commands from worker 0, which can be ahead of other workers' epochs. This commit removes the assumption and instead makes the channel adapter handle responses with future epochs by stashing them until sufficiently many client reconnects have been observed.
// Serve this connection until we see any of the channels disconnect. | ||
loop { | ||
let (resp, resp_epoch) = match stashed_response.take() { | ||
Some(stashed) => stashed, | ||
None => match serve_rx_channels() { | ||
Ok(response) => response, | ||
Err(()) => break, | ||
}, | ||
}; | ||
|
||
if resp_epoch < epoch { | ||
// Response for a previous connection; discard it. | ||
continue; | ||
} else if resp_epoch > epoch { | ||
// Response for a future connection; stash it and reconnect. | ||
stashed_response = Some((resp, resp_epoch)); | ||
break; | ||
} else { | ||
// Response for the current connection; forward it. | ||
if response_tx.send(resp).is_err() { | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would all be much nicer if we could have a crossbeam_channel::Receiver
wrapper with a builtin stash that you could push_front
to. Unfortunately crossbeam_channel::select
doesn't support other types so I had to move the stash outside and ended up with this somewhat convoluted control flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thank you!
TFTR! |
Previously the compute command channel adapter assumed that it is impossible to receive responses with epoch values greater than the channel adapter's current epoch. This assumption is wrong because the Timely cluster-side of workers learns about new connection epochs by observing commands from worker 0, which can be ahead of other workers' epochs.
This PR removes the assumption and instead makes the channel adapter handle responses with future epochs by stashing them until sufficiently many client reconnects have been observed.
Motivation
Fixes https://github.com/MaterializeInc/database-issues/issues/9124
Tips for reviewer
I've tried to write a regression test using mzcompose and toxiproxy, spawning a multi-process cluster and resetting its controller connection in a loop. Didn't manage to reproduce the race that leads to the "epoch from the future" panic though.
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.