Skip to content

Commit 795e76b

Browse files
authored
Merge pull request #33713 from ggevay/peek-frontend
New peek sequencing in Adapter Frontend
2 parents c72746f + 7340287 commit 795e76b

File tree

50 files changed

+1779
-302
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1779
-302
lines changed

misc/python/materialize/checks/all_checks/statement_logging.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@ class StatementLogging(Check):
1717
def initialize(self) -> Testdrive:
1818
return Testdrive(
1919
dedent(
20+
# TODO(peek-seq): enable_frontend_peek_sequencing when it supports statement logging.
2021
"""
2122
$ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
2223
ALTER SYSTEM SET statement_logging_max_sample_rate TO 1.0
24+
25+
$[version>=2600100] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
26+
ALTER SYSTEM SET enable_frontend_peek_sequencing = false;
2327
"""
2428
)
2529
)

misc/python/materialize/mzcompose/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ def get_variable_system_parameters(
138138
version: MzVersion,
139139
force_source_table_syntax: bool,
140140
) -> list[VariableSystemParameter]:
141+
"""Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci.
142+
These defaults are applied _after_ applying the settings from `get_minimal_system_parameters`.
143+
"""
144+
141145
return [
142146
# -----
143147
# To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
@@ -190,6 +194,11 @@ def get_variable_system_parameters(
190194
"true",
191195
["true", "false"],
192196
),
197+
VariableSystemParameter(
198+
"enable_frontend_peek_sequencing",
199+
"false",
200+
["true", "false"],
201+
),
193202
VariableSystemParameter(
194203
"kafka_default_metadata_fetch_interval",
195204
"1s",

misc/python/materialize/parallel_workload/action.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ def errors_to_ignore(self, exe: Executor) -> list[str]:
172172
"real-time source dropped before ingesting the upstream system's visible frontier", # Expected, see https://buildkite.com/materialize/nightly/builds/9399#0191be17-1f4c-4321-9b51-edc4b08b71c5
173173
"object state changed while transaction was in progress", # Old error msg, can remove this ignore later
174174
"another session modified the catalog while this DDL transaction was open",
175+
"was dropped while executing a statement",
175176
]
176177
)
177178
if exe.db.scenario == Scenario.Cancel:
@@ -1319,6 +1320,10 @@ def __init__(
13191320
"314572800", # 300 MiB, the production value
13201321
]
13211322
self.flags_with_values["cluster"] = ["quickstart", "dont_exist"]
1323+
self.flags_with_values["enable_frontend_peek_sequencing"] = [
1324+
"true",
1325+
"false",
1326+
]
13221327

13231328
# If you are adding a new config flag in Materialize, consider using it
13241329
# here instead of just marking it as uninteresting to silence the

src/adapter/src/client.rs

Lines changed: 82 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ use mz_sql::catalog::{EnvironmentId, SessionCatalog};
3939
use mz_sql::session::hint::ApplicationNameHint;
4040
use mz_sql::session::metadata::SessionMetadata;
4141
use mz_sql::session::user::SUPPORT_USER;
42-
use mz_sql::session::vars::{CLUSTER, OwnedVarInput, SystemVars, Var};
42+
use mz_sql::session::vars::{
43+
CLUSTER, ENABLE_FRONTEND_PEEK_SEQUENCING, OwnedVarInput, SystemVars, Var,
44+
};
4345
use mz_sql_parser::parser::{ParserStatementError, StatementParseResult};
4446
use prometheus::Histogram;
4547
use serde_json::json;
4648
use tokio::sync::{mpsc, oneshot};
47-
use tracing::error;
49+
use tracing::{debug, error};
4850
use uuid::Uuid;
4951

5052
use crate::catalog::Catalog;
@@ -63,7 +65,7 @@ use crate::session::{
6365
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
6466
use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType};
6567
use crate::webhook::AppendWebhookResponse;
66-
use crate::{AdapterNotice, AppendWebhookError, PeekResponseUnary, StartupResponse};
68+
use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse};
6769

6870
/// A handle to a running coordinator.
6971
///
@@ -252,21 +254,36 @@ impl Client {
252254

253255
// Create the client as soon as startup succeeds (before any await points) so its `Drop` can
254256
// handle termination.
257+
// Build the PeekClient with controller handles returned from startup.
258+
let StartupResponse {
259+
role_id,
260+
write_notify,
261+
session_defaults,
262+
catalog,
263+
storage_collections,
264+
transient_id_gen,
265+
optimizer_metrics,
266+
persist_client,
267+
} = response;
268+
269+
let peek_client = PeekClient::new(
270+
self.clone(),
271+
storage_collections,
272+
transient_id_gen,
273+
optimizer_metrics,
274+
persist_client,
275+
);
276+
255277
let mut client = SessionClient {
256278
inner: Some(self.clone()),
257279
session: Some(session),
258280
timeouts: Timeout::new(),
259281
environment_id: self.environment_id.clone(),
260282
segment_client: self.segment_client.clone(),
283+
peek_client,
284+
enable_frontend_peek_sequencing: false, // initialized below, once we have a ConnCatalog
261285
};
262286

263-
let StartupResponse {
264-
role_id,
265-
write_notify,
266-
session_defaults,
267-
catalog,
268-
} = response;
269-
270287
let session = client.session();
271288
session.initialize_role_metadata(role_id);
272289
let vars_mut = session.vars_mut();
@@ -396,6 +413,10 @@ Issue a SQL query to get started. Need help?
396413
}
397414
}
398415

416+
client.enable_frontend_peek_sequencing = ENABLE_FRONTEND_PEEK_SEQUENCING
417+
.require(catalog.system_vars())
418+
.is_ok();
419+
399420
Ok(client)
400421
}
401422

@@ -412,7 +433,7 @@ Issue a SQL query to get started. Need help?
412433
pub async fn support_execute_one(
413434
&self,
414435
sql: &str,
415-
) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send + Sync>>, anyhow::Error> {
436+
) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
416437
// Connect to the coordinator.
417438
let conn_id = self.new_conn_id()?;
418439
let session = self.new_session(SessionConfig {
@@ -503,7 +524,7 @@ Issue a SQL query to get started. Need help?
503524
}
504525

505526
#[instrument(level = "debug")]
506-
fn send(&self, cmd: Command) {
527+
pub(crate) fn send(&self, cmd: Command) {
507528
self.inner_cmd_tx
508529
.send((OpenTelemetryContext::obtain(), cmd))
509530
.expect("coordinator unexpectedly gone");
@@ -524,6 +545,13 @@ pub struct SessionClient {
524545
timeouts: Timeout,
525546
segment_client: Option<mz_segment::Client>,
526547
environment_id: EnvironmentId,
548+
/// Client for frontend peek sequencing; populated at connection startup.
549+
peek_client: PeekClient,
550+
/// Whether frontend peek sequencing is enabled; initialized at connection startup.
551+
// TODO(peek-seq): Currently, this is initialized only at session startup. We'll be able to
552+
// check the actual feature flag value at every peek (without a Coordinator call) once we'll
553+
// always have a catalog snapshot at hand.
554+
pub enable_frontend_peek_sequencing: bool,
527555
}
528556

529557
impl SessionClient {
@@ -672,6 +700,17 @@ impl SessionClient {
672700
outer_ctx_extra: Option<ExecuteContextExtra>,
673701
) -> Result<(ExecuteResponse, Instant), AdapterError> {
674702
let execute_started = Instant::now();
703+
704+
// Attempt peek sequencing in the session task.
705+
// If unsupported, fall back to the Coordinator path.
706+
// TODO(peek-seq): wire up cancel_future
707+
if let Some(resp) = self.try_frontend_peek(&portal_name).await? {
708+
debug!("frontend peek succeeded");
709+
return Ok((resp, execute_started));
710+
} else {
711+
debug!("frontend peek did not happen");
712+
}
713+
675714
let response = self
676715
.send_with_cancel(
677716
|tx, session| Command::Execute {
@@ -973,7 +1012,9 @@ impl SessionClient {
9731012
| Command::Terminate { .. }
9741013
| Command::RetireExecute { .. }
9751014
| Command::CheckConsistency { .. }
976-
| Command::Dump { .. } => {}
1015+
| Command::Dump { .. }
1016+
| Command::GetComputeInstanceClient { .. }
1017+
| Command::GetOracle { .. } => {}
9771018
};
9781019
cmd
9791020
});
@@ -1045,6 +1086,34 @@ impl SessionClient {
10451086
pub async fn recv_timeout(&mut self) -> Option<TimeoutType> {
10461087
self.timeouts.recv().await
10471088
}
1089+
1090+
/// Returns a reference to the PeekClient used for frontend peek sequencing.
1091+
pub fn peek_client(&self) -> &PeekClient {
1092+
&self.peek_client
1093+
}
1094+
1095+
/// Returns a reference to the PeekClient used for frontend peek sequencing.
1096+
pub fn peek_client_mut(&mut self) -> &mut PeekClient {
1097+
&mut self.peek_client
1098+
}
1099+
1100+
/// Attempt to sequence a peek from the session task.
1101+
///
1102+
/// Returns Some(response) if we handled the peek, or None to fall back to the Coordinator's
1103+
/// peek sequencing.
1104+
pub(crate) async fn try_frontend_peek(
1105+
&mut self,
1106+
portal_name: &str,
1107+
) -> Result<Option<ExecuteResponse>, AdapterError> {
1108+
if self.enable_frontend_peek_sequencing {
1109+
let session = self.session.as_mut().expect("SessionClient invariant");
1110+
self.peek_client
1111+
.try_frontend_peek_inner(portal_name, session)
1112+
.await
1113+
} else {
1114+
Ok(None)
1115+
}
1116+
}
10481117
}
10491118

10501119
impl Drop for SessionClient {

src/adapter/src/command.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,20 @@ use mz_compute_types::ComputeInstanceId;
2121
use mz_ore::collections::CollectionExt;
2222
use mz_ore::soft_assert_no_log;
2323
use mz_ore::tracing::OpenTelemetryContext;
24+
use mz_persist_client::PersistClient;
2425
use mz_pgcopy::CopyFormatParams;
26+
use mz_repr::global_id::TransientIdGen;
2527
use mz_repr::role_id::RoleId;
2628
use mz_repr::{CatalogItemId, ColumnIndex, RowIterator};
2729
use mz_sql::ast::{FetchDirection, Raw, Statement};
2830
use mz_sql::catalog::ObjectType;
31+
use mz_sql::optimizer_metrics::OptimizerMetrics;
2932
use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
3033
use mz_sql::session::user::User;
3134
use mz_sql::session::vars::{OwnedVarInput, SystemVars};
3235
use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
36+
use mz_storage_types::sources::Timeline;
37+
use mz_timestamp_oracle::TimestampOracle;
3338
use tokio::sync::{mpsc, oneshot};
3439
use uuid::Uuid;
3540

@@ -153,6 +158,23 @@ pub enum Command {
153158
Dump {
154159
tx: oneshot::Sender<Result<serde_json::Value, anyhow::Error>>,
155160
},
161+
162+
GetComputeInstanceClient {
163+
instance_id: ComputeInstanceId,
164+
tx: oneshot::Sender<
165+
Result<
166+
mz_compute_client::controller::instance::Client<mz_repr::Timestamp>,
167+
mz_compute_client::controller::error::InstanceMissing,
168+
>,
169+
>,
170+
},
171+
172+
GetOracle {
173+
timeline: Timeline,
174+
tx: oneshot::Sender<
175+
Result<Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync>, AdapterError>,
176+
>,
177+
},
156178
}
157179

158180
impl Command {
@@ -172,7 +194,9 @@ impl Command {
172194
| Command::SetSystemVars { .. }
173195
| Command::RetireExecute { .. }
174196
| Command::CheckConsistency { .. }
175-
| Command::Dump { .. } => None,
197+
| Command::Dump { .. }
198+
| Command::GetComputeInstanceClient { .. }
199+
| Command::GetOracle { .. } => None,
176200
}
177201
}
178202

@@ -192,7 +216,9 @@ impl Command {
192216
| Command::SetSystemVars { .. }
193217
| Command::RetireExecute { .. }
194218
| Command::CheckConsistency { .. }
195-
| Command::Dump { .. } => None,
219+
| Command::Dump { .. }
220+
| Command::GetComputeInstanceClient { .. }
221+
| Command::GetOracle { .. } => None,
196222
}
197223
}
198224
}
@@ -216,6 +242,15 @@ pub struct StartupResponse {
216242
/// Map of (name, VarInput::Flat) tuples of session default variables that should be set.
217243
pub session_defaults: BTreeMap<String, OwnedVarInput>,
218244
pub catalog: Arc<Catalog>,
245+
pub storage_collections: Arc<
246+
dyn mz_storage_client::storage_collections::StorageCollections<
247+
Timestamp = mz_repr::Timestamp,
248+
> + Send
249+
+ Sync,
250+
>,
251+
pub transient_id_gen: Arc<TransientIdGen>,
252+
pub optimizer_metrics: OptimizerMetrics,
253+
pub persist_client: PersistClient,
219254
}
220255

221256
/// The response to [`Client::authenticate`](crate::Client::authenticate).

src/adapter/src/coord.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,26 +208,26 @@ use crate::util::{ClientTransmitter, ResultExt};
208208
use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
209209
use crate::{AdapterNotice, ReadHolds, flags};
210210

211+
pub(crate) mod appends;
212+
pub(crate) mod catalog_serving;
213+
pub(crate) mod cluster_scheduling;
214+
pub(crate) mod consistency;
211215
pub(crate) mod id_bundle;
212216
pub(crate) mod in_memory_oracle;
213217
pub(crate) mod peek;
218+
pub(crate) mod read_policy;
219+
pub(crate) mod sequencer;
214220
pub(crate) mod statement_logging;
215221
pub(crate) mod timeline;
216222
pub(crate) mod timestamp_selection;
217223

218-
pub mod appends;
219-
mod catalog_serving;
220224
mod caught_up;
221-
pub mod cluster_scheduling;
222225
mod command_handler;
223-
pub mod consistency;
224226
mod ddl;
225227
mod indexes;
226228
mod introspection;
227229
mod message_handler;
228230
mod privatelink_status;
229-
pub mod read_policy;
230-
mod sequencer;
231231
mod sql;
232232
mod validity;
233233

@@ -361,6 +361,8 @@ impl Message {
361361
Command::AuthenticatePassword { .. } => "command-auth_check",
362362
Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
363363
Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
364+
Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
365+
Command::GetOracle { .. } => "get-oracle",
364366
},
365367
Message::ControllerReady {
366368
controller: ControllerReadiness::Compute,

src/adapter/src/coord/appends.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,8 @@ pub struct GroupCommitPermit {
926926
/// So as an optimization we do not wait for these writes to complete. But if a [`Session`] tries
927927
/// to query any of these builtin objects, we need to block that query on the writes completing to
928928
/// maintain linearizability.
929+
///
930+
/// Warning: this already clears the wait flag (i.e., it calls `clear_builtin_table_updates`).
929931
pub(crate) fn waiting_on_startup_appends(
930932
catalog: &Catalog,
931933
session: &mut Session,

0 commit comments

Comments
 (0)