diff --git a/Cargo.lock b/Cargo.lock index 6f32d14609087..a4fde5b2c4fd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5102,6 +5102,7 @@ dependencies = [ "mz-metrics", "mz-npm", "mz-orchestrator", + "mz-orchestrator-external", "mz-orchestrator-kubernetes", "mz-orchestrator-process", "mz-orchestrator-tracing", @@ -5617,6 +5618,23 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "mz-orchestrator-external" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "mz-adapter", + "mz-orchestrator", + "mz-ore", + "mz-repr", + "mz-sql", + "tokio", + "uuid", + "workspace-hack", +] + [[package]] name = "mz-orchestrator-kubernetes" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index 6f4e96c08f0f3..a58c0dda69ffc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ members = [ "src/npm", "src/orchestrator", "src/orchestrator-kubernetes", + "src/orchestrator-external", "src/orchestrator-process", "src/orchestrator-tracing", "src/orchestratord", @@ -170,6 +171,7 @@ default-members = [ "src/npm", "src/orchestrator", "src/orchestrator-kubernetes", + "src/orchestrator-external", "src/orchestrator-process", "src/orchestrator-tracing", "src/orchestratord", diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index 395978cbc125d..2d9d32c439c38 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -9142,6 +9142,20 @@ pub const MZ_ANALYTICS_CLUSTER: BuiltinCluster = BuiltinCluster { replication_factor: 0, }; +pub static MZ_EXTERNAL_ORCHESTRATOR_SERVICES: LazyLock = + LazyLock::new(|| BuiltinTable { + name: "mz_external_orchestrator_services", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::TABLE_MZ_EXTERNAL_ORCHESTRATOR_SERVICES_OID, + desc: RelationDesc::builder() + .with_column("id", ScalarType::String.nullable(false)) + .with_column("state", ScalarType::String.nullable(false)) + .with_key(vec![0]) + .finish(), + is_retained_metrics_object: false, + access: vec![PUBLIC_SELECT], + }); + /// List of all builtin objects sorted topologically by dependency. pub static BUILTINS_STATIC: LazyLock>> = LazyLock::new(|| { let mut builtins = vec![ diff --git a/src/environmentd/Cargo.toml b/src/environmentd/Cargo.toml index c884e6c9bf8ad..c55819e7170d6 100644 --- a/src/environmentd/Cargo.toml +++ b/src/environmentd/Cargo.toml @@ -55,6 +55,7 @@ mz-http-util = { path = "../http-util" } mz-interchange = { path = "../interchange" } mz-metrics = { path = "../metrics" } mz-orchestrator = { path = "../orchestrator" } +mz-orchestrator-external = { path = "../orchestrator-external" } mz-orchestrator-kubernetes = { path = "../orchestrator-kubernetes" } mz-orchestrator-process = { path = "../orchestrator-process" } mz-orchestrator-tracing = { path = "../orchestrator-tracing" } diff --git a/src/environmentd/src/environmentd/main.rs b/src/environmentd/src/environmentd/main.rs index 457ee83615cb1..93c2e2aca2b3d 100644 --- a/src/environmentd/src/environmentd/main.rs +++ b/src/environmentd/src/environmentd/main.rs @@ -39,6 +39,7 @@ use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController}; use mz_controller::ControllerConfig; use mz_frontegg_auth::{Authenticator, FronteggCliArgs}; use mz_orchestrator::Orchestrator; +use mz_orchestrator_external::{ExternalOrchestrator, ExternalOrchestratorConfig}; use mz_orchestrator_kubernetes::{ KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig, }; @@ -317,6 +318,10 @@ pub struct Args { /// production, only testing. #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")] orchestrator_kubernetes_coverage: bool, + /// Template to use to generate the hostname for cluster replicas. Will + /// replace {name} with the replica name and {id} with the replica id. + #[structopt(long, env = "ORCHESTRATOR_EXTERNAL_HOSTNAME_TEMPLATE")] + orchestrator_external_hostname_template: Option, /// The secrets controller implementation to use. #[structopt( long, @@ -599,6 +604,7 @@ pub struct Args { enum OrchestratorKind { Kubernetes, Process, + External, } // TODO [Alex Hunt] move this to a shared function that can be imported by the @@ -851,6 +857,45 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { }; (orchestrator, secrets_controller, None) } + OrchestratorKind::External => { + if args.orchestrator_process_scratch_directory.is_some() { + bail!( + "--orchestrator-process-scratch-directory is \ + not currently usable with the external orchestrator" + ); + } + let Some(hostname_template) = args.orchestrator_external_hostname_template else { + bail!("--orchestrator-external-hostname-template is required when using Orchestrator::External") + }; + + let orchestrator = Arc::new(ExternalOrchestrator::new(ExternalOrchestratorConfig { + hostname_template, + })); + let secrets_controller: Arc = match args.secrets_controller { + SecretsControllerKind::Kubernetes => bail!( + "SecretsControllerKind::Kubernetes is not yet implemented for Orchestrator::External." + ), + SecretsControllerKind::AwsSecretsManager => { + Arc::new( + runtime.block_on(AwsSecretsController::new( + // TODO [Alex Hunt] move this to a shared function that can be imported by the + // region-controller. + &aws_secrets_controller_prefix(&args.environment_id), + &aws_secrets_controller_key_alias(&args.environment_id), + args.aws_secrets_controller_tags + .into_iter() + .map(|tag| (tag.key, tag.value)) + .collect(), + )), + ) + } + SecretsControllerKind::LocalFile => bail!( + "SecretsControllerKind::LocalFile is not compatible with Orchestrator::External." + ), + }; + // TODO: secrets and vpc endpoints need to be implemented here + (orchestrator, secrets_controller, None) + } }; drop(entered); let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader()); diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index af0f775306faf..4c8ffd041c3d2 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -36,6 +36,7 @@ use mz_catalog::durable::BootstrapArgs; use mz_cloud_resources::CloudResourceController; use mz_controller::ControllerConfig; use mz_frontegg_auth::Authenticator as FronteggAuthentication; +use mz_orchestrator_external::ExternalOrchestrator; use mz_ore::future::OreFutureExt; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::NowFn; @@ -612,6 +613,8 @@ impl Listeners { connection_limiter.update_superuser_reserved(superuser_reserved); }); + let orchestrator = Arc::clone(&config.controller.orchestrator); + let webhook_concurrency_limit = WebhookConcurrencyLimiter::default(); let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config { connection_context: config.controller.connection_context.clone(), @@ -668,6 +671,13 @@ impl Listeners { let serve_postamble_start = Instant::now(); info!("startup: envd serve: postamble beginning"); + // Install an adapter client in the orchestrator + if let Some(external_orchestrator) = + orchestrator.as_any().downcast_ref::() + { + external_orchestrator.set_adapter_client(adapter_client.clone()); + } + // Install an adapter client in the internal HTTP server. internal_http_adapter_client_tx .send(adapter_client.clone()) diff --git a/src/orchestrator-external/Cargo.toml b/src/orchestrator-external/Cargo.toml new file mode 100644 index 0000000000000..698a9946d3883 --- /dev/null +++ b/src/orchestrator-external/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "mz-orchestrator-external" +description = "Service orchestration via an external orchestrator." +version = "0.0.0" +edition.workspace = true +rust-version.workspace = true +publish = false + +[lints] +workspace = true + +[dependencies] +anyhow = "1.0.66" +async-trait = "0.1.68" +futures = "0.3.25" +mz-adapter = { path = "../adapter" } +mz-orchestrator = { path = "../orchestrator" } +mz-ore = { path = "../ore" } +mz-repr = { path = "../repr" } +mz-sql = { path = "../sql" } +tokio = "1.32.0" +uuid = { version = "1.7.0", features = ["v4"] } +workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } + +[features] +default = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] diff --git a/src/orchestrator-external/src/lib.rs b/src/orchestrator-external/src/lib.rs new file mode 100644 index 0000000000000..3b8bbc94d1f82 --- /dev/null +++ b/src/orchestrator-external/src/lib.rs @@ -0,0 +1,322 @@ +use std::{ + any::Any, + collections::{BTreeMap, VecDeque}, + sync::{Arc, Mutex}, +}; + +use anyhow::bail; +use async_trait::async_trait; +use futures::stream::BoxStream; +use tokio::sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + oneshot, +}; +use uuid::Uuid; + +use mz_adapter::{ + session::SessionConfig, Client, ExecuteResponse, PeekResponseUnary, SessionClient, +}; +use mz_orchestrator::{ + scheduling_config, NamespacedOrchestrator, Orchestrator, Service, ServiceConfig, ServiceEvent, + ServiceProcessMetrics, +}; +use mz_ore::{collections::CollectionExt, task::AbortOnDropHandle}; +use mz_repr::RowIterator; +use mz_sql::{parse::StatementParseResult, session::user::SYSTEM_USER}; + +/// Configures an [`ExternalOrchestrator`]. +#[derive(Debug, Clone)] +pub struct ExternalOrchestratorConfig { + /// Template to use to generate service hostnames given a service name + /// and id. For instance: + /// "{name}-{i}.{name}.mz-environment.svc.cluster.local" + pub hostname_template: String, +} + +#[derive(Debug)] +pub struct ExternalOrchestrator { + config: ExternalOrchestratorConfig, + namespaces: Mutex>>, + adapter_client: Mutex>, +} + +impl ExternalOrchestrator { + pub fn new(config: ExternalOrchestratorConfig) -> Self { + Self { + config, + namespaces: Mutex::new(BTreeMap::new()), + adapter_client: Mutex::new(None), + } + } + + fn adapter_client(&self) -> Option { + self.adapter_client.lock().expect("lock poisoned").clone() + } + + pub fn set_adapter_client(&self, new_client: Client) { + *self.adapter_client.lock().expect("lock poisoned") = Some(new_client.clone()); + for namespace in self.namespaces.lock().expect("lock poisoned").values() { + if let Some(namespace) = Arc::clone(namespace) + .as_any() + .downcast_ref::() + { + namespace.set_adapter_client(new_client.clone()); + } + } + } +} + +impl Orchestrator for ExternalOrchestrator { + fn namespace(&self, namespace: &str) -> Arc { + let mut namespaces = self.namespaces.lock().expect("lock poisoned"); + Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| { + let (command_tx, command_rx) = mpsc::unbounded_channel(); + let worker = OrchestratorWorker::new(command_rx) + .spawn(format!("external-orchestrator-worker:{namespace}")); + let namespace = NamespacedExternalOrchestrator { + namespace: namespace.into(), + config: self.config.clone(), + command_tx, + _worker: worker, + }; + if let Some(client) = &self.adapter_client() { + namespace.set_adapter_client(client.clone()); + } + Arc::new(namespace) + })) + } + + fn as_any(self: Arc) -> Arc { + self + } +} + +#[derive(Debug)] +pub struct NamespacedExternalOrchestrator { + namespace: String, + config: ExternalOrchestratorConfig, + command_tx: UnboundedSender, + _worker: AbortOnDropHandle<()>, +} + +impl NamespacedExternalOrchestrator { + fn set_adapter_client(&self, client: Client) { + self.command_tx + .send(Command::SetAdapterClient { client }) + .expect("worker task not dropped"); + } +} + +#[async_trait] +impl NamespacedOrchestrator for NamespacedExternalOrchestrator { + fn ensure_service( + &self, + id: &str, + config: ServiceConfig, + ) -> Result, anyhow::Error> { + self.command_tx + .send(Command::Ensure { + id: format!("{}-{}", self.namespace, id), + }) + .expect("worker task not dropped"); + let hosts = (0..config.scale) + .map(|i| { + self.config + .hostname_template + .replace("{name}", &format!("{}-{}", self.namespace, id)) + .replace("{idx}", &i.to_string()) + }) + .collect::>(); + let ports = config + .ports + .into_iter() + .map(|p| (p.name, p.port_hint)) + .collect(); + Ok(Box::new(ExternalService { hosts, ports })) + } + + fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> { + self.command_tx + .send(Command::Drop { + id: format!("{}-{}", self.namespace, id), + }) + .expect("worker task not dropped"); + Ok(()) + } + + async fn list_services(&self) -> Result, anyhow::Error> { + let (tx, rx) = oneshot::channel(); + self.command_tx + .send(Command::List { reply: tx }) + .expect("worker task not dropped"); + Ok(rx.await?) + } + + fn watch_services(&self) -> BoxStream<'static, Result> { + todo!() + } + + async fn fetch_service_metrics( + &self, + _id: &str, + ) -> Result, anyhow::Error> { + todo!() + } + + fn update_scheduling_config(&self, _config: scheduling_config::ServiceSchedulingConfig) { + todo!() + } + + fn as_any(self: Arc) -> Arc { + self + } +} + +#[derive(Debug)] +pub struct ExternalService { + hosts: Vec, + ports: BTreeMap, +} + +impl Service for ExternalService { + fn addresses(&self, port: &str) -> Vec { + let port = self.ports[port]; + self.hosts + .iter() + .map(|host| format!("{host}:{port}")) + .collect() + } +} + +enum Command { + Ensure { id: String }, + Drop { id: String }, + List { reply: oneshot::Sender> }, + SetAdapterClient { client: Client }, +} + +struct OrchestratorWorker { + command_rx: UnboundedReceiver, + session_client: Option, + init_queue: VecDeque, +} + +impl OrchestratorWorker { + fn new(command_rx: UnboundedReceiver) -> Self { + Self { + command_rx, + session_client: None, + init_queue: VecDeque::new(), + } + } + fn spawn(self, name: String) -> AbortOnDropHandle<()> { + mz_ore::task::spawn(|| name, self.run()).abort_on_drop() + } + + async fn run(mut self) { + while let Some(cmd) = self.command_rx.recv().await { + self.wait_for_adapter_client(cmd).await.unwrap(); + + if self.session_client.is_some() { + while let Some(cmd) = self.init_queue.pop_front() { + self.handle_command(cmd).await.unwrap(); + } + break; + } + } + while let Some(cmd) = self.command_rx.recv().await { + self.handle_command(cmd).await.unwrap(); + } + } + + async fn wait_for_adapter_client(&mut self, command: Command) -> Result<(), anyhow::Error> { + match command { + Command::SetAdapterClient { client } => { + self.session_client = Some(session_client(client).await?); + } + _ => { + self.init_queue.push_back(command); + } + } + Ok(()) + } + + async fn handle_command(&mut self, command: Command) -> Result<(), anyhow::Error> { + match command { + Command::Ensure { id } => { + // TODO: use placeholders here? + self.execute_sql( + &format!( + "INSERT INTO mz_internal.mz_external_orchestrator_services VALUES ('{id}', 'Pending')" + ), + ) + .await?; + } + Command::Drop { id } => { + // TODO: use placeholders here? + self.execute_sql(&format!( + "DELETE FROM mz_internal.mz_external_orchestrator_services WHERE id = '{id}'" + )) + .await?; + } + Command::List { reply } => { + let rows = self + .execute_sql("SELECT id FROM mz_internal.mz_external_orchestrator_services") + .await?; + reply + .send( + rows.map(|row| row.into_iter().next().unwrap().to_string()) + .collect(), + ) + .expect("channel closed"); + } + Command::SetAdapterClient { .. } => { + // this can't currently happen + panic!("adapter client already set") + } + } + Ok(()) + } + + async fn execute_sql(&mut self, sql: &str) -> Result, anyhow::Error> { + let session_client = self + .session_client + .as_mut() + .expect("adapter client not initialized"); + let stmts = mz_sql::parse::parse(sql)?; + if stmts.len() != 1 { + bail!("must supply exactly one query"); + } + let StatementParseResult { ast: stmt, sql } = stmts.into_element(); + + const EMPTY_PORTAL: &str = ""; + session_client.start_transaction(Some(1))?; + session_client + .declare(EMPTY_PORTAL.into(), stmt, sql.to_string()) + .await?; + match session_client + .execute(EMPTY_PORTAL.into(), futures::future::pending(), None) + .await? + { + (ExecuteResponse::SendingRows { future, .. }, _) => match future.await { + PeekResponseUnary::Rows(rows) => Ok(rows), + PeekResponseUnary::Canceled => bail!("query canceled"), + PeekResponseUnary::Error(e) => bail!(e), + }, + r => bail!("unsupported response type: {r:?}"), + } + } +} + +async fn session_client(client: Client) -> Result { + let conn_id = client.new_conn_id()?; + let session = client.new_session(SessionConfig { + conn_id, + uuid: Uuid::new_v4(), + user: SYSTEM_USER.name.clone(), + client_ip: None, + external_metadata_rx: None, + helm_chart_version: None, + }); + Ok(client.startup(session).await?) +} diff --git a/src/orchestrator-kubernetes/src/lib.rs b/src/orchestrator-kubernetes/src/lib.rs index 26274f64dc263..8cd91c1522f2e 100644 --- a/src/orchestrator-kubernetes/src/lib.rs +++ b/src/orchestrator-kubernetes/src/lib.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::any::Any; use std::collections::BTreeMap; use std::future::Future; use std::sync::{Arc, Mutex}; @@ -206,6 +207,10 @@ impl Orchestrator for KubernetesOrchestrator { }) })) } + + fn as_any(self: Arc) -> Arc { + self + } } #[derive(Clone, Copy)] @@ -1338,6 +1343,10 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { fn update_scheduling_config(&self, config: ServiceSchedulingConfig) { *self.scheduling_config.write().expect("poisoned") = config; } + + fn as_any(self: Arc) -> Arc { + self + } } impl OrchestratorWorker { diff --git a/src/orchestrator-process/src/lib.rs b/src/orchestrator-process/src/lib.rs index 0d0da50e97767..10c178dff36be 100644 --- a/src/orchestrator-process/src/lib.rs +++ b/src/orchestrator-process/src/lib.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::any::Any; use std::collections::BTreeMap; use std::env; use std::ffi::OsStr; @@ -290,6 +291,10 @@ impl Orchestrator for ProcessOrchestrator { }) })) } + + fn as_any(self: Arc) -> Arc { + self + } } /// Configuration for a [`NamespacedProcessOrchestrator`]. @@ -412,6 +417,10 @@ impl NamespacedOrchestrator for NamespacedProcessOrchestrator { ) { // This orchestrator ignores scheduling constraints. } + + fn as_any(self: Arc) -> Arc { + self + } } /// Commands sent from a [`NamespacedProcessOrchestrator`] to its diff --git a/src/orchestrator-tracing/src/lib.rs b/src/orchestrator-tracing/src/lib.rs index 4a5a36a9f9986..dc48f5d40b740 100644 --- a/src/orchestrator-tracing/src/lib.rs +++ b/src/orchestrator-tracing/src/lib.rs @@ -9,11 +9,11 @@ //! Service orchestration for tracing-aware services. -use std::collections::BTreeMap; use std::ffi::OsString; use std::fmt; use std::sync::Arc; use std::time::Duration; +use std::{any::Any, collections::BTreeMap}; use async_trait::async_trait; use clap::{FromArgMatches, IntoApp}; @@ -374,6 +374,10 @@ impl Orchestrator for TracingOrchestrator { tracing_args: self.tracing_args.clone(), }) } + + fn as_any(self: Arc) -> Arc { + self + } } #[derive(Debug)] @@ -524,6 +528,10 @@ impl NamespacedOrchestrator for NamespacedTracingOrchestrator { ) { self.inner.update_scheduling_config(config) } + + fn as_any(self: Arc) -> Arc { + self + } } /// Specifies the format of a stderr log message. diff --git a/src/orchestrator/src/lib.rs b/src/orchestrator/src/lib.rs index aae9d3a31a377..2baa869f78366 100644 --- a/src/orchestrator/src/lib.rs +++ b/src/orchestrator/src/lib.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::any::Any; use std::collections::BTreeMap; use std::fmt; use std::str::FromStr; @@ -47,6 +48,7 @@ use serde::{Deserialize, Deserializer, Serialize}; pub trait Orchestrator: fmt::Debug + Send + Sync { /// Enter a namespace in the orchestrator. fn namespace(&self, namespace: &str) -> Arc; + fn as_any(self: Arc) -> Arc; } /// An orchestrator restricted to a single namespace. @@ -84,6 +86,8 @@ pub trait NamespacedOrchestrator: fmt::Debug + Send + Sync { ) -> Result, anyhow::Error>; fn update_scheduling_config(&self, config: scheduling_config::ServiceSchedulingConfig); + + fn as_any(self: Arc) -> Arc; } /// An event describing a status change of an orchestrated service. diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index 0c01911126f98..5005ea6a1e816 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -8,17 +8,19 @@ // by the Apache License, Version 2.0. use std::{ - collections::BTreeSet, + collections::{BTreeMap, BTreeSet}, fmt::Display, str::FromStr, sync::{Arc, Mutex}, + time::Duration, }; use http::HeaderValue; use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time}; use kube::{api::PostParams, runtime::controller::Action, Api, Client, Resource, ResourceExt}; use serde::Deserialize; -use tracing::{debug, trace}; +use tokio_postgres::NoTls; +use tracing::{debug, trace, warn}; use crate::metrics::Metrics; use mz_cloud_provider::CloudProvider; @@ -27,7 +29,12 @@ use mz_cloud_resources::crd::materialize::v1alpha1::{ }; use mz_orchestrator_kubernetes::KubernetesImagePullPolicy; use mz_orchestrator_tracing::TracingCliArgs; -use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument}; +use mz_ore::{ + cast::CastFrom, + cli::KeyValueArg, + instrument, + task::{spawn, AbortOnDropHandle}, +}; pub mod balancer; pub mod console; @@ -54,6 +61,8 @@ pub struct Args { enable_prometheus_scrape_annotations: bool, #[clap(long)] disable_authentication: bool, + #[clap(long)] + use_external_orchestrator: bool, #[clap(long)] segment_api_key: Option, @@ -142,6 +151,32 @@ pub struct Args { default_certificate_specs: DefaultCertificateSpecs, } +impl Args { + fn environmentd_internal_hostname(&self, namespace: &str, service_name: &str) -> String { + if let Some(host_override) = &self.environmentd_internal_http_host_override { + host_override.to_string() + } else { + format!("{}.{}.svc.cluster.local", service_name, namespace) + } + } + + fn environmentd_internal_http_address(&self, namespace: &str, service_name: &str) -> String { + format!( + "{}:{}", + self.environmentd_internal_hostname(namespace, service_name), + self.environmentd_internal_http_port + ) + } + + fn environmentd_internal_sql_address(&self, namespace: &str, service_name: &str) -> String { + format!( + "{}:{}", + self.environmentd_internal_hostname(namespace, service_name), + self.environmentd_internal_sql_port + ) + } +} + #[derive(Deserialize, Default)] #[serde(rename_all = "camelCase")] pub struct DefaultCertificateSpecs { @@ -205,12 +240,121 @@ impl Display for Error { } } +struct EnvironmentWorker { + client: Client, + namespace: String, + name: String, + internal_pgwire_url: String, +} + +impl EnvironmentWorker { + fn new(client: Client, namespace: String, name: String, internal_pgwire_url: String) -> Self { + Self { + client, + namespace, + name, + internal_pgwire_url, + } + } + + async fn run(&self, initial_generation: u64) { + // this is required to break the bootstrapping loop, since we can't + // run subscribe queries until there is a system cluster available to + // run them on + self.ensure_replica( + &format!("cluster-s1-replica-s1-gen-{}", initial_generation), + false, + ) + .await; + + let mut active_client = None; + loop { + if let Some(client) = active_client.as_mut() { + if let Err(e) = self.subscribe(client).await { + warn!("lost subscribe connection: {e}"); + active_client = None; + } + } else { + if let Ok(client) = self.reconnect().await { + active_client = Some(client); + } + } + } + } + + async fn ensure_replica(&self, replica_name: &str, write_status: bool) { + // ensure_service logic from orchestrator-kubernetes goes here + todo!() + } + + async fn drop_replica(&self, replica_name: &str) { + // drop_service logic from orchestrator-kubernetes goes here + todo!() + } + + async fn subscribe(&self, client: &mut tokio_postgres::Client) -> Result<(), anyhow::Error> { + let transaction = client.transaction().await?; + transaction + .execute( + "DECLARE c CURSOR FOR SUBSCRIBE (SELECT id, state FROM mz_internal.mz_external_orchestrator_services) ENVELOPE UPSERT (KEY (id));", + &[], + ) + .await?; + loop { + let results = transaction.query("FETCH ALL c;", &[]).await?; + for row in results { + let id = row.get("id"); + match row.get("mz_state") { + "upsert" => { + self.ensure_replica(id, true).await; + } + "delete" => { + self.drop_replica(id).await; + } + _ => {} + } + } + } + } + + async fn reconnect(&self) -> Result { + let (client, connection) = match tokio::time::timeout( + Duration::from_secs(5), + tokio_postgres::connect(&self.internal_pgwire_url, NoTls), + ) + .await + { + Ok(Ok((client, connection))) => (client, connection), + Ok(Err(err)) => { + warn!("failed to connect to environmentd: {err}"); + return Err(err.into()); + } + Err(err) => { + warn!("timed out connecting to environmentd"); + return Err(err.into()); + } + }; + + mz_ore::task::spawn( + || format!("postgres connection for {}/{}", self.namespace, self.name), + async move { + if let Err(e) = connection.await { + panic!("connection error: {}", e); + } + }, + ); + + Ok(client) + } +} + pub struct Context { config: Args, tracing: TracingCliArgs, orchestratord_namespace: String, metrics: Arc, needs_update: Arc>>, + environment_workers: Mutex>>, } impl Context { @@ -237,6 +381,7 @@ impl Context { orchestratord_namespace, metrics, needs_update: Default::default(), + environment_workers: Mutex::new(BTreeMap::new()), } } @@ -279,6 +424,51 @@ impl Context { ) .await } + + fn start_environment_worker(&self, mz: &Materialize, client: Client) { + let namespace = mz.namespace(); + let name = mz.name_unchecked(); + let generation = mz + .status + .as_ref() + .map(|status| status.active_generation) + .unwrap_or(1); + let internal_pgwire_url = format!( + "postgres://mz_system@{}/materialize", + self.config.environmentd_internal_sql_address( + &mz.namespace(), + &mz.environmentd_service_name() + ) + ); + self.environment_workers + .lock() + .unwrap() + .entry(mz.metadata.uid.clone().unwrap()) + .or_insert_with(|| { + spawn( + || { + format!( + "environment worker for {}/{}", + mz.namespace(), + mz.name_unchecked() + ) + }, + async move { + EnvironmentWorker::new(client, namespace, name, internal_pgwire_url) + .run(generation) + .await + }, + ) + .abort_on_drop() + }); + } + + fn stop_environment_worker(&self, mz: &Materialize) { + self.environment_workers + .lock() + .unwrap() + .remove(mz.metadata.uid.as_ref().unwrap()); + } } #[async_trait::async_trait] @@ -421,6 +611,9 @@ impl k8s_controller::Context for Context { false, ) .await?; + if self.config.use_external_orchestrator { + self.start_environment_worker(mz, client.clone()); + } Ok(None) } Err(e) => { @@ -591,6 +784,7 @@ impl k8s_controller::Context for Context { mz: &Self::Resource, ) -> Result, Self::Error> { self.set_needs_update(mz, false); + self.stop_environment_worker(mz); Ok(None) } diff --git a/src/orchestratord/src/controller/materialize/environmentd.rs b/src/orchestratord/src/controller/materialize/environmentd.rs index 1a2620c24ce2e..9057369d8609d 100644 --- a/src/orchestratord/src/controller/materialize/environmentd.rs +++ b/src/orchestratord/src/controller/materialize/environmentd.rs @@ -192,8 +192,10 @@ impl Resources { return Ok(Some(retry_action)); } - let environmentd_url = - environmentd_internal_http_address(args, namespace, &*self.generation_service); + let environmentd_url = args.environmentd_internal_http_address( + namespace, + &self.generation_service.name_unchecked(), + ); let http_client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(10)) @@ -1312,23 +1314,6 @@ enum BecomeLeaderResult { Failure { message: String }, } -fn environmentd_internal_http_address( - args: &super::Args, - namespace: &str, - generation_service: &Service, -) -> String { - let host = if let Some(host_override) = &args.environmentd_internal_http_host_override { - host_override.to_string() - } else { - format!( - "{}.{}.svc.cluster.local", - generation_service.name_unchecked(), - namespace, - ) - }; - format!("{}:{}", host, args.environmentd_internal_http_port) -} - fn statefulset_pod_name(statefulset: &StatefulSet, idx: u64) -> String { format!("{}-{}", statefulset.name_unchecked(), idx) } diff --git a/src/pgrepr-consts/src/oid.rs b/src/pgrepr-consts/src/oid.rs index 4d52d45c49186..65a03ce4bd6aa 100644 --- a/src/pgrepr-consts/src/oid.rs +++ b/src/pgrepr-consts/src/oid.rs @@ -770,3 +770,4 @@ pub const VIEW_MZ_DATAFLOW_GLOBAL_IDS_OID: u32 = 17047; pub const NETWORK_POLICIES_DEFAULT_POLICY_OID: u32 = 17048; pub const VIEW_MZ_CLUSTER_DEPLOYMENT_LINEAGE_OID: u32 = 17049; pub const INDEX_MZ_CLUSTER_DEPLOYMENT_LINEAGE_IND_OID: u32 = 17050; +pub const TABLE_MZ_EXTERNAL_ORCHESTRATOR_SERVICES_OID: u32 = 17051;