diff --git a/Cargo.lock b/Cargo.lock index c7524cbcf36..207d26745b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6186,18 +6186,31 @@ dependencies = [ name = "nexus-mgs-updates" version = "0.1.0" dependencies = [ + "chrono", "futures", "gateway-client", "gateway-messages", "gateway-test-utils", "hubtools", + "id-map", + "internal-dns-resolver", + "internal-dns-types", + "nexus-types", + "omicron-common", "omicron-workspace-hack", + "qorb", "rand 0.8.5", + "repo-depot-client", "reqwest", + "sha2", "slog", + "slog-error-chain", "sp-sim", "thiserror 1.0.69", "tokio", + "tokio-stream", + "tokio-util", + "tufaceous-artifact", "uuid", ] @@ -6286,6 +6299,7 @@ dependencies = [ "nexus-db-queries", "nexus-db-schema", "nexus-inventory", + "nexus-mgs-updates", "nexus-networking", "nexus-reconfigurator-planning", "nexus-reconfigurator-preparation", @@ -7247,6 +7261,7 @@ dependencies = [ "nexus-internal-api", "nexus-inventory", "nexus-metrics-producer-gc", + "nexus-mgs-updates", "nexus-networking", "nexus-reconfigurator-execution", "nexus-reconfigurator-planning", @@ -7481,14 +7496,19 @@ dependencies = [ "clap", "dropshot 0.16.0", "internal-dns-resolver", + "internal-dns-types", "nexus-db-model", "nexus-db-queries", + "nexus-mgs-updates", "nexus-reconfigurator-execution", "nexus-types", + "omicron-common", "omicron-rpaths", "omicron-uuid-kinds", "omicron-workspace-hack", "pq-sys", + "qorb", + "repo-depot-client", "serde_json", "slog", "supports-color", @@ -10049,6 +10069,30 @@ dependencies = [ "uuid", ] +[[package]] +name = "reconfigurator-sp-updater" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "clap", + "dropshot 0.16.0", + "futures", + "gateway-client", + "humantime", + "internal-dns-resolver", + "internal-dns-types", + "nexus-mgs-updates", + "nexus-types", + "omicron-repl-utils", + "omicron-workspace-hack", + "qorb", + "serde_json", + "slog", + "tokio", + "tufaceous-artifact", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -12637,6 +12681,7 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite", + "slab", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 6c67ccc345b..d722cbe9c4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "dev-tools/pins", "dev-tools/reconfigurator-cli", "dev-tools/reconfigurator-exec-unsafe", + "dev-tools/reconfigurator-sp-updater", "dev-tools/releng", "dev-tools/repl-utils", "dev-tools/repo-depot-standalone", @@ -188,6 +189,7 @@ default-members = [ "dev-tools/pins", "dev-tools/reconfigurator-cli", "dev-tools/reconfigurator-exec-unsafe", + "dev-tools/reconfigurator-sp-updater", "dev-tools/releng", "dev-tools/repl-utils", "dev-tools/repo-depot-standalone", @@ -688,7 +690,7 @@ tokio = "1.43.0" tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4", "with-uuid-1" ] } tokio-stream = "0.1.17" tokio-tungstenite = "0.23.1" -tokio-util = { version = "0.7.13", features = ["io", "io-util"] } +tokio-util = { version = "0.7.13", features = ["io", "io-util", "time"] } toml = "0.8.20" toml_edit = "0.22.24" tough = { version = "0.20.0", features = [ "http" ] } diff --git a/dev-tools/ls-apis/tests/api_dependencies.out b/dev-tools/ls-apis/tests/api_dependencies.out index 861fc12437f..00e0ed2400b 100644 --- a/dev-tools/ls-apis/tests/api_dependencies.out +++ b/dev-tools/ls-apis/tests/api_dependencies.out @@ -48,7 +48,7 @@ ereport (client: ereport-client) Management Gateway Service (client: gateway-client) consumed by: dpd (dendrite/dpd) via 1 path - consumed by: omicron-nexus (omicron/nexus) via 3 paths + consumed by: omicron-nexus (omicron/nexus) via 4 paths consumed by: omicron-sled-agent (omicron/sled-agent) via 1 path consumed by: wicketd (omicron/wicketd) via 3 paths @@ -83,6 +83,7 @@ Crucible Repair (client: repair-client) consumed by: crucible-downstairs (crucible/downstairs) via 1 path Repo Depot API (client: repo-depot-client) + consumed by: omicron-nexus (omicron/nexus) via 1 path consumed by: omicron-sled-agent (omicron/sled-agent) via 1 path Sled Agent (client: sled-agent-client) diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index d1c2108b44a..e8638aa2aa6 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -1423,6 +1423,7 @@ parent: internal DNS version: 1 external DNS version: 2 + PENDING MGS-MANAGED UPDATES: 0 --------------------------------------------- stderr: @@ -1519,6 +1520,7 @@ parent: internal DNS version: 1 external DNS version: 2 + PENDING MGS-MANAGED UPDATES: 0 --------------------------------------------- stderr: diff --git a/dev-tools/reconfigurator-cli/tests/output/cmd-example-stdout b/dev-tools/reconfigurator-cli/tests/output/cmd-example-stdout index a2639cd2db7..20cc5fee234 100644 --- a/dev-tools/reconfigurator-cli/tests/output/cmd-example-stdout +++ b/dev-tools/reconfigurator-cli/tests/output/cmd-example-stdout @@ -358,6 +358,7 @@ parent: 02697f74-b14a-4418-90f0-c28b2a3a6aa9 internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 > @@ -453,5 +454,6 @@ parent: 02697f74-b14a-4418-90f0-c28b2a3a6aa9 internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 diff --git a/dev-tools/reconfigurator-cli/tests/output/cmd-expunge-newly-added-stdout b/dev-tools/reconfigurator-cli/tests/output/cmd-expunge-newly-added-stdout index d7791c3baa4..3dc26155c76 100644 --- a/dev-tools/reconfigurator-cli/tests/output/cmd-expunge-newly-added-stdout +++ b/dev-tools/reconfigurator-cli/tests/output/cmd-expunge-newly-added-stdout @@ -302,6 +302,7 @@ parent: 06c88262-f435-410e-ba98-101bed41ec27 internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 > blueprint-edit 3f00b694-1b16-4aaa-8f78-e6b3a527b434 expunge-zone 9995de32-dd52-4eb1-b0eb-141eb84bc739 @@ -605,6 +606,7 @@ parent: 3f00b694-1b16-4aaa-8f78-e6b3a527b434 internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 > blueprint-plan 366b0b68-d80e-4bc1-abd3-dc69837847e0 @@ -922,6 +924,7 @@ parent: 366b0b68-d80e-4bc1-abd3-dc69837847e0 internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 > blueprint-edit 9c998c1d-1a7b-440a-ae0c-40f781dea6e2 expunge-zone d786ef4a-5acb-4f5d-a732-a00addf986b5 diff --git a/dev-tools/reconfigurator-cli/tests/output/cmd-set-zone-images-stdout b/dev-tools/reconfigurator-cli/tests/output/cmd-set-zone-images-stdout index 737ac8bcdba..efb997be6d5 100644 --- a/dev-tools/reconfigurator-cli/tests/output/cmd-set-zone-images-stdout +++ b/dev-tools/reconfigurator-cli/tests/output/cmd-set-zone-images-stdout @@ -94,6 +94,7 @@ parent: 1b013011-2062-4b48-b544-a32b23bce83a internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 > @@ -198,6 +199,7 @@ parent: 9766ca20-38d4-4380-b005-e7c43c797e7c internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 > blueprint-diff 971eeb12-1830-4fa0-a699-98ea0164505c f714e6ea-e85a-4d7d-93c2-a018744fe176 @@ -470,6 +472,7 @@ parent: bb128f06-a2e1-44c1-8874-4f789d0ff896 internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 > blueprint-diff f714e6ea-e85a-4d7d-93c2-a018744fe176 d9c572a1-a68c-4945-b1ec-5389bd588fe9 diff --git a/dev-tools/reconfigurator-exec-unsafe/Cargo.toml b/dev-tools/reconfigurator-exec-unsafe/Cargo.toml index 3efe7d92923..f4259730820 100644 --- a/dev-tools/reconfigurator-exec-unsafe/Cargo.toml +++ b/dev-tools/reconfigurator-exec-unsafe/Cargo.toml @@ -16,13 +16,18 @@ camino.workspace = true clap.workspace = true dropshot.workspace = true internal-dns-resolver.workspace = true +internal-dns-types.workspace = true nexus-db-model.workspace = true nexus-db-queries.workspace = true +nexus-mgs-updates.workspace = true nexus-reconfigurator-execution.workspace = true nexus-types.workspace = true +omicron-common.workspace = true omicron-uuid-kinds.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" +qorb.workspace = true +repo-depot-client.workspace = true serde_json.workspace = true slog.workspace = true supports-color.workspace = true diff --git a/dev-tools/reconfigurator-exec-unsafe/src/main.rs b/dev-tools/reconfigurator-exec-unsafe/src/main.rs index 0e4f818e15f..bd881d212ac 100644 --- a/dev-tools/reconfigurator-exec-unsafe/src/main.rs +++ b/dev-tools/reconfigurator-exec-unsafe/src/main.rs @@ -5,20 +5,34 @@ //! Execute blueprints from the command line use anyhow::Context; +use anyhow::anyhow; use anyhow::bail; use camino::Utf8PathBuf; use clap::ColorChoice; use clap::Parser; +use dropshot::PaginationOrder; +use internal_dns_types::names::ServiceName; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; use nexus_db_queries::db::DataStore; +use nexus_mgs_updates::ArtifactCache; +use nexus_mgs_updates::MgsUpdateDriver; use nexus_reconfigurator_execution::{RequiredRealizeArgs, realize_blueprint}; use nexus_types::deployment::Blueprint; +use nexus_types::deployment::PendingMgsUpdates; +use nexus_types::deployment::SledFilter; +use omicron_common::api::external::DataPageParams; use omicron_uuid_kinds::GenericUuid; use omicron_uuid_kinds::OmicronZoneUuid; -use slog::info; +use qorb::resolver::Resolver; +use qorb::resolvers::fixed::FixedResolver; +use slog::{debug, info}; use std::net::SocketAddr; +use std::net::SocketAddrV6; +use std::num::NonZeroU32; use std::sync::Arc; +use std::time::Duration; +use tokio::sync::watch; use update_engine::EventBuffer; use update_engine::NestedError; use update_engine::display::LineDisplay; @@ -62,6 +76,11 @@ struct ReconfiguratorExec { #[arg(long, default_value = "[fd00:1122:3344:3::1]:53")] dns_server: SocketAddr, + /// enable MGS-managed updates (will keep the process running after + /// blueprint execution completes) + #[arg(long, default_value_t = false)] + mgs_updates: bool, + /// Color output #[arg(long, value_enum, default_value_t)] color: ColorChoice, @@ -96,7 +115,7 @@ impl ReconfiguratorExec { .context("creating datastore")?, ); - let result = self.do_exec(log, &datastore).await; + let result = self.do_exec(log, &datastore, &qorb_resolver).await; datastore.terminate().await; result } @@ -105,6 +124,7 @@ impl ReconfiguratorExec { &self, log: slog::Logger, datastore: &Arc, + qorb_resolver: &internal_dns_resolver::QorbResolver, ) -> Result<(), anyhow::Error> { info!(&log, "setting up arguments for execution"); let opctx = OpContext::for_tests(log.clone(), datastore.clone()); @@ -156,6 +176,60 @@ impl ReconfiguratorExec { event_buffer }); + // If requested, set up a driver for MGS-managed updates. + let (mgs_updates, mgs) = if self.mgs_updates { + info!(&log, "setting up MGS update driver"); + let mut mgs_resolver = qorb_resolver + .for_service(ServiceName::ManagementGatewayService); + let mgs_rx = mgs_resolver.monitor(); + // Pick an arbitrary in-service sled to act as our repo depot + // server. + let repo_depot_sled = datastore + .sled_list( + &opctx, + &DataPageParams { + marker: None, + direction: PaginationOrder::Ascending, + limit: NonZeroU32::new(1).expect("1 is not 0"), + }, + SledFilter::TufArtifactReplication, + ) + .await + .context("listing sleds")? + .into_iter() + .next() + .ok_or_else(|| anyhow!("found no sleds with TUF artifacts"))?; + let repo_depot_addr = SocketAddrV6::new( + *repo_depot_sled.ip, + *repo_depot_sled.repo_depot_port, + 0, + 0, + ); + let mut repo_depot_resolver = + FixedResolver::new([SocketAddr::from(repo_depot_addr)]); + let artifact_cache = Arc::new(ArtifactCache::new( + log.clone(), + repo_depot_resolver.monitor(), + )); + let (requests_tx, requests_rx) = + watch::channel(PendingMgsUpdates::new()); + let driver = MgsUpdateDriver::new( + log.clone(), + artifact_cache, + requests_rx, + mgs_rx, + Duration::from_secs(20), + ); + let status_rx = driver.status_rx(); + let driver_task = tokio::spawn(async move { + driver.run().await; + }); + (requests_tx, Some((driver_task, status_rx, repo_depot_resolver))) + } else { + let (mgs_updates, _rx) = watch::channel(PendingMgsUpdates::new()); + (mgs_updates, None) + }; + // This uuid uses similar conventions as the DB fixed data. It's // intended to be recognizable by a human (maybe just as "a little // strange looking") and ideally evoke this tool. @@ -171,6 +245,7 @@ impl ReconfiguratorExec { blueprint: &blueprint, creator: OmicronZoneUuid::from_untyped_uuid(creator), sender, + mgs_updates, } .into(), ) @@ -193,6 +268,37 @@ impl ReconfiguratorExec { } line_display.write_event_buffer(&event_buffer)?; + if let Some((driver_task, status_rx, mut repo_depot_resolver)) = mgs { + let nupdates = blueprint.pending_mgs_updates.len(); + info!( + &log, + "waiting for requested SP updates to complete"; + "nupdates" => nupdates + ); + + loop { + let status = status_rx.borrow(); + debug!(&log, "MGS update status"; "status" => ?status); + if !status.recent.is_empty() && status.in_progress.is_empty() { + break; + } + debug!( + &log, + "waiting for more updates"; + "nwaiting" => nupdates, + "nrecent" => status.recent.len(), + "nin_progress" => status.in_progress.len(), + ); + drop(status); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + } + + info!(&log, "waiting for repo depot resolver to stop"); + repo_depot_resolver.terminate().await; + info!(&log, "waiting for driver to stop"); + driver_task.await.context("waiting for driver to stop")?; + } + rv.map(|_| ()) } } diff --git a/dev-tools/reconfigurator-sp-updater/Cargo.toml b/dev-tools/reconfigurator-sp-updater/Cargo.toml new file mode 100644 index 00000000000..08f71d561c0 --- /dev/null +++ b/dev-tools/reconfigurator-sp-updater/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "reconfigurator-sp-updater" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +[lints] +workspace = true + +[dependencies] +anyhow.workspace = true +clap.workspace = true +chrono.workspace = true +dropshot.workspace = true +futures.workspace = true +gateway-client.workspace = true +humantime.workspace = true +internal-dns-resolver.workspace = true +internal-dns-types.workspace = true +nexus-mgs-updates.workspace = true +nexus-types.workspace = true +omicron-repl-utils.workspace = true +qorb.workspace = true +serde_json.workspace = true +slog.workspace = true +tokio = { workspace = true, features = [ "full" ] } +tufaceous-artifact.workspace = true +omicron-workspace-hack.workspace = true + +[[bin]] +name = "reconfigurator-sp-updater" +path = "src/main.rs" diff --git a/dev-tools/reconfigurator-sp-updater/src/main.rs b/dev-tools/reconfigurator-sp-updater/src/main.rs new file mode 100644 index 00000000000..1099ae2d8c8 --- /dev/null +++ b/dev-tools/reconfigurator-sp-updater/src/main.rs @@ -0,0 +1,481 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Interactively manage SP updates from the command line + +use anyhow::Context; +use anyhow::anyhow; +use chrono::SecondsFormat; +use clap::Args; +use clap::ColorChoice; +use clap::Parser; +use clap::Subcommand; +use futures::StreamExt; +use gateway_client::types::SpIgnition; +use gateway_client::types::SpType; +use internal_dns_types::names::ServiceName; +use nexus_mgs_updates::ArtifactCache; +use nexus_mgs_updates::DriverStatus; +use nexus_mgs_updates::MgsUpdateDriver; +use nexus_types::deployment::ExpectedVersion; +use nexus_types::deployment::PendingMgsUpdate; +use nexus_types::deployment::PendingMgsUpdateDetails; +use nexus_types::deployment::PendingMgsUpdates; +use nexus_types::inventory::BaseboardId; +use omicron_repl_utils::run_repl_on_stdin; +use qorb::resolver::Resolver; +use qorb::resolvers::fixed::FixedResolver; +use slog::{info, o, warn}; +use std::collections::BTreeMap; +use std::fmt::Write; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::watch; +use tufaceous_artifact::ArtifactHash; +use tufaceous_artifact::ArtifactHashId; +use tufaceous_artifact::ArtifactKind; +use tufaceous_artifact::ArtifactVersion; +use tufaceous_artifact::KnownArtifactKind; + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let args = ReconfiguratorSpUpdater::parse(); + + if let Err(error) = args.exec().await { + eprintln!("error: {:#}", error); + std::process::exit(1); + } + + Ok(()) +} + +/// Execute blueprints from the command line +#[derive(Debug, Parser)] +struct ReconfiguratorSpUpdater { + /// log level filter + #[arg( + env, + long, + value_parser = parse_dropshot_log_level, + default_value = "info", + )] + log_level: dropshot::ConfigLoggingLevel, + + /// an internal DNS server in this deployment + // This default value is currently appropriate for all deployed systems. + // That relies on two assumptions: + // + // 1. The internal DNS servers' underlay addresses are at a fixed location + // from the base of the AZ subnet. This is unlikely to change, since the + // DNS servers must be discoverable with virtually no other information. + // 2. The AZ subnet used for all deployments today is fixed. + // + // For simulated systems (e.g., `cargo xtask omicron-dev run-all`), or if + // these assumptions change in the future, we may need to adjust this. + #[arg(long, default_value = "[fd00:1122:3344:3::1]:53")] + dns_server: SocketAddr, + + /// HOST:PORT for a TUF repo depot server + repo_depot_addr: SocketAddr, + + /// Color output + #[arg(long, value_enum, default_value_t)] + color: ColorChoice, +} + +fn parse_dropshot_log_level( + s: &str, +) -> Result { + serde_json::from_str(&format!("{:?}", s)).context("parsing log level") +} + +impl ReconfiguratorSpUpdater { + async fn exec(self) -> Result<(), anyhow::Error> { + let log = dropshot::ConfigLogging::StderrTerminal { + level: self.log_level.clone(), + } + .to_logger("reconfigurator-sp-updater") + .context("failed to create logger")?; + + info!(&log, "setting up resolver"); + let qorb_resolver = + internal_dns_resolver::QorbResolver::new(vec![self.dns_server]); + + let mut mgs_resolver = + qorb_resolver.for_service(ServiceName::ManagementGatewayService); + let mut mgs_rx = mgs_resolver.monitor(); + + // Fetch an initial inventory from MGS. We'll use this to allow users + // to specify just a serial number and have us look up the rest of the + // information that we need. + info!(&log, "resolve MGS in DNS"); + let mgs_url = { + let mgs_backends = mgs_rx + .wait_for(|all_backends| !all_backends.is_empty()) + .await + .context("waiting to resolve MGS in DNS")?; + let mgs_backend = mgs_backends + .values() + .next() + .expect("we just waited for this condition"); + format!("http://{}", mgs_backend.address) + }; + let mgs_client = gateway_client::Client::new( + &mgs_url, + log.new(o!("mgs_url" => mgs_url.clone())), + ); + let inventory = Inventory::load(&log, mgs_client) + .await + .context("loading inventory")?; + info!(&log, "loaded inventory from MGS"); + + let mut repo_depot_resolver = + FixedResolver::new([self.repo_depot_addr]); + let artifact_cache = Arc::new(ArtifactCache::new( + log.clone(), + repo_depot_resolver.monitor(), + )); + + let (requests_tx, requests_rx) = + watch::channel(PendingMgsUpdates::new()); + + let driver = MgsUpdateDriver::new( + log.clone(), + artifact_cache, + requests_rx, + mgs_rx, + Duration::from_secs(20), + ); + let status_rx = driver.status_rx(); + let driver_task = tokio::spawn(async move { driver.run().await }); + + let mut updater_state = + UpdaterState { requests_tx, status_rx, inventory }; + + run_repl_on_stdin(&mut |cmd: TopLevelArgs| { + process_cmd(&mut updater_state, cmd) + })?; + + info!(&log, "waiting for qorb to shut down"); + mgs_resolver.terminate().await; + repo_depot_resolver.terminate().await; + info!(&log, "waiting for driver task to stop"); + drop(updater_state); + driver_task.await.context("waiting for driver task")?; + + Ok(()) + } +} + +struct UpdaterState { + requests_tx: watch::Sender, + status_rx: watch::Receiver, + inventory: Inventory, +} + +struct Inventory { + sps_by_serial: BTreeMap, +} + +impl Inventory { + fn info_for_serial(&self, serial: &str) -> anyhow::Result<&SpInfo> { + self.sps_by_serial.get(serial).ok_or_else(|| { + anyhow!("did not find serial number in inventory: {:?}", serial) + }) + } +} + +struct SpInfo { + baseboard_id: Arc, + sp_type: SpType, + sp_slot_id: u32, +} + +impl Inventory { + pub async fn load( + log: &slog::Logger, + mgs_client: gateway_client::Client, + ) -> anyhow::Result { + let sp_list_ignition = mgs_client + .ignition_list() + .await + .context("listing ignition")? + .into_inner(); + + let c = &mgs_client; + let sp_infos = futures::stream::iter( + sp_list_ignition.iter().filter_map(|ignition| { + if matches!(ignition.details, SpIgnition::Yes { .. }) { + Some(ignition.id) + } else { + None + } + }), + ) + .then(async move |sp_id| { + c.sp_get(sp_id.type_, sp_id.slot) + .await + .with_context(|| format!("fetching info about SP {:?}", sp_id)) + .map(|s| (sp_id, s)) + }) + .collect::>>() + .await + .into_iter() + .filter_map(|r| match r { + Ok((sp_id, v)) => Some((sp_id, v.into_inner())), + Err(error) => { + warn!( + log, + "error getting SP state"; + "error" => #?error, + ); + None + } + }) + .collect::>(); + + let sps_by_serial = sp_infos + .into_iter() + .map(|(sp_id, sp_state)| { + let baseboard_id = Arc::new(BaseboardId { + serial_number: sp_state.serial_number, + part_number: sp_state.model, + }); + let serial_number = baseboard_id.serial_number.clone(); + let sp_info = SpInfo { + baseboard_id, + sp_type: sp_id.type_, + sp_slot_id: sp_id.slot, + }; + (serial_number, sp_info) + }) + .collect(); + + Ok(Inventory { sps_by_serial }) + } +} + +/// Processes one "line" of user input. +fn process_cmd( + updater_state: &mut UpdaterState, + cmd: TopLevelArgs, +) -> anyhow::Result> { + let TopLevelArgs { command } = cmd; + match command { + Commands::Config => cmd_config(updater_state), + Commands::Status => cmd_status(updater_state), + Commands::Set(args) => cmd_set(updater_state, args), + Commands::Delete(args) => cmd_delete(updater_state, args), + } +} + +// clap configuration for the REPL commands + +/// reconfigurator-sp-updater: interactively manage SP updates +#[derive(Debug, Parser)] +struct TopLevelArgs { + #[command(subcommand)] + command: Commands, +} + +#[derive(Debug, Subcommand)] +enum Commands { + /// Show configured updates + Config, + /// Show status of recent and in-progress updates + Status, + /// Configure an update + Set(SetArgs), + /// Delete a configured update + Delete(DeleteArgs), +} + +fn cmd_config( + updater_state: &mut UpdaterState, +) -> anyhow::Result> { + let configured = updater_state.requests_tx.borrow(); + + let mut s = String::new(); + writeln!(&mut s, "configured updates ({}):", configured.len())?; + for update in &*configured { + let baseboard_id = &update.baseboard_id; + writeln!( + &mut s, + " part {} serial {} (type {:?} slot {}):", + baseboard_id.part_number, + baseboard_id.serial_number, + update.sp_type, + update.slot_id, + )?; + writeln!(&mut s, " artifact hash: {}", update.artifact_hash_id,)?; + writeln!( + &mut s, + " user-provided artifact version: {}", + update.artifact_version, + )?; + match &update.details { + PendingMgsUpdateDetails::Sp { + expected_active_version, + expected_inactive_version, + } => { + writeln!( + &mut s, + " preconditions: active slot {:?}, inactive slot {:?}", + expected_active_version, expected_inactive_version, + )?; + } + } + + writeln!(&mut s)?; + } + + Ok(Some(s)) +} + +fn cmd_status( + updater_state: &mut UpdaterState, +) -> anyhow::Result> { + let status = updater_state.status_rx.borrow(); + + let mut s = String::new(); + writeln!(&mut s, "recent completed attempts:")?; + for r in &status.recent { + // Ignore units smaller than a millisecond. + let elapsed = Duration::from_millis( + u64::try_from(r.elapsed.as_millis()) + .context("elapsed time too large")?, + ); + writeln!( + &mut s, + " {} to {} (took {}): serial {}", + r.time_started.to_rfc3339_opts(SecondsFormat::Millis, true), + r.time_done.to_rfc3339_opts(SecondsFormat::Millis, true), + humantime::format_duration(elapsed), + r.request.baseboard_id.serial_number, + )?; + writeln!(&mut s, " attempt#: {}", r.nattempts_done)?; + writeln!(&mut s, " version: {}", r.request.artifact_version)?; + writeln!( + &mut s, + " hash: {}", + r.request.artifact_hash_id.hash + )?; + writeln!(&mut s, " result: {:?}", r.result)?; + } + + writeln!(&mut s, "\ncurrently in progress:")?; + for (baseboard_id, status) in &status.in_progress { + // Ignore units smaller than a millisecond. + let elapsed = Duration::from_millis( + u64::try_from(status.instant_started.elapsed().as_millis()) + .context("total runtime was too large")?, + ); + writeln!( + &mut s, + " {}: serial {}: {:?} (attempt {}, running {})", + status.time_started.to_rfc3339_opts(SecondsFormat::Millis, true), + baseboard_id.serial_number, + status.status, + status.nattempts_done + 1, + humantime::format_duration(elapsed), + )?; + } + + writeln!(&mut s, "\nwaiting for retry:")?; + for (baseboard_id, wait_info) in &status.waiting { + writeln!( + &mut s, + " serial {}: will try again at {} (attempt {})", + baseboard_id.serial_number, + wait_info.next_attempt_time, + wait_info.nattempts_done + 1, + )?; + } + + Ok(Some(s)) +} + +#[derive(Debug, Args)] +struct SetArgs { + /// serial number to update + serial: String, + /// artifact hash id + artifact_hash: ArtifactHash, + /// version + version: String, + /// component to update + #[command(subcommand)] + component: Component, +} + +#[derive(Clone, Debug, Subcommand)] +enum Component { + Sp { + expected_active_version: ArtifactVersion, + expected_inactive_version: ExpectedVersion, + }, +} + +fn cmd_set( + updater_state: &mut UpdaterState, + args: SetArgs, +) -> anyhow::Result> { + let serial = &args.serial; + let info = updater_state.inventory.info_for_serial(serial)?; + let known_artifact_kind = match (&args.component, info.sp_type) { + (Component::Sp { .. }, SpType::Sled) => KnownArtifactKind::GimletSp, + (Component::Sp { .. }, SpType::Power) => KnownArtifactKind::PscSp, + (Component::Sp { .. }, SpType::Switch) => KnownArtifactKind::SwitchSp, + }; + let artifact_kind = ArtifactKind::from_known(known_artifact_kind); + let artifact_hash_id = + ArtifactHashId { kind: artifact_kind, hash: args.artifact_hash }; + let request = PendingMgsUpdate { + baseboard_id: info.baseboard_id.clone(), + sp_type: info.sp_type, + slot_id: info.sp_slot_id, + details: match args.component { + Component::Sp { + expected_active_version, + expected_inactive_version, + } => PendingMgsUpdateDetails::Sp { + expected_active_version, + expected_inactive_version, + }, + }, + artifact_hash_id, + artifact_version: ArtifactVersion::new(args.version) + .context("parsing artifact version")?, + }; + + updater_state.requests_tx.send_modify(|requests| { + requests.insert(request); + }); + + Ok(Some(format!("updated configuration for {serial}"))) +} + +#[derive(Debug, Args)] +struct DeleteArgs { + /// serial number of SP with update to delete + serial: String, +} + +fn cmd_delete( + updater_state: &mut UpdaterState, + args: DeleteArgs, +) -> anyhow::Result> { + let serial = &args.serial; + let baseboard_id = + &updater_state.inventory.info_for_serial(serial)?.baseboard_id; + let changed = updater_state + .requests_tx + .send_if_modified(|requests| requests.remove(&baseboard_id).is_some()); + if changed { + Ok(Some(format!("deleted configured update for serial {serial}"))) + } else { + Err(anyhow!("no update was configured for serial {serial}")) + } +} diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 4e8c0b56187..0d6478790ab 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -57,6 +57,7 @@ macaddr.workspace = true nexus-config.workspace = true nexus-external-api.workspace = true nexus-internal-api.workspace = true +nexus-mgs-updates.workspace = true nexus-networking.workspace = true nexus-saga-recovery.workspace = true nexus-test-interface.workspace = true diff --git a/nexus/db-queries/src/db/datastore/deployment.rs b/nexus/db-queries/src/db/datastore/deployment.rs index 122852e6f48..88d46a13570 100644 --- a/nexus/db-queries/src/db/datastore/deployment.rs +++ b/nexus/db-queries/src/db/datastore/deployment.rs @@ -59,6 +59,7 @@ use nexus_types::deployment::BlueprintSledConfig; use nexus_types::deployment::BlueprintTarget; use nexus_types::deployment::ClickhouseClusterConfig; use nexus_types::deployment::CockroachDbPreserveDowngrade; +use nexus_types::deployment::PendingMgsUpdates; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; @@ -892,6 +893,9 @@ impl DataStore { Ok(Blueprint { id: blueprint_id, + // TODO these need to be serialized to the database. + // See oxidecomputer/omicron#7981. + pending_mgs_updates: PendingMgsUpdates::new(), sleds: sled_configs, parent_blueprint_id, internal_dns_version, diff --git a/nexus/db-queries/src/db/datastore/rack.rs b/nexus/db-queries/src/db/datastore/rack.rs index 23e6e3a3fd6..c3138f94eda 100644 --- a/nexus/db-queries/src/db/datastore/rack.rs +++ b/nexus/db-queries/src/db/datastore/rack.rs @@ -1012,6 +1012,7 @@ mod test { use nexus_sled_agent_shared::inventory::OmicronZoneDataset; use nexus_types::deployment::BlueprintSledConfig; use nexus_types::deployment::CockroachDbPreserveDowngrade; + use nexus_types::deployment::PendingMgsUpdates; use nexus_types::deployment::{ BlueprintZoneConfig, OmicronZoneExternalFloatingAddr, OmicronZoneExternalFloatingIp, @@ -1054,6 +1055,7 @@ mod test { blueprint: Blueprint { id: BlueprintUuid::new_v4(), sleds: BTreeMap::new(), + pending_mgs_updates: PendingMgsUpdates::new(), cockroachdb_setting_preserve_downgrade: CockroachDbPreserveDowngrade::DoNotModify, parent_blueprint_id: None, @@ -1539,6 +1541,7 @@ mod test { let blueprint = Blueprint { id: BlueprintUuid::new_v4(), sleds: make_sled_config_only_zones(blueprint_zones), + pending_mgs_updates: PendingMgsUpdates::new(), cockroachdb_setting_preserve_downgrade: CockroachDbPreserveDowngrade::DoNotModify, parent_blueprint_id: None, @@ -1796,6 +1799,7 @@ mod test { let blueprint = Blueprint { id: BlueprintUuid::new_v4(), sleds: make_sled_config_only_zones(blueprint_zones), + pending_mgs_updates: PendingMgsUpdates::new(), cockroachdb_setting_preserve_downgrade: CockroachDbPreserveDowngrade::DoNotModify, parent_blueprint_id: None, @@ -2002,6 +2006,7 @@ mod test { let blueprint = Blueprint { id: BlueprintUuid::new_v4(), sleds: make_sled_config_only_zones(blueprint_zones), + pending_mgs_updates: PendingMgsUpdates::new(), cockroachdb_setting_preserve_downgrade: CockroachDbPreserveDowngrade::DoNotModify, parent_blueprint_id: None, @@ -2138,6 +2143,7 @@ mod test { let blueprint = Blueprint { id: BlueprintUuid::new_v4(), sleds: make_sled_config_only_zones(blueprint_zones), + pending_mgs_updates: PendingMgsUpdates::new(), cockroachdb_setting_preserve_downgrade: CockroachDbPreserveDowngrade::DoNotModify, parent_blueprint_id: None, diff --git a/nexus/mgs-updates/Cargo.toml b/nexus/mgs-updates/Cargo.toml index 265350e95c4..e3b753d1fdc 100644 --- a/nexus/mgs-updates/Cargo.toml +++ b/nexus/mgs-updates/Cargo.toml @@ -7,12 +7,25 @@ edition = "2021" workspace = true [dependencies] +chrono.workspace = true futures.workspace = true gateway-client.workspace = true +id-map.workspace = true +internal-dns-resolver.workspace = true +internal-dns-types.workspace = true +omicron-common.workspace = true +nexus-types.workspace = true +qorb.workspace = true +repo-depot-client.workspace = true reqwest.workspace = true +sha2.workspace = true slog.workspace = true +slog-error-chain.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-stream.workspace = true +tokio-util.workspace = true +tufaceous-artifact.workspace = true uuid.workspace = true omicron-workspace-hack.workspace = true diff --git a/nexus/mgs-updates/src/artifacts.rs b/nexus/mgs-updates/src/artifacts.rs new file mode 100644 index 00000000000..f8dfef42a41 --- /dev/null +++ b/nexus/mgs-updates/src/artifacts.rs @@ -0,0 +1,118 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Makes artifact contents available for use in updates + +use futures::TryStreamExt; +use sha2::{Digest, Sha256}; +use slog::o; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use thiserror::Error; +use tokio::{io::AsyncWriteExt, sync::watch}; +use tufaceous_artifact::ArtifactHash; + +type RepoDepotError = repo_depot_client::Error; + +/// Makes update artifact contents available to consumers that need it +// This implementation is currently very minimal. It doesn't actually cache and +// doesn't avoid concurrent fetches for the same object. +pub struct ArtifactCache { + log: slog::Logger, + repo_depot_backends: watch::Receiver, + next: AtomicUsize, +} + +impl ArtifactCache { + pub fn new( + log: slog::Logger, + repo_depot_backends: watch::Receiver, + ) -> ArtifactCache { + ArtifactCache { log, repo_depot_backends, next: AtomicUsize::new(0) } + } + + /// Retrieve the entire contents of the artifact identified by `hash` + /// + /// Since this will buffer the whole artifact in memory, this should only be + /// used for artifacts known to be relatively small. + pub async fn artifact_contents( + &self, + hash: &ArtifactHash, + ) -> Result, ArtifactCacheError> { + let client = self.client()?; + let writer = std::io::Cursor::new(Vec::new()); + let byte_stream = client + .artifact_get_by_sha256(&hash.to_string()) + .await? + .into_inner() + .into_inner(); + let mut sha256 = Sha256::new(); + let mut nbytes = 0; + let writer = byte_stream + .map_err(ArtifactCacheError::Read) + .try_fold(writer, |mut writer, chunk| { + nbytes += chunk.len(); + sha256.update(&chunk); + async move { + writer + .write_all(&chunk) + .await + .map_err(ArtifactCacheError::Buffer)?; + Ok(writer) + } + }) + .await?; + let buffer = writer.into_inner(); + let digest = sha256.finalize(); + if digest.as_slice() != hash.as_ref() { + return Err(ArtifactCacheError::HashMismatch { + nbytes, + found: ArtifactHash(digest.into()), + expected: *hash, + }); + } + + Ok(buffer) + } + + fn client(&self) -> Result { + // It's important that we drop the borrowed value before returning so + // that we don't keep the watch channel locked. + // + // "next" is used to try to avoid re-using the same client every time. + // But it's not critical that we go in any particular order. + let idx = self.next.fetch_add(1, Ordering::SeqCst); + let clients = self.repo_depot_backends.borrow(); + if clients.is_empty() { + Err(ArtifactCacheError::NoClients) + } else { + let addresses: Vec<_> = clients.values().collect(); + let addr = addresses[idx % addresses.len()]; + let url = format!("http://{}", addr.address); + let log = self.log.new(o!("repo_depot_url" => url.clone())); + Ok(repo_depot_client::Client::new(&url, log)) + } + } +} + +#[derive(Debug, Error)] +pub enum ArtifactCacheError { + #[error("no repo depot clients available")] + NoClients, + + #[error("failed to fetch artifact")] + Fetch(#[from] RepoDepotError), + + #[error("reading artifact")] + Read(reqwest::Error), + + #[error("buffering data for artifact")] + Buffer(std::io::Error), + + #[error( + "artifact hash mismatch (read {nbytes} bytes, expected {expected}, \ + found {found})" + )] + HashMismatch { expected: ArtifactHash, found: ArtifactHash, nbytes: usize }, +} diff --git a/nexus/mgs-updates/src/common_sp_update.rs b/nexus/mgs-updates/src/common_sp_update.rs index 7cfbacc29d1..9e68055cb7f 100644 --- a/nexus/mgs-updates/src/common_sp_update.rs +++ b/nexus/mgs-updates/src/common_sp_update.rs @@ -7,14 +7,22 @@ use super::MgsClients; use super::UpdateProgress; +use futures::future::BoxFuture; use gateway_client::types::SpType; use gateway_client::types::SpUpdateStatus; +use nexus_types::deployment::ExpectedVersion; +use nexus_types::deployment::PendingMgsUpdate; use slog::Logger; use slog::{debug, error, info, warn}; use std::time::Duration; +use thiserror::Error; use tokio::sync::watch; +use tufaceous_artifact::ArtifactVersion; use uuid::Uuid; +/// How frequently do we poll MGS for the update progress? +pub(crate) const STATUS_POLL_INTERVAL: Duration = Duration::from_secs(3); + type GatewayClientError = gateway_client::Error; /// Error type returned when an update to a component managed by the SP fails. @@ -45,7 +53,12 @@ pub enum SpComponentUpdateError { UpdateFailedWithMessage(String), } -pub(super) trait SpComponentUpdater { +/// Describes an update to a component for which the SP drives the update +/// +/// This trait is essentially historical at this point. We maintain impls so +/// that we have tested reference implementations. But these will eventually be +/// migrated to `ReconfiguratorSpComponentUpdater` instead. +pub trait SpComponentUpdater { /// The target component. /// /// Should be produced via `SpComponent::const_as_str()`. @@ -81,9 +94,6 @@ pub(super) async fn deliver_update( updater: &(dyn SpComponentUpdater + Send + Sync), mgs_clients: &mut MgsClients, ) -> Result<(), SpComponentUpdateError> { - // How frequently do we poll MGS for the update progress? - const STATUS_POLL_INTERVAL: Duration = Duration::from_secs(3); - // Start the update. mgs_clients .try_all_serially(updater.logger(), |client| async move { @@ -238,3 +248,66 @@ fn status_is_complete( } } } + +/// Provides helper functions used while updating a particular SP component +pub trait SpComponentUpdateHelper { + /// Checks if the component is already updated or ready for update + fn precheck<'a>( + &'a self, + log: &'a slog::Logger, + mgs_clients: &'a mut MgsClients, + update: &'a PendingMgsUpdate, + ) -> BoxFuture<'a, Result>; + + /// Attempts once to perform any post-update actions (e.g., reset the + /// device) + fn post_update<'a>( + &'a self, + log: &'a slog::Logger, + mgs_clients: &'a mut MgsClients, + update: &'a PendingMgsUpdate, + ) -> BoxFuture<'a, Result<(), GatewayClientError>>; +} + +/// Describes the live state of the component before the update begins +#[derive(Debug)] +pub enum PrecheckStatus { + UpdateComplete, + ReadyForUpdate, +} + +#[derive(Debug, Error)] +pub enum PrecheckError { + #[error("communicating with MGS")] + GatewayClientError(#[from] GatewayClientError), + + #[error( + "in {sp_type} slot {slot_id}, expected to find + part {expected_part:?} serial {expected_serial:?}, but found + part {found_part:?} serial {found_serial:?}" + )] + WrongDevice { + sp_type: SpType, + slot_id: u32, + expected_part: String, + expected_serial: String, + found_part: String, + found_serial: String, + }, + + #[error( + "expected to find active version {expected:?}, but found {found:?}" + )] + WrongActiveVersion { expected: ArtifactVersion, found: String }, + + #[error( + "expected to find inactive version {expected:?}, but found {found:?}" + )] + WrongInactiveVersion { expected: ExpectedVersion, found: FoundVersion }, +} + +#[derive(Debug)] +pub enum FoundVersion { + MissingVersion, + Version(String), +} diff --git a/nexus/mgs-updates/src/driver.rs b/nexus/mgs-updates/src/driver.rs new file mode 100644 index 00000000000..c563c4b9094 --- /dev/null +++ b/nexus/mgs-updates/src/driver.rs @@ -0,0 +1,612 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Drive one or more in-progress MGS-managed updates + +use crate::ArtifactCache; +use crate::driver_update::ApplyUpdateError; +use crate::driver_update::SpComponentUpdate; +use crate::driver_update::UpdateCompletedHow; +use crate::driver_update::apply_update; +use crate::sp_updater::ReconfiguratorSpUpdater; +use futures::FutureExt; +use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; +use gateway_client::SpComponent; +use id_map::IdMap; +use id_map::IdMappable; +use nexus_types::deployment::PendingMgsUpdate; +use nexus_types::deployment::PendingMgsUpdates; +use nexus_types::inventory::BaseboardId; +use qorb::resolver::AllBackends; +use slog::{error, info, o, warn}; +use slog_error_chain::InlineErrorChain; +use std::collections::BTreeMap; +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use tokio::sync::watch; +use tokio_stream::StreamExt; +use tokio_util::time::DelayQueue; +use tokio_util::time::delay_queue; +use uuid::Uuid; + +/// How many recent completions to keep track of (for debugging) +const N_RECENT_COMPLETIONS: usize = 16; + +/// Drive one or more MGS-managed updates +/// +/// Use [`MgsUpdateDriver::new()`] to create a new one of these. You configure +/// the set of updates that should be driven by writing to its watch channel. +/// Use [`MgsUpdateDriver::status_rx()`] to get a `watch::Receiver` where you +/// can check on the status of updates being managed by this driver. Use +/// [`MgsUpdateDriver::run()`] to drive updates. +/// +/// - If a requested update is added to the channel, an attempt will be made to +/// apply the update promptly. +/// - For each update attempt: +/// - If the live system state reflects that the update has been completed, +/// then no action is taken. +/// - If the live system state reflects that the preconditions aren't true +/// (e.g., the device is currently running from a different slot than +/// expected), then no action is taken. +/// - If the live system state reflects that an update is already in progress, +/// we'll wait for that one to complete. +/// - If the process appears to be stuck (whether this driver is running it or +/// a different one is), it will be aborted. This is really only intended +/// to catch pathological cases like a partitioned Nexus or a failure of the +/// MGS through which we're sending the update image. +/// - Once the update attempt completes, regardless of the outcome, it will be +/// tried again later. +/// - If an update is ongoing when it gets removed from the channel, it is *not* +/// cancelled. It will run to completion. But it will not be restarted again +/// even if it fails. +/// +/// It's assumed that other instances may exist running the same process. We +/// may find their updates in progress, we may need to cancel them if they +/// appear stuck, and ours may get cancelled at any point. These are presumed +/// to be very unlikely. +pub struct MgsUpdateDriver { + // helpers + log: slog::Logger, + /// source of artifacts used for updates + artifacts: Arc, + /// dynamically-changing set of MGS backends (provided by qorb) + mgs_rx: watch::Receiver, + + // inputs + /// set of updates requested by our consumer + requests_rx: watch::Receiver, + /// how long to wait between attempts (successful or otherwise) + retry_timeout: Duration, + + // outputs + /// status of updates we're working on or recently finished + status_tx: watch::Sender, + + // internal state tracking + /// holds the futures that are each performing one update attempt + futures: FuturesUnordered>, + /// tracks the next timer we're waiting on for retries + delayq: DelayQueue>, + + /// tracks update attempts that are in-progress right now + in_progress: IdMap, + /// tracks update attempts that are not running right now + /// (but waiting for a retry) + waiting: IdMap, +} + +impl MgsUpdateDriver { + pub fn new( + log: slog::Logger, + artifacts: Arc, + requests_rx: watch::Receiver, + mgs_rx: watch::Receiver, + retry_timeout: Duration, + ) -> MgsUpdateDriver { + let (status_tx, _) = watch::channel(DriverStatus { + recent: VecDeque::with_capacity(N_RECENT_COMPLETIONS), + in_progress: BTreeMap::new(), + waiting: BTreeMap::new(), + }); + + MgsUpdateDriver { + log, + artifacts, + mgs_rx, + requests_rx, + retry_timeout, + status_tx, + futures: FuturesUnordered::new(), + delayq: DelayQueue::new(), + in_progress: IdMap::new(), + waiting: IdMap::new(), + } + } + + /// Returns a `watch::Receiver` that you can use to inspect the state of + /// in-progress, waiting, and recently completed update attempts. + pub fn status_rx(&self) -> watch::Receiver { + self.status_tx.subscribe() + } + + /// Runs the driver + /// + /// You generally want to run this in its own tokio task. This will not + /// return until one of the input channels has closed. + pub async fn run(mut self) { + info!(&self.log, "starting MgsUpdateDriver"); + loop { + tokio::select! { + // See if we've received an updated configuration. + // Per the docs on [`tokio::select!], + // `watch::Receiver::changed()` is cancel-safe. + maybe_update = self.requests_rx.changed() => { + match maybe_update { + Ok(()) => { + self.on_config_changed(); + } + Err(error) => { + info!( + &self.log, + "shutting down \ + (failed to read from input channel)"; + InlineErrorChain::new(&error) + ); + break; + } + } + } + + // See if any update attempts have completed. + // + // Avoid waiting on an empty FuturesUnordered. Doing so would + // cause it to immediately return None, terminating the Stream + // altogether. + // + // tokio_stream::StreamExt::next() is documented to be + // cancel-safe. + maybe_work_done = self.futures.next(), + if !self.futures.is_empty() => { + match maybe_work_done { + Some(result) => self.on_attempt_done(result), + None => { + error!( + &self.log, + "FutureUnordered unexpectedly ended" + ); + break; + } + }; + }, + + // See if the timer has fired for any update awaiting retry. + // tokio_stream::StreamExt::next() is documented to be + // cancel-safe. + maybe_timer_expired = self.delayq.next(), + if !self.delayq.is_empty() => { + match maybe_timer_expired { + Some(expired) => { + let baseboard_id = expired.into_inner(); + self.on_retry_timer_expired(baseboard_id); + }, + None => { + error!( + &self.log, + "DelayQueue unexpectedly ended" + ); + break; + } + } + } + } + } + } + + /// Examines the configuration and decides what work needs to be kicked off. + fn on_config_changed(&mut self) { + // We'll take two passes: + // + // 1. Look at each request in the configuration and decide what to do + // with each one. + // 2. Take the appropriate action for each one. + // + // Importantly, we can drop the config (unblocking the "watch" channel + // and also dropping a shared reference on `self`) after pass 1. + + let (to_stop_waiting, to_dispatch) = { + let new_config = self.requests_rx.borrow_and_update(); + + // Stop waiting to retry any requests whose config has changed or + // been removed, + let to_stop_waiting: Vec<_> = self + .waiting + .iter() + .filter_map(|waiting| { + let baseboard_id = + &waiting.internal_request.request.baseboard_id; + match new_config.get(baseboard_id) { + None => true, + Some(new_request) => { + *new_request != waiting.internal_request.request + } + } + .then(|| baseboard_id.clone()) + }) + .collect(); + + // Dispatch new requests if either: + // + // - we're waiting to retry and the config has changed + // (overlaps with the case above) + // - we're not waiting to retry and not already running an attempt + let to_dispatch: Vec<_> = new_config + .iter() + .filter_map(|new_request| { + let baseboard_id = &new_request.baseboard_id; + let do_dispatch = + if let Some(waiting) = self.waiting.get(baseboard_id) { + *new_request != waiting.internal_request.request + } else { + !self.in_progress.contains_key(baseboard_id) + }; + do_dispatch.then(|| InternalRequest { + request: (*new_request).clone(), + nattempts_done: 0, + }) + }) + .collect(); + + (to_stop_waiting, to_dispatch) + }; + + // Process the requests for which we've decided to stop waiting. + for baseboard_id in &to_stop_waiting { + // Update our bookkeeping. + // unwrap(): we filtered on this condition above. + let waiting = self.waiting.remove(baseboard_id).unwrap(); + + // Stop tracking this timeout. + self.delayq.remove(&waiting.delay_key); + } + // Update the status to reflect that. + self.status_tx.send_modify(|driver_status| { + for baseboard_id in &to_stop_waiting { + driver_status.waiting.remove(baseboard_id); + } + }); + + // Now dispatch new update attempts. + for internal_request in to_dispatch { + self.start_attempt(internal_request); + } + } + + fn start_attempt(&mut self, internal_request: InternalRequest) { + let request = &internal_request.request; + let baseboard_id = &request.baseboard_id; + assert!(!self.in_progress.contains_key(baseboard_id)); + + let update_id = Uuid::new_v4(); + let log = self.log.new(o!( + request.clone(), + "update_id" => update_id.to_string() + )); + info!(&log, "begin update attempt for baseboard"); + + let (sp_update, updater) = match &request.details { + nexus_types::deployment::PendingMgsUpdateDetails::Sp { .. } => { + let sp_update = SpComponentUpdate { + log: log.clone(), + component: SpComponent::SP_ITSELF, + target_sp_type: request.sp_type, + target_sp_slot: request.slot_id, + // The SP has two firmware slots, but they're aren't + // individually labeled. We always request an update to slot + // 0, which (confusingly in this context) means "the + // inactive slot". + firmware_slot: 0, + update_id, + }; + + (sp_update, Box::new(ReconfiguratorSpUpdater {})) + } + }; + + let baseboard_id = baseboard_id.clone(); + let nattempts_done = internal_request.nattempts_done; + let request = internal_request.request.clone(); + let in_progress = InProgressUpdate { + log: log.clone(), + time_started: chrono::Utc::now(), + instant_started: Instant::now(), + internal_request, + }; + + // Update status. We do this before starting the future because it will + // update this status and expects to find it. + self.status_tx.send_modify(|driver_status| { + driver_status.in_progress.insert( + baseboard_id.clone(), + InProgressUpdateStatus { + time_started: in_progress.time_started, + instant_started: in_progress.instant_started, + status: UpdateAttemptStatus::NotStarted, + nattempts_done, + }, + ); + }); + + let status_updater = UpdateAttemptStatusUpdater { + tx: self.status_tx.clone(), + baseboard_id: baseboard_id.clone(), + }; + let artifacts = self.artifacts.clone(); + let mgs_rx = self.mgs_rx.clone(); + let future = async move { + let result = apply_update( + artifacts, + &sp_update, + &*updater, + mgs_rx, + &request, + status_updater, + ) + .await; + UpdateAttemptResult { baseboard_id: request.baseboard_id, result } + } + .boxed(); + + // Keep track of the work. + self.futures.push(future); + + // Update our bookkeeping. + assert!(self.in_progress.insert(in_progress).is_none()); + } + + /// Invoked when an in-progress update attempt has completed. + fn on_attempt_done(&mut self, result: UpdateAttemptResult) { + // Load the in-progress state and generate a CompletedAttempt status for + // this attempt. + let in_progress = self + .in_progress + .remove(&result.baseboard_id) + .expect("in-progress record for attempt that just completed"); + let nattempts_done = in_progress.internal_request.nattempts_done + 1; + let completed = CompletedAttempt { + time_started: in_progress.time_started, + time_done: chrono::Utc::now(), + elapsed: in_progress.instant_started.elapsed(), + request: in_progress.internal_request.request.clone(), + result: result + .result + .map_err(|error| InlineErrorChain::new(&error).to_string()), + nattempts_done, + }; + let internal_request = InternalRequest { + request: in_progress.internal_request.request, + nattempts_done, + }; + + // Log the result. + match &completed.result { + Ok(success) => { + info!( + &in_progress.log, + "update attempt done"; + "elapsed_millis" => completed.elapsed.as_millis(), + "result" => ?success, + ); + } + Err(error) => { + info!( + &in_progress.log, + "update attempt done"; + "elapsed_millis" => completed.elapsed.as_millis(), + "error" => error, + ) + } + }; + + // Regardless of the result, set a timer for retrying. Our job is to + // ensure reality matches our configuration, so even if we succeeded, we + // want to check again in a little while to see if anything has changed. + let baseboard_id = completed.request.baseboard_id.clone(); + let retry_timeout = self.retry_timeout; + let status_time_next = chrono::Utc::now() + retry_timeout; + let delay_key = self.delayq.insert(baseboard_id.clone(), retry_timeout); + self.waiting.insert(WaitingAttempt { delay_key, internal_request }); + + // Update the overall status to reflect all these changes. + self.status_tx.send_modify(|driver_status| { + // Remove this item from the list of in-progress attempts. + let found = driver_status.in_progress.remove(&baseboard_id); + assert!(found.is_some()); + + // Add this item to the list of requests waiting to be retried. + driver_status.waiting.insert( + baseboard_id.clone(), + WaitingStatus { + next_attempt_time: status_time_next, + nattempts_done, + }, + ); + + // Report this recently-completed attempt. + // This is a ringbuffer of recent attempts. Make space if we're + // already at the capacity. + let recent = &mut driver_status.recent; + if recent.len() == recent.capacity() { + let _ = recent.pop_front(); + } + recent.push_back(completed); + }); + } + + /// Invoked when the timer fires to retry a particular request. + fn on_retry_timer_expired(&mut self, baseboard_id: Arc) { + // Remove this request from the set of requests waiting to be retried. + let waiting = self + .waiting + .remove(&baseboard_id) + .expect("waiting request for expired retry timer"); + // Update the external status to reflect that. + self.status_tx.send_modify(|driver_status| { + driver_status.waiting.remove(&baseboard_id); + }); + + // Find the current configuration for this request. + // + // It generally ought to be here because if it got removed, we'd have + // gotten a notification and cancelled the retry timer. But it's + // conceivable that the retry timer fired, then it was removed from the + // configuration (concurrently), and we just noticed that before having + // processed the change notification. + let my_request = { + let current_config = self.requests_rx.borrow(); + let Some(my_request) = current_config.get(&baseboard_id).cloned() + else { + warn!( + &self.log, + "attempted retry of baseboard whose update is no longer \ + configured"; + &*baseboard_id + ); + return; + }; + + my_request + }; + + // Dispatch another attempt. + info!( + &self.log, + "dispatching new attempt (retry timer expired)"; &*baseboard_id + ); + self.start_attempt(InternalRequest { + request: my_request, + nattempts_done: waiting.internal_request.nattempts_done, + }); + } +} + +/// information tracked for each request +struct InternalRequest { + request: PendingMgsUpdate, + nattempts_done: u32, +} + +/// internal bookkeeping for each in-progress update +struct InProgressUpdate { + log: slog::Logger, + time_started: chrono::DateTime, + instant_started: Instant, + internal_request: InternalRequest, +} + +impl IdMappable for InProgressUpdate { + type Id = Arc; + fn id(&self) -> Self::Id { + self.internal_request.request.baseboard_id.clone() + } +} + +/// internal result of a completed update attempt +struct UpdateAttemptResult { + baseboard_id: Arc, + result: Result, +} + +/// internal bookkeeping for an update that's awaiting retry +struct WaitingAttempt { + delay_key: delay_queue::Key, + internal_request: InternalRequest, +} + +impl WaitingAttempt { + fn baseboard_id(&self) -> &Arc { + &self.internal_request.request.baseboard_id + } +} + +impl IdMappable for WaitingAttempt { + type Id = Arc; + fn id(&self) -> Self::Id { + self.baseboard_id().clone() + } +} + +/// Interface used by update attempts to update just their part of the overall +/// `DriverStatus` +pub(crate) struct UpdateAttemptStatusUpdater { + tx: watch::Sender, + baseboard_id: Arc, +} + +impl UpdateAttemptStatusUpdater { + pub(crate) fn update(&self, new_status: UpdateAttemptStatus) { + self.tx.send_modify(|driver_status| { + // unwrap(): this UpdateAttemptStatusUpdater's lifetime is bound by + // the future that owns it. The status entry for this future lives + // in the `in_progress` struct until it completes. Thus, we should + // always have a value here. + let my_status = + driver_status.in_progress.get_mut(&self.baseboard_id).unwrap(); + my_status.status = new_status; + }); + } +} + +// Externally-visible status + +/// Status of ongoing update attempts, recently completed attempts, and update +/// requests that are waiting for retry. +#[derive(Debug)] +pub struct DriverStatus { + pub recent: VecDeque, + pub in_progress: BTreeMap, InProgressUpdateStatus>, + pub waiting: BTreeMap, WaitingStatus>, +} + +/// externally-exposed status for a completed attempt +#[derive(Debug)] +pub struct CompletedAttempt { + pub time_started: chrono::DateTime, + pub time_done: chrono::DateTime, + pub elapsed: Duration, + pub request: PendingMgsUpdate, + pub result: Result, + pub nattempts_done: u32, +} + +/// externally-exposed status for each in-progress update +#[derive(Debug)] +pub struct InProgressUpdateStatus { + pub time_started: chrono::DateTime, + pub instant_started: std::time::Instant, + pub status: UpdateAttemptStatus, + pub nattempts_done: u32, +} + +/// status of a single update attempt +#[derive(Clone, Debug)] +pub enum UpdateAttemptStatus { + NotStarted, + FetchingArtifact, + Precheck, + Updating, + UpdateWaiting, + PostUpdate, + PostUpdateWait, + Done, +} + +/// externally-exposed status for waiting updates +#[derive(Debug)] +pub struct WaitingStatus { + pub next_attempt_time: chrono::DateTime, + pub nattempts_done: u32, +} diff --git a/nexus/mgs-updates/src/driver_update.rs b/nexus/mgs-updates/src/driver_update.rs new file mode 100644 index 00000000000..d695ab3aa98 --- /dev/null +++ b/nexus/mgs-updates/src/driver_update.rs @@ -0,0 +1,576 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Concurrent-safe facilities for doing MGS-managed upates + +use crate::common_sp_update::PrecheckError; +use crate::common_sp_update::PrecheckStatus; +use crate::common_sp_update::STATUS_POLL_INTERVAL; +use crate::common_sp_update::SpComponentUpdateHelper; +use crate::driver::UpdateAttemptStatus; +use crate::driver::UpdateAttemptStatusUpdater; +use crate::mgs_clients::GatewayClientError; +use crate::{ArtifactCache, ArtifactCacheError, MgsClients}; +use gateway_client::SpComponent; +use gateway_client::types::UpdateAbortBody; +use gateway_client::types::{SpType, SpUpdateStatus}; +use nexus_types::deployment::PendingMgsUpdate; +use qorb::resolver::AllBackends; +use slog::{debug, error, info, o, warn}; +use slog_error_chain::InlineErrorChain; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use thiserror::Error; +use tokio::sync::watch; +use uuid::Uuid; + +/// How long may the status remain unchanged without us treating this as a +/// problem? +const PROGRESS_TIMEOUT: Duration = Duration::from_secs(120); + +/// How long to wait between failed attempts to reset the device +const RESET_DELAY_INTERVAL: Duration = Duration::from_secs(10); + +/// How long to wait between poll attempts on update status +const PROGRESS_POLL_INTERVAL: Duration = Duration::from_secs(10); + +/// Timeout for repeat attempts +pub const DEFAULT_RETRY_TIMEOUT: Duration = Duration::from_secs(60); + +/// How long to wait after resetting the device before expecting it to come up +const RESET_TIMEOUT: Duration = Duration::from_secs(60); + +/// Parameters describing a request to update one SP-managed component +/// +/// This is similar in spirit to the `SpComponentUpdater` trait but uses a +/// struct-based interface instead. +pub struct SpComponentUpdate { + pub log: slog::Logger, + pub component: SpComponent, + pub target_sp_type: SpType, + pub target_sp_slot: u32, + pub firmware_slot: u16, + pub update_id: Uuid, +} + +impl SpComponentUpdate { + fn component(&self) -> &str { + self.component.const_as_str() + } +} + +#[derive(Clone, Debug)] +pub enum UpdateCompletedHow { + FoundNoChangesNeeded, + CompletedUpdate, + WaitedForConcurrentUpdate, + TookOverConcurrentUpdate, +} + +#[derive(Debug, Error)] +pub enum ApplyUpdateError { + #[error("found no MGS backends in DNS")] + NoMgsBackends, + #[error("failed to fetch artifact")] + FetchArtifact(#[from] ArtifactCacheError), + #[error("preconditions were not met")] + PreconditionFailed(#[source] PrecheckError), + #[error("SP reports update {0} was aborted")] + SpUpdateAborted(Uuid), + #[error("SP reports update {0} failed: {1:?}")] + SpUpdateFailed(Uuid, String), + #[error("SP not knowing about our update attempt")] + SpUpdateLost, + #[error( + "gave up after {}ms waiting for update {0} to finish", + .1.as_millis()) + ] + StuckUpdating(Uuid, Duration), + #[error("failed to abort in-progress SP update")] + SpUpdateAbortFailed(#[from] AbortError), + #[error("SP reports that reset failed: {0:?}")] + SpResetFailed(String), + + #[error("failed waiting for artifact delivery")] + DeliveryWaitError(#[from] DeliveryWaitError), + #[error("error communicating with MGS")] + UpdateStartError(#[source] GatewayClientError), + #[error( + "timed out after {}ms waiting for update to finish", + .0.as_millis() + )] + ResetTimeoutError(Duration), + #[error("waiting for update to finish")] + WaitError(#[source] PrecheckError), +} + +/// Makes one complete attempt to apply the specified software update to an SP +/// component. +/// +/// `sp_update` specifies the target component, expected previous version, +/// expected new version, etc. +/// +/// This operation is made up of multiple steps that have to happen in order: +/// upload the artifact to MGS, wait for the SP to finish writing it, +/// potentially change the active slot, reset the device, etc. At any point, +/// it's possible that a different Nexus instance decides we're out to lunch and +/// aborts the process and potentially even starts its own. While this function +/// will poll forever as long as it looks like things are working, it will +/// return early if not (e.g., if we find that somebody else has aborted our +/// update or if there's been no progress for too long). +/// +/// On success, `UpdateCompletedHow` describes what was needed to complete the +/// update. +pub(crate) async fn apply_update( + artifacts: Arc, + sp_update: &SpComponentUpdate, + update_helper: &(dyn SpComponentUpdateHelper + Send + Sync), + mgs_rx: watch::Receiver, + update: &PendingMgsUpdate, + status: UpdateAttemptStatusUpdater, +) -> Result { + // Set up an instance of `MgsClients` to talk to MGS for the duration of + // this attempt. For each call to `try_serially()`, `MgsClients` will try + // the request against each MGS client that it has. That makes it possible + // to survive transient failure of MGS as long as one is working. If all + // are offline, these operations will fail. We rely on the higher-level + // operation retry to deal with that. + status.update(UpdateAttemptStatus::FetchingArtifact); + let log = &sp_update.log; + let mut mgs_clients = { + let backends = mgs_rx.borrow(); + if backends.is_empty() { + return Err(ApplyUpdateError::NoMgsBackends); + } + MgsClients::from_clients(backends.iter().map( + |(backend_name, backend)| { + gateway_client::Client::new( + &format!("http://{}", backend.address), + log.new(o!( + "mgs_backend_name" => backend_name.0.to_string(), + "mgs_backend_addr" => backend.address.to_string(), + )), + ) + }, + )) + + // It's important that `backends` is dropped at this point. Otherwise, + // we'll hold the watch channel lock while we do the long-running + // operations below. + }; + + // Obtain the contents of the artifact that we need. + let data = + artifacts.artifact_contents(&update.artifact_hash_id.hash).await?; + debug!(log, "loaded artifact contents"); + + // Check the live state first to see if: + // - this update has already been completed, or + // - if not, then if our required preconditions are met + status.update(UpdateAttemptStatus::Precheck); + match update_helper.precheck(log, &mut mgs_clients, update).await { + Ok(PrecheckStatus::ReadyForUpdate) => (), + Ok(PrecheckStatus::UpdateComplete) => { + return Ok(UpdateCompletedHow::FoundNoChangesNeeded); + } + Err(error) => { + return Err(ApplyUpdateError::PreconditionFailed(error)); + } + }; + + // Start the update. + debug!(log, "ready to start update"); + status.update(UpdateAttemptStatus::Updating); + let sp_type = sp_update.target_sp_type; + let sp_slot = sp_update.target_sp_slot; + let component = sp_update.component(); + let my_update_id = sp_update.update_id; + + let update_start = mgs_clients + .try_all_serially(log, |client| { + let data = data.clone(); + async move { + client + .sp_component_update( + sp_type, + sp_slot, + component, + sp_update.firmware_slot, + &sp_update.update_id, + reqwest::Body::from(data.clone()), + ) + .await?; + info!( + log, + "update started"; + "mgs_addr" => client.baseurl(), + ); + Ok(()) + } + }) + .await; + + match update_start { + Ok(()) => { + debug!(log, "started update"); + } + Err(error) => { + // TODO We need a better way to identify this error. + let chain = InlineErrorChain::new(&error); + let message = chain.to_string(); + if !message.contains("update still in progress") { + error!(log, "failed to start update"; chain); + return Err(ApplyUpdateError::UpdateStartError(error)); + } + + // There's another one ongoing. That's fine. + // We'll handle this below. + debug!(log, "watching existing update"); + } + }; + + status.update(UpdateAttemptStatus::UpdateWaiting); + let our_update = + match wait_for_delivery(&mut mgs_clients, sp_update).await? { + DeliveryWaitStatus::Completed(id) => id == my_update_id, + DeliveryWaitStatus::Aborted(id) => { + warn!( + log, + "SP reports update was aborted"; + "aborted_update_id" => id.to_string() + ); + return Err(ApplyUpdateError::SpUpdateAborted(id)); + } + DeliveryWaitStatus::NotRunning => { + // This is a little weird. The SP has likely been reset. + warn!(log, "SP unexpectedly reports no update in progress"); + return Err(ApplyUpdateError::SpUpdateLost); + } + + // For any of the following cases: something went wrong with the + // update. It needs to be explicitly aborted before anybody can try + // again. We'll attempt the abort and then report the specific + // error. The caller will have to do the retry if they want it. + DeliveryWaitStatus::StuckUpdating(id, timeout) => { + abort_update(&mut mgs_clients, sp_update, id, "stuck").await?; + return Err(ApplyUpdateError::StuckUpdating(id, timeout)); + } + + DeliveryWaitStatus::Failed(id, message) => { + abort_update(&mut mgs_clients, sp_update, id, "failed").await?; + return Err(ApplyUpdateError::SpUpdateFailed(id, message)); + } + }; + + // If we were the one doing the update, then we're responsible for + // any post-update action (generally, resetting the device). + // + // Regardless of whether we were the one doing the update, we want to wait + // up to PROGRESS_TIMEOUT for it to come back on the new version. + // + // If we were *not* the one doing the update, and it doesn't come back on + // the new version after PROGRESS_TIMEOUT, then we assume the Nexus doing + // the update may be stuck or crashed and we *do* want to issue the reset + // ourselves. + let try_reset = if our_update { + true + } else { + match wait_for_update_done( + log, + update_helper, + &mut mgs_clients, + update, + PROGRESS_TIMEOUT, + ) + .await + { + Ok(_) => false, + Err(UpdateWaitError::Timeout(_)) => { + warn!(log, "update takeover: sending reset"); + true + } + Err(UpdateWaitError::Indeterminate(error)) => { + return Err(ApplyUpdateError::WaitError(error)); + } + } + }; + + debug!(log, "delivered artifact"); + status.update(UpdateAttemptStatus::PostUpdate); + + if try_reset { + // We retry this until we get some error *other* than a communication + // error. There is intentionally no timeout here. If we've staged an + // update but not managed to reset the device, there's no point where + // we'd want to stop trying to do so. + while let Err(error) = + update_helper.post_update(log, &mut mgs_clients, update).await + { + if !matches!(error, gateway_client::Error::CommunicationError(_)) { + let error = InlineErrorChain::new(&error); + error!(log, "post_update failed"; &error); + return Err(ApplyUpdateError::SpResetFailed(error.to_string())); + } + + tokio::time::sleep(RESET_DELAY_INTERVAL).await; + } + } + + // Regardless of whether it was our job to reset the device, wait for it to + // come back on the new version. + status.update(UpdateAttemptStatus::PostUpdateWait); + let rv = match wait_for_update_done( + log, + update_helper, + &mut mgs_clients, + update, + RESET_TIMEOUT, + ) + .await + { + Ok(()) => { + let how = match (our_update, try_reset) { + (true, _) => UpdateCompletedHow::CompletedUpdate, + (false, false) => UpdateCompletedHow::WaitedForConcurrentUpdate, + (false, true) => UpdateCompletedHow::TookOverConcurrentUpdate, + }; + Ok(how) + } + Err(UpdateWaitError::Timeout(error)) => { + Err(ApplyUpdateError::ResetTimeoutError(error)) + } + Err(UpdateWaitError::Indeterminate(error)) => { + Err(ApplyUpdateError::WaitError(error)) + } + }; + + status.update(UpdateAttemptStatus::Done); + rv +} + +enum DeliveryWaitStatus { + /// the SP does not know about the update we're waiting for + /// + /// This can happen if the SP was reset, possibly because the update + /// completed successfully. + NotRunning, + /// the SP reports that this update was aborted + Aborted(Uuid), + /// the SP reports that this update was completed + Completed(Uuid), + /// the SP reports that this update failed + Failed(Uuid, String), + /// we gave up because the update stopped making forward progress for too + /// long + StuckUpdating(Uuid, Duration), +} + +#[derive(Debug, Error)] +pub enum DeliveryWaitError { + #[error("error communicating with MGS")] + MgsCommunication(#[from] GatewayClientError), +} + +/// Waits for the delivery (upload) phase of the specified update to complete +/// +/// This is used both when we're the one doing the update and when some other +/// component is doing it. +/// +/// This returns early if the update stops for any reason or stops making +/// forward progress for too long. +/// +/// This returns an error only when the state is indeterminate. Otherwise, it +/// returns a description of the resting state. +async fn wait_for_delivery( + mgs_clients: &mut MgsClients, + update: &SpComponentUpdate, +) -> Result { + let mut last_status = None; + let mut last_progress = Instant::now(); + let log = &update.log; + let sp_type = update.target_sp_type; + let sp_slot = update.target_sp_slot; + let component = update.component(); + + loop { + let status = mgs_clients + .try_all_serially(log, |client| async move { + let update_status = client + .sp_component_update_status(sp_type, sp_slot, component) + .await?; + + debug!( + log, + "got update status"; + "mgs_addr" => client.baseurl(), + "status" => ?update_status, + ); + + Ok(update_status) + }) + .await? + .into_inner(); + + match status { + SpUpdateStatus::None => { + return Ok(DeliveryWaitStatus::NotRunning); + } + SpUpdateStatus::Preparing { id, .. } + | SpUpdateStatus::InProgress { id, .. } => { + if let Some(last) = last_status.replace(status.clone()) { + if last == status { + if last_progress.elapsed() > PROGRESS_TIMEOUT { + error!( + log, + "progress timeout"; + "status" => ?status, + "timeout_ms" => PROGRESS_TIMEOUT.as_millis(), + ); + + return Ok(DeliveryWaitStatus::StuckUpdating( + id, + PROGRESS_TIMEOUT, + )); + } + } else { + last_progress = Instant::now(); + } + } + } + SpUpdateStatus::Complete { id } => { + return Ok(DeliveryWaitStatus::Completed(id)); + } + SpUpdateStatus::Aborted { id } => { + return Ok(DeliveryWaitStatus::Aborted(id)); + } + SpUpdateStatus::Failed { code, id } => { + return Ok(DeliveryWaitStatus::Failed( + id, + format!("code {code}"), + )); + } + SpUpdateStatus::RotError { id, message } => { + return Ok(DeliveryWaitStatus::Failed( + id, + format!("RoT error: {message}"), + )); + } + } + + tokio::time::sleep(STATUS_POLL_INTERVAL).await; + } +} + +#[derive(Debug, Error)] +#[error( + "error aborting update {update_id} (reason: {reason}): error \ + communicating with MGS" +)] +pub struct AbortError { + update_id: Uuid, + reason: String, + #[source] + error: GatewayClientError, +} + +async fn abort_update( + mgs_clients: &mut MgsClients, + sp_update: &SpComponentUpdate, + update_id: Uuid, + reason: &str, +) -> Result<(), AbortError> { + let log = &sp_update.log; + let sp_type = sp_update.target_sp_type; + let sp_slot = sp_update.target_sp_slot; + let component = sp_update.component(); + + warn!( + log, + "aborting in-progress SP component update"; + "update_id" => update_id.to_string(), + "reason" => reason, + ); + + mgs_clients + .try_all_serially(log, |mgs_client| async move { + let arg = UpdateAbortBody { id: update_id }; + mgs_client + .sp_component_update_abort(sp_type, sp_slot, component, &arg) + .await + }) + .await + .map_err(|error| AbortError { + update_id, + reason: reason.to_string(), + error, + })?; + Ok(()) +} + +/// Errors returned from `wait_for_update_done()`. +#[derive(Debug, Error)] +enum UpdateWaitError { + #[error("timed out after {0:?}")] + Timeout(Duration), + #[error("found unexpected state while waiting for update")] + Indeterminate(#[source] PrecheckError), +} + +/// Waits for the specified update to completely finish (by polling) +/// +/// "Finish" here means that the component is online in the final state +/// reflected by the update (e.g., with the expected software in the active +/// slot). If a reset was required as part of the update, then the component +/// will have come back online on the new software. +/// +/// This is called after the caller has determined than a particular update +/// (from one specific version to another) is ongoing, potentially driven by a +/// different Nexus instance. +/// +/// Returns early with an error if it's determined that the target component is +/// running neither the old software (prior to the update) nor the new one +/// (after the update). +async fn wait_for_update_done( + log: &slog::Logger, + updater: &(dyn SpComponentUpdateHelper + Send + Sync), + mgs_clients: &mut MgsClients, + update: &PendingMgsUpdate, + timeout: Duration, +) -> Result<(), UpdateWaitError> { + let before = Instant::now(); + + // We retry this until we get some error *other* than a communication + // error or the caller wants to give up due to a timeout. + + loop { + let precheck = updater.precheck(log, mgs_clients, update).await; + debug!(log, "precheck result"; "precheck" => ?precheck); + match updater.precheck(log, mgs_clients, update).await { + // Check if we're done. + Ok(PrecheckStatus::UpdateComplete) => return Ok(()), + + // An incorrect version in the "inactive" slot is normal during the + // upgrade. We have no reason to think this won't converge so we + // proceed with waiting. + Err(PrecheckError::GatewayClientError(_)) + | Err(PrecheckError::WrongInactiveVersion { .. }) + | Ok(PrecheckStatus::ReadyForUpdate) => { + if before.elapsed() >= timeout { + return Err(UpdateWaitError::Timeout(timeout)); + } + + tokio::time::sleep(PROGRESS_POLL_INTERVAL).await; + continue; + } + + Err(error @ PrecheckError::WrongDevice { .. }) + | Err(error @ PrecheckError::WrongActiveVersion { .. }) => { + // Stop trying to make this update happen. It's not going to + // happen. + return Err(UpdateWaitError::Indeterminate(error)); + } + } + } +} diff --git a/nexus/mgs-updates/src/lib.rs b/nexus/mgs-updates/src/lib.rs index 18508a1451f..2a69dfb7846 100644 --- a/nexus/mgs-updates/src/lib.rs +++ b/nexus/mgs-updates/src/lib.rs @@ -11,13 +11,23 @@ //! - root of trust bootloader (bootleby) //! - host phase 1 image (Helios phase 1) +mod artifacts; mod common_sp_update; +mod driver; +mod driver_update; mod host_phase1_updater; mod mgs_clients; mod rot_updater; mod sp_updater; +pub use artifacts::ArtifactCache; +pub use artifacts::ArtifactCacheError; pub use common_sp_update::SpComponentUpdateError; +pub use common_sp_update::SpComponentUpdateHelper; +pub use common_sp_update::SpComponentUpdater; +pub use driver::DriverStatus; +pub use driver::MgsUpdateDriver; +pub use driver_update::DEFAULT_RETRY_TIMEOUT; pub use host_phase1_updater::HostPhase1Updater; pub use mgs_clients::MgsClients; pub use rot_updater::RotUpdater; diff --git a/nexus/mgs-updates/src/sp_updater.rs b/nexus/mgs-updates/src/sp_updater.rs index 355ac551636..b66e8f94c64 100644 --- a/nexus/mgs-updates/src/sp_updater.rs +++ b/nexus/mgs-updates/src/sp_updater.rs @@ -4,15 +4,24 @@ //! Module containing types for updating SPs via MGS. -use super::MgsClients; -use super::SpComponentUpdateError; -use super::UpdateProgress; -use super::common_sp_update::SpComponentUpdater; -use super::common_sp_update::deliver_update; +use crate::MgsClients; +use crate::SpComponentUpdateError; +use crate::SpComponentUpdateHelper; +use crate::UpdateProgress; +use crate::common_sp_update::FoundVersion; +use crate::common_sp_update::PrecheckError; +use crate::common_sp_update::PrecheckStatus; +use crate::common_sp_update::SpComponentUpdater; +use crate::common_sp_update::deliver_update; +use futures::FutureExt; +use futures::future::BoxFuture; use gateway_client::SpComponent; use gateway_client::types::SpType; +use nexus_types::deployment::ExpectedVersion; +use nexus_types::deployment::PendingMgsUpdate; +use nexus_types::deployment::PendingMgsUpdateDetails; use slog::Logger; -use slog::info; +use slog::{debug, info}; use tokio::sync::watch; use uuid::Uuid; @@ -140,3 +149,160 @@ impl SpComponentUpdater for SpUpdater { &self.log } } + +pub struct ReconfiguratorSpUpdater; +impl SpComponentUpdateHelper for ReconfiguratorSpUpdater { + /// Checks if the component is already updated or ready for update + fn precheck<'a>( + &'a self, + log: &'a slog::Logger, + mgs_clients: &'a mut MgsClients, + update: &'a PendingMgsUpdate, + ) -> BoxFuture<'a, Result> { + async move { + // Verify that the device is the one we think it is. + let state = mgs_clients + .try_all_serially(log, move |mgs_client| async move { + mgs_client.sp_get(update.sp_type, update.slot_id).await + }) + .await? + .into_inner(); + debug!(log, "found SP state"; "state" => ?state); + if state.model != update.baseboard_id.part_number + || state.serial_number != update.baseboard_id.serial_number + { + return Err(PrecheckError::WrongDevice { + sp_type: update.sp_type, + slot_id: update.slot_id, + expected_part: update.baseboard_id.part_number.clone(), + expected_serial: update.baseboard_id.serial_number.clone(), + found_part: state.model, + found_serial: state.serial_number, + }); + } + + // Fetch the caboose from the currently active slot. + let caboose = mgs_clients + .try_all_serially(log, move |mgs_client| async move { + mgs_client + .sp_component_caboose_get( + update.sp_type, + update.slot_id, + &SpComponent::SP_ITSELF.to_string(), + 0, + ) + .await + }) + .await? + .into_inner(); + debug!(log, "found active slot caboose"; "caboose" => ?caboose); + + // If the version in the currently active slot matches the one we're + // trying to set, then there's nothing to do. + if caboose.version == update.artifact_version.as_str() { + return Ok(PrecheckStatus::UpdateComplete); + } + + // Otherwise, if the version in the currently active slot does not + // match what we expect to find, bail out. It may be that somebody + // else has come along and completed a subsequent update and we + // don't want to roll that back. (If for some reason we *do* want + // to do this update, the planner will have to notice that what's + // here is wrong and update the blueprint.) + let PendingMgsUpdateDetails::Sp { + expected_active_version, + expected_inactive_version, + } = &update.details; + if caboose.version != expected_active_version.to_string() { + return Err(PrecheckError::WrongActiveVersion { + expected: expected_active_version.clone(), + found: caboose.version, + }); + } + + // For the same reason, check that the version in the inactive slot + // matches what we expect to find. + // TODO It's important for us to detect the condition that a caboose + // is invalid because this can happen when devices are programmed + // with a bad image. Unfortunately, MGS currently reports this as a + // 503. Besides being annoying for us to look for, this causes + // `try_all_serially()` to try the other MGS. That's pointless + // here, but not a big deal. + let found_inactive_caboose_result = mgs_clients + .try_all_serially(log, move |mgs_client| async move { + mgs_client + .sp_component_caboose_get( + update.sp_type, + update.slot_id, + &SpComponent::SP_ITSELF.to_string(), + 1, + ) + .await + }) + .await; + let found_version = match found_inactive_caboose_result { + Ok(version) => { + FoundVersion::Version(version.into_inner().version) + } + Err(error) => { + let message = format!("{error:?}"); + if message.contains("the image caboose does not contain") + || message + .contains("the image does not include a caboose") + { + FoundVersion::MissingVersion + } else { + return Err(PrecheckError::from(error)); + } + } + }; + match (&expected_inactive_version, &found_version) { + // expected garbage, found garbage + ( + ExpectedVersion::NoValidVersion, + FoundVersion::MissingVersion, + ) => (), + // expected a specific version and found it + ( + ExpectedVersion::Version(artifact_version), + FoundVersion::Version(found_version), + ) if artifact_version.to_string() == *found_version => (), + // anything else is a mismatch + (ExpectedVersion::NoValidVersion, FoundVersion::Version(_)) + | (ExpectedVersion::Version(_), FoundVersion::MissingVersion) + | (ExpectedVersion::Version(_), FoundVersion::Version(_)) => { + return Err(PrecheckError::WrongInactiveVersion { + expected: expected_inactive_version.clone(), + found: found_version, + }); + } + }; + + Ok(PrecheckStatus::ReadyForUpdate) + } + .boxed() + } + + /// Attempts once to perform any post-update actions (e.g., reset the + /// device) + fn post_update<'a>( + &'a self, + log: &'a slog::Logger, + mgs_clients: &'a mut MgsClients, + update: &'a PendingMgsUpdate, + ) -> BoxFuture<'a, Result<(), GatewayClientError>> { + mgs_clients + .try_all_serially(log, move |mgs_client| async move { + debug!(log, "attempting to reset device"); + mgs_client + .sp_component_reset( + update.sp_type, + update.slot_id, + &SpComponent::SP_ITSELF.to_string(), + ) + .await?; + Ok(()) + }) + .boxed() + } +} diff --git a/nexus/reconfigurator/execution/Cargo.toml b/nexus/reconfigurator/execution/Cargo.toml index 2f13bc0962e..9439adcc8f5 100644 --- a/nexus/reconfigurator/execution/Cargo.toml +++ b/nexus/reconfigurator/execution/Cargo.toml @@ -26,6 +26,7 @@ newtype-uuid.workspace = true nexus-config.workspace = true nexus-db-model.workspace = true nexus-db-queries.workspace = true +nexus-mgs-updates.workspace = true nexus-networking.workspace = true nexus-sled-agent-shared.workspace = true nexus-types.workspace = true diff --git a/nexus/reconfigurator/execution/src/dns.rs b/nexus/reconfigurator/execution/src/dns.rs index 35e72328ae8..c704b65a27b 100644 --- a/nexus/reconfigurator/execution/src/dns.rs +++ b/nexus/reconfigurator/execution/src/dns.rs @@ -339,6 +339,7 @@ mod test { pub use nexus_types::deployment::OmicronZoneExternalFloatingAddr; pub use nexus_types::deployment::OmicronZoneExternalFloatingIp; pub use nexus_types::deployment::OmicronZoneExternalSnatIp; + use nexus_types::deployment::PendingMgsUpdates; use nexus_types::deployment::SledFilter; use nexus_types::deployment::blueprint_zone_type; use nexus_types::external_api::params; @@ -688,6 +689,7 @@ mod test { let mut blueprint = Blueprint { id: BlueprintUuid::new_v4(), sleds: blueprint_sleds, + pending_mgs_updates: PendingMgsUpdates::new(), cockroachdb_setting_preserve_downgrade: CockroachDbPreserveDowngrade::DoNotModify, parent_blueprint_id: None, diff --git a/nexus/reconfigurator/execution/src/lib.rs b/nexus/reconfigurator/execution/src/lib.rs index 2cc5ec46148..875c5421ec2 100644 --- a/nexus/reconfigurator/execution/src/lib.rs +++ b/nexus/reconfigurator/execution/src/lib.rs @@ -11,8 +11,15 @@ use internal_dns_resolver::Resolver; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; use nexus_types::deployment::Blueprint; +use nexus_types::deployment::PendingMgsUpdates; use nexus_types::deployment::SledFilter; -use nexus_types::deployment::execution::*; +use nexus_types::deployment::execution::StepSkipped; +use nexus_types::deployment::execution::overridables; +use nexus_types::deployment::execution::{ + ComponentRegistrar, Event, ExecutionComponent, ExecutionStepId, + Overridables, ReconfiguratorExecutionSpec, SharedStepHandle, Sled, + StepHandle, StepResult, UpdateEngine, +}; use nexus_types::identity::Asset; use omicron_uuid_kinds::GenericUuid; use omicron_uuid_kinds::OmicronZoneUuid; @@ -22,6 +29,7 @@ use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; use std::sync::Arc; use tokio::sync::mpsc; +use tokio::sync::watch; use update_engine::StepSuccess; use update_engine::StepWarning; use update_engine::merge_anyhow_list; @@ -50,6 +58,7 @@ pub struct RealizeArgs<'a> { pub creator: OmicronZoneUuid, pub sender: mpsc::Sender, pub overrides: Option<&'a Overridables>, + pub mgs_updates: watch::Sender, } impl<'a> RealizeArgs<'a> { @@ -96,6 +105,7 @@ pub struct RequiredRealizeArgs<'a> { pub creator: OmicronZoneUuid, pub blueprint: &'a Blueprint, pub sender: mpsc::Sender, + pub mgs_updates: watch::Sender, } impl<'a> From> for RealizeArgs<'a> { @@ -109,6 +119,7 @@ impl<'a> From> for RealizeArgs<'a> { nexus_id: None, sender: value.sender, overrides: None, + mgs_updates: value.mgs_updates, } } } @@ -153,6 +164,7 @@ pub async fn realize_blueprint( creator, sender, overrides, + mgs_updates, } = exec_ctx; let opctx = opctx.child(BTreeMap::from([( @@ -262,6 +274,12 @@ pub async fn realize_blueprint( blueprint, ); + register_mgs_update_step( + &engine.for_component(ExecutionComponent::MgsUpdates), + blueprint, + mgs_updates, + ); + // All steps are registered, so execute the engine. let result = engine.execute().await?; @@ -632,3 +650,23 @@ fn register_cockroachdb_settings_step<'a>( ) .register(); } + +fn register_mgs_update_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + blueprint: &'a Blueprint, + sender: watch::Sender, +) { + registrar + .new_step( + ExecutionStepId::Ensure, + "Kick off MGS-managed updates", + move |_cx| async move { + let result = + sender.send(blueprint.pending_mgs_updates.clone()).context( + "failed to send to MgsUpdateDriver on watch channel", + ); + Ok(map_err_to_step_warning(result)) + }, + ) + .register(); +} diff --git a/nexus/reconfigurator/execution/src/test_utils.rs b/nexus/reconfigurator/execution/src/test_utils.rs index e678b5acb76..ad91890eb5a 100644 --- a/nexus/reconfigurator/execution/src/test_utils.rs +++ b/nexus/reconfigurator/execution/src/test_utils.rs @@ -9,13 +9,14 @@ use std::net::Ipv6Addr; use internal_dns_resolver::Resolver; use nexus_db_queries::{context::OpContext, db::DataStore}; use nexus_types::deployment::{ - Blueprint, + Blueprint, PendingMgsUpdates, execution::{EventBuffer, Overridables}, }; use omicron_uuid_kinds::OmicronZoneUuid; use update_engine::TerminalKind; use crate::{RealizeBlueprintOutput, RequiredRealizeArgs}; +use tokio::sync::watch; pub(crate) async fn realize_blueprint_and_expect( opctx: &OpContext, @@ -34,6 +35,8 @@ pub(crate) async fn realize_blueprint_and_expect( buffer }); + // This helper function does not support MGS-managed updates. + let (mgs_updates, _rx) = watch::channel(PendingMgsUpdates::new()); let nexus_id = OmicronZoneUuid::new_v4(); let output = crate::realize_blueprint( RequiredRealizeArgs { @@ -43,6 +46,7 @@ pub(crate) async fn realize_blueprint_and_expect( creator: nexus_id, blueprint, sender, + mgs_updates, } .with_overrides(overrides) .as_nexus(OmicronZoneUuid::new_v4()), diff --git a/nexus/reconfigurator/planning/src/blueprint_builder/builder.rs b/nexus/reconfigurator/planning/src/blueprint_builder/builder.rs index 710da6ab5c2..4780a2aef4c 100644 --- a/nexus/reconfigurator/planning/src/blueprint_builder/builder.rs +++ b/nexus/reconfigurator/planning/src/blueprint_builder/builder.rs @@ -83,6 +83,7 @@ use thiserror::Error; use super::ClickhouseZonesThatShouldBeRunning; use super::clickhouse::ClickhouseAllocator; +use nexus_types::deployment::PendingMgsUpdates; /// Errors encountered while assembling blueprints #[derive(Debug, Error)] @@ -454,6 +455,7 @@ impl<'a> BlueprintBuilder<'a> { Blueprint { id: rng.next_blueprint(), sleds, + pending_mgs_updates: PendingMgsUpdates::new(), parent_blueprint_id: None, internal_dns_version: Generation::new(), external_dns_version: Generation::new(), @@ -688,6 +690,10 @@ impl<'a> BlueprintBuilder<'a> { Blueprint { id: blueprint_id, sleds, + pending_mgs_updates: self + .parent_blueprint + .pending_mgs_updates + .clone(), parent_blueprint_id: Some(self.parent_blueprint.id), internal_dns_version: self.input.internal_dns_version(), external_dns_version: self.input.external_dns_version(), diff --git a/nexus/reconfigurator/planning/tests/output/example_builder_zone_counts_blueprint.txt b/nexus/reconfigurator/planning/tests/output/example_builder_zone_counts_blueprint.txt index 66d230fca31..414972e916b 100644 --- a/nexus/reconfigurator/planning/tests/output/example_builder_zone_counts_blueprint.txt +++ b/nexus/reconfigurator/planning/tests/output/example_builder_zone_counts_blueprint.txt @@ -487,3 +487,4 @@ parent: e35b2fdd-354d-48d9-acb5-703b2c269a54 internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 diff --git a/nexus/reconfigurator/planning/tests/output/planner_decommissions_sleds_bp2.txt b/nexus/reconfigurator/planning/tests/output/planner_decommissions_sleds_bp2.txt index 9e995b8a5fe..44c6265ab0e 100644 --- a/nexus/reconfigurator/planning/tests/output/planner_decommissions_sleds_bp2.txt +++ b/nexus/reconfigurator/planning/tests/output/planner_decommissions_sleds_bp2.txt @@ -294,3 +294,4 @@ parent: 516e80a3-b362-4fac-bd3c-4559717120dd internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 diff --git a/nexus/reconfigurator/planning/tests/output/planner_nonprovisionable_bp2.txt b/nexus/reconfigurator/planning/tests/output/planner_nonprovisionable_bp2.txt index ced3527bbb4..e00b1ed5064 100644 --- a/nexus/reconfigurator/planning/tests/output/planner_nonprovisionable_bp2.txt +++ b/nexus/reconfigurator/planning/tests/output/planner_nonprovisionable_bp2.txt @@ -466,3 +466,4 @@ parent: 4d4e6c38-cd95-4c4e-8f45-6af4d686964b internal DNS version: 1 external DNS version: 1 + PENDING MGS-MANAGED UPDATES: 0 diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index e6feebd7a4a..026f85fca3a 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -133,6 +133,7 @@ use nexus_config::DnsTasksConfig; use nexus_db_model::DnsGroup; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; +use nexus_types::deployment::PendingMgsUpdates; use omicron_uuid_kinds::OmicronZoneUuid; use oximeter::types::ProducerRegistry; use std::collections::BTreeMap; @@ -481,6 +482,7 @@ impl BackgroundTasksInitializer { rx_blueprint.clone(), nexus_id, task_saga_recovery.clone(), + args.mgs_updates_tx, ); let rx_blueprint_exec = blueprint_executor.watcher(); driver.register(TaskDefinition { @@ -942,6 +944,8 @@ pub struct BackgroundTasksData { pub saga_recovery: saga_recovery::SagaRecoveryHelpers>, /// Channel for TUF repository artifacts to be replicated out to sleds pub tuf_artifact_replication_rx: mpsc::Receiver, + /// Channel for configuring pending MGS updates + pub mgs_updates_tx: watch::Sender, } /// Starts the three DNS-propagation-related background tasks for either diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index fccba57a0bf..53b450400d1 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -14,7 +14,7 @@ use nexus_reconfigurator_execution::{ RealizeBlueprintOutput, RequiredRealizeArgs, }; use nexus_types::deployment::{ - Blueprint, BlueprintTarget, execution::EventBuffer, + Blueprint, BlueprintTarget, PendingMgsUpdates, execution::EventBuffer, }; use omicron_uuid_kinds::OmicronZoneUuid; use serde_json::json; @@ -31,6 +31,7 @@ pub struct BlueprintExecutor { nexus_id: OmicronZoneUuid, tx: watch::Sender, saga_recovery: Activator, + mgs_update_tx: watch::Sender, } impl BlueprintExecutor { @@ -42,6 +43,7 @@ impl BlueprintExecutor { >, nexus_id: OmicronZoneUuid, saga_recovery: Activator, + mgs_update_tx: watch::Sender, ) -> BlueprintExecutor { let (tx, _) = watch::channel(0); BlueprintExecutor { @@ -51,6 +53,7 @@ impl BlueprintExecutor { nexus_id, tx, saga_recovery, + mgs_update_tx, } } @@ -109,6 +112,7 @@ impl BlueprintExecutor { creator: self.nexus_id, blueprint, sender, + mgs_updates: self.mgs_update_tx.clone(), } .as_nexus(self.nexus_id), ) @@ -190,7 +194,7 @@ mod test { use nexus_types::deployment::{ Blueprint, BlueprintSledConfig, BlueprintTarget, BlueprintZoneConfig, BlueprintZoneDisposition, BlueprintZoneImageSource, BlueprintZoneType, - CockroachDbPreserveDowngrade, blueprint_zone_type, + CockroachDbPreserveDowngrade, PendingMgsUpdates, blueprint_zone_type, }; use nexus_types::external_api::views::SledState; use omicron_common::api::external; @@ -253,6 +257,7 @@ mod test { let blueprint = Blueprint { id, sleds: blueprint_sleds, + pending_mgs_updates: PendingMgsUpdates::new(), cockroachdb_setting_preserve_downgrade: CockroachDbPreserveDowngrade::DoNotModify, parent_blueprint_id: Some(current_target.target_id), @@ -357,12 +362,14 @@ mod test { } let (blueprint_tx, blueprint_rx) = watch::channel(None); + let (dummy_tx, _dummy_rx) = watch::channel(PendingMgsUpdates::new()); let mut task = BlueprintExecutor::new( datastore.clone(), resolver.clone(), blueprint_rx, OmicronZoneUuid::new_v4(), Activator::new(), + dummy_tx, ); // Now we're ready. diff --git a/nexus/src/app/background/tasks/blueprint_load.rs b/nexus/src/app/background/tasks/blueprint_load.rs index da2bc6dee2b..ba0dc248cd7 100644 --- a/nexus/src/app/background/tasks/blueprint_load.rs +++ b/nexus/src/app/background/tasks/blueprint_load.rs @@ -195,6 +195,7 @@ mod test { use nexus_test_utils_macros::nexus_test; use nexus_types::deployment::{ Blueprint, BlueprintTarget, CockroachDbPreserveDowngrade, + PendingMgsUpdates, }; use omicron_common::api::external::Generation; use omicron_uuid_kinds::BlueprintUuid; @@ -217,6 +218,7 @@ mod test { Blueprint { id, sleds: BTreeMap::new(), + pending_mgs_updates: PendingMgsUpdates::new(), cockroachdb_setting_preserve_downgrade: CockroachDbPreserveDowngrade::DoNotModify, parent_blueprint_id: Some(parent_blueprint_id), diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 63e6628237a..7d83b76b03b 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -24,6 +24,9 @@ use nexus_db_queries::authn; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; +use nexus_mgs_updates::ArtifactCache; +use nexus_mgs_updates::MgsUpdateDriver; +use nexus_types::deployment::PendingMgsUpdates; use omicron_common::address::DENDRITE_PORT; use omicron_common::address::MGD_PORT; use omicron_common::address::MGS_PORT; @@ -41,6 +44,7 @@ use std::net::{IpAddr, Ipv6Addr}; use std::sync::Arc; use std::sync::OnceLock; use tokio::sync::mpsc; +use tokio::sync::watch; use update_common::artifacts::ArtifactsWithPlan; use uuid::Uuid; @@ -103,6 +107,7 @@ pub(crate) mod sagas; pub(crate) use nexus_db_model::MAX_NICS_PER_INSTANCE; pub(crate) use nexus_db_queries::db::queries::disk::MAX_DISKS_PER_INSTANCE; +use nexus_mgs_updates::DEFAULT_RETRY_TIMEOUT; use sagas::demo::CompletingDemoSagas; // XXX: Might want to recast as max *floating* IPs, we have at most one @@ -165,7 +170,7 @@ pub struct Nexus { internal_server: std::sync::Mutex>, /// Status of background task to populate database - populate_status: tokio::sync::watch::Receiver, + populate_status: watch::Receiver, /// The metric producer server from which oximeter collects metric data. producer_server: std::sync::Mutex>, @@ -231,6 +236,11 @@ pub struct Nexus { /// Sender for TUF repository artifacts temporarily stored in this zone to /// be replicated out to sleds in the background tuf_artifact_replication_tx: mpsc::Sender, + + /// reports status of pending MGS-managed updates + // This will be used in the future to expose driver state via the internal + // API. + _mgs_update_status_rx: watch::Receiver, } impl Nexus { @@ -371,6 +381,26 @@ impl Nexus { )) }; + let mut mgs_resolver = + qorb_resolver.for_service(ServiceName::ManagementGatewayService); + let mut repo_depot_resolver = + qorb_resolver.for_service(ServiceName::RepoDepot); + let (mgs_updates_tx, mgs_updates_rx) = + watch::channel(PendingMgsUpdates::new()); + let artifact_cache = Arc::new(ArtifactCache::new( + log.new(o!("component" => "ArtifactCache")), + repo_depot_resolver.monitor(), + )); + let mgs_update_driver = MgsUpdateDriver::new( + log.new(o!("component" => "MgsUpdateDriver")), + artifact_cache, + mgs_updates_rx, + mgs_resolver.monitor(), + DEFAULT_RETRY_TIMEOUT, + ); + let _mgs_update_status_rx = mgs_update_driver.status_rx(); + let _mgs_driver_task = tokio::spawn(mgs_update_driver.run()); + let nexus = Nexus { id: config.deployment.id, rack_id, @@ -419,6 +449,7 @@ impl Nexus { CompletingDemoSagas::new(), )), tuf_artifact_replication_tx, + _mgs_update_status_rx, }; // TODO-cleanup all the extra Arcs here seems wrong @@ -474,6 +505,7 @@ impl Nexus { sagas_started_rx: saga_recovery_rx, }, tuf_artifact_replication_rx, + mgs_updates_tx, }, ); diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index ac6d98506bd..bb33ab5f0be 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -98,6 +98,7 @@ use std::sync::Arc; use std::time::Duration; use uuid::Uuid; +use nexus_types::deployment::PendingMgsUpdates; pub use sim::TEST_HARDWARE_THREADS; pub use sim::TEST_RESERVOIR_RAM; @@ -940,6 +941,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { Blueprint { id: BlueprintUuid::new_v4(), sleds: blueprint_sleds, + pending_mgs_updates: PendingMgsUpdates::new(), parent_blueprint_id: None, internal_dns_version: dns_config.generation, external_dns_version: Generation::new(), diff --git a/nexus/types/src/deployment.rs b/nexus/types/src/deployment.rs index c84b88fe862..7017f392e59 100644 --- a/nexus/types/src/deployment.rs +++ b/nexus/types/src/deployment.rs @@ -45,6 +45,7 @@ use omicron_uuid_kinds::ZpoolUuid; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; +use slog::Key; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::fmt; @@ -52,7 +53,9 @@ use std::net::Ipv6Addr; use std::net::SocketAddrV6; use strum::EnumIter; use tufaceous_artifact::ArtifactHash; +use tufaceous_artifact::ArtifactHashId; use tufaceous_artifact::ArtifactVersion; +use tufaceous_artifact::ArtifactVersionError; mod blueprint_diff; mod blueprint_display; @@ -63,7 +66,11 @@ mod planning_input; mod tri_map; mod zone_type; +use crate::inventory::BaseboardId; +pub use blueprint_diff::BlueprintDiffSummary; +use blueprint_display::BpPendingMgsUpdates; pub use clickhouse::ClickhouseClusterConfig; +use gateway_client::types::SpType; pub use network_resources::AddNetworkResourceError; pub use network_resources::OmicronZoneExternalFloatingAddr; pub use network_resources::OmicronZoneExternalFloatingIp; @@ -91,6 +98,7 @@ pub use planning_input::SledLookupError; pub use planning_input::SledLookupErrorKind; pub use planning_input::SledResources; pub use planning_input::ZpoolFilter; +use std::sync::Arc; pub use zone_type::BlueprintZoneType; pub use zone_type::DurableDataset; pub use zone_type::blueprint_zone_type; @@ -100,8 +108,7 @@ use blueprint_display::{ BpTable, BpTableData, BpTableRow, KvListWithHeading, constants::*, }; use id_map::{IdMap, IdMappable}; - -pub use blueprint_diff::BlueprintDiffSummary; +use std::str::FromStr; /// Describes a complete set of software and configuration for the system // Blueprints are a fundamental part of how the system modifies itself. Each @@ -151,6 +158,9 @@ pub struct Blueprint { /// A map of sled id -> desired configuration of the sled. pub sleds: BTreeMap, + /// List of pending MGS-mediated updates + pub pending_mgs_updates: PendingMgsUpdates, + /// which blueprint this blueprint is based on pub parent_blueprint_id: Option, @@ -488,6 +498,7 @@ impl fmt::Display for BlueprintDisplay<'_> { let Blueprint { id, sleds, + pending_mgs_updates, parent_blueprint_id, // These two cockroachdb_* fields are handled by // `make_cockroachdb_table()`, called below. @@ -569,6 +580,38 @@ impl fmt::Display for BlueprintDisplay<'_> { writeln!(f, "{}", self.make_cockroachdb_table())?; writeln!(f, "{}", self.make_metadata_table())?; + writeln!( + f, + " PENDING MGS-MANAGED UPDATES: {}", + pending_mgs_updates.len() + )?; + if !pending_mgs_updates.is_empty() { + writeln!( + f, + "{}", + BpTable::new( + BpPendingMgsUpdates {}, + None, + pending_mgs_updates + .iter() + .map(|pu| { + BpTableRow::from_strings( + BpDiffState::Unchanged, + vec![ + pu.sp_type.to_string(), + pu.slot_id.to_string(), + pu.baseboard_id.part_number.clone(), + pu.baseboard_id.serial_number.clone(), + pu.artifact_hash_id.kind.to_string(), + pu.artifact_hash_id.hash.to_string(), + ], + ) + }) + .collect() + ) + )?; + } + Ok(()) } } @@ -1000,6 +1043,198 @@ impl fmt::Display for BlueprintZoneImageVersion { } } +#[derive( + Clone, Debug, Eq, PartialEq, JsonSchema, Deserialize, Serialize, Diffable, +)] +pub struct PendingMgsUpdates { + // The IdMap key is the baseboard_id. Only one outstanding MGS-managed + // update is allowed for a given baseboard. + by_baseboard: IdMap, +} + +impl PendingMgsUpdates { + pub fn new() -> PendingMgsUpdates { + PendingMgsUpdates { by_baseboard: IdMap::new() } + } + + pub fn iter(&self) -> impl Iterator { + self.into_iter() + } + + pub fn len(&self) -> usize { + self.by_baseboard.len() + } + + pub fn is_empty(&self) -> bool { + self.by_baseboard.is_empty() + } + + pub fn contains_key(&self, key: &Arc) -> bool { + self.by_baseboard.contains_key(key) + } + + pub fn get( + &self, + baseboard_id: &Arc, + ) -> Option<&PendingMgsUpdate> { + self.by_baseboard.get(baseboard_id) + } + + pub fn remove( + &mut self, + baseboard_id: &Arc, + ) -> Option { + self.by_baseboard.remove(baseboard_id) + } + + pub fn insert( + &mut self, + update: PendingMgsUpdate, + ) -> Option { + self.by_baseboard.insert(update) + } +} + +impl<'a> IntoIterator for &'a PendingMgsUpdates { + type Item = &'a PendingMgsUpdate; + type IntoIter = std::collections::btree_map::Values< + 'a, + Arc, + PendingMgsUpdate, + >; + fn into_iter(self) -> Self::IntoIter { + self.by_baseboard.iter() + } +} + +#[derive( + Clone, Debug, Eq, PartialEq, JsonSchema, Deserialize, Serialize, Diffable, +)] +pub struct PendingMgsUpdate { + // identity of the baseboard + /// id of the baseboard that we're going to update + pub baseboard_id: Arc, + + // location of the baseboard (that we'd pass to MGS) + /// what type of baseboard this is + pub sp_type: SpType, + /// last known MGS slot (cubby number) of the baseboard + pub slot_id: u32, + + /// component-specific details of the pending update + pub details: PendingMgsUpdateDetails, + + /// which artifact to apply to this device + /// (implies which component is being updated) + pub artifact_hash_id: ArtifactHashId, + pub artifact_version: ArtifactVersion, +} + +impl slog::KV for PendingMgsUpdate { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + slog::KV::serialize(&self.baseboard_id, record, serializer)?; + serializer + .emit_str(Key::from("sp_type"), &format!("{:?}", self.sp_type))?; + serializer.emit_u32(Key::from("sp_slot"), self.slot_id)?; + slog::KV::serialize(&self.details, record, serializer)?; + serializer.emit_str( + Key::from("artifact_kind"), + &self.artifact_hash_id.kind.as_str(), + )?; + serializer.emit_str( + Key::from("artifact_hash"), + &self.artifact_hash_id.hash.to_string(), + ) + } +} + +impl IdMappable for PendingMgsUpdate { + type Id = Arc; + fn id(&self) -> Self::Id { + self.baseboard_id.clone() + } +} + +/// Describes the component-specific details of a PendingMgsUpdate +// This needs to specify: +// +// - the "component" (that we provide to the SP) +// - the slot that needs to be updated +// - any preconditions we expect to be true. These generally specify what we +// think is in each slot. This is intended to reduce the chance that an +// update operation using outdated configuration winds up rolling back the +// deployed version. +// +// Much of this may be implicit. See comments below. +#[derive( + Clone, Debug, Eq, PartialEq, JsonSchema, Deserialize, Serialize, Diffable, +)] +#[serde(tag = "component", rename_all = "snake_case")] +pub enum PendingMgsUpdateDetails { + /// the SP itself is being updated + Sp { + // implicit: component = SP_ITSELF + // implicit: firmware slot id = 0 (always 0 for SP itself) + /// expected contents of the active slot + expected_active_version: ArtifactVersion, + /// expected contents of the inactive slot + expected_inactive_version: ExpectedVersion, + }, +} + +impl slog::KV for PendingMgsUpdateDetails { + fn serialize( + &self, + _record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + match self { + PendingMgsUpdateDetails::Sp { + expected_active_version, + expected_inactive_version, + } => { + serializer.emit_str(Key::from("component"), "sp")?; + serializer.emit_str( + Key::from("expected_active_version"), + &expected_active_version.to_string(), + )?; + serializer.emit_str( + Key::from("expected_inactive_version"), + &format!("{:?}", expected_inactive_version), + ) + } + } + } +} + +/// Describes the version that we expect to find in some firmware slot +#[derive( + Clone, Debug, Eq, PartialEq, JsonSchema, Deserialize, Serialize, Diffable, +)] +#[serde(tag = "kind", content = "version", rename_all = "snake_case")] +pub enum ExpectedVersion { + /// We expect to find _no_ valid caboose in this slot + NoValidVersion, + /// We expect to find the specified version in this slot + Version(ArtifactVersion), +} + +impl FromStr for ExpectedVersion { + type Err = ArtifactVersionError; + + fn from_str(s: &str) -> Result { + if s == "invalid" { + Ok(ExpectedVersion::NoValidVersion) + } else { + Ok(ExpectedVersion::Version(s.parse()?)) + } + } +} + /// The desired state of an Omicron-managed physical disk in a blueprint. #[derive( Debug, diff --git a/nexus/types/src/deployment/blueprint_diff.rs b/nexus/types/src/deployment/blueprint_diff.rs index ae3541f5a38..3d7df967d1d 100644 --- a/nexus/types/src/deployment/blueprint_diff.rs +++ b/nexus/types/src/deployment/blueprint_diff.rs @@ -58,6 +58,7 @@ impl<'a> BlueprintDiffSummary<'a> { let BlueprintDiff { // Fields in which changes are meaningful. sleds, + pending_mgs_updates, clickhouse_cluster_config, // Metadata fields for which changes don't reflect semantic // changes from one blueprint to the next. @@ -79,6 +80,14 @@ impl<'a> BlueprintDiffSummary<'a> { return true; } + // Did we modify, add, or remove any pending MGS updates? + if pending_mgs_updates.by_baseboard.modified().next().is_some() + || !pending_mgs_updates.by_baseboard.added.is_empty() + || !pending_mgs_updates.by_baseboard.removed.is_empty() + { + return true; + } + // Did the clickhouse config change? if clickhouse_cluster_config.before != clickhouse_cluster_config.after { return true; diff --git a/nexus/types/src/deployment/blueprint_display.rs b/nexus/types/src/deployment/blueprint_display.rs index ee11559cb35..77754ae7342 100644 --- a/nexus/types/src/deployment/blueprint_display.rs +++ b/nexus/types/src/deployment/blueprint_display.rs @@ -400,6 +400,25 @@ impl BpTableSchema for BpClickhouseServersTableSchema { } } +/// The [`BpTable`] schema for pending MGS updates +pub struct BpPendingMgsUpdates {} +impl BpTableSchema for BpPendingMgsUpdates { + fn table_name(&self) -> &'static str { + "Pending MGS-managed updates" + } + + fn column_names(&self) -> &'static [&'static str] { + &[ + "sp_type", + "slot", + "part_number", + "serial_number", + "artifact_kind", + "artifact_hash", + ] + } +} + // An entry in a [`KvListWithHeading`] #[derive(Debug)] pub struct KvPair { diff --git a/nexus/types/src/deployment/execution/spec.rs b/nexus/types/src/deployment/execution/spec.rs index 6e1b6dbacc8..915e295f345 100644 --- a/nexus/types/src/deployment/execution/spec.rs +++ b/nexus/types/src/deployment/execution/spec.rs @@ -37,6 +37,7 @@ pub enum ExecutionComponent { Dns, Cockroach, Clickhouse, + MgsUpdates, } /// Steps for reconfigurator execution. diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 40d2c88c942..f1f63c2ffc0 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -1571,6 +1571,25 @@ } ] }, + "ArtifactHashId": { + "description": "A hash-based identifier for an artifact or deployment unit: the kind and hash.", + "type": "object", + "properties": { + "hash": { + "description": "The hash of the artifact.", + "type": "string", + "format": "hex string (32 bytes)" + }, + "kind": { + "description": "The kind of artifact this is.", + "type": "string" + } + }, + "required": [ + "hash", + "kind" + ] + }, "ArtifactVersion": { "description": "An artifact version.\n\nThis is a freeform identifier with some basic validation. It may be the serialized form of a semver version, or a custom identifier that uses the same character set as a semver, plus `_`.\n\nThe exact pattern accepted is `^[a-zA-Z0-9._+-]{1,63}$`.\n\n# Ord implementation\n\n`ArtifactVersion`s are not intended to be sorted, just compared for equality. `ArtifactVersion` implements `Ord` only for storage within sorted collections.", "type": "string", @@ -1659,6 +1678,24 @@ "serial" ] }, + "BaseboardId": { + "description": "A unique baseboard id found during a collection\n\nBaseboard ids are the keys used to link up information from disparate sources (like a service processor and a sled agent).\n\nThese are normalized in the database. Each distinct baseboard id is assigned a uuid and shared across the many possible collections that reference it.\n\nUsually, the part number and serial number are combined with a revision number. We do not include that here. If we ever did find a baseboard with the same part number and serial number but a new revision number, we'd want to treat that as the same baseboard as one with a different revision number.", + "type": "object", + "properties": { + "part_number": { + "description": "Oxide Part Number", + "type": "string" + }, + "serial_number": { + "description": "Serial number (unique for a given part number)", + "type": "string" + } + }, + "required": [ + "part_number", + "serial_number" + ] + }, "BfdMode": { "description": "BFD connection mode.", "type": "string", @@ -1947,6 +1984,14 @@ } ] }, + "pending_mgs_updates": { + "description": "List of pending MGS-mediated updates", + "allOf": [ + { + "$ref": "#/components/schemas/PendingMgsUpdates" + } + ] + }, "sleds": { "description": "A map of sled id -> desired configuration of the sled.", "type": "object", @@ -1968,6 +2013,7 @@ "external_dns_version", "id", "internal_dns_version", + "pending_mgs_updates", "sleds", "time_created" ] @@ -3747,6 +3793,45 @@ "request_id" ] }, + "ExpectedVersion": { + "description": "Describes the version that we expect to find in some firmware slot", + "oneOf": [ + { + "description": "We expect to find _no_ valid caboose in this slot", + "type": "object", + "properties": { + "kind": { + "type": "string", + "enum": [ + "no_valid_version" + ] + } + }, + "required": [ + "kind" + ] + }, + { + "description": "We expect to find the specified version in this slot", + "type": "object", + "properties": { + "kind": { + "type": "string", + "enum": [ + "version" + ] + }, + "version": { + "$ref": "#/components/schemas/ArtifactVersion" + } + }, + "required": [ + "kind", + "version" + ] + } + ] + }, "ExternalPortDiscovery": { "oneOf": [ { @@ -3814,6 +3899,12 @@ "$ref": "#/components/schemas/BlueprintZoneConfig" } }, + "IdMapPendingMgsUpdate": { + "type": "object", + "additionalProperties": { + "$ref": "#/components/schemas/PendingMgsUpdate" + } + }, "ImportExportPolicy": { "description": "Define policy relating to the import and export of prefixes from a BGP peer.", "oneOf": [ @@ -4664,6 +4755,109 @@ "collector_id" ] }, + "PendingMgsUpdate": { + "type": "object", + "properties": { + "artifact_hash_id": { + "description": "which artifact to apply to this device (implies which component is being updated)", + "allOf": [ + { + "$ref": "#/components/schemas/ArtifactHashId" + } + ] + }, + "artifact_version": { + "$ref": "#/components/schemas/ArtifactVersion" + }, + "baseboard_id": { + "description": "id of the baseboard that we're going to update", + "allOf": [ + { + "$ref": "#/components/schemas/BaseboardId" + } + ] + }, + "details": { + "description": "component-specific details of the pending update", + "allOf": [ + { + "$ref": "#/components/schemas/PendingMgsUpdateDetails" + } + ] + }, + "slot_id": { + "description": "last known MGS slot (cubby number) of the baseboard", + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "sp_type": { + "description": "what type of baseboard this is", + "allOf": [ + { + "$ref": "#/components/schemas/SpType" + } + ] + } + }, + "required": [ + "artifact_hash_id", + "artifact_version", + "baseboard_id", + "details", + "slot_id", + "sp_type" + ] + }, + "PendingMgsUpdateDetails": { + "description": "Describes the component-specific details of a PendingMgsUpdate", + "oneOf": [ + { + "description": "the SP itself is being updated", + "type": "object", + "properties": { + "component": { + "type": "string", + "enum": [ + "sp" + ] + }, + "expected_active_version": { + "description": "expected contents of the active slot", + "allOf": [ + { + "$ref": "#/components/schemas/ArtifactVersion" + } + ] + }, + "expected_inactive_version": { + "description": "expected contents of the inactive slot", + "allOf": [ + { + "$ref": "#/components/schemas/ExpectedVersion" + } + ] + } + }, + "required": [ + "component", + "expected_active_version", + "expected_inactive_version" + ] + } + ] + }, + "PendingMgsUpdates": { + "type": "object", + "properties": { + "by_baseboard": { + "$ref": "#/components/schemas/IdMapPendingMgsUpdate" + } + }, + "required": [ + "by_baseboard" + ] + }, "PhysicalDiskKind": { "description": "Describes the form factor of physical disks.", "type": "string", @@ -5807,6 +6001,15 @@ "last_port" ] }, + "SpType": { + "description": "SpType\n\n
JSON schema\n\n```json { \"type\": \"string\", \"enum\": [ \"sled\", \"power\", \"switch\" ] } ```
", + "type": "string", + "enum": [ + "sled", + "power", + "switch" + ] + }, "Srv": { "type": "object", "properties": { diff --git a/sled-agent/src/rack_setup/service.rs b/sled-agent/src/rack_setup/service.rs index b0ec33d001f..b1d4d550e2e 100644 --- a/sled-agent/src/rack_setup/service.rs +++ b/sled-agent/src/rack_setup/service.rs @@ -91,11 +91,11 @@ use nexus_client::{ use nexus_sled_agent_shared::inventory::{ OmicronSledConfig, OmicronZoneConfig, OmicronZoneType, OmicronZonesConfig, }; -use nexus_types::deployment::BlueprintSledConfig; use nexus_types::deployment::{ Blueprint, BlueprintDatasetConfig, BlueprintDatasetDisposition, BlueprintZoneType, CockroachDbPreserveDowngrade, blueprint_zone_type, }; +use nexus_types::deployment::{BlueprintSledConfig, PendingMgsUpdates}; use nexus_types::external_api::views::SledState; use omicron_common::address::get_sled_address; use omicron_common::api::external::Generation; @@ -1540,6 +1540,7 @@ pub(crate) fn build_initial_blueprint_from_sled_configs( Ok(Blueprint { id: BlueprintUuid::new_v4(), sleds: blueprint_sleds, + pending_mgs_updates: PendingMgsUpdates::new(), parent_blueprint_id: None, internal_dns_version, // We don't configure external DNS during RSS, so set it to an initial diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index b626b2e25b8..6abef0da383 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -124,7 +124,7 @@ time = { version = "0.3.36", features = ["formatting", "local-offset", "macros", tokio = { version = "1.43.1", features = ["full", "test-util"] } tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } tokio-stream = { version = "0.1.17", features = ["net", "sync"] } -tokio-util = { version = "0.7.13", features = ["codec", "io-util"] } +tokio-util = { version = "0.7.13", features = ["codec", "io-util", "time"] } toml = { version = "0.7.8" } toml_datetime = { version = "0.6.8", default-features = false, features = ["serde"] } toml_edit-3c51e837cfc5589a = { package = "toml_edit", version = "0.22.24", features = ["serde"] } @@ -249,7 +249,7 @@ time-macros = { version = "0.2.18", default-features = false, features = ["forma tokio = { version = "1.43.1", features = ["full", "test-util"] } tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } tokio-stream = { version = "0.1.17", features = ["net", "sync"] } -tokio-util = { version = "0.7.13", features = ["codec", "io-util"] } +tokio-util = { version = "0.7.13", features = ["codec", "io-util", "time"] } toml = { version = "0.7.8" } toml_datetime = { version = "0.6.8", default-features = false, features = ["serde"] } toml_edit-3c51e837cfc5589a = { package = "toml_edit", version = "0.22.24", features = ["serde"] }