-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compute: spillable MV correction buffer #30083
base: main
Are you sure you want to change the base?
Conversation
ffdf64d
to
77afd64
Compare
25b0987
to
0b2e359
Compare
ffb5488
to
4697f9c
Compare
4356350
to
185c99c
Compare
e95c32f
to
49c0569
Compare
The feature benchmarks report a bunch of regressions. Some increased CPU and memory usage is expected, but the memory regressions here are worryingly large:
I have a suspicion that this isn't an actual regression but a result of the usage of lgalloc, which holds onto allocated memory and only releases it slowly over time. The |
49c0569
to
e01a7dc
Compare
I retried the feature benchmarks with lgalloc disabled and things do look better:
Still a higher than expected memory regression for some of them, I want to look into those. |
9d51f8f
to
c8846d6
Compare
let mut heap = MergeHeap::from_iter(cursors); | ||
let mut merged = Chain::default(); | ||
while let Some(cursor1) = heap.pop() { | ||
let (data, time, mut diff) = cursor1.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we'd have the option to reuse whole chunks, if all updates in the current chunk are less than the first update in the next cursor on the heap. However, if we do this naively we could end up with a lot empty space in our chunks, and therefore chains that have a lot more chunks than they'd need were their updates tightly packed.
//! Unfortunately, performing consolidation as described above can break the chain invariant and we | ||
//! might need to restore it by merging chains, including ones containing future updates. This is | ||
//! something that would be great to fix! In the meantime the hope is that in steady state it | ||
//! doesn't matter too much because either there are no future retractions and U is approximately | ||
//! equal to N, or the amount of future retractions is much larger than the amount of current | ||
//! changes, in which case removing the current changes has a good chance of leaving the chain | ||
//! invariant intact. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is, I think, the main wrinkle in the current implementation. It'd be nice to never have to look at future updates when consolidating the current ones. All alternatives I've come up with so far are not great:
- Don't consolidate the correction updates at all, just provide a read-only merging iterator. That's sufficient for the MV sink to work, but it doesn't explain when the correction contents will be consolidated. Inserts will trigger merges, but if few inserts happen, we can't rely on those. We'd need some form of idle merging, but that adds significant complexity.
- Skip restoring the chain invariant and rely on subsequent inserts to do so. It's not clear to me if that actually improves anything or just moves work from one operation to the other. It definitely makes the data structure harder to reason about, since now the chain invariant isn't an invariant anymore and we can't make crisp statements about the complexity of inserts anymore.
@@ -94,6 +94,7 @@ def get_default_system_parameters( | |||
"enable_alter_swap": "true", | |||
"enable_columnation_lgalloc": "true", | |||
"enable_compute_chunked_stack": "true", | |||
"enable_compute_correction_v2": "true", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to remove this before merging. Currently the new correction implementation regresses some of the feature benchmarks and while some of that might be expected, I should probably do some profiling before deciding to accept the regressions.
0993a98
to
6607ce2
Compare
c825ca6
to
f0c6e77
Compare
f0c6e77
to
2262dee
Compare
This commit introduces a "correction v2" buffer that differs from the existing one in that it stores data in columnated regions that can be spilled to disk. It follows the general design of the arrangement merge batcher, with the major difference that it sorts updates by time first, in an attempt to more efficiently deal with the presence of updates in the future (commonly introduced by temporal filters).
This commit adds a new dyncfg, `enable_compute_correction_v2`, that controlls whether the MV sink v2 should use the old or the new implementation of the correction buffer.
2262dee
to
70d0355
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments, but I'm still reading.
} | ||
|
||
/// Merge the given chains, advancing times by the given `since` in the process. | ||
fn merge_chains<D: Data>(chains: Vec<Chain<D>>, since: &Antichain<Timestamp>) -> Chain<D> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn merge_chains<D: Data>(chains: Vec<Chain<D>>, since: &Antichain<Timestamp>) -> Chain<D> { | |
fn merge_chains<D: Data>(chains: impl Iterator<Chain<D>>, since: &Antichain<Timestamp>) -> Chain<D> { |
fn update_metrics(&mut self) { | ||
let mut new_size = LengthAndCapacity::default(); | ||
for chain in &mut self.chains { | ||
new_size += chain.get_size(); | ||
} | ||
|
||
let old_size = self.total_size; | ||
let len_delta = UpdateDelta::new(new_size.length, old_size.length); | ||
let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity); | ||
self.metrics | ||
.report_correction_update_deltas(len_delta, cap_delta); | ||
self.worker_metrics | ||
.report_correction_update_totals(new_size.length, new_size.capacity); | ||
|
||
self.total_size = new_size; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use something similar to the arrangement size logging instead. This only exposes it through metrics. Can fix as a separate PR!
/// in spirit. | ||
struct Chunk<D: Data> { | ||
/// The contained updates. | ||
data: StackWrapper<(D, Timestamp, Diff)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since data
has a static capacity, you probably want a different type here. Array
might be that.
} | ||
|
||
/// Return the chunk capacity implied by [`CHUNK_SIZE_BYTES`] and the update size. | ||
fn capacity() -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using Array
, this should be desired_capacity
, and all capacity checks should be against the array's capacity. Lgalloc returns memory with at least the requested capacity.
This PR introduces a "correction v2" buffer that differs from the existing one in that it stores data in columnated regions that can be spilled to disk. It follows the general design of the arrangement merge batcher, with the major difference that it sorts updates by time first, in an attempt to more efficiently deal with the presence of updates in
the future (commonly introduced by temporal filters).
The new correction buffer can be switched on through a feature flag,
enable_compute_correction_v2
, and is switched off by default. The plan is to keep it disabled in production but have it available for emergencies where replicas fail to hydrate due to the MV memory spike. Eventually we'll want to make the new correction buffer the default, but we should do more performance testing before that.Motivation
Part of https://github.com/MaterializeInc/database-issues/issues/8464
Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.