Skip to content

Commit d6c5764

Browse files
authored
Merge pull request #25184 from ggevay/precise_acquire_read_holds2
Acquire precise read holds for `REFRESH AT`s
2 parents a6d529d + d3feba8 commit d6c5764

File tree

11 files changed

+238
-45
lines changed

11 files changed

+238
-45
lines changed

src/adapter/src/coord.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -1253,7 +1253,9 @@ pub struct Coordinator {
12531253
///
12541254
/// Upon completing a transaction, this timestamp should be removed from the holds
12551255
/// in `self.read_capability[id]`, using the `release_read_holds` method.
1256-
txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1256+
///
1257+
/// We use a Vec because `ReadHolds` doesn't have a way of tracking multiplicity.
1258+
txn_read_holds: BTreeMap<ConnectionId, Vec<read_policy::ReadHolds<Timestamp>>>,
12571259

12581260
/// Access to the peek fields should be restricted to methods in the [`peek`] API.
12591261
/// A map from pending peek ids to the queue into which responses are sent, and

src/adapter/src/coord/command_handler.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,8 @@ impl Coordinator {
843843
}
844844

845845
if acquire_read_holds {
846-
self.acquire_read_holds_auto_cleanup(session, timestamp, &ids);
846+
self.acquire_read_holds_auto_cleanup(session, timestamp, &ids, false)
847+
.expect("precise==false, so acquiring read holds always succeeds");
847848
}
848849

849850
Ok(Some(timestamp))

src/adapter/src/coord/id_bundle.rs

+43
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,49 @@ impl CollectionIdBundle {
5454
}
5555
}
5656

57+
/// Returns a new bundle with the identifiers that are present in both `self` and `other`.
58+
pub fn intersection(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
59+
// Attend to storage ids.
60+
let storage_ids = self
61+
.storage_ids
62+
.intersection(&other.storage_ids)
63+
.cloned()
64+
.collect();
65+
66+
// Intersect ComputeInstanceIds.
67+
let self_compute_instances = self.compute_ids.keys().collect::<BTreeSet<_>>();
68+
let other_compute_instances = other.compute_ids.keys().collect::<BTreeSet<_>>();
69+
let compute_instances = self_compute_instances
70+
.intersection(&other_compute_instances)
71+
.cloned();
72+
73+
// For each ComputeInstanceId, intersect `self` with `other`.
74+
let compute_ids = compute_instances
75+
.map(|compute_instance_id| {
76+
let self_compute_ids = self
77+
.compute_ids
78+
.get(compute_instance_id)
79+
.expect("id is in intersection, so should be found");
80+
let other_compute_ids = other
81+
.compute_ids
82+
.get(compute_instance_id)
83+
.expect("id is in intersection, so should be found");
84+
(
85+
compute_instance_id.clone(),
86+
self_compute_ids
87+
.intersection(other_compute_ids)
88+
.cloned()
89+
.collect(),
90+
)
91+
})
92+
.collect();
93+
94+
CollectionIdBundle {
95+
storage_ids,
96+
compute_ids,
97+
}
98+
}
99+
57100
/// Extends a `CollectionIdBundle` with the contents of another `CollectionIdBundle`.
58101
pub fn extend(&mut self, other: &CollectionIdBundle) {
59102
self.storage_ids.extend(&other.storage_ids);

src/adapter/src/coord/read_policy.rs

+79-37
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,15 @@ impl<T: Eq + Hash + Ord> ReadHolds<T> {
110110
}
111111

112112
/// Extends a `ReadHolds` with the contents of another `ReadHolds`.
113-
pub fn extend(&mut self, other: ReadHolds<T>) {
114-
for (time, id_bundle) in other.holds {
115-
self.holds.entry(time).or_default().extend(&id_bundle);
113+
/// Asserts that the newly added read holds don't coincide with any of the existing read holds in self.
114+
pub fn extend_with_new(&mut self, other: ReadHolds<T>) {
115+
for (time, other_id_bundle) in other.holds {
116+
let self_id_bundle = self.holds.entry(time).or_default();
117+
assert!(
118+
self_id_bundle.intersection(&other_id_bundle).is_empty(),
119+
"extend_with_new encountered duplicate read holds",
120+
);
121+
self_id_bundle.extend(&other_id_bundle);
116122
}
117123
}
118124

@@ -214,7 +220,7 @@ impl crate::coord::Coordinator {
214220
.or_default()
215221
.extend(id_bundle);
216222
}
217-
read_holds.extend(new_read_holds);
223+
read_holds.extend_with_new(new_read_holds);
218224
}
219225
TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent => {
220226
id_bundles.entry(None).or_default().extend(&id_bundle);
@@ -382,14 +388,34 @@ impl crate::coord::Coordinator {
382388

383389
/// Attempt to acquire read holds on the indicated collections at the indicated `time`.
384390
///
385-
/// If we are unable to acquire a read hold at the provided `time` for a specific id, then we
386-
/// will acquire a read hold at the lowest possible time for that id.
391+
/// If we are unable to acquire a read hold at the provided `time` for a specific id, then
392+
/// depending on the `precise` argument, we either fall back to acquiring a read hold at
393+
/// the lowest possible time for that id, or return an error. The returned error contains
394+
/// those collection sinces that were later than the specified time.
387395
pub(crate) fn acquire_read_holds(
388396
&mut self,
389-
time: mz_repr::Timestamp,
397+
time: Timestamp,
390398
id_bundle: &CollectionIdBundle,
391-
) -> ReadHolds<mz_repr::Timestamp> {
399+
precise: bool,
400+
) -> Result<ReadHolds<Timestamp>, Vec<(Antichain<Timestamp>, CollectionIdBundle)>> {
392401
let read_holds = self.initialize_read_holds(time, id_bundle);
402+
if precise {
403+
// If we are not able to acquire read holds precisely at the specified time (only later), then error out.
404+
let too_late = read_holds
405+
.holds
406+
.iter()
407+
.filter_map(|(antichain, ids)| {
408+
if antichain.iter().all(|hold_time| *hold_time == time) {
409+
None
410+
} else {
411+
Some((antichain.clone(), ids.clone()))
412+
}
413+
})
414+
.collect_vec();
415+
if !too_late.is_empty() {
416+
return Err(too_late);
417+
}
418+
}
393419
// Update STORAGE read policies.
394420
let mut policy_changes = Vec::new();
395421
for (time, id) in read_holds.storage_ids() {
@@ -418,26 +444,30 @@ impl crate::coord::Coordinator {
418444
.unwrap_or_terminate("cannot fail to set read policy");
419445
}
420446

421-
read_holds
447+
Ok(read_holds)
422448
}
423449

424450
/// Attempt to acquire read holds on the indicated collections at the indicated `time`.
425451
/// This is similar to [Self::acquire_read_holds], but instead of returning the read holds,
426452
/// it arranges for them to be automatically released at the end of the transaction.
427453
///
428-
/// If we are unable to acquire a read hold at the provided `time` for a specific id, then we
429-
/// will acquire a read hold at the lowest possible time for that id.
454+
/// If we are unable to acquire a read hold at the provided `time` for a specific id, then
455+
/// depending on the `precise` argument, we either fall back to acquiring a read hold at
456+
/// the lowest possible time for that id, or return an error. The returned error contains
457+
/// those collection sinces that were later than the specified time.
430458
pub(crate) fn acquire_read_holds_auto_cleanup(
431459
&mut self,
432460
session: &Session,
433461
time: Timestamp,
434462
id_bundle: &CollectionIdBundle,
435-
) {
436-
let read_holds = self.acquire_read_holds(time, id_bundle);
463+
precise: bool,
464+
) -> Result<(), Vec<(Antichain<Timestamp>, CollectionIdBundle)>> {
465+
let read_holds = self.acquire_read_holds(time, id_bundle, precise)?;
437466
self.txn_read_holds
438467
.entry(session.conn_id().clone())
439-
.or_insert_with(ReadHolds::new)
440-
.extend(read_holds);
468+
.or_insert_with(Vec::new)
469+
.push(read_holds);
470+
Ok(())
441471
}
442472

443473
/// Attempt to update the timestamp of the read holds on the indicated collections from the
@@ -446,10 +476,10 @@ impl crate::coord::Coordinator {
446476
/// If we are unable to update a read hold at the provided `time` for a specific id, then we
447477
/// leave it unchanged.
448478
///
449-
/// This method relies on a previous call to `acquire_read_holds` with the same
450-
/// `read_holds` argument or a previous call to `update_read_hold` that returned
479+
/// This method relies on a previous call to
480+
/// `initialize_read_holds`, `acquire_read_holds`, or `update_read_hold` that returned
451481
/// `read_holds`, and its behavior will be erratic if called on anything else.
452-
pub(super) fn update_read_hold(
482+
pub(super) fn update_read_holds(
453483
&mut self,
454484
read_holds: ReadHolds<mz_repr::Timestamp>,
455485
new_time: mz_repr::Timestamp,
@@ -547,34 +577,46 @@ impl crate::coord::Coordinator {
547577

548578
new_read_holds
549579
}
550-
/// Release read holds on the indicated collections at the indicated times.
580+
581+
/// Release the given read holds.
551582
///
552-
/// This method relies on a previous call to `acquire_read_holds` with the same
553-
/// argument, or a previous call to `update_read_hold` that returned
554-
/// `read_holds`, and its behavior will be erratic if called on anything else,
583+
/// This method relies on a previous call to
584+
/// `initialize_read_holds`, `acquire_read_holds`, or `update_read_hold` that returned
585+
/// `ReadHolds`, and its behavior will be erratic if called on anything else,
555586
/// or if called more than once on the same bundle of read holds.
556-
pub(super) fn release_read_hold(&mut self, read_holds: &ReadHolds<mz_repr::Timestamp>) {
587+
pub(super) fn release_read_holds(&mut self, read_holdses: Vec<ReadHolds<Timestamp>>) {
557588
// Update STORAGE read policies.
558-
let mut policy_changes = Vec::new();
559-
for (time, id) in read_holds.storage_ids() {
560-
// It's possible that a concurrent DDL statement has already dropped this GlobalId
561-
if let Some(read_needs) = self.storage_read_capabilities.get_mut(id) {
562-
read_needs.holds.update_iter(time.iter().map(|t| (*t, -1)));
563-
policy_changes.push((*id, read_needs.policy()));
589+
let mut storage_policy_changes = Vec::new();
590+
for read_holds in read_holdses.iter() {
591+
for (time, id) in read_holds.storage_ids() {
592+
// It's possible that a concurrent DDL statement has already dropped this GlobalId
593+
if let Some(read_needs) = self.storage_read_capabilities.get_mut(id) {
594+
read_needs.holds.update_iter(time.iter().map(|t| (*t, -1)));
595+
storage_policy_changes.push((*id, read_needs.policy()));
596+
}
564597
}
565598
}
566-
self.controller.storage.set_read_policy(policy_changes);
599+
self.controller
600+
.storage
601+
.set_read_policy(storage_policy_changes);
567602
// Update COMPUTE read policies
568603
let mut compute = self.controller.active_compute();
569-
for (compute_instance, compute_ids) in read_holds.compute_ids() {
570-
let mut policy_changes = Vec::new();
571-
for (time, id) in compute_ids {
572-
// It's possible that a concurrent DDL statement has already dropped this GlobalId
573-
if let Some(read_needs) = self.compute_read_capabilities.get_mut(id) {
574-
read_needs.holds.update_iter(time.iter().map(|t| (*t, -1)));
575-
policy_changes.push((*id, read_needs.policy()));
604+
let mut policy_changes_per_instance = BTreeMap::new();
605+
for read_holds in read_holdses.iter() {
606+
for (compute_instance, compute_ids) in read_holds.compute_ids() {
607+
let policy_changes = policy_changes_per_instance
608+
.entry(compute_instance)
609+
.or_insert_with(Vec::new);
610+
for (time, id) in compute_ids {
611+
// It's possible that a concurrent DDL statement has already dropped this GlobalId
612+
if let Some(read_needs) = self.compute_read_capabilities.get_mut(id) {
613+
read_needs.holds.update_iter(time.iter().map(|t| (*t, -1)));
614+
policy_changes.push((*id, read_needs.policy()));
615+
}
576616
}
577617
}
618+
}
619+
for (compute_instance, policy_changes) in policy_changes_per_instance {
578620
if compute.instance_exists(*compute_instance) {
579621
compute
580622
.set_read_policy(*compute_instance, policy_changes)

src/adapter/src/coord/sequencer/inner/create_materialized_view.rs

+29-1
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,10 @@ impl Coordinator {
206206
let plan::CreateMaterializedViewPlan {
207207
materialized_view:
208208
plan::MaterializedView {
209-
expr, cluster_id, ..
209+
expr,
210+
cluster_id,
211+
refresh_schedule,
212+
..
210213
},
211214
ambiguous_columns,
212215
..
@@ -243,6 +246,31 @@ impl Coordinator {
243246
role_metadata: session.role_metadata().clone(),
244247
};
245248

249+
// Acquire read holds at all the REFRESH AT times.
250+
// Note that we already acquired a possibly non-precise read hold at mz_now() in the purification,
251+
// if any of the REFRESH options involve mz_now(). But now we can acquire precise read holds, because by now
252+
// the REFRESH AT expressions have been evaluated, so we can handle something like
253+
// `mz_now()::text::int8 + 10000`;
254+
if let Some(refresh_schedule) = refresh_schedule {
255+
if !refresh_schedule.ats.is_empty() {
256+
let ids = self
257+
.index_oracle(*cluster_id)
258+
.sufficient_collections(resolved_ids.0.iter());
259+
for refresh_at_ts in &refresh_schedule.ats {
260+
match self.acquire_read_holds_auto_cleanup(session, *refresh_at_ts, &ids, true)
261+
{
262+
Ok(()) => {}
263+
Err(earliest_possible) => {
264+
return Err(AdapterError::InputNotReadableAtRefreshAtTime(
265+
*refresh_at_ts,
266+
earliest_possible,
267+
));
268+
}
269+
};
270+
}
271+
}
272+
}
273+
246274
Ok(CreateMaterializedViewStage::Optimize(
247275
CreateMaterializedViewOptimize {
248276
validity,

src/adapter/src/coord/sequencer/inner/peek.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,9 @@ impl Coordinator {
984984
// we must acquire read holds here so they are held until the off-thread work
985985
// returns to the coordinator.
986986
if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
987+
// Transactions involving peeks will acquire read holds at most once.
988+
assert_eq!(txn_reads.len(), 1);
989+
let txn_reads = &txn_reads[0];
987990
// Find referenced ids not in the read hold. A reference could be caused by a
988991
// user specifying an object in a different schema than the first query. An
989992
// index could be caused by a CREATE INDEX after the transaction started.
@@ -1000,7 +1003,8 @@ impl Coordinator {
10001003
});
10011004
}
10021005
} else if let Some((timestamp, bundle)) = potential_read_holds {
1003-
self.acquire_read_holds_auto_cleanup(session, timestamp, bundle);
1006+
self.acquire_read_holds_auto_cleanup(session, timestamp, bundle, true)
1007+
.expect("able to acquire read holds at the time that we just got from `determine_timestamp`");
10041008
}
10051009

10061010
// TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems

src/adapter/src/coord/sql.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ impl Coordinator {
205205

206206
// Release this transaction's compaction hold on collections.
207207
if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
208-
self.release_read_hold(&txn_reads);
208+
self.release_read_holds(txn_reads);
209209
}
210210
}
211211

src/adapter/src/coord/timeline.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ impl Coordinator {
723723
};
724724
let read_ts = oracle.read_ts().await;
725725
if read_holds.times().any(|time| time.less_than(&read_ts)) {
726-
read_holds = self.update_read_hold(read_holds, read_ts);
726+
read_holds = self.update_read_holds(read_holds, read_ts);
727727
}
728728
self.global_timelines
729729
.insert(timeline, TimelineState { oracle, read_holds });

0 commit comments

Comments
 (0)