-
Notifications
You must be signed in to change notification settings - Fork 464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adapter: Expression cache design doc #29908
Conversation
This commit adds a design doc for an optimized expression cache. Works towards resolving #MaterializeInc/database-issues/issues/8384
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flagging that this needs close coordination with the compute team.
|
||
- deployment generation | ||
- object global ID | ||
- expression type (local MIR, global MIR, LIR, etc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep in mind we don't have a serialization format for MIR, and the one for LIR is not designed to be cashed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I should have emphasized is that fact this is keyed by deployment generation. That means that deployment generation n
will never look at a serialized expression for deployment generation m
. s.t. n != m
. As a consequence, the serialized representation does not need to be stable across versions and can be wildly different.
So we can invent any serialization specifically for the cache without needing to add any guarantees across versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this seems fine. (Just be prepared that this will be a lot of typing.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not as simple as using bincode and relying on the existing #[derive(Serialize, Deserialize)]
derivations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just be prepared that this will be a lot of typing
Implementing a serialization format will be a lot of typing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, ok! I thought it will need to be manual protobuf (impl RustType ...
). If it can simply be
#[derive(Serialize, Deserialize)]
, then it's easy.
- No need to worry about coordination across K8s pods. | ||
- Bulk deletion is a simple directory delete. | ||
|
||
#### Cons |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An idea that Aljoscha and I had in our 1:1 earlier today is to directly use persist's FileBlob for this. It's extremely well tested (most of CI uses it for persist) and solves at least some of these cons.
- Need to worry about mocking things in memory for tests. | ||
- If we lose the pod, then we also lose the cache. | ||
|
||
### Persist implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A second idea that Aljsocha and I had in our 1:1 today is to use persist, but not with the normal consensus and blob impls (e.g. use FileBlob, ??? on the Consensus impl). That could potentially get you a persist impl where you don't need to worry about coordination (e.g. if they're both pointed at local fs)
|
||
- deployment generation | ||
- object global ID | ||
- expression type (local MIR, global MIR, LIR, etc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not as simple as using bincode and relying on the existing #[derive(Serialize, Deserialize)]
derivations?
- Need to worry about Flushing/fsync. | ||
- Need to worry about concurrency. | ||
- Need to worry about atomicity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The standard approach here is to rely on the atomicity of rename
to ensure that only a fully written/synced file will be considered part of the cache:
let mut f = File::create("path/to/entry.tmp")?;
f.write_all(contents)?;
f.sync_all()?;
fs::rename("path/to/entry.tmp", "path/to/entry")?;
- Need to worry about Flushing/fsync. | ||
- Need to worry about concurrency. | ||
- Need to worry about atomicity. | ||
- Need to worry about mocking things in memory for tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests can assume access to the filesystem, so I think this is as simple as creating a temporary scratch directory in the test harness.
- Need to worry about concurrency. | ||
- Need to worry about atomicity. | ||
- Need to worry about mocking things in memory for tests. | ||
- If we lose the pod, then we also lose the cache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like the most substantive downside to me. Doesn't help you with a node failure.
Another potential implementation is via persist. Each cache entry would be keyed by | ||
`(deploy_generation, global_id, expression_type)` and the value would be a serialized version of the | ||
expression. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My worry with using a single persist shard here is that it seems like we might run into the same compaction problems that we did when we tried to remove the storage usage entries from the catalog shard. Do we have confidence that the usage pattern here won't result in a slow pileup of entries in the persist shard that have to be fetched/filtered through? The cache will be a lot less impactful if it winds up taking multiple seconds to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option here that uses persist is to mint a new shard for each deploy generation. That seems tricky to manage though, because you'd want to finalize the shards for any past/failed deploy generations, which is tricky to keep track of. You'd also slowly accumulate finalized shard tombstones in CRDB—one per deploy generation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some text about this.
implementing. | ||
|
||
```Rust | ||
trait ExpressionCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the durability semantics of insert_expression
? Does it guarantee an fsync
/compare_and_append
? That would be unusably slow if called sequentially.
I think what you probably want is best effort semantics. insert_expression
doesn't guarantee flushing the cache on its own, but there's a background task that periodically batches up writes/fsyncs to the cache. Should work well enough since the cache is just a perf optimization, and it's not the end of the world if not everything gets flushed to the cache before restart. In the common case for R/O there will be plenty of time for the cache to flush in the background while clusters are catching up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I was thinking is that insert_expression
could immediately update the in-memory cache and then return a future that completes once the insert has been made durable. Then it's up to the caller whether or not they want to wait or just send the future into the background.
I've also updated the method to accept multiple entries in a single call. I think to do additional batching in the background might be overkill since DDL will be fairly rare. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense on all fronts! Whether additional batching happens or not is an implementation detail anyway that will depend on which cache implementation (persist vs files) you go with.
### DDL - Drop | ||
1. Execute catalog transaction. | ||
2. Invalidate cache entries via `ExpressionCache::invalidate_entries`. | ||
3. Re-compute and repopulate cache entries that depended on dropped entries via |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth the trouble of recomputing eagerly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh, we had the same thought at exactly the same moment! #29908 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're going to leave this out for now, but my gut take is that it'll be worth it eventually. Re-calculating for a handful of objects will be cheap and it will help reduce recovery times in case of a crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agreed!
Below is a detailed set of steps that will happen in startup. | ||
|
||
1. Call `ExpressionCache::reconcile` to remove any invalid entries. | ||
2. While opening the catalog, for each object: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by this. The design above---where (generation, global_id, expr)
form a key that maps to a blob of optimized code---makes sense to drop in at various levels of calls to the Optimizer
, which will, say, map HIR to one of the MIR newtypes, or MIR to LIR.
But what the design here seems like we'd be working at a higher level, skipping Optimizer
entirely. In that case we could have a single, top-level cache from SQL straight to LIR or FlatPlan.
I think both designs are sensible. The former lets us make fewer changes; the latter reduces effort (if we have a cache hit from HIR to MIR, surely we can expect hits from the rest of the pipeline, too!).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what the design here seems like we'd be working at a higher level, skipping
Optimizer
entirely. In that case we could have a single, top-level cache from SQL straight to LIR or FlatPlan.
Yes, as I understand it the plan is to skip the Optimizer
entirely. But we still need these intermediate plan types because the catalog hangs on to the intermediate plans:
materialize/src/adapter/src/catalog.rs
Lines 145 to 151 in bff2319
#[derive(Default, Debug, Clone)] | |
pub struct CatalogPlans { | |
optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>, | |
physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>, | |
dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>, | |
notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>, | |
} |
I think for something with EXPLAIN
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So we could think of the cache as mapping (generation, global_id, sql_expression)
to a tuple (optimized_mir, physical_plan, metainfo, notices)
, then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as I understand it the plan is to skip the Optimizer entirely. But we still need these intermediate plan types because the catalog hangs on to the intermediate plans:
Yes, exactly.
I see. So we could think of the cache as mapping
(generation, global_id, sql_expression)
to a tuple(optimized_mir, physical_plan, metainfo, notices)
, then?
Yes, but I don't think sql_expression
is necessary here since within a deploy generation the SQL of a global ID can never change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed an update to the API to make this more explicit. The big change is that I've combined all the expression types into a single struct that will be stored as a single blob, instead of storing each expression type separately.
The benefit of the old approach was that when an index was dropped, we could re-write the global optimized expressions without having to also re-write the local optimized expressions. The benefit of the new approach is that the assumption that either all expression types for a given global ID exist or no expression types exists becomes much more explicit.
I'm still thinking about which approach is better, but for now I think the new approach is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! Another point in favor of the all-or-nothing approach: no implicit invariant around the validity of the generated intermediate expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the new approach better as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, it looks fine! But I think we need some more cache invalidations, unfortunately.
Unless someone feels strongly, I'm fully committed to the persist implementation. I've just pushed an update to describe the implementation in more detail. The main reason is to eliminate downtime when we lose the environmentd pod. The main motivation for startup time perf improvements is to eliminate downtime during 0dt upgrades AND during envd failures, otherwise we would have went with the graceful cut-over approach. So I think this is a good motivation. CC @danhhz , @benesch, @aljoscha in case any of you have strong opinions. |
Ben just merged the better version of the persist force compaction tool, so I think the biggest drawback of the persist shard appraoch ("what if we end up in the same situation as the catalog shard") is now pretty de-risked |
If persist is up for us using dangerously_force_compaction on the cache shard (and fixing any bugs that might remain), works for me! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't trace through the exact steps of each phase here, but the general shape of this now looks great. Thanks very much for writing this design doc up and iterating on it. Feel really good about where we landed.
|
||
- `o`. | ||
- All compute objects that depend directly on `o`. | ||
- All compute objects that would directly depend on `o`, if all views were inlined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be an optional "re-compute and repopulate cache entries that could use the new index" step here, yeah?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, added.
This commit adds a design doc for an optimized expression cache.
Works towards resolving #MaterializeInc/database-issues/issues/8384
Motivation
Design Doc
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.