From 8c6c9a63c1eb1bb29cf2689e0643df9e386b44ef Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 22 Oct 2024 10:51:55 +0200 Subject: [PATCH 1/2] Update expiration design Signed-off-by: Moritz Hoffmann --- .../design/20240919_dataflow_expiration.md | 79 ++++++++++++++++++- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/doc/developer/design/20240919_dataflow_expiration.md b/doc/developer/design/20240919_dataflow_expiration.md index 585e0c6740112..e696295b4915b 100644 --- a/doc/developer/design/20240919_dataflow_expiration.md +++ b/doc/developer/design/20240919_dataflow_expiration.md @@ -7,7 +7,9 @@ Temporal filters currently require Materialize to maintain all future retractions of data that is currently visible. For long windows, the retractions could be at timestamps beyond our next scheduled restart, which is typically our weekly -DB release. +DB release. Materialize needs to revisit the data at every tick, which causes it +to spend CPU time linear in the number of outstanding updates. This is prohibitively +expensive to maintain. For instance, in the example below, the temporal filter in the `last_30_days` view causes two diffs to be generated for every row inserted into `events`, the @@ -40,6 +42,9 @@ such retractions can be dropped. ## Success Criteria +The observed output of Materialize with expiration enabled is indistinguishable +from expiration disabled. + When temporal filters are in use, retraction diffs associated with timestamps beyond a set expiration time can be dropped without affecting correctness, resulting in lower memory and CPU utilization from halving the number of @@ -47,6 +52,47 @@ processed diffs. ## Solution Proposal +We define an _expiration offset_, which allows a replica to determine which +data cannot be revealed to the user while maintaining correctness invariants. A +implementation is correct iff it produces exactly the same data with or without +replica expiration enabled or disabled. + +Objects in Materialize exist in a specific timeline. For this effort, we only +focus on objects in the epoch timeline, but exclude all others. This will cover +the majority of objects. + +At any point in time, a collection has a lower and an upper bound of times that +we can distinguish, forming a range of data that we can surface to the user. As +time advances, the lower and upper bounds can advance, too. Inserting new data +moves the upper bound, compacting old data moves the lower bound. Relating the +wall-clock time to future frontiers allows us to specify the semantics for +expiring data. + +Specifically, we're interested to determine how far the upper frontier of a +collection will advance at the point of expiration. We can drop all data past +the upper frontier at the time of expiration, but never any data before. +* Sources and tables tick forward in relation to the system's wall-clock time. + They never jump forward unless dropped. +* Load generators tick forward in accordance with their implementation, + allowing all valid frontier movements. +* Indexes, tick with respect to their inputs. +* Materialized views tick with respect to their inputs, and adhere to a refresh + schedule. +* Subscribes tick forward with respect to their inputs up to a user-defined + time. +* Selects query a single point in time, and thus are not affected by this + feature. +* Constant collections are valid from the beginning to the end of time. + +An object that depends on multiple inputs ticks forward at the rate of the +slowest input, i.e., the meet of all its input's uppers. + +The expiration offset is a global setting. When creating a new replica, we +capture the value and never update it later. Replicas convert the offset into +an absolute time stamp depending on their wall-clock time. When rendering a +dataflow, each replica determines an appropriate expiration time based on the +dataflow's upper at the time of expiration. + A new LaunchDarkly feature flag is introduced that specifies an _expiration offset_ (a `Duration`). The _replica expiration_ time is computed as the offset added to the start time of the replica. Dataflows matching certain @@ -56,6 +102,34 @@ the dataflow expiration are dropped. To ensure correctness, panic checks are added to these dataflows that ensure that the frontier does not exceed the dataflow expiration before the replica is restarted. +Environment and replica only have partial information on the per-dataflow +expiration time. The environment knows the dependency tree and properties per +object. Only the replica knows the local expiration time. Thus, a dataflow +description needs to encode enough breadcrumbs for the replica to make a +correct decision on if and when to expire a dataflow. + +**An object is _definite up to_ a frontier as a function of an expiration +time:** +* Sources are definite up to the expiration time. +* Load generators are definit up to the minimum frontier. +* Indexes are definite up to the meet of their inputs. +* Constant collections are definite up to the empty frontier. +* Tables are definite up to the expiration time. +* Materialized views are definite up to the meet of their inputs, rounded up to + the refresh schedule. +* Subscribes are definite up to the meet of their up-to and their inputs. + +### Distinguishing expiration from dataflow shutdown + +When a dataflow shuts down, it frontier advances to the empty frontier. The +persist source assumes that once the input frontiers reach the `until`, it is +free to shut down the dataflow. This is correct if the until is specified by +the user, but incorrect if the until is set to expire future updates. + +TODO: +* Checking frontiers at the input vs. output. +* Do we need a token to distinguish shutdown variants? Ideally, we know from the structure of the dataflow and objects it depends on what advancing to the empty frontier means. + An overview of the logic used for these features is as follows: ``` # Consider the `upper` for different dataflows @@ -128,6 +202,3 @@ Dataflow expiration is disabled for the following cases: - Dataflows whose timeline type is not `Timeline::EpochMillis`. We rely on the frontier timestamp being comparable to wall clock time of the replica. -- Dataflows that transitively depend on a materialized view with a non-default - refresh schedule. Handling such materialized views would require additional - logic to track the refresh schedules and ensure that the dataflow expiration From 1b6ced36744732953236f4f30ca6daa1c08e698e Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 23 Oct 2024 15:31:12 +0200 Subject: [PATCH 2/2] wip Signed-off-by: Moritz Hoffmann --- .../design/20240919_dataflow_expiration.md | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/doc/developer/design/20240919_dataflow_expiration.md b/doc/developer/design/20240919_dataflow_expiration.md index e696295b4915b..8af37dbdd0320 100644 --- a/doc/developer/design/20240919_dataflow_expiration.md +++ b/doc/developer/design/20240919_dataflow_expiration.md @@ -108,16 +108,16 @@ object. Only the replica knows the local expiration time. Thus, a dataflow description needs to encode enough breadcrumbs for the replica to make a correct decision on if and when to expire a dataflow. -**An object is _definite up to_ a frontier as a function of an expiration +**An object is at least _indefinite up to_ a frontier as a function of an expiration time:** -* Sources are definite up to the expiration time. -* Load generators are definit up to the minimum frontier. -* Indexes are definite up to the meet of their inputs. -* Constant collections are definite up to the empty frontier. -* Tables are definite up to the expiration time. -* Materialized views are definite up to the meet of their inputs, rounded up to +* Sources are indefinite up to the expiration time. +* Load generators are indefinit up to the minimum frontier. +* Indexes are indefinite up to the meet of their inputs. +* Constant collections are indefinite up to the empty frontier. +* Tables are indefinite up to the expiration time. +* Materialized views are indefinite up to the meet of their inputs, rounded up to the refresh schedule. -* Subscribes are definite up to the meet of their up-to and their inputs. +* Subscribes are indefinite up to the meet of their up-to and their inputs. ### Distinguishing expiration from dataflow shutdown @@ -128,7 +128,12 @@ the user, but incorrect if the until is set to expire future updates. TODO: * Checking frontiers at the input vs. output. -* Do we need a token to distinguish shutdown variants? Ideally, we know from the structure of the dataflow and objects it depends on what advancing to the empty frontier means. +* Do we need a token to distinguish shutdown variants? Ideally, we know from + the structure of the dataflow and objects it depends on what advancing to the + empty frontier means. +* Do we need to panic on expiration? +* Can we retain a capability at the expiration time and thus prevent unwanted + progress? An overview of the logic used for these features is as follows: ```