Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def get_variable_system_parameters(
),
VariableSystemParameter(
"enable_frontend_peek_sequencing",
"false",
"true",
["true", "false"],
),
VariableSystemParameter(
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,8 @@ impl SessionClient {
| Command::CheckConsistency { .. }
| Command::Dump { .. }
| Command::GetComputeInstanceClient { .. }
| Command::GetOracle { .. } => {}
| Command::GetOracle { .. }
| Command::DetermineRealTimeRecentTimestamp { .. } => {}
};
cmd
});
Expand Down
15 changes: 12 additions & 3 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::net::IpAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use derivative::Derivative;
use enum_kinds::EnumKind;
Expand All @@ -25,7 +26,7 @@ use mz_persist_client::PersistClient;
use mz_pgcopy::CopyFormatParams;
use mz_repr::global_id::TransientIdGen;
use mz_repr::role_id::RoleId;
use mz_repr::{CatalogItemId, ColumnIndex, RowIterator};
use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator};
use mz_sql::ast::{FetchDirection, Raw, Statement};
use mz_sql::catalog::ObjectType;
use mz_sql::optimizer_metrics::OptimizerMetrics;
Expand Down Expand Up @@ -175,6 +176,12 @@ pub enum Command {
Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
>,
},

DetermineRealTimeRecentTimestamp {
source_ids: BTreeSet<GlobalId>,
real_time_recency_timeout: Duration,
tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
},
}

impl Command {
Expand All @@ -196,7 +203,8 @@ impl Command {
| Command::CheckConsistency { .. }
| Command::Dump { .. }
| Command::GetComputeInstanceClient { .. }
| Command::GetOracle { .. } => None,
| Command::GetOracle { .. }
| Command::DetermineRealTimeRecentTimestamp { .. } => None,
}
}

Expand All @@ -218,7 +226,8 @@ impl Command {
| Command::CheckConsistency { .. }
| Command::Dump { .. }
| Command::GetComputeInstanceClient { .. }
| Command::GetOracle { .. } => None,
| Command::GetOracle { .. }
| Command::DetermineRealTimeRecentTimestamp { .. } => None,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ impl Message {
Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
Command::GetOracle { .. } => "get-oracle",
Command::DetermineRealTimeRecentTimestamp { .. } => {
"determine-real-time-recent-timestamp"
}
},
Message::ControllerReady {
controller: ControllerReadiness::Compute,
Expand Down
28 changes: 28 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,34 @@ impl Coordinator {
));
let _ = tx.send(oracle);
}

Command::DetermineRealTimeRecentTimestamp {
source_ids,
real_time_recency_timeout,
tx,
} => {
let result = self
.determine_real_time_recent_timestamp(
source_ids.iter().copied(),
real_time_recency_timeout,
)
.await;

match result {
Ok(Some(fut)) => {
task::spawn(|| "determine real time recent timestamp", async move {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This corresponds to the spawning in the old peek sequencing's peek_real_time_recency.

let result = fut.await.map(Some).map_err(AdapterError::from);
let _ = tx.send(result);
});
}
Ok(None) => {
let _ = tx.send(Ok(None));
}
Err(e) => {
let _ = tx.send(Err(e));
}
}
}
}
}
.instrument(debug_span!("handle_command"))
Expand Down
104 changes: 55 additions & 49 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2524,67 +2524,73 @@ impl Coordinator {
}
}

/// Inner method that performs the actual real-time recency timestamp determination.
/// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
/// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
pub(crate) async fn determine_real_time_recent_timestamp(
&self,
source_ids: impl Iterator<Item = GlobalId>,
real_time_recency_timeout: Duration,
) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This big green block is mostly just code movement from the old determine_real_time_recent_timestamp's inner part.

{
let item_ids = source_ids.map(|gid| self.catalog.resolve_item_id(&gid));

// Find all dependencies transitively because we need to ensure that
// RTR queries determine the timestamp from the sources' (i.e.
// storage objects that ingest data from external systems) remap
// data. We "cheat" a little bit and filter out any IDs that aren't
// user objects because we know they are not a RTR source.
let mut to_visit = VecDeque::from_iter(item_ids.filter(CatalogItemId::is_user));
// If none of the sources are user objects, we don't need to provide
// a RTR timestamp.
if to_visit.is_empty() {
return Ok(None);
}

let mut timestamp_objects = BTreeSet::new();

while let Some(id) = to_visit.pop_front() {
timestamp_objects.insert(id);
to_visit.extend(
self.catalog()
.get_entry(&id)
.uses()
.into_iter()
.filter(|id| !timestamp_objects.contains(id) && id.is_user()),
);
}
let timestamp_objects = timestamp_objects
.into_iter()
.flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
.collect();

let r = self
.controller
.determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
.await?;

Ok(Some(r))
}

/// Checks to see if the session needs a real time recency timestamp and if so returns
/// a future that will return the timestamp.
pub(super) async fn determine_real_time_recent_timestamp(
pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
&self,
session: &Session,
source_ids: impl Iterator<Item = CatalogItemId>,
source_ids: impl Iterator<Item = GlobalId>,
) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
{
let vars = session.vars();

// Ideally this logic belongs inside of
// `mz-adapter::coord::timestamp_selection::determine_timestamp`. However, including the
// logic in there would make it extremely difficult and inconvenient to pull the waiting off
// of the main coord thread.
let r = if vars.real_time_recency()
if vars.real_time_recency()
&& vars.transaction_isolation() == &IsolationLevel::StrictSerializable
&& !session.contains_read_timestamp()
{
// Find all dependencies transitively because we need to ensure that
// RTR queries determine the timestamp from the sources' (i.e.
// storage objects that ingest data from external systems) remap
// data. We "cheat" a little bit and filter out any IDs that aren't
// user objects because we know they are not a RTR source.
let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
// If none of the sources are user objects, we don't need to provide
// a RTR timestamp.
if to_visit.is_empty() {
return Ok(None);
}

let mut timestamp_objects = BTreeSet::new();

while let Some(id) = to_visit.pop_front() {
timestamp_objects.insert(id);
to_visit.extend(
self.catalog()
.get_entry(&id)
.uses()
.into_iter()
.filter(|id| !timestamp_objects.contains(id) && id.is_user()),
);
}
let timestamp_objects = timestamp_objects
.into_iter()
.flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
.collect();

let r = self
.controller
.determine_real_time_recent_timestamp(
timestamp_objects,
*vars.real_time_recency_timeout(),
)
.await?;

Some(r)
self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
.await
} else {
None
};

Ok(r)
Ok(None)
}
}

#[instrument]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,8 @@ impl Coordinator {
}: ExplainTimestampRealTimeRecency,
) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
let source_ids = optimized_plan.depends_on();
let source_items: Vec<_> = source_ids
.iter()
.map(|gid| self.catalog().resolve_item_id(gid))
.collect();
let fut = self
.determine_real_time_recent_timestamp(session, source_items.into_iter())
.determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
.await?;

match fut {
Expand Down
6 changes: 1 addition & 5 deletions src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,12 +756,8 @@ impl Coordinator {
explain_ctx,
}: PeekStageRealTimeRecency,
) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
let item_ids: Vec<_> = source_ids
.iter()
.map(|gid| self.catalog.resolve_item_id(gid))
.collect();
let fut = self
.determine_real_time_recent_timestamp(session, item_ids.into_iter())
.determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
.await?;

match fut {
Expand Down
19 changes: 12 additions & 7 deletions src/adapter/src/frontend_peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tracing::{Span, debug};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::catalog::CatalogState;
use crate::command::Command;
use crate::coord::peek::PeekPlan;
use crate::coord::timestamp_selection::TimestampDetermination;
use crate::coord::{Coordinator, ExplainContext, TargetCluster};
Expand Down Expand Up @@ -286,17 +287,21 @@ impl PeekClient {

// # From peek_real_time_recency

// TODO(peek-seq): Real-time recency is slow anyhow, so we don't handle it in frontend peek
// sequencing for now.
let vars = session.vars();
if vars.real_time_recency()
let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
&& vars.transaction_isolation() == &IsolationLevel::StrictSerializable
&& !session.contains_read_timestamp()
{
debug!("Bailing out from try_frontend_peek_inner, because of real time recency");
return Ok(None);
}
let real_time_recency_ts: Option<mz_repr::Timestamp> = None;
// Only call the coordinator when we actually need real-time recency
self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
source_ids: source_ids.clone(),
real_time_recency_timeout: *vars.real_time_recency_timeout(),
tx,
})
.await?
} else {
None
};

// # From peek_timestamp_read_hold

Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/peek_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl PeekClient {
catalog
}

async fn call_coordinator<T, F>(&self, f: F) -> T
pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
where
F: FnOnce(oneshot::Sender<T>) -> Command,
{
Expand Down
4 changes: 4 additions & 0 deletions test/sqllogictest/transactions.slt
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ ALTER SYSTEM SET allow_real_time_recency = true
----
COMPLETE 0

# TODO(ggevay): I think this is not actually testing real-time recency, because tables don't participate in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙈

# RTR machinery at all. (We do get the latest contents of a table in STRICT SERIALIZABLE mode even without RTR
# being turned on.) We do have other tests in Testdrive, though.

statement ok
SET REAL_TIME_RECENCY TO TRUE

Expand Down