|
| 1 | +# Expression Cache |
| 2 | + |
| 3 | +## The Problem |
| 4 | + |
| 5 | +Optimization is a slow process that is on the critical path for `environmentd` startup times. Some |
| 6 | +environments spend 30 seconds just on optimization. All optimization time spent in startup is |
| 7 | +experienced downtime for users when `environmentd` restarts. |
| 8 | + |
| 9 | +## Success Criteria |
| 10 | + |
| 11 | +Startup spends less than 1 second optimizing expressions. |
| 12 | + |
| 13 | +## Solution Proposal |
| 14 | + |
| 15 | +The solution being proposed in this document is a cache of optimized expressions. During startup, |
| 16 | +`environmentd` will first look in the cache for optimized expressions and only compute a new |
| 17 | +expression if it isn't present in the cache. If enough expressions are cached and the cache is fast |
| 18 | +enough, then the time spent on this part of startup should be small. |
| 19 | + |
| 20 | +The cache will present similarly as a key-value value store where the key is a composite of |
| 21 | + |
| 22 | + - deployment generation |
| 23 | + - object global ID |
| 24 | + |
| 25 | +The value will be a serialized version of the optimized expression. An `environmentd` process with |
| 26 | +deploy generation `n`, will never be expected to look at a serialized expression with a deploy |
| 27 | +generation `m` s.t. `n != m`. Therefore, there are no forwards or backwards compatibility needed on |
| 28 | +the serialized representation of expressions. The cache will also be made durable so that it's |
| 29 | +available after a restart, at least within the same deployment generation. |
| 30 | + |
| 31 | +Upgrading an environment will look something like this: |
| 32 | + |
| 33 | +1. Start deploy generation `n` in read-only mode. |
| 34 | +2. Populate the expression cache for generation `n`. |
| 35 | +3. Start deploy generation `n` in read-write mode. |
| 36 | +4. Read optimized expressions from cache. |
| 37 | + |
| 38 | +Restarting an environment will look something like this: |
| 39 | + |
| 40 | +1. Start deploy generation `n` in read-write mode. |
| 41 | +2. Read optimized expressions from cache. |
| 42 | + |
| 43 | +### Prior Art |
| 44 | + |
| 45 | +The catalog currently has an in-memory expression cache. |
| 46 | + |
| 47 | + - [https://github.com/MaterializeInc/materialize/blob/bff231953f4bb97b70cae81bdd6dd1716dbf8cec/src/adapter/src/catalog.rs#L127](https://github.com/MaterializeInc/materialize/blob/bff231953f4bb97b70cae81bdd6dd1716dbf8cec/src/adapter/src/catalog.rs#L127) |
| 48 | + - [https://github.com/MaterializeInc/materialize/blob/bff231953f4bb97b70cae81bdd6dd1716dbf8cec/src/adapter/src/catalog.rs#L145-L345](https://github.com/MaterializeInc/materialize/blob/bff231953f4bb97b70cae81bdd6dd1716dbf8cec/src/adapter/src/catalog.rs#L145-L345) |
| 49 | + |
| 50 | +This cache is used to serve `EXPLAIN` queries to ensure accurate and consistent responses. When an |
| 51 | +index is dropped, it may change how an object _would_ be optimized, but it does not change how the |
| 52 | +object is currently deployed in a cluster. This cache contains the expressions that are deployed in |
| 53 | +a cluster, but not necessarily the expressions that would result from optimization from the current |
| 54 | +catalog contents. |
| 55 | + |
| 56 | +### Cache API |
| 57 | + |
| 58 | +Below is the API that the cache will present. It may be further wrapped with typed methods that |
| 59 | +take care of serializing and deserializing bytes. Additionally, we probably don't need a trait when |
| 60 | +implementing. |
| 61 | + |
| 62 | +```Rust |
| 63 | +/// All the cached expressions for a single `GlobalId`. |
| 64 | +/// |
| 65 | +/// Note: This is just a placeholder for now, don't index too hard on the exact fields. I haven't |
| 66 | +/// done the necessary research to figure out what they are. |
| 67 | +struct Expressions { |
| 68 | + local_mir: OptimizedMirRelationExpr, |
| 69 | + global_mir: DataflowDescription<OptimizedMirRelationExpr>, |
| 70 | + physical_plan: DataflowDescription<mz_compute_types::plan::Plan>, |
| 71 | + dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>, |
| 72 | + notices: SmallVec<[Arc<OptimizerNotice>; 4]>, |
| 73 | + optimizer_feature_overrides: OptimizerFeatures, |
| 74 | +} |
| 75 | + |
| 76 | +struct NewEntry { |
| 77 | + /// `GlobalId` of the new expression. |
| 78 | + id: GlobalId, |
| 79 | + /// New `Expressions` to cache. |
| 80 | + expressions: Expressions, |
| 81 | + /// `GlobalId`s to invalidate as a result of the new entry. |
| 82 | + invalidate_ids: BTreeSet<GlobalId>, |
| 83 | +} |
| 84 | + |
| 85 | +struct ExpressionCache { |
| 86 | + deploy_generation: u64, |
| 87 | + information_needed_to_connect_to_durable_store: _, |
| 88 | +} |
| 89 | + |
| 90 | +impl ExpressionCache { |
| 91 | + /// Creates a new [`ExpressionCache`] for `deploy_generation`. |
| 92 | + fn new(&mut self, deploy_generation: u64, information_needed_to_connect_to_durable_store: _) -> Self; |
| 93 | + |
| 94 | + /// Reconciles all entries in current deploy generation with the current objects, `current_ids`, |
| 95 | + /// and current optimizer features, `optimizer_features`. |
| 96 | + /// |
| 97 | + /// If `remove_prior_gens` is `true`, all previous generations are durably removed from the |
| 98 | + /// cache. |
| 99 | + /// |
| 100 | + /// Returns all cached expressions in the current deploy generation, after reconciliation. |
| 101 | + fn open(&mut self, current_ids: &BTreeSet<GlobalId>, optimizer_features: &OptimizerFeatures, remove_prior_gens: bool) -> Vec<(GlobalId, Expressions)>; |
| 102 | + |
| 103 | + /// Durably inserts `expressions` into current deploy generation. This may also invalidate |
| 104 | + /// entries giving by `expressions`. |
| 105 | + /// |
| 106 | + /// Returns a [`Future`] that completes once the changes have been made durable. |
| 107 | + /// |
| 108 | + /// Panics if any `GlobalId` already exists in the cache. |
| 109 | + fn insert_expressions(&mut self, expressions: Vec<NewEntry>) -> impl Future<Output=()>; |
| 110 | + |
| 111 | + /// Durably remove and return all entries in current deploy generation that depend on an ID in |
| 112 | + /// `dropped_ids` . |
| 113 | + /// |
| 114 | + /// Optional for v1. |
| 115 | + fn invalidate_entries(&mut self, dropped_ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, Expressions)>; |
| 116 | +} |
| 117 | +``` |
| 118 | + |
| 119 | +### Startup |
| 120 | + |
| 121 | +Below is a detailed set of steps that will happen in startup. |
| 122 | + |
| 123 | +1. Call `ExpressionCache::open` to read the cache into memory and perform reconciliation (See |
| 124 | + [Startup Reconciliation](#startup reconciliation)). When passing in the arguments, |
| 125 | + `remove_prior_gens == !read_only_mode`. |
| 126 | +2. While opening the catalog, for each object: |
| 127 | + a. If the object is present in the cache, use cached optimized expression. |
| 128 | + b. Else generate the optimized expressions and insert the expressions via |
| 129 | + `ExpressionCache::insert_expressions`. This will also perform any necessary invalidations if |
| 130 | + the new expression is an index. See ([Create Invalidations](#create invalidations)). |
| 131 | + |
| 132 | +#### Startup Reconciliation |
| 133 | +When opening the cache for the first time, we need to perform the following reconciliation tasks: |
| 134 | + |
| 135 | + - Remove any entries that exist in the cache but not in the catalog. |
| 136 | + - If `remove_prior_gens` is true, then remove all prior gens. |
| 137 | + |
| 138 | + |
| 139 | +### DDL - Create |
| 140 | + |
| 141 | +1. Execute catalog transaction. |
| 142 | +2. Update cache via `ExpressionCache::insert_expressions`. |
| 143 | +3. [Optional for v1] Recompute and insert any invalidated expressions. |
| 144 | + |
| 145 | +#### Create Invalidations |
| 146 | + |
| 147 | +When creating and inserting a new index, we need to invalidate some entries that may optimize to |
| 148 | +new expressions. When creating index `i` on object `o`, we need to invalidate the following objects: |
| 149 | + |
| 150 | + - `o`. |
| 151 | + - All compute objects that depend directly on `o`. |
| 152 | + - All compute objects that would directly depend on `o`, if all views were inlined. |
| 153 | + |
| 154 | +### DDL - Drop |
| 155 | + |
| 156 | +This is optional for v1, on startup `ExpressionCache::open` will update the cache to the |
| 157 | +correct state. |
| 158 | + |
| 159 | +1. Execute catalog transaction. |
| 160 | +2. Invalidate cache entries via `ExpressionCache::invalidate_entries`. |
| 161 | +3. Re-compute and repopulate cache entries that depended on dropped entries via |
| 162 | + `ExpressionCache::insert_expressions`. |
| 163 | + |
| 164 | +### Implementation |
| 165 | + |
| 166 | +The implementation will use persist for durability. The cache will be a single dedicated shard. |
| 167 | +Each cache entry will be keyed by `(deploy_generation, global_id)` and the value will be a |
| 168 | +serialized version of the expression. |
| 169 | + |
| 170 | +#### Conflict Resolution |
| 171 | + |
| 172 | +It is possible and expected that multiple environments will be writing to the cache at the same |
| 173 | +time. This would manifest in an upper mismatch error during an insert or invalidation. In case of |
| 174 | +this error, the cache should read in all new updates, apply each update as described below, and |
| 175 | +retry the operation from the beginning. |
| 176 | + |
| 177 | +If the update is in a different deploy generation as the current cache, then ignore it. It is in a |
| 178 | +different logical namespace and won't conflict with the operation. |
| 179 | + |
| 180 | +If the update is in the same deploy generation, then we must be in a split-brain scenario where |
| 181 | +both the current process and another process think they are the leader. We should still update any |
| 182 | +in-memory state as if the current cache had made that change. This relies on the following |
| 183 | +invariants: |
| 184 | + |
| 185 | + - Two processes with the same deploy generation MUST be running the same version of code. |
| 186 | + - A global ID only ever maps to a single object. |
| 187 | + - Optimization is deterministic. |
| 188 | + |
| 189 | +Therefore, we can be sure that any new global IDs refer to the same object that the current cache |
| 190 | +thinks it refers to. Also, the optimized expressions that the other process produced is identical |
| 191 | +to the optimized expression that the current process would have produced. Eventually, one of the |
| 192 | +processes will be fenced out on some other operation. The reason that we don't panic immediately, |
| 193 | +is because the current process may actually be the leader and enter a live-lock scenario like the |
| 194 | +following: |
| 195 | + |
| 196 | +1. Process `A` starts up and becomes the leader. |
| 197 | +2. Process `B` starts up and becomes the leader. |
| 198 | +3. Process `A` writes to the cache. |
| 199 | +4. Process `B` panics. |
| 200 | +5. Process `A` is fenced. |
| 201 | +6. Go back to step (1). |
| 202 | + |
| 203 | +#### Pros |
| 204 | +- Flushing, concurrency, atomicity, mocking are already implemented by persist. |
| 205 | + |
| 206 | +#### Cons |
| 207 | +- We need to worry about coordinating access across multiple pods. It's expected that during |
| 208 | + upgrades at least two `environmentd`s will be communicating with the cache. |
| 209 | +- We need to worry about compaction and read latency during startup. |
| 210 | + |
| 211 | +## Alternatives |
| 212 | + |
| 213 | +- For the persist implementation, we could mint a new shard for each deploy generation. This would |
| 214 | +require us to finalize old shards during startup which would accumulate shard tombstones in CRDB. |
| 215 | +- We could use persist's `FileBlob` for durability. It's extremely well tested (most of CI uses it |
| 216 | + for persist) and solves at least some of the file system cons. |
| 217 | +- We could use persist for durability, but swap in the `FileBlob` as the blob store and some local |
| 218 | + consensus implementation. |
| 219 | + |
| 220 | +### File System Implementation |
| 221 | + |
| 222 | +One potential implementation is via the filesystem of an attached durable storage to `environmentd`. |
| 223 | +Each cache entry would be saved as a file of the format |
| 224 | +`/path/to/cache/<deploy_generation>/<global_id>`. |
| 225 | + |
| 226 | +#### Pros |
| 227 | +- No need to worry about coordination across K8s pods. |
| 228 | +- Bulk deletion is a simple directory delete. |
| 229 | + |
| 230 | +#### Cons |
| 231 | +- Need to worry about Flushing/fsync. |
| 232 | +- Need to worry about concurrency. |
| 233 | +- Need to worry about atomicity. |
| 234 | +- Need to worry about mocking things in memory for tests. |
| 235 | +- If we lose the pod, then we also lose the cache. |
| 236 | + |
| 237 | +## Open questions |
| 238 | + |
| 239 | +- Which implementation should we use? |
| 240 | +- If we use the persist implementation, how do we coordinate writes across pods? |
| 241 | + - I haven't thought much about this, but here's one idea. The cache will maintain a subscribe on |
| 242 | + the persist shard. Everytime it experiences an upper mismatch, it will listen for all new |
| 243 | + changes. If any of the changes contain the current deploy generation, then panic, else ignore |
| 244 | + them. |
0 commit comments