diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index fe70f468b5b5d..8d35a8889085e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -220,9 +220,13 @@ impl TurboTasksBackendInner { Self { options, start_time: Instant::now(), - session_id: backing_storage.next_session_id(), + session_id: backing_storage + .next_session_id() + .expect("Failed get session id"), persisted_task_id_factory: IdFactoryWithReuse::new( - backing_storage.next_free_task_id(), + backing_storage + .next_free_task_id() + .expect("Failed to get task id"), TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(), ), transient_task_id_factory: IdFactoryWithReuse::new( @@ -780,6 +784,7 @@ impl TurboTasksBackendInner { if let Some(task_type) = unsafe { self.backing_storage .reverse_lookup_task_cache(None, task_id) + .expect("Failed to lookup task type") } { let _ = self.task_cache.try_insert(task_type.clone(), task_id); return Some(task_type); @@ -969,7 +974,10 @@ impl TurboTasksBackendInner { // Continue all uncompleted operations // They can't be interrupted by a snapshot since the snapshotting job has not been // scheduled yet. - let uncompleted_operations = self.backing_storage.uncompleted_operations(); + let uncompleted_operations = self + .backing_storage + .uncompleted_operations() + .expect("Failed to get uncompleted operations"); if !uncompleted_operations.is_empty() { let mut ctx = self.execute_context(turbo_tasks); for op in uncompleted_operations { @@ -1057,6 +1065,7 @@ impl TurboTasksBackendInner { if let Some(task_id) = unsafe { self.backing_storage .forward_lookup_task_cache(tx.as_ref(), &task_type) + .expect("Failed to lookup task id") } { let _ = self.task_cache.try_insert(Arc::new(task_type), task_id); task_id diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 0160c50bc9b77..4fdfcc98c843f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -165,10 +165,20 @@ where category: TaskDataCategory, ) -> Vec { // Safety: `transaction` is a valid transaction from `self.backend.backing_storage`. - unsafe { + let result = unsafe { self.backend .backing_storage .lookup_data(self.transaction(), task_id, category) + }; + match result { + Ok(data) => data, + Err(e) => { + let task_name = self.backend.get_task_description(task_id); + panic!( + "Failed to restore task data (corrupted database or bug): {:?}", + e.context(format!("{category:?} for {task_name} ({task_id}))")) + ) + } } } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs index 7603c02201951..8708e028fbf44 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs @@ -15,9 +15,9 @@ pub trait BackingStorage: 'static + Send + Sync { fn lower_read_transaction<'l: 'i + 'r, 'i: 'r, 'r>( tx: &'r Self::ReadTransaction<'l>, ) -> &'r Self::ReadTransaction<'i>; - fn next_free_task_id(&self) -> TaskId; - fn next_session_id(&self) -> SessionId; - fn uncompleted_operations(&self) -> Vec; + fn next_free_task_id(&self) -> Result; + fn next_session_id(&self) -> Result; + fn uncompleted_operations(&self) -> Result>; #[allow(clippy::ptr_arg)] fn serialize(task: TaskId, data: &Vec) -> Result>; fn save_snapshot( @@ -44,7 +44,7 @@ pub trait BackingStorage: 'static + Send + Sync { &self, tx: Option<&Self::ReadTransaction<'_>>, key: &CachedTaskType, - ) -> Option; + ) -> Result>; /// # Safety /// /// `tx` must be a transaction from this BackingStorage instance. @@ -52,7 +52,7 @@ pub trait BackingStorage: 'static + Send + Sync { &self, tx: Option<&Self::ReadTransaction<'_>>, task_id: TaskId, - ) -> Option>; + ) -> Result>>; /// # Safety /// /// `tx` must be a transaction from this BackingStorage instance. @@ -61,7 +61,7 @@ pub trait BackingStorage: 'static + Send + Sync { tx: Option<&Self::ReadTransaction<'_>>, task_id: TaskId, category: TaskDataCategory, - ) -> Vec; + ) -> Result>; fn shutdown(&self) -> Result<()> { Ok(()) diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 43b0304f9afaf..5705e67aba956 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -2,7 +2,7 @@ use std::{borrow::Borrow, cmp::max, sync::Arc}; use anyhow::{Context, Result, anyhow}; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use tracing::Span; use turbo_tasks::{SessionId, TaskId, backend::CachedTaskType, turbo_tasks_scope}; @@ -53,7 +53,6 @@ fn pot_ser_symbol_map() -> pot::ser::SymbolMap { pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4) } -#[cfg(feature = "verify_serialization")] fn pot_de_symbol_list<'l>() -> pot::de::SymbolList<'l> { pot::de::SymbolList::new() } @@ -106,14 +105,12 @@ impl KeyValueDatabaseBackingStorage { } } -fn get_infra_u32(database: &impl KeyValueDatabase, key: u32) -> Option { - let tx = database.begin_read_transaction().ok()?; - let value = database - .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref()) - .ok()? - .map(as_u32)? - .ok()?; - Some(value) +fn get_infra_u32(database: &impl KeyValueDatabase, key: u32) -> Result> { + let tx = database.begin_read_transaction()?; + database + .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())? + .map(as_u32) + .transpose() } impl BackingStorage @@ -127,17 +124,24 @@ impl BackingStorage T::lower_read_transaction(tx) } - fn next_free_task_id(&self) -> TaskId { - TaskId::try_from(get_infra_u32(&self.database, META_KEY_NEXT_FREE_TASK_ID).unwrap_or(1)) - .unwrap() + fn next_free_task_id(&self) -> Result { + Ok(TaskId::try_from( + get_infra_u32(&self.database, META_KEY_NEXT_FREE_TASK_ID) + .context("Unable to read next free task id from database")? + .unwrap_or(1), + )?) } - fn next_session_id(&self) -> SessionId { - SessionId::try_from(get_infra_u32(&self.database, META_KEY_SESSION_ID).unwrap_or(0) + 1) - .unwrap() + fn next_session_id(&self) -> Result { + Ok(SessionId::try_from( + get_infra_u32(&self.database, META_KEY_SESSION_ID) + .context("Unable to read session id from database")? + .unwrap_or(0) + + 1, + )?) } - fn uncompleted_operations(&self) -> Vec { + fn uncompleted_operations(&self) -> Result> { fn get(database: &impl KeyValueDatabase) -> Result> { let tx = database.begin_read_transaction()?; let Some(operations) = database.get( @@ -148,10 +152,10 @@ impl BackingStorage else { return Ok(Vec::new()); }; - let operations = POT_CONFIG.deserialize(operations.borrow())?; + let operations = deserialize_with_good_error(operations.borrow())?; Ok(operations) } - get(&self.database).unwrap_or_default() + get(&self.database).context("Unable to read uncompleted operations from database") } fn serialize(task: TaskId, data: &Vec) -> Result> { @@ -360,7 +364,7 @@ impl BackingStorage &self, tx: Option<&T::ReadTransaction<'_>>, task_type: &CachedTaskType, - ) -> Option { + ) -> Result> { fn lookup( database: &D, tx: &D::ReadTransaction<'_>, @@ -377,20 +381,17 @@ impl BackingStorage if self.database.is_empty() { // Checking if the database is empty is a performance optimization // to avoid serializing the task type. - return None; + return Ok(None); } - let id = self - .with_tx(tx, |tx| lookup(&self.database, tx, task_type)) - .inspect_err(|err| println!("Looking up task id for {task_type:?} failed: {err:?}")) - .ok()??; - Some(id) + self.with_tx(tx, |tx| lookup(&self.database, tx, task_type)) + .with_context(|| format!("Looking up task id for {task_type:?} from database failed")) } unsafe fn reverse_lookup_task_cache( &self, tx: Option<&T::ReadTransaction<'_>>, task_id: TaskId, - ) -> Option> { + ) -> Result>> { fn lookup( database: &D, tx: &D::ReadTransaction<'_>, @@ -404,13 +405,10 @@ impl BackingStorage else { return Ok(None); }; - Ok(Some(POT_CONFIG.deserialize(bytes.borrow())?)) + Ok(Some(deserialize_with_good_error(bytes.borrow())?)) } - let result = self - .with_tx(tx, |tx| lookup(&self.database, tx, task_id)) - .inspect_err(|err| println!("Looking up task type for {task_id} failed: {err:?}")) - .ok()??; - Some(result) + self.with_tx(tx, |tx| lookup(&self.database, tx, task_id)) + .with_context(|| format!("Looking up task type for {task_id} from database failed")) } unsafe fn lookup_data( @@ -418,7 +416,7 @@ impl BackingStorage tx: Option<&T::ReadTransaction<'_>>, task_id: TaskId, category: TaskDataCategory, - ) -> Vec { + ) -> Result> { fn lookup( database: &D, tx: &D::ReadTransaction<'_>, @@ -437,12 +435,11 @@ impl BackingStorage else { return Ok(Vec::new()); }; - let result: Vec = POT_CONFIG.deserialize(bytes.borrow())?; + let result: Vec = deserialize_with_good_error(bytes.borrow())?; Ok(result) } self.with_tx(tx, |tx| lookup(&self.database, tx, task_id, category)) - .inspect_err(|err| println!("Looking up data for {task_id} failed: {err:?}")) - .unwrap_or_default() + .with_context(|| format!("Looking up data for {task_id} from database failed")) } fn shutdown(&self) -> Result<()> { @@ -647,3 +644,15 @@ fn serialize(task: TaskId, data: &Vec) -> Result>(data: &'de [u8]) -> Result { + match POT_CONFIG.deserialize(data) { + Ok(value) => Ok(value), + Err(error) => serde_path_to_error::deserialize::<'_, _, T>( + &mut pot_de_symbol_list().deserializer_for_slice(data)?, + ) + .map_err(anyhow::Error::from) + .and(Err(error.into())) + .context("Deserialization failed"), + } +}