-
Notifications
You must be signed in to change notification settings - Fork 464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
controller/compute: sequence external read hold changes #30114
base: main
Are you sure you want to change the base?
controller/compute: sequence external read hold changes #30114
Conversation
assert!( | ||
self.holds_tx.same_channel(&other.holds_tx), | ||
"can only merge ReadHolds that come from the same issuer" | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If anybody feels strongly about keeping this check, we could make ChangeTx
a custom trait, e.g.:
trait ChangeTx {
fn send(&self, id: GlobalId, change: ChangeBatch) -> Result;
fn same_recipient(&self, other: &Self) -> bool;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do this without resorting to a trait. Effectively what you want to do is create a single prototypical Arc<sender closure>
in the controller that gets cloned into all read holds that are given out. Then this assert can be replaced with Arc::ptr_eq
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, true. That would still allow people to create ReadHold
s that write to the same channel but use different closures for it, but we can hope that CI would catch this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, actually I think we should probably not do this. With this scheme, two ReadHold
s created with with_channel
for the same channel won't be mergeable. So if you request two read holds for the same collection from the storage controller, then try to merge them, Mz will panic. CI suggests that we don't try to do such things today, but we might in edge-cases that are not covered by CI, in which case the assert would be a time bomb waiting to explode in production.
src/compute-client/src/controller.rs
Outdated
let client = self.client.clone(); | ||
let tx = Box::new(move |id, change| { | ||
client.call(move |i| i.report_read_hold_change(id, change)); | ||
Ok(()) | ||
}); | ||
let hold = ReadHold::new(id, since, tx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that ReadHold
s issued by the compute controller now contain a copy of instance::Client
and therefore keep the connected Instance
alive. I don't see any immediate issue with that, but I might be missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I'll take less concurrency over more code any day of the week!
assert!( | ||
self.holds_tx.same_channel(&other.holds_tx), | ||
"can only merge ReadHolds that come from the same issuer" | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do this without resorting to a trait. Effectively what you want to do is create a single prototypical Arc<sender closure>
in the controller that gets cloned into all read holds that are given out. Then this assert can be replaced with Arc::ptr_eq
afc7242
to
8ecb8b2
Compare
c8a5d6d
to
dde8d9a
Compare
This commit changes `ReadHold` to contain sender closure, rather than an `UnboundedSender` directly, for transmitting read hold changes. This provides the abililty to construct `ReadHolds` that are connected to channels with an item type different from `(GlobalId, ChangeBatch)`, performing the necessary transformation inside the closure.
Changes to external `ReadHold`s on compute collections are now sequenced through the `command_{tx,rx}` channels of the respective `Instance` controllers. This ensures that `Instance`s observe read hold changes always after they have observed the creation of the affected collection, which wasn't the case previously. As a side effect of this change, each external compute `ReadHold` now contains a copy of an instance `Client`. These copies keep the connected `Instance`s alive, even when the `ComputeController` has already dropped them. This seems fine in that arguably an instance shouldn't shut down if somebody still has read holds on its collections. But it is a change to the previous behavior.
…ges" This reverts commit 6f2214e.
Now that some of the read holds held by `Instance` are sequencing their changes through the command channel, it is no longer guaranteed that an `apply_read_hold_changes` call also applies cascading changes on dependencies. The previous `check_empty` check expected this to be the case, so it would incorrectly fail its asserts. This is fixed by having the check also drain and apply the contents of the command channel, until no new read capability changes are produced. To clarify the semantics, `check_empty` is also renamed to `shutdown` and explicitly terminates the execution of the `Instance` task.
This commit changes the compute controller to also sequence internal read hold changes through `command_tx`. This simplifies things as now all read hold changes flow through the same channels and we don't need to worry about having two kinds of `ChangeTx` that secretly point to different destinations. The commit removes the separate `read_holds_{tx,rx}` channel in the compute controller and moves to applying received read hold changes directly, rather than batching them. This removes a bit of complexity but also imposes a risk of a performance regression. My hope is that the performance impact will be negligible.
The changes made in the previous commits make it so the compute controller is slower at applying read hold downgrades. In tests that spawn two envd instances in sequence that can lead to issues because the first instance might still try to apply read hold downgrades while the second instance is already running, causing `PersistEpoch` conflicts. This race condition existed before but apparently wasn't hit because read hold downgrades by the first instance didn't get delayed long enough. The fix chosen here is to convert the tests that want to run two envd instances in sequence from using `TestHarness::start` to using `TestHarness::start_blocking`. The latter runs the `TestServer` in a separate tokio runtime that gets dropped when the `TestServer` is dropped, ensuring that no controller tasks can still be alive when the second envd instance is started.
dde8d9a
to
f8bf695
Compare
Finally had time to look at the test issues. Should be ready for review now! |
Looks like the adapter changes are minimal enough that you can proceed without an adapter approval. |
Changes to
ReadHold
s on compute collections are now sequenced through thecommand_{tx,rx}
channels of the respectiveInstance
controllers. This ensures thatInstance
s observe read hold changes always after they have observed the creation of the affected collection, which wasn't the case previously.The implementation includes a change to the
ReadHold
type, which now contains a sender closure, rather than a channel sender directly. The closure enables the construction ofReadHold
s that communicate changes on a channel with an item type different from(GlobalId, ChangeBatch)
.Motivation
Fixes https://github.com/MaterializeInc/database-issues/issues/8679
Tips for reviewer
This PR required touching a lot more parts than I initially expected, so it ended up quite busy. I managed to split out sensible commits though, looking at those separately makes the reviewing experience much better!
The above mentioned bug has already been mitigated in #30109. This PR delivers an alternative fix that requires more restructuring but lets us end up in a saner state with fewer things moving concurrently. For example, with the other fix we have to always assume read hold changes for unknown collections are for future collections, which would make us miss bugs where we wrongly issue read hold changes for already dropped collections.
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.