Skip to content

Commit 5ae4456

Browse files
authored
Merge pull request #30114 from teskje/compute-sequence-readhold-downgrades
controller/compute: sequence external read hold changes
2 parents 42af451 + f8bf695 commit 5ae4456

File tree

9 files changed

+184
-208
lines changed

9 files changed

+184
-208
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/tests/timestamp_selection.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl TimestampProvider for Frontiers {
119119

120120
let mock_read_hold = |id, frontier| {
121121
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
122-
ReadHold::new(id, frontier, tx)
122+
ReadHold::with_channel(id, frontier, tx)
123123
};
124124

125125
for (instance_id, ids) in id_bundle.compute_ids.iter() {

src/compute-client/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ bytes = "1.3.0"
1717
bytesize = "1.1.0"
1818
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
1919
crossbeam-channel = "0.5.8"
20+
derivative = "2.2.0"
2021
differential-dataflow = "0.13.2"
2122
futures = "0.3.25"
2223
http = "1.1.0"

src/compute-client/src/as_of_selection.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -944,7 +944,7 @@ mod tests {
944944
.get(&id)
945945
.ok_or(ReadHoldError::CollectionMissing(id))?;
946946
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
947-
holds.push(ReadHold::new(id, read.clone(), tx));
947+
holds.push(ReadHold::with_channel(id, read.clone(), tx));
948948
}
949949
Ok(holds)
950950
}

src/compute-client/src/controller.rs

+23-34
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use mz_storage_types::read_policy::ReadPolicy;
5858
use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
5959
use prometheus::proto::LabelPair;
6060
use serde::{Deserialize, Serialize};
61-
use timely::progress::{Antichain, ChangeBatch, Timestamp};
61+
use timely::progress::{Antichain, Timestamp};
6262
use timely::PartialOrder;
6363
use tokio::sync::{mpsc, oneshot};
6464
use tokio::time::{self, MissedTickBehavior};
@@ -539,8 +539,6 @@ where
539539
logs.push((log, id, shared));
540540
}
541541

542-
let (read_holds_tx, read_holds_rx) = mpsc::unbounded_channel();
543-
544542
let client = instance::Client::spawn(
545543
id,
546544
self.build_info,
@@ -553,11 +551,9 @@ where
553551
Arc::clone(&self.dyncfg),
554552
self.response_tx.clone(),
555553
self.introspection_tx.clone(),
556-
read_holds_tx.clone(),
557-
read_holds_rx,
558554
);
559555

560-
let instance = InstanceState::new(client, collections, read_holds_tx);
556+
let instance = InstanceState::new(client, collections);
561557
self.instances.insert(id, instance);
562558

563559
self.instance_workload_classes
@@ -608,7 +604,7 @@ where
608604
/// Panics if the identified `instance` still has active replicas.
609605
pub fn drop_instance(&mut self, id: ComputeInstanceId) {
610606
if let Some(instance) = self.instances.remove(&id) {
611-
instance.call(|i| i.check_empty());
607+
instance.call(|i| i.shutdown());
612608
}
613609

614610
self.instance_workload_classes
@@ -1103,58 +1099,52 @@ struct InstanceState<T: ComputeControllerTimestamp> {
11031099
client: instance::Client<T>,
11041100
replicas: BTreeSet<ReplicaId>,
11051101
collections: BTreeMap<GlobalId, Collection<T>>,
1106-
/// Sender for updates to collection read holds.
1107-
///
1108-
/// Copies of this sender are given to [`ReadHold`]s that are created in
1109-
/// [`InstanceState::acquire_read_hold`].
1110-
read_holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
11111102
}
11121103

11131104
impl<T: ComputeControllerTimestamp> InstanceState<T> {
1114-
fn new(
1115-
client: instance::Client<T>,
1116-
collections: BTreeMap<GlobalId, Collection<T>>,
1117-
read_holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
1118-
) -> Self {
1105+
fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
11191106
Self {
11201107
client,
11211108
replicas: Default::default(),
11221109
collections,
1123-
read_holds_tx,
11241110
}
11251111
}
11261112

11271113
fn collection(&self, id: GlobalId) -> Result<&Collection<T>, CollectionMissing> {
11281114
self.collections.get(&id).ok_or(CollectionMissing(id))
11291115
}
11301116

1131-
pub fn call<F>(&self, f: F)
1117+
fn call<F>(&self, f: F)
11321118
where
11331119
F: FnOnce(&mut Instance<T>) + Send + 'static,
11341120
{
11351121
let otel_ctx = OpenTelemetryContext::obtain();
1136-
self.client.send(Box::new(move |instance| {
1137-
let _span = debug_span!("instance::call").entered();
1138-
otel_ctx.attach_as_parent();
1139-
1140-
f(instance)
1141-
}));
1122+
self.client
1123+
.send(Box::new(move |instance| {
1124+
let _span = debug_span!("instance::call").entered();
1125+
otel_ctx.attach_as_parent();
1126+
1127+
f(instance)
1128+
}))
1129+
.expect("instance not dropped");
11421130
}
11431131

1144-
pub async fn call_sync<F, R>(&self, f: F) -> R
1132+
async fn call_sync<F, R>(&self, f: F) -> R
11451133
where
11461134
F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
11471135
R: Send + 'static,
11481136
{
11491137
let (tx, rx) = oneshot::channel();
11501138
let otel_ctx = OpenTelemetryContext::obtain();
1151-
self.client.send(Box::new(move |instance| {
1152-
let _span = debug_span!("instance::call_sync").entered();
1153-
otel_ctx.attach_as_parent();
1139+
self.client
1140+
.send(Box::new(move |instance| {
1141+
let _span = debug_span!("instance::call_sync").entered();
1142+
otel_ctx.attach_as_parent();
11541143

1155-
let result = f(instance);
1156-
let _ = tx.send(result);
1157-
}));
1144+
let result = f(instance);
1145+
let _ = tx.send(result);
1146+
}))
1147+
.expect("instance not dropped");
11581148

11591149
rx.await.expect("instance not dropped")
11601150
}
@@ -1177,7 +1167,7 @@ impl<T: ComputeControllerTimestamp> InstanceState<T> {
11771167
since
11781168
});
11791169

1180-
let hold = ReadHold::new(id, since, self.read_holds_tx.clone());
1170+
let hold = ReadHold::new(id, since, self.client.read_hold_tx());
11811171
Ok(hold)
11821172
}
11831173

@@ -1196,7 +1186,6 @@ impl<T: ComputeControllerTimestamp> InstanceState<T> {
11961186
client: _,
11971187
replicas,
11981188
collections,
1199-
read_holds_tx: _,
12001189
} = self;
12011190

12021191
let instance = self.call_sync(|i| i.dump()).await?;

0 commit comments

Comments
 (0)