Skip to content

Commit

Permalink
Merge pull request #24906 from teskje/operator-hydration-logging
Browse files Browse the repository at this point in the history
Compute operator hydration status logging
  • Loading branch information
teskje authored Feb 19, 2024
2 parents b79e777 + 83dbd1c commit 067ae87
Show file tree
Hide file tree
Showing 25 changed files with 738 additions and 94 deletions.
16 changes: 16 additions & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,22 @@ A compute object is hydrated on a given replica when it has fully processed the
| `replica_id` | [`text`] | The ID of a cluster replica. |
| `hydrated` | [`boolean`] | Whether the compute object is hydrated on the replica. |

### `mz_compute_operator_hydration_statuses`

The `mz_compute_operator_hydration_statuses` table describes the dataflow operator hydration status of compute objects (indexes or materialized views).

A dataflow operator is hydrated on a given replica when it has fully processed the initial snapshot of data available in its inputs.

<!-- RELATION_SPEC mz_internal.mz_compute_operator_hydration_statuses -->
| Field | Type | Meaning |
| ----------------------- | ----------- | -------- |
| `object_id` | [`text`] | The ID of a compute object. Corresponds to [`mz_catalog.mz_indexes.id`](../mz_catalog#mz_indexes) or [`mz_catalog.mz_materialized_views.id`](../mz_catalog#mz_materialized_views). |
| `physical_plan_node_id` | [`uint8`] | The ID of a node in the physical plan of the compute object. Corresponds to a `node_id` displayed in the output of `EXPLAIN PHYSICAL PLAN WITH (node_ids)`. |
| `replica_id` | [`text`] | The ID of a cluster replica. |
| `hydrated` | [`boolean`] | Whether the node is hydrated on the replica. |

<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_compute_operator_hydration_statuses_per_worker -->

### `mz_frontiers`

The `mz_frontiers` table describes the frontiers of each source, sink, table,
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
enable_mz_join_core: Some(config.enable_mz_join_core()),
enable_jemalloc_profiling: Some(config.enable_jemalloc_profiling()),
enable_columnation_lgalloc: Some(config.enable_columnation_lgalloc()),
enable_operator_hydration_status_logging: Some(
config.enable_compute_operator_hydration_status_logging(),
),
persist: persist_config(config),
tracing: tracing_config(config),
grpc_client: grpc_client_config(config),
Expand Down
31 changes: 31 additions & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1946,6 +1946,20 @@ pub static MZ_COMPUTE_HYDRATION_STATUSES: Lazy<BuiltinSource> = Lazy::new(|| Bui
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
});
pub static MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER: Lazy<BuiltinSource> =
Lazy::new(|| BuiltinSource {
name: "mz_compute_operator_hydration_statuses_per_worker",
schema: MZ_INTERNAL_SCHEMA,
data_source: IntrospectionType::ComputeOperatorHydrationStatus,
desc: RelationDesc::empty()
.with_column("object_id", ScalarType::String.nullable(false))
.with_column("physical_plan_node_id", ScalarType::UInt64.nullable(false))
.with_column("replica_id", ScalarType::String.nullable(false))
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("hydrated", ScalarType::Bool.nullable(false)),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
});

pub static MZ_DATABASES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
name: "mz_databases",
Expand Down Expand Up @@ -4178,6 +4192,21 @@ HAVING pg_catalog.sum(count) != 0",
access: vec![PUBLIC_SELECT],
});

pub static MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES: Lazy<BuiltinView> = Lazy::new(|| BuiltinView {
name: "mz_compute_operator_hydration_statuses",
schema: MZ_INTERNAL_SCHEMA,
column_defs: None,
sql: "
SELECT
object_id,
physical_plan_node_id,
replica_id,
bool_and(hydrated) AS hydrated
FROM mz_internal.mz_compute_operator_hydration_statuses_per_worker
GROUP BY object_id, physical_plan_node_id, replica_id",
access: vec![PUBLIC_SELECT],
});

pub static MZ_MESSAGE_COUNTS_PER_WORKER: Lazy<BuiltinView> = Lazy::new(|| BuiltinView {
name: "mz_message_counts_per_worker",
schema: MZ_INTERNAL_SCHEMA,
Expand Down Expand Up @@ -6711,10 +6740,12 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::View(&MZ_GLOBAL_FRONTIERS),
Builtin::Source(&MZ_COMPUTE_DEPENDENCIES),
Builtin::Source(&MZ_COMPUTE_HYDRATION_STATUSES),
Builtin::Source(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER),
Builtin::View(&MZ_HYDRATION_STATUSES),
Builtin::View(&MZ_MATERIALIZATION_LAG),
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS_PER_WORKER),
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS),
Builtin::View(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES),
Builtin::Source(&MZ_CLUSTER_REPLICA_FRONTIERS),
Builtin::Source(&MZ_CLUSTER_REPLICA_HEARTBEATS),
Builtin::Index(&MZ_SHOW_DATABASES_IND),
Expand Down
18 changes: 12 additions & 6 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,11 @@ where
}

for (type_, updates) in updates_by_type {
self.storage
.record_introspection_updates(type_, updates)
.await;
if !updates.is_empty() {
self.storage
.record_introspection_updates(type_, updates)
.await;
}
}
}
}
Expand Down Expand Up @@ -704,13 +706,17 @@ where

#[tracing::instrument(level = "debug", skip(self))]
async fn maintain(&mut self) {
// Record pending introspection updates.
self.record_introspection_updates().await;

// Perform instance maintenance work.
for instance in self.compute.instances.values_mut() {
instance.activate(self.storage).maintain();
}

// Record pending introspection updates.
//
// It's beneficial to do this as the last maintenance step because previous steps can cause
// dropping of state, which can can cause introspection retractions, which lower the volume
// of data we have to record.
self.record_introspection_updates().await;
}
}

Expand Down
160 changes: 132 additions & 28 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use futures::{future, StreamExt};
use mz_build_info::BuildInfo;
use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::NodeId;
use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection};
use mz_compute_types::sources::SourceInstanceDesc;
use mz_expr::RowSetFinishing;
Expand All @@ -46,7 +47,8 @@ use crate::protocol::command::{
};
use crate::protocol::history::ComputeCommandHistory;
use crate::protocol::response::{
ComputeResponse, CopyToResponse, PeekResponse, SubscribeBatch, SubscribeResponse,
ComputeResponse, CopyToResponse, OperatorHydrationStatus, PeekResponse, StatusResponse,
SubscribeBatch, SubscribeResponse,
};
use crate::service::{ComputeClient, ComputeGrpcClient};

Expand Down Expand Up @@ -444,6 +446,34 @@ impl<T: Timestamp> Instance<T> {
}
}

/// Update the tracked hydration status for an operator according to a received status update.
fn update_operator_hydration_status(
&mut self,
replica_id: ReplicaId,
status: OperatorHydrationStatus,
) {
let Some(replica) = self.replicas.get_mut(&replica_id) else {
tracing::error!(
%replica_id, ?status,
"status update for an unknown replica"
);
return;
};
let Some(collection) = replica.collections.get_mut(&status.collection_id) else {
tracing::error!(
%replica_id, ?status,
"status update for an unknown collection"
);
return;
};

collection.hydration_state.operator_hydrated(
status.lir_id,
status.worker_id,
status.hydrated,
);
}

/// Clean up collection state that is not needed anymore.
///
/// Three conditions need to be true before we can remove state for a collection:
Expand Down Expand Up @@ -1500,6 +1530,10 @@ where
ComputeResponse::SubscribeResponse(id, response) => {
self.handle_subscribe_response(id, response, replica_id)
}
ComputeResponse::Status(response) => {
self.handle_status_response(response, replica_id);
None
}
}
}

Expand Down Expand Up @@ -1689,6 +1723,14 @@ where
}
}
}

fn handle_status_response(&mut self, response: StatusResponse, replica_id: ReplicaId) {
match response {
StatusResponse::OperatorHydration(status) => self
.compute
.update_operator_hydration_status(replica_id, status),
}
}
}

impl<'a, T> ActiveInstance<'a, T>
Expand Down Expand Up @@ -1854,12 +1896,12 @@ impl<T> ReplicaState<T> {
/// Add a collection to the replica state.
fn add_collection(&mut self, id: GlobalId, as_of: Antichain<T>) {
let metrics = self.metrics.for_collection(id);
let hydration_flag = HydrationFlag::new(self.id, id, self.introspection_tx.clone());
let hydration_state = HydrationState::new(self.id, id, self.introspection_tx.clone());
let mut state = ReplicaCollectionState {
metrics,
created_at: Instant::now(),
as_of,
hydration_flag,
hydration_state,
};

// We need to consider the edge case where the as-of is the empty frontier. Such an as-of
Expand Down Expand Up @@ -1897,14 +1939,14 @@ struct ReplicaCollectionState<T> {
created_at: Instant,
/// As-of frontier with which this collection was installed on the replica.
as_of: Antichain<T>,
/// Tracks whether this collection is hydrated, i.e., it has produced some initial output.
hydration_flag: HydrationFlag,
/// Tracks hydration state for this collection.
hydration_state: HydrationState,
}

impl<T> ReplicaCollectionState<T> {
/// Returns whether this collection is hydrated.
fn hydrated(&self) -> bool {
self.hydration_flag.hydrated
self.hydration_state.hydrated
}

/// Marks the collection as hydrated and updates metrics and introspection accordingly.
Expand All @@ -1914,22 +1956,29 @@ impl<T> ReplicaCollectionState<T> {
metrics.initial_output_duration_seconds.set(duration);
}

self.hydration_flag.set();
self.hydration_state.collection_hydrated();
}
}

/// A wrapper type that maintains hydration introspection for a given replica and collection, and
/// ensures that reported introspection data is retracted when the flag is dropped.
/// Maintains both global and operator-level hydration introspection for a given replica and
/// collection, and ensures that reported introspection data is retracted when the flag is dropped.
#[derive(Debug)]
struct HydrationFlag {
struct HydrationState {
/// The ID of the replica.
replica_id: ReplicaId,
/// The ID of the compute collection.
collection_id: GlobalId,
/// Whether the collection is hydrated.
hydrated: bool,
/// Operator-level hydration state.
/// (lir_id, worker_id) -> hydrated
operators: BTreeMap<(NodeId, usize), bool>,
/// A channel through which introspection updates are delivered.
introspection_tx: crossbeam_channel::Sender<IntrospectionUpdates>,
}

impl HydrationFlag {
/// Create a new unset `HydrationFlag` and update introspection.
impl HydrationState {
/// Create a new `HydrationState` and initialize introspection.
fn new(
replica_id: ReplicaId,
collection_id: GlobalId,
Expand All @@ -1939,55 +1988,110 @@ impl HydrationFlag {
replica_id,
collection_id,
hydrated: false,
operators: Default::default(),
introspection_tx,
};

let insertion = self_.row();
self_.send(vec![(insertion, 1)]);
let insertion = self_.row_for_collection();
self_.send(
IntrospectionType::ComputeHydrationStatus,
vec![(insertion, 1)],
);

self_
}

/// Mark the collection as hydrated and update introspection.
fn set(&mut self) {
/// Update the collection as hydrated.
fn collection_hydrated(&mut self) {
if self.hydrated {
return; // nothing to do
}

let retraction = self.row();
let retraction = self.row_for_collection();
self.hydrated = true;
let insertion = self.row();
let insertion = self.row_for_collection();

self.send(
IntrospectionType::ComputeHydrationStatus,
vec![(retraction, -1), (insertion, 1)],
);
}

/// Update the given (lir_id, worker_id) pair as hydrated.
fn operator_hydrated(&mut self, lir_id: NodeId, worker_id: usize, hydrated: bool) {
let retraction = self.row_for_operator(lir_id, worker_id);
self.operators.insert((lir_id, worker_id), hydrated);
let insertion = self.row_for_operator(lir_id, worker_id);

if retraction == insertion {
return; // no change
}

self.send(vec![(retraction, -1), (insertion, 1)]);
let updates = retraction
.map(|r| (r, -1))
.into_iter()
.chain(insertion.map(|r| (r, 1)))
.collect();
self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
}

fn row(&self) -> Row {
/// Return a `Row` reflecting the current collection hydration status.
fn row_for_collection(&self) -> Row {
Row::pack_slice(&[
Datum::String(&self.collection_id.to_string()),
Datum::String(&self.replica_id.to_string()),
Datum::from(self.hydrated),
])
}

fn send(&self, updates: Vec<(Row, Diff)>) {
let result = self
.introspection_tx
.send((IntrospectionType::ComputeHydrationStatus, updates));
/// Return a `Row` reflecting the current hydration status of the identified operator.
///
/// Returns `None` if the identified operator is not tracked.
fn row_for_operator(&self, lir_id: NodeId, worker_id: usize) -> Option<Row> {
self.operators.get(&(lir_id, worker_id)).map(|hydrated| {
Row::pack_slice(&[
Datum::String(&self.collection_id.to_string()),
Datum::UInt64(lir_id),
Datum::String(&self.replica_id.to_string()),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::from(*hydrated),
])
})
}

fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
let result = self.introspection_tx.send((introspection_type, updates));

if result.is_err() {
// The global controller holds on to the `introspection_rx`. So when we get here that
// probably means that the controller was dropped and the process is shutting down, in
// which case we don't care about introspection updates anymore.
tracing::info!(
"discarding `ComputeHydrationStatus` update because the receiver disconnected"
?introspection_type,
"discarding introspection update because the receiver disconnected"
);
}
}
}

impl Drop for HydrationFlag {
impl Drop for HydrationState {
fn drop(&mut self) {
let retraction = self.row();
self.send(vec![(retraction, -1)]);
// Retract collection hydration status.
let retraction = self.row_for_collection();
self.send(
IntrospectionType::ComputeHydrationStatus,
vec![(retraction, -1)],
);

// Retract operator-level hydration status.
let operators: Vec<_> = self.operators.keys().collect();
let updates: Vec<_> = operators
.into_iter()
.flat_map(|(lir_id, worker_id)| self.row_for_operator(*lir_id, *worker_id))
.map(|r| (r, -1))
.collect();
if !updates.is_empty() {
self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates)
}
}
}
Loading

0 comments on commit 067ae87

Please sign in to comment.