Skip to content

Commit 7f6dc28

Browse files
authored
Merge pull request #34238 from ggevay/frontend-peek-rtr
Frontend peek sequencing -- RTR
2 parents e3304a8 + f0b69ff commit 7f6dc28

File tree

11 files changed

+120
-72
lines changed

11 files changed

+120
-72
lines changed

misc/python/materialize/mzcompose/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def get_variable_system_parameters(
196196
),
197197
VariableSystemParameter(
198198
"enable_frontend_peek_sequencing",
199-
"false",
199+
"true",
200200
["true", "false"],
201201
),
202202
VariableSystemParameter(

src/adapter/src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1014,7 +1014,8 @@ impl SessionClient {
10141014
| Command::CheckConsistency { .. }
10151015
| Command::Dump { .. }
10161016
| Command::GetComputeInstanceClient { .. }
1017-
| Command::GetOracle { .. } => {}
1017+
| Command::GetOracle { .. }
1018+
| Command::DetermineRealTimeRecentTimestamp { .. } => {}
10181019
};
10191020
cmd
10201021
});

src/adapter/src/command.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::collections::{BTreeMap, BTreeSet};
1111
use std::net::IpAddr;
1212
use std::pin::Pin;
1313
use std::sync::Arc;
14+
use std::time::Duration;
1415

1516
use derivative::Derivative;
1617
use enum_kinds::EnumKind;
@@ -25,7 +26,7 @@ use mz_persist_client::PersistClient;
2526
use mz_pgcopy::CopyFormatParams;
2627
use mz_repr::global_id::TransientIdGen;
2728
use mz_repr::role_id::RoleId;
28-
use mz_repr::{CatalogItemId, ColumnIndex, RowIterator};
29+
use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator};
2930
use mz_sql::ast::{FetchDirection, Raw, Statement};
3031
use mz_sql::catalog::ObjectType;
3132
use mz_sql::optimizer_metrics::OptimizerMetrics;
@@ -175,6 +176,12 @@ pub enum Command {
175176
Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
176177
>,
177178
},
179+
180+
DetermineRealTimeRecentTimestamp {
181+
source_ids: BTreeSet<GlobalId>,
182+
real_time_recency_timeout: Duration,
183+
tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
184+
},
178185
}
179186

180187
impl Command {
@@ -196,7 +203,8 @@ impl Command {
196203
| Command::CheckConsistency { .. }
197204
| Command::Dump { .. }
198205
| Command::GetComputeInstanceClient { .. }
199-
| Command::GetOracle { .. } => None,
206+
| Command::GetOracle { .. }
207+
| Command::DetermineRealTimeRecentTimestamp { .. } => None,
200208
}
201209
}
202210

@@ -218,7 +226,8 @@ impl Command {
218226
| Command::CheckConsistency { .. }
219227
| Command::Dump { .. }
220228
| Command::GetComputeInstanceClient { .. }
221-
| Command::GetOracle { .. } => None,
229+
| Command::GetOracle { .. }
230+
| Command::DetermineRealTimeRecentTimestamp { .. } => None,
222231
}
223232
}
224233
}

src/adapter/src/coord.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,9 @@ impl Message {
363363
Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
364364
Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
365365
Command::GetOracle { .. } => "get-oracle",
366+
Command::DetermineRealTimeRecentTimestamp { .. } => {
367+
"determine-real-time-recent-timestamp"
368+
}
366369
},
367370
Message::ControllerReady {
368371
controller: ControllerReadiness::Compute,

src/adapter/src/coord/command_handler.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,34 @@ impl Coordinator {
285285
));
286286
let _ = tx.send(oracle);
287287
}
288+
289+
Command::DetermineRealTimeRecentTimestamp {
290+
source_ids,
291+
real_time_recency_timeout,
292+
tx,
293+
} => {
294+
let result = self
295+
.determine_real_time_recent_timestamp(
296+
source_ids.iter().copied(),
297+
real_time_recency_timeout,
298+
)
299+
.await;
300+
301+
match result {
302+
Ok(Some(fut)) => {
303+
task::spawn(|| "determine real time recent timestamp", async move {
304+
let result = fut.await.map(Some).map_err(AdapterError::from);
305+
let _ = tx.send(result);
306+
});
307+
}
308+
Ok(None) => {
309+
let _ = tx.send(Ok(None));
310+
}
311+
Err(e) => {
312+
let _ = tx.send(Err(e));
313+
}
314+
}
315+
}
288316
}
289317
}
290318
.instrument(debug_span!("handle_command"))

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 55 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2526,67 +2526,73 @@ impl Coordinator {
25262526
}
25272527
}
25282528

2529+
/// Inner method that performs the actual real-time recency timestamp determination.
2530+
/// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2531+
/// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2532+
pub(crate) async fn determine_real_time_recent_timestamp(
2533+
&self,
2534+
source_ids: impl Iterator<Item = GlobalId>,
2535+
real_time_recency_timeout: Duration,
2536+
) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2537+
{
2538+
let item_ids = source_ids.map(|gid| self.catalog.resolve_item_id(&gid));
2539+
2540+
// Find all dependencies transitively because we need to ensure that
2541+
// RTR queries determine the timestamp from the sources' (i.e.
2542+
// storage objects that ingest data from external systems) remap
2543+
// data. We "cheat" a little bit and filter out any IDs that aren't
2544+
// user objects because we know they are not a RTR source.
2545+
let mut to_visit = VecDeque::from_iter(item_ids.filter(CatalogItemId::is_user));
2546+
// If none of the sources are user objects, we don't need to provide
2547+
// a RTR timestamp.
2548+
if to_visit.is_empty() {
2549+
return Ok(None);
2550+
}
2551+
2552+
let mut timestamp_objects = BTreeSet::new();
2553+
2554+
while let Some(id) = to_visit.pop_front() {
2555+
timestamp_objects.insert(id);
2556+
to_visit.extend(
2557+
self.catalog()
2558+
.get_entry(&id)
2559+
.uses()
2560+
.into_iter()
2561+
.filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2562+
);
2563+
}
2564+
let timestamp_objects = timestamp_objects
2565+
.into_iter()
2566+
.flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2567+
.collect();
2568+
2569+
let r = self
2570+
.controller
2571+
.determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2572+
.await?;
2573+
2574+
Ok(Some(r))
2575+
}
2576+
25292577
/// Checks to see if the session needs a real time recency timestamp and if so returns
25302578
/// a future that will return the timestamp.
2531-
pub(super) async fn determine_real_time_recent_timestamp(
2579+
pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
25322580
&self,
25332581
session: &Session,
2534-
source_ids: impl Iterator<Item = CatalogItemId>,
2582+
source_ids: impl Iterator<Item = GlobalId>,
25352583
) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
25362584
{
25372585
let vars = session.vars();
25382586

2539-
// Ideally this logic belongs inside of
2540-
// `mz-adapter::coord::timestamp_selection::determine_timestamp`. However, including the
2541-
// logic in there would make it extremely difficult and inconvenient to pull the waiting off
2542-
// of the main coord thread.
2543-
let r = if vars.real_time_recency()
2587+
if vars.real_time_recency()
25442588
&& vars.transaction_isolation() == &IsolationLevel::StrictSerializable
25452589
&& !session.contains_read_timestamp()
25462590
{
2547-
// Find all dependencies transitively because we need to ensure that
2548-
// RTR queries determine the timestamp from the sources' (i.e.
2549-
// storage objects that ingest data from external systems) remap
2550-
// data. We "cheat" a little bit and filter out any IDs that aren't
2551-
// user objects because we know they are not a RTR source.
2552-
let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2553-
// If none of the sources are user objects, we don't need to provide
2554-
// a RTR timestamp.
2555-
if to_visit.is_empty() {
2556-
return Ok(None);
2557-
}
2558-
2559-
let mut timestamp_objects = BTreeSet::new();
2560-
2561-
while let Some(id) = to_visit.pop_front() {
2562-
timestamp_objects.insert(id);
2563-
to_visit.extend(
2564-
self.catalog()
2565-
.get_entry(&id)
2566-
.uses()
2567-
.into_iter()
2568-
.filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2569-
);
2570-
}
2571-
let timestamp_objects = timestamp_objects
2572-
.into_iter()
2573-
.flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2574-
.collect();
2575-
2576-
let r = self
2577-
.controller
2578-
.determine_real_time_recent_timestamp(
2579-
timestamp_objects,
2580-
*vars.real_time_recency_timeout(),
2581-
)
2582-
.await?;
2583-
2584-
Some(r)
2591+
self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2592+
.await
25852593
} else {
2586-
None
2587-
};
2588-
2589-
Ok(r)
2594+
Ok(None)
2595+
}
25902596
}
25912597

25922598
#[instrument]

src/adapter/src/coord/sequencer/inner/explain_timestamp.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,8 @@ impl Coordinator {
173173
}: ExplainTimestampRealTimeRecency,
174174
) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
175175
let source_ids = optimized_plan.depends_on();
176-
let source_items: Vec<_> = source_ids
177-
.iter()
178-
.map(|gid| self.catalog().resolve_item_id(gid))
179-
.collect();
180176
let fut = self
181-
.determine_real_time_recent_timestamp(session, source_items.into_iter())
177+
.determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
182178
.await?;
183179

184180
match fut {

src/adapter/src/coord/sequencer/inner/peek.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -756,12 +756,8 @@ impl Coordinator {
756756
explain_ctx,
757757
}: PeekStageRealTimeRecency,
758758
) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
759-
let item_ids: Vec<_> = source_ids
760-
.iter()
761-
.map(|gid| self.catalog.resolve_item_id(gid))
762-
.collect();
763759
let fut = self
764-
.determine_real_time_recent_timestamp(session, item_ids.into_iter())
760+
.determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
765761
.await?;
766762

767763
match fut {

src/adapter/src/frontend_peek.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use tracing::{Span, debug};
2828
use tracing_opentelemetry::OpenTelemetrySpanExt;
2929

3030
use crate::catalog::CatalogState;
31+
use crate::command::Command;
3132
use crate::coord::peek::PeekPlan;
3233
use crate::coord::timestamp_selection::TimestampDetermination;
3334
use crate::coord::{Coordinator, ExplainContext, TargetCluster};
@@ -286,17 +287,21 @@ impl PeekClient {
286287

287288
// # From peek_real_time_recency
288289

289-
// TODO(peek-seq): Real-time recency is slow anyhow, so we don't handle it in frontend peek
290-
// sequencing for now.
291290
let vars = session.vars();
292-
if vars.real_time_recency()
291+
let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
293292
&& vars.transaction_isolation() == &IsolationLevel::StrictSerializable
294293
&& !session.contains_read_timestamp()
295294
{
296-
debug!("Bailing out from try_frontend_peek_inner, because of real time recency");
297-
return Ok(None);
298-
}
299-
let real_time_recency_ts: Option<mz_repr::Timestamp> = None;
295+
// Only call the coordinator when we actually need real-time recency
296+
self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
297+
source_ids: source_ids.clone(),
298+
real_time_recency_timeout: *vars.real_time_recency_timeout(),
299+
tx,
300+
})
301+
.await?
302+
} else {
303+
None
304+
};
300305

301306
// # From peek_timestamp_read_hold
302307

src/adapter/src/peek_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ impl PeekClient {
132132
catalog
133133
}
134134

135-
async fn call_coordinator<T, F>(&self, f: F) -> T
135+
pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
136136
where
137137
F: FnOnce(oneshot::Sender<T>) -> Command,
138138
{

0 commit comments

Comments
 (0)