diff --git a/nexus/db-queries/src/db/datastore/fm.rs b/nexus/db-queries/src/db/datastore/fm.rs index 63876e1cdfc..c9e0f0c4684 100644 --- a/nexus/db-queries/src/db/datastore/fm.rs +++ b/nexus/db-queries/src/db/datastore/fm.rs @@ -27,6 +27,8 @@ use diesel::result::Error as DieselError; use diesel::sql_types; use dropshot::PaginationOrder; use nexus_db_errors::ErrorHandler; +use nexus_db_errors::OptionalError; +use nexus_db_errors::TransactionError; use nexus_db_errors::public_error_from_diesel; use nexus_db_lookup::DbConnection; use nexus_db_schema::schema::ereport::dsl as ereport_dsl; @@ -60,7 +62,8 @@ impl DataStore { let conn = self.pool_connection_authorized(opctx).await?; let version = self .fm_current_sitrep_version_on_conn(&conn) - .await? + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))? .map(Into::into); Ok(version) } @@ -68,14 +71,13 @@ impl DataStore { async fn fm_current_sitrep_version_on_conn( &self, conn: &async_bb8_diesel::Connection, - ) -> Result, Error> { + ) -> Result, DieselError> { history_dsl::fm_sitrep_history .order_by(history_dsl::version.desc()) .select(model::SitrepVersion::as_select()) .first_async(conn) .await .optional() - .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } /// Reads the [`fm::SitrepMetadata`] describing the sitrep with the given @@ -122,13 +124,56 @@ impl DataStore { opctx: &OpContext, ) -> Result, Error> { let conn = self.pool_connection_authorized(opctx).await?; - let version: fm::SitrepVersion = - match self.fm_current_sitrep_version_on_conn(&conn).await? { + loop { + let version = + self.fm_current_sitrep_version_on_conn(&conn).await.map_err( + |e| public_error_from_diesel(e, ErrorHandler::Server), + )?; + let version: fm::SitrepVersion = match version { Some(version) => version.into(), + // If there is no current sitrep version, that means no sitreps + // exist; return `None`. None => return Ok(None), }; - let sitrep = self.fm_sitrep_read_on_conn(version.id, &conn).await?; - Ok(Some((version, sitrep))) + match self.fm_sitrep_read_on_conn(version.id, &conn).await { + Ok(sitrep) => return Ok(Some((version, sitrep))), + // If `fm_sitrep_read_on_conn` returns `NotFound` for a sitrep + // ID that was returned by `fm_current_sitrep_version_on_conn`, + // this means that the sitrep we were attempting to read is no + // longer the current sitrep. There must, therefore, be a *new* + // current sitrep. This is because: + // + // - The `fm_sitrep_delete_all` query does not permit the + // current sitrep to be deleted, and, + // - If a sitrep is the current sitrep, it will no longer be + // the current sitrep if and only if a new current sitrep has + // been inserted. + // + // Therefore, we can just retry, loading the current version + // again and trying to read the new current sitrep. + // + // This is a fairly unlikely situation, but it could occur if + // there is a particularly long delay between when we read the + // current version and when we attempt to actually load that + // sitrep, so we ought to handle it here. It would be incorrect + // to just return `None` in this case, as `None` means that *no + // current sitrep has ever been created*. + Err(e @ Error::NotFound { .. }) => { + slog::debug!( + opctx.log, + "attempted to read current sitrep {}, but it seems to + have been deleted out from under us! retrying...", + version.id; + "sitrep_id" => ?version.id, + "sitrep_version" => ?version.version, + "error" => %e, + ); + continue; + } + // Propagate any unanticipated errors. + Err(e) => return Err(e), + } + } } /// Reads the entire content of the sitrep with the provided ID, if one @@ -148,9 +193,6 @@ impl DataStore { id: SitrepUuid, conn: &async_bb8_diesel::Connection, ) -> Result { - let metadata = - self.fm_sitrep_metadata_read_on_conn(id, &conn).await?.into(); - // Fetch all ereports assigned to cases in this sitrep. We do this by // querying the `fm_ereport_in_case` table for all entries with this // sitrep ID, paginated by the ereport assignment's UUID. This query is @@ -289,6 +331,17 @@ impl DataStore { cases }; + // Finally, fetch the sitrep's metadata from the `fm_sitrep` table. We + // load this record last, because if a concurrent delete operation has + // started, we will observe that the top-level metadata record has been + // deleted, and return `NotFound`. This prevents us from returning a + // potentially torn sitrep where child records were deleted after + // loading the metadata record. + // + // See https://github.com/oxidecomputer/omicron/issues/9594 for details. + let metadata = + self.fm_sitrep_metadata_read_on_conn(id, &conn).await?.into(); + Ok(Sitrep { metadata, cases }) } @@ -759,47 +812,77 @@ impl DataStore { .map(|id| id.into_untyped_uuid()) .collect::>(); - // Delete case ereport assignments - let case_ereports_deleted = diesel::delete( - case_ereport_dsl::fm_ereport_in_case - .filter(case_ereport_dsl::sitrep_id.eq_any(ids.clone())), - ) - .execute_async(&*conn) - .await - .map_err(|e| { - public_error_from_diesel(e, ErrorHandler::Server) - .internal_context("failed to delete case ereport assignments") - })?; - - // Delete case metadata records. - let cases_deleted = diesel::delete( - case_dsl::fm_case.filter(case_dsl::sitrep_id.eq_any(ids.clone())), - ) - .execute_async(&*conn) - .await - .map_err(|e| { - public_error_from_diesel(e, ErrorHandler::Server) - .internal_context("failed to delete case metadata") - })?; - - // Delete the sitrep metadata entries *last*. This is necessary because - // the rest of the delete operation is unsynchronized, and it is - // possible for a Nexus to die before it has "fully deleted" a sitrep, - // but deleted some of its records. The `fm_sitrep` (metadata) table is - // the one that is used to determine whether a sitrep "exists" so that - // the sitrep GC task can determine if it needs to be deleted, so don't - // touch it until all the other records are gone. - let sitreps_deleted = diesel::delete( - sitrep_dsl::fm_sitrep.filter(sitrep_dsl::id.eq_any(ids.clone())), - ) - .execute_async(&*conn) - .await - .map_err(|e| { - public_error_from_diesel(e, ErrorHandler::Server) - .internal_context("failed to delete sitrep metadata") - })?; - - slog::debug!( + struct SitrepDeleteResult { + sitreps_deleted: usize, + case_ereports_deleted: usize, + cases_deleted: usize, + } + + let err = OptionalError::new(); + let SitrepDeleteResult { + sitreps_deleted, + case_ereports_deleted, + cases_deleted, + } = self + // Sitrep deletion is transactional to prevent a sitrep from being + // left in a partially-deleted state should the Nexus instance + // attempting the delete operation die suddenly. + .transaction_retry_wrapper("fm_sitrep_delete_all") + .transaction(&conn, |conn| { + let ids = ids.clone(); + let err = err.clone(); + async move { + // First, ensure that we are not deleting the current + // sitrep, and bail out if we would. + if let Some(model::SitrepVersion { sitrep_id, .. }) = + self.fm_current_sitrep_version_on_conn(&conn).await? + { + if ids.contains(&sitrep_id.as_untyped_uuid()) { + return Err(err.bail(TransactionError::CustomError(Error::conflict(format!( + "cannot delete sitrep {sitrep_id}, as it is the current sitrep" + ))))); + } + } + + // Delete case ereport assignments + let case_ereports_deleted = diesel::delete( + case_ereport_dsl::fm_ereport_in_case.filter( + case_ereport_dsl::sitrep_id.eq_any(ids.clone()), + ), + ) + .execute_async(&conn) + .await?; + + // Delete case metadata records. + let cases_deleted = diesel::delete( + case_dsl::fm_case + .filter(case_dsl::sitrep_id.eq_any(ids.clone())), + ) + .execute_async(&conn) + .await?; + + // Delete sitrep metadata records. + let sitreps_deleted = diesel::delete( + sitrep_dsl::fm_sitrep + .filter(sitrep_dsl::id.eq_any(ids.clone())), + ) + .execute_async(&conn) + .await?; + + Ok(SitrepDeleteResult { + sitreps_deleted, + cases_deleted, + case_ereports_deleted, + }) + } + }) + .await + .map_err(|e| match err.take() { + Some(err) => err.into_public_ignore_retries(), + None => public_error_from_diesel(e, ErrorHandler::Server), + })?; + + slog::info!( &opctx.log, "deleted {sitreps_deleted} of {} sitreps sitreps", ids.len(); "ids" => ?ids, @@ -1348,6 +1431,69 @@ mod tests { logctx.cleanup_successful(); } + /// Assert that two sitreps are equal, skipping timestamp fields in ereports + /// and sitrep metadata. These timestamps may lose precision when + /// round-tripping through cockroachdb and may no longer be "equal". + /// + /// NOTE FOR FUTURE GENERATIONS: If we add other top-level child records + /// other than cases, we should also assert that they match here. + #[track_caller] + fn assert_sitreps_eq(this: &Sitrep, that: &Sitrep) { + // Verify the sitrep metadata matches --- ignore the timestamp. + assert_eq!(this.id(), that.id()); + assert_eq!(this.metadata.creator_id, that.metadata.creator_id); + assert_eq!(this.metadata.comment, that.metadata.comment); + assert_eq!(this.metadata.parent_sitrep_id, None); + + // Verify all the expected cases exist in both sitreps + assert_eq!(this.cases.len(), that.cases.len()); + for case in &that.cases { + let fm::Case { + id, + created_sitrep_id, + closed_sitrep_id, + comment, + de, + ereports, + } = dbg!(case); + let Some(expected) = this.cases.get(&case.id) else { + panic!("expected case {id} to exist in the original sitrep") + }; + // N.B.: we must assert each bit of the case manually, as ereports + // contain `time_collected` timestamps which will lose a bit of + // precision when roundtripped through the database. + // :( + assert_eq!(id, &expected.id); + assert_eq!(created_sitrep_id, &expected.created_sitrep_id); + assert_eq!(closed_sitrep_id, &expected.closed_sitrep_id); + assert_eq!(comment, &expected.comment); + assert_eq!(de, &expected.de); + + // Now, check that all the ereports are present in both cases. + assert_eq!(ereports.len(), expected.ereports.len()); + for expected in &expected.ereports { + let Some(ereport) = ereports.get(&expected.ereport.id()) else { + panic!( + "expected ereport {id} to exist in the original case" + ) + }; + let fm::case::CaseEreport { + id, + ereport, + assigned_sitrep_id, + comment, + } = dbg!(ereport); + assert_eq!(id, &expected.id); + // This is where we go out of our way to avoid the timestamp, + // btw. + assert_eq!(ereport.id(), expected.ereport.id()); + assert_eq!(assigned_sitrep_id, &expected.assigned_sitrep_id); + assert_eq!(comment, &expected.comment); + } + eprintln!(); + } + } + async fn list_orphans( datastore: &DataStore, opctx: &OpContext, @@ -1527,55 +1673,7 @@ mod tests { .await .expect("failed to read sitrep"); - // Verify the sitrep metadata matches --- ignore the timestamp. - assert_eq!(read_sitrep.id(), sitrep.id()); - assert_eq!(read_sitrep.metadata.creator_id, sitrep.metadata.creator_id); - assert_eq!(read_sitrep.metadata.comment, sitrep.metadata.comment); - assert_eq!(read_sitrep.metadata.parent_sitrep_id, None); - - // Verify all the expected cases were read back - for case in &read_sitrep.cases { - let fm::Case { - id, - created_sitrep_id, - closed_sitrep_id, - comment, - de, - ereports, - } = dbg!(case); - let Some(expected) = sitrep.cases.get(&case.id) else { - panic!("expected case {id} to exist in the original sitrep") - }; - // N.B.: we must assert each bit of the case manually, as ereports - // contain `time_collected` timestamps which will lose a bit of - // precision when roundtripped through the database. - // :( - assert_eq!(id, &expected.id); - assert_eq!(created_sitrep_id, &expected.created_sitrep_id); - assert_eq!(closed_sitrep_id, &expected.closed_sitrep_id); - assert_eq!(comment, &expected.comment); - assert_eq!(de, &expected.de); - for expected in &expected.ereports { - let Some(ereport) = ereports.get(&expected.ereport.id()) else { - panic!( - "expected ereport {id} to exist in the original case" - ) - }; - let fm::case::CaseEreport { - id, - ereport, - assigned_sitrep_id, - comment, - } = dbg!(ereport); - assert_eq!(id, &expected.id); - // This is where we go out of our way to avoid the timestamp, - // btw. - assert_eq!(ereport.id(), expected.ereport.id()); - assert_eq!(assigned_sitrep_id, &expected.assigned_sitrep_id); - assert_eq!(comment, &expected.comment); - } - eprintln!(); - } + assert_sitreps_eq(&sitrep, &read_sitrep); // Clean up db.terminate().await; @@ -1596,6 +1694,28 @@ mod tests { .await .expect("failed to insert sitrep"); + // Note that we must also insert a second sitrep which is a child of the + // sitrep we intend to delete, as the sitrep insert operation makes a + // sitrep the current sitrep, and a sitrep cannot be deleted if it is + // current. + datastore + .fm_sitrep_insert( + opctx, + fm::Sitrep { + metadata: fm::SitrepMetadata { + parent_sitrep_id: Some(sitrep_id), + id: SitrepUuid::new_v4(), + time_created: Utc::now(), + creator_id: OmicronZoneUuid::new_v4(), + comment: "my cool sitrep".to_string(), + inv_collection_id: CollectionUuid::new_v4(), + }, + cases: Default::default(), + }, + ) + .await + .expect("failed to insert second sitrep"); + // Verify the sitrep, cases, and ereport assignments exist let conn = db .datastore() @@ -1674,4 +1794,168 @@ mod tests { db.terminate().await; logctx.cleanup_successful(); } + + // Test that concurrent read and delete operations on sitreps collections do + // not result in torn reads. With the fix for issue #9594, fm_sitrep_read + // checks for the top-level sitrep record at the END of reading, so if a + // concurrent delete has started (which deletes the top-level record first), + // the read will fail rather than returning partial data. + // + // This test spawns concurrent readers and a deleter to exercise the race + // condition. Readers should either get the complete original collection + // OR an error - never partial/torn data. + #[tokio::test] + async fn test_concurrent_sitrep_read_delete() { + use std::sync::Arc; + use std::sync::atomic::AtomicBool; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + const TEST_NAME: &str = "test_concurrent_sitrep_read_delete"; + let logctx = dev::test_setup_log(TEST_NAME); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + // Create a sitrep and insert it. + let sitrep = make_sitrep_with_cases(opctx, datastore).await; + let sitrep_id = sitrep.metadata.id; + datastore + .fm_sitrep_insert(opctx, sitrep.clone()) + .await + .expect("failed to insert sitrep"); + + // Verify we can read it back correctly + let read_back = datastore + .fm_sitrep_read(&opctx, sitrep_id) + .await + .expect("failed to read sitrep"); + assert_sitreps_eq(&sitrep, &read_back); + + // Note that we must also insert a second sitrep which is a child of the + // sitrep we intend to delete, as the sitrep insert operation makes a + // sitrep the current sitrep, and a sitrep cannot be deleted if it is + // current. + datastore + .fm_sitrep_insert( + opctx, + fm::Sitrep { + metadata: fm::SitrepMetadata { + parent_sitrep_id: Some(sitrep_id), + id: SitrepUuid::new_v4(), + time_created: Utc::now(), + creator_id: OmicronZoneUuid::new_v4(), + comment: "my cool sitrep".to_string(), + inv_collection_id: CollectionUuid::new_v4(), + }, + cases: Default::default(), + }, + ) + .await + .expect("failed to insert second sitrep"); + + // Track results from concurrent readers + let successful_reads = Arc::new(AtomicUsize::new(0)); + let error_count = Arc::new(AtomicUsize::new(0)); + let delete_completed = Arc::new(AtomicBool::new(false)); + + // Signal when at least one read has completed, so we know readers are + // running before we start deleting + let (first_read_tx, first_read_rx) = + tokio::sync::oneshot::channel::<()>(); + let first_read_tx = + Arc::new(std::sync::Mutex::new(Some(first_read_tx))); + + // Spawn reader tasks that loop until deletion completes + const NUM_READERS: usize = 10; + let mut reader_handles = Vec::new(); + + for n in 0..NUM_READERS { + let datastore = datastore.clone(); + let opctx = opctx.child( + std::iter::once(("reader".to_string(), n.to_string())) + .collect(), + ); + let sitrep = sitrep.clone(); + let successful_reads = successful_reads.clone(); + let error_count = error_count.clone(); + let delete_completed = delete_completed.clone(); + let first_read_tx = first_read_tx.clone(); + + reader_handles.push(tokio::spawn(async move { + loop { + match datastore.fm_sitrep_read(&opctx, sitrep_id).await { + Ok(read_sitrep) => { + // If the read sitrep is not equal to the original, + // this indicates a torn read! + assert_sitreps_eq(&sitrep, &read_sitrep); + successful_reads.fetch_add(1, Ordering::Relaxed); + + // Signal that at least one read completed (only + // the first sender to take the channel will send) + if let Some(tx) = + first_read_tx.lock().unwrap().take() + { + let _ = tx.send(()); + } + } + Err(_) => { + // Errors are expected after deletion - the + // collection no longer exists. The specific error + // varies depending on which query fails first. + error_count.fetch_add(1, Ordering::Relaxed); + } + } + + // Stop reading after delete completes + if delete_completed.load(Ordering::Relaxed) { + break; + } + } + })); + } + + // Wait for at least one successful read before deleting, so we know + // the reader tasks have started + first_read_rx.await.expect("no reader completed a read"); + + // Delete the sitrep while readers are running + datastore + .fm_sitrep_delete_all(&opctx, vec![sitrep_id]) + .await + .expect("failed to delete sitrep"); + delete_completed.store(true, Ordering::Relaxed); + + // Wait for all readers to complete + for handle in reader_handles { + handle.await.expect("reader task panicked"); + } + + // Log results for debugging + let successful = successful_reads.load(Ordering::Relaxed); + let errors = error_count.load(Ordering::Relaxed); + eprintln!( + "Results: {} successful reads, {} errors", + successful, errors + ); + + // Key invariant: at least one successful read (we waited for this + // before deleting). Successful reads are validated inside the reader + // loop - they must match the original sitrep exactly, or the assert_eq! + // fails indicating a torn read. Errors after deletion are expected and + // don't need to be categorized. + assert!( + successful > 0, + "Expected at least one successful read (we wait for this)" + ); + + // Verify the sitrep is fully deleted + match datastore.fm_sitrep_read(&opctx, sitrep_id).await { + Ok(_sitrep) => panic!("sitrep not deleted"), + Err(Error::NotFound { message: _ }) => {} + Err(e) => panic!("unexpected error: {e}"), + } + + db.terminate().await; + logctx.cleanup_successful(); + } } diff --git a/nexus/types/src/fm.rs b/nexus/types/src/fm.rs index eb8cfaaee3b..2dd6387c6d4 100644 --- a/nexus/types/src/fm.rs +++ b/nexus/types/src/fm.rs @@ -39,6 +39,18 @@ pub struct Sitrep { /// ID, and which Nexus produced it. pub metadata: SitrepMetadata, pub cases: IdOrdMap, + // + // NOTE FOR FUTURE GENERATIONS: If you add more database tables whose + // records are top-level children of a sitrep (i.e., like cases), please + // make sure to update the tests in `nexus_db_queries::db::datastore::fm` to + // also include those records. In particular, make sure to update the + // `assert_sitreps_eq` function to also assert that your new records are + // contained in both sitreps. Also, the tests for sitrep deletion and for + // roundtripping sitreps through the database should also + // create/delete/assert any new records added. + // + // Thank you for your cooperation! + // } impl Sitrep {