diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 2a2362cf24431..827f0fef2aa7d 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -196,7 +196,7 @@ def get_variable_system_parameters( ), VariableSystemParameter( "enable_frontend_peek_sequencing", - "false", + "true", ["true", "false"], ), VariableSystemParameter( diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 5e0777f3fa2a7..13420b42b37ad 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -1014,7 +1014,8 @@ impl SessionClient { | Command::CheckConsistency { .. } | Command::Dump { .. } | Command::GetComputeInstanceClient { .. } - | Command::GetOracle { .. } => {} + | Command::GetOracle { .. } + | Command::DetermineRealTimeRecentTimestamp { .. } => {} }; cmd }); diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 1bd86a123dcba..9576dd1415571 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -11,6 +11,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::net::IpAddr; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use derivative::Derivative; use enum_kinds::EnumKind; @@ -25,7 +26,7 @@ use mz_persist_client::PersistClient; use mz_pgcopy::CopyFormatParams; use mz_repr::global_id::TransientIdGen; use mz_repr::role_id::RoleId; -use mz_repr::{CatalogItemId, ColumnIndex, RowIterator}; +use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator}; use mz_sql::ast::{FetchDirection, Raw, Statement}; use mz_sql::catalog::ObjectType; use mz_sql::optimizer_metrics::OptimizerMetrics; @@ -175,6 +176,12 @@ pub enum Command { Result + Send + Sync>, AdapterError>, >, }, + + DetermineRealTimeRecentTimestamp { + source_ids: BTreeSet, + real_time_recency_timeout: Duration, + tx: oneshot::Sender, AdapterError>>, + }, } impl Command { @@ -196,7 +203,8 @@ impl Command { | Command::CheckConsistency { .. } | Command::Dump { .. } | Command::GetComputeInstanceClient { .. } - | Command::GetOracle { .. } => None, + | Command::GetOracle { .. } + | Command::DetermineRealTimeRecentTimestamp { .. } => None, } } @@ -218,7 +226,8 @@ impl Command { | Command::CheckConsistency { .. } | Command::Dump { .. } | Command::GetComputeInstanceClient { .. } - | Command::GetOracle { .. } => None, + | Command::GetOracle { .. } + | Command::DetermineRealTimeRecentTimestamp { .. } => None, } } } diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 7cac2c64de6bc..602db4ffe9117 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -363,6 +363,9 @@ impl Message { Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof", Command::GetComputeInstanceClient { .. } => "get-compute-instance-client", Command::GetOracle { .. } => "get-oracle", + Command::DetermineRealTimeRecentTimestamp { .. } => { + "determine-real-time-recent-timestamp" + } }, Message::ControllerReady { controller: ControllerReadiness::Compute, diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 03b6a50b7d9e4..69a1fd8c6889c 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -285,6 +285,34 @@ impl Coordinator { )); let _ = tx.send(oracle); } + + Command::DetermineRealTimeRecentTimestamp { + source_ids, + real_time_recency_timeout, + tx, + } => { + let result = self + .determine_real_time_recent_timestamp( + source_ids.iter().copied(), + real_time_recency_timeout, + ) + .await; + + match result { + Ok(Some(fut)) => { + task::spawn(|| "determine real time recent timestamp", async move { + let result = fut.await.map(Some).map_err(AdapterError::from); + let _ = tx.send(result); + }); + } + Ok(None) => { + let _ = tx.send(Ok(None)); + } + Err(e) => { + let _ = tx.send(Err(e)); + } + } + } } } .instrument(debug_span!("handle_command")) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index ee8bf567291cb..2a910c525c776 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -2524,67 +2524,73 @@ impl Coordinator { } } + /// Inner method that performs the actual real-time recency timestamp determination. + /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`) + /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`. + pub(crate) async fn determine_real_time_recent_timestamp( + &self, + source_ids: impl Iterator, + real_time_recency_timeout: Duration, + ) -> Result>>>, AdapterError> + { + let item_ids = source_ids.map(|gid| self.catalog.resolve_item_id(&gid)); + + // Find all dependencies transitively because we need to ensure that + // RTR queries determine the timestamp from the sources' (i.e. + // storage objects that ingest data from external systems) remap + // data. We "cheat" a little bit and filter out any IDs that aren't + // user objects because we know they are not a RTR source. + let mut to_visit = VecDeque::from_iter(item_ids.filter(CatalogItemId::is_user)); + // If none of the sources are user objects, we don't need to provide + // a RTR timestamp. + if to_visit.is_empty() { + return Ok(None); + } + + let mut timestamp_objects = BTreeSet::new(); + + while let Some(id) = to_visit.pop_front() { + timestamp_objects.insert(id); + to_visit.extend( + self.catalog() + .get_entry(&id) + .uses() + .into_iter() + .filter(|id| !timestamp_objects.contains(id) && id.is_user()), + ); + } + let timestamp_objects = timestamp_objects + .into_iter() + .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids()) + .collect(); + + let r = self + .controller + .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout) + .await?; + + Ok(Some(r)) + } + /// Checks to see if the session needs a real time recency timestamp and if so returns /// a future that will return the timestamp. - pub(super) async fn determine_real_time_recent_timestamp( + pub(crate) async fn determine_real_time_recent_timestamp_if_needed( &self, session: &Session, - source_ids: impl Iterator, + source_ids: impl Iterator, ) -> Result>>>, AdapterError> { let vars = session.vars(); - // Ideally this logic belongs inside of - // `mz-adapter::coord::timestamp_selection::determine_timestamp`. However, including the - // logic in there would make it extremely difficult and inconvenient to pull the waiting off - // of the main coord thread. - let r = if vars.real_time_recency() + if vars.real_time_recency() && vars.transaction_isolation() == &IsolationLevel::StrictSerializable && !session.contains_read_timestamp() { - // Find all dependencies transitively because we need to ensure that - // RTR queries determine the timestamp from the sources' (i.e. - // storage objects that ingest data from external systems) remap - // data. We "cheat" a little bit and filter out any IDs that aren't - // user objects because we know they are not a RTR source. - let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user)); - // If none of the sources are user objects, we don't need to provide - // a RTR timestamp. - if to_visit.is_empty() { - return Ok(None); - } - - let mut timestamp_objects = BTreeSet::new(); - - while let Some(id) = to_visit.pop_front() { - timestamp_objects.insert(id); - to_visit.extend( - self.catalog() - .get_entry(&id) - .uses() - .into_iter() - .filter(|id| !timestamp_objects.contains(id) && id.is_user()), - ); - } - let timestamp_objects = timestamp_objects - .into_iter() - .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids()) - .collect(); - - let r = self - .controller - .determine_real_time_recent_timestamp( - timestamp_objects, - *vars.real_time_recency_timeout(), - ) - .await?; - - Some(r) + self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout()) + .await } else { - None - }; - - Ok(r) + Ok(None) + } } #[instrument] diff --git a/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs b/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs index 40357650f5134..ab1fa9335707c 100644 --- a/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs +++ b/src/adapter/src/coord/sequencer/inner/explain_timestamp.rs @@ -173,12 +173,8 @@ impl Coordinator { }: ExplainTimestampRealTimeRecency, ) -> Result>, AdapterError> { let source_ids = optimized_plan.depends_on(); - let source_items: Vec<_> = source_ids - .iter() - .map(|gid| self.catalog().resolve_item_id(gid)) - .collect(); let fut = self - .determine_real_time_recent_timestamp(session, source_items.into_iter()) + .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied()) .await?; match fut { diff --git a/src/adapter/src/coord/sequencer/inner/peek.rs b/src/adapter/src/coord/sequencer/inner/peek.rs index 4616cd440e1a1..e285dd4fa3a88 100644 --- a/src/adapter/src/coord/sequencer/inner/peek.rs +++ b/src/adapter/src/coord/sequencer/inner/peek.rs @@ -756,12 +756,8 @@ impl Coordinator { explain_ctx, }: PeekStageRealTimeRecency, ) -> Result>, AdapterError> { - let item_ids: Vec<_> = source_ids - .iter() - .map(|gid| self.catalog.resolve_item_id(gid)) - .collect(); let fut = self - .determine_real_time_recent_timestamp(session, item_ids.into_iter()) + .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied()) .await?; match fut { diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 3861763494827..6745edb77e574 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -28,6 +28,7 @@ use tracing::{Span, debug}; use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::catalog::CatalogState; +use crate::command::Command; use crate::coord::peek::PeekPlan; use crate::coord::timestamp_selection::TimestampDetermination; use crate::coord::{Coordinator, ExplainContext, TargetCluster}; @@ -286,17 +287,21 @@ impl PeekClient { // # From peek_real_time_recency - // TODO(peek-seq): Real-time recency is slow anyhow, so we don't handle it in frontend peek - // sequencing for now. let vars = session.vars(); - if vars.real_time_recency() + let real_time_recency_ts: Option = if vars.real_time_recency() && vars.transaction_isolation() == &IsolationLevel::StrictSerializable && !session.contains_read_timestamp() { - debug!("Bailing out from try_frontend_peek_inner, because of real time recency"); - return Ok(None); - } - let real_time_recency_ts: Option = None; + // Only call the coordinator when we actually need real-time recency + self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp { + source_ids: source_ids.clone(), + real_time_recency_timeout: *vars.real_time_recency_timeout(), + tx, + }) + .await? + } else { + None + }; // # From peek_timestamp_read_hold diff --git a/src/adapter/src/peek_client.rs b/src/adapter/src/peek_client.rs index bf3efd9e4a759..8ebb939d62a7c 100644 --- a/src/adapter/src/peek_client.rs +++ b/src/adapter/src/peek_client.rs @@ -132,7 +132,7 @@ impl PeekClient { catalog } - async fn call_coordinator(&self, f: F) -> T + pub(crate) async fn call_coordinator(&self, f: F) -> T where F: FnOnce(oneshot::Sender) -> Command, { diff --git a/test/sqllogictest/transactions.slt b/test/sqllogictest/transactions.slt index d62073994136c..1ed816c5e7b38 100644 --- a/test/sqllogictest/transactions.slt +++ b/test/sqllogictest/transactions.slt @@ -689,6 +689,10 @@ ALTER SYSTEM SET allow_real_time_recency = true ---- COMPLETE 0 +# TODO(ggevay): I think this is not actually testing real-time recency, because tables don't participate in the +# RTR machinery at all. (We do get the latest contents of a table in STRICT SERIALIZABLE mode even without RTR +# being turned on.) We do have other tests in Testdrive, though. + statement ok SET REAL_TIME_RECENCY TO TRUE