Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update expiration design #30125

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 80 additions & 4 deletions doc/developer/design/20240919_dataflow_expiration.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
Temporal filters currently require Materialize to maintain all future retractions
of data that is currently visible. For long windows, the retractions could be at
timestamps beyond our next scheduled restart, which is typically our weekly
DB release.
DB release. Materialize needs to revisit the data at every tick, which causes it
to spend CPU time linear in the number of outstanding updates. This is prohibitively
expensive to maintain.

For instance, in the example below, the temporal filter in the `last_30_days`
view causes two diffs to be generated for every row inserted into `events`, the
Expand Down Expand Up @@ -40,13 +42,57 @@ such retractions can be dropped.

## Success Criteria

The observed output of Materialize with expiration enabled is indistinguishable
from expiration disabled.

When temporal filters are in use, retraction diffs associated with timestamps
beyond a set expiration time can be dropped without affecting correctness,
resulting in lower memory and CPU utilization from halving the number of
processed diffs.

## Solution Proposal

We define an _expiration offset_, which allows a replica to determine which
data cannot be revealed to the user while maintaining correctness invariants. A
implementation is correct iff it produces exactly the same data with or without
replica expiration enabled or disabled.

Objects in Materialize exist in a specific timeline. For this effort, we only
focus on objects in the epoch timeline, but exclude all others. This will cover
the majority of objects.

At any point in time, a collection has a lower and an upper bound of times that
we can distinguish, forming a range of data that we can surface to the user. As
time advances, the lower and upper bounds can advance, too. Inserting new data
moves the upper bound, compacting old data moves the lower bound. Relating the
wall-clock time to future frontiers allows us to specify the semantics for
expiring data.

Specifically, we're interested to determine how far the upper frontier of a
collection will advance at the point of expiration. We can drop all data past
the upper frontier at the time of expiration, but never any data before.
* Sources and tables tick forward in relation to the system's wall-clock time.
They never jump forward unless dropped.
* Load generators tick forward in accordance with their implementation,
allowing all valid frontier movements.
* Indexes, tick with respect to their inputs.
* Materialized views tick with respect to their inputs, and adhere to a refresh
schedule.
* Subscribes tick forward with respect to their inputs up to a user-defined
time.
* Selects query a single point in time, and thus are not affected by this
feature.
* Constant collections are valid from the beginning to the end of time.

An object that depends on multiple inputs ticks forward at the rate of the
slowest input, i.e., the meet of all its input's uppers.

The expiration offset is a global setting. When creating a new replica, we
capture the value and never update it later. Replicas convert the offset into
an absolute time stamp depending on their wall-clock time. When rendering a
dataflow, each replica determines an appropriate expiration time based on the
dataflow's upper at the time of expiration.

A new LaunchDarkly feature flag is introduced that specifies an _expiration
offset_ (a `Duration`). The _replica expiration_ time is computed as the offset
added to the start time of the replica. Dataflows matching certain
Expand All @@ -56,6 +102,39 @@ the dataflow expiration are dropped. To ensure correctness, panic checks are
added to these dataflows that ensure that the frontier does not exceed the
dataflow expiration before the replica is restarted.

Environment and replica only have partial information on the per-dataflow
expiration time. The environment knows the dependency tree and properties per
object. Only the replica knows the local expiration time. Thus, a dataflow
description needs to encode enough breadcrumbs for the replica to make a
correct decision on if and when to expire a dataflow.

**An object is at least _indefinite up to_ a frontier as a function of an expiration
time:**
* Sources are indefinite up to the expiration time.
* Load generators are indefinit up to the minimum frontier.
* Indexes are indefinite up to the meet of their inputs.
* Constant collections are indefinite up to the empty frontier.
* Tables are indefinite up to the expiration time.
* Materialized views are indefinite up to the meet of their inputs, rounded up to
the refresh schedule.
* Subscribes are indefinite up to the meet of their up-to and their inputs.

### Distinguishing expiration from dataflow shutdown

When a dataflow shuts down, it frontier advances to the empty frontier. The
persist source assumes that once the input frontiers reach the `until`, it is
free to shut down the dataflow. This is correct if the until is specified by
the user, but incorrect if the until is set to expire future updates.

TODO:
* Checking frontiers at the input vs. output.
* Do we need a token to distinguish shutdown variants? Ideally, we know from
the structure of the dataflow and objects it depends on what advancing to the
empty frontier means.
* Do we need to panic on expiration?
* Can we retain a capability at the expiration time and thus prevent unwanted
progress?

An overview of the logic used for these features is as follows:
```
# Consider the `upper` for different dataflows
Expand Down Expand Up @@ -128,6 +207,3 @@ Dataflow expiration is disabled for the following cases:

- Dataflows whose timeline type is not `Timeline::EpochMillis`. We rely on the
frontier timestamp being comparable to wall clock time of the replica.
- Dataflows that transitively depend on a materialized view with a non-default
refresh schedule. Handling such materialized views would require additional
logic to track the refresh schedules and ensure that the dataflow expiration
Loading