Skip to content

Turbopack: improve error handling when DB read/deserialization fails #79545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,13 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
Self {
options,
start_time: Instant::now(),
session_id: backing_storage.next_session_id(),
session_id: backing_storage
.next_session_id()
.expect("Failed get session id"),
persisted_task_id_factory: IdFactoryWithReuse::new(
backing_storage.next_free_task_id(),
backing_storage
.next_free_task_id()
.expect("Failed to get task id"),
TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
),
transient_task_id_factory: IdFactoryWithReuse::new(
Expand Down Expand Up @@ -780,6 +784,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
if let Some(task_type) = unsafe {
self.backing_storage
.reverse_lookup_task_cache(None, task_id)
.expect("Failed to lookup task type")
} {
let _ = self.task_cache.try_insert(task_type.clone(), task_id);
return Some(task_type);
Expand Down Expand Up @@ -969,7 +974,10 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// Continue all uncompleted operations
// They can't be interrupted by a snapshot since the snapshotting job has not been
// scheduled yet.
let uncompleted_operations = self.backing_storage.uncompleted_operations();
let uncompleted_operations = self
.backing_storage
.uncompleted_operations()
.expect("Failed to get uncompleted operations");
if !uncompleted_operations.is_empty() {
let mut ctx = self.execute_context(turbo_tasks);
for op in uncompleted_operations {
Expand Down Expand Up @@ -1057,6 +1065,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
if let Some(task_id) = unsafe {
self.backing_storage
.forward_lookup_task_cache(tx.as_ref(), &task_type)
.expect("Failed to lookup task id")
} {
let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
task_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,20 @@ where
category: TaskDataCategory,
) -> Vec<CachedDataItem> {
// Safety: `transaction` is a valid transaction from `self.backend.backing_storage`.
unsafe {
let result = unsafe {
self.backend
.backing_storage
.lookup_data(self.transaction(), task_id, category)
};
match result {
Ok(data) => data,
Err(e) => {
let task_name = self.backend.get_task_description(task_id);
panic!(
"Failed to restore task data (corrupted database or bug): {:?}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we instruct the user to delete the cache and report an issue?

bgw referenced a plan to automatically invalidate the db at this point... add a reference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

He is probably doing that on any panic...

e.context(format!("{category:?} for {task_name} ({task_id}))"))
)
}
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions turbopack/crates/turbo-tasks-backend/src/backing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ pub trait BackingStorage: 'static + Send + Sync {
fn lower_read_transaction<'l: 'i + 'r, 'i: 'r, 'r>(
tx: &'r Self::ReadTransaction<'l>,
) -> &'r Self::ReadTransaction<'i>;
fn next_free_task_id(&self) -> TaskId;
fn next_session_id(&self) -> SessionId;
fn uncompleted_operations(&self) -> Vec<AnyOperation>;
fn next_free_task_id(&self) -> Result<TaskId>;
fn next_session_id(&self) -> Result<SessionId>;
fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>>;
#[allow(clippy::ptr_arg)]
fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>>;
fn save_snapshot<I>(
Expand All @@ -44,15 +44,15 @@ pub trait BackingStorage: 'static + Send + Sync {
&self,
tx: Option<&Self::ReadTransaction<'_>>,
key: &CachedTaskType,
) -> Option<TaskId>;
) -> Result<Option<TaskId>>;
/// # Safety
///
/// `tx` must be a transaction from this BackingStorage instance.
unsafe fn reverse_lookup_task_cache(
&self,
tx: Option<&Self::ReadTransaction<'_>>,
task_id: TaskId,
) -> Option<Arc<CachedTaskType>>;
) -> Result<Option<Arc<CachedTaskType>>>;
/// # Safety
///
/// `tx` must be a transaction from this BackingStorage instance.
Expand All @@ -61,7 +61,7 @@ pub trait BackingStorage: 'static + Send + Sync {
tx: Option<&Self::ReadTransaction<'_>>,
task_id: TaskId,
category: TaskDataCategory,
) -> Vec<CachedDataItem>;
) -> Result<Vec<CachedDataItem>>;

fn shutdown(&self) -> Result<()> {
Ok(())
Expand Down
83 changes: 46 additions & 37 deletions turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{borrow::Borrow, cmp::max, sync::Arc};

use anyhow::{Context, Result, anyhow};
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use tracing::Span;
use turbo_tasks::{SessionId, TaskId, backend::CachedTaskType, turbo_tasks_scope};
Expand Down Expand Up @@ -53,7 +53,6 @@ fn pot_ser_symbol_map() -> pot::ser::SymbolMap {
pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4)
}

#[cfg(feature = "verify_serialization")]
fn pot_de_symbol_list<'l>() -> pot::de::SymbolList<'l> {
pot::de::SymbolList::new()
}
Expand Down Expand Up @@ -106,14 +105,12 @@ impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
}
}

fn get_infra_u32(database: &impl KeyValueDatabase, key: u32) -> Option<u32> {
let tx = database.begin_read_transaction().ok()?;
let value = database
.get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())
.ok()?
.map(as_u32)?
.ok()?;
Some(value)
fn get_infra_u32(database: &impl KeyValueDatabase, key: u32) -> Result<Option<u32>> {
let tx = database.begin_read_transaction()?;
database
.get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
.map(as_u32)
.transpose()
}

impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
Expand All @@ -127,17 +124,24 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
T::lower_read_transaction(tx)
}

fn next_free_task_id(&self) -> TaskId {
TaskId::try_from(get_infra_u32(&self.database, META_KEY_NEXT_FREE_TASK_ID).unwrap_or(1))
.unwrap()
fn next_free_task_id(&self) -> Result<TaskId> {
Ok(TaskId::try_from(
get_infra_u32(&self.database, META_KEY_NEXT_FREE_TASK_ID)
.context("Unable to read next free task id from database")?
.unwrap_or(1),
)?)
}

fn next_session_id(&self) -> SessionId {
SessionId::try_from(get_infra_u32(&self.database, META_KEY_SESSION_ID).unwrap_or(0) + 1)
.unwrap()
fn next_session_id(&self) -> Result<SessionId> {
Ok(SessionId::try_from(
get_infra_u32(&self.database, META_KEY_SESSION_ID)
.context("Unable to read session id from database")?
.unwrap_or(0)
+ 1,
)?)
}

fn uncompleted_operations(&self) -> Vec<AnyOperation> {
fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
let tx = database.begin_read_transaction()?;
let Some(operations) = database.get(
Expand All @@ -148,10 +152,10 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
else {
return Ok(Vec::new());
};
let operations = POT_CONFIG.deserialize(operations.borrow())?;
let operations = deserialize_with_good_error(operations.borrow())?;
Ok(operations)
}
get(&self.database).unwrap_or_default()
get(&self.database).context("Unable to read uncompleted operations from database")
}

fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
Expand Down Expand Up @@ -360,7 +364,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
&self,
tx: Option<&T::ReadTransaction<'_>>,
task_type: &CachedTaskType,
) -> Option<TaskId> {
) -> Result<Option<TaskId>> {
fn lookup<D: KeyValueDatabase>(
database: &D,
tx: &D::ReadTransaction<'_>,
Expand All @@ -377,20 +381,17 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
if self.database.is_empty() {
// Checking if the database is empty is a performance optimization
// to avoid serializing the task type.
return None;
return Ok(None);
}
let id = self
.with_tx(tx, |tx| lookup(&self.database, tx, task_type))
.inspect_err(|err| println!("Looking up task id for {task_type:?} failed: {err:?}"))
.ok()??;
Some(id)
self.with_tx(tx, |tx| lookup(&self.database, tx, task_type))
.with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
}

unsafe fn reverse_lookup_task_cache(
&self,
tx: Option<&T::ReadTransaction<'_>>,
task_id: TaskId,
) -> Option<Arc<CachedTaskType>> {
) -> Result<Option<Arc<CachedTaskType>>> {
fn lookup<D: KeyValueDatabase>(
database: &D,
tx: &D::ReadTransaction<'_>,
Expand All @@ -404,21 +405,18 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
else {
return Ok(None);
};
Ok(Some(POT_CONFIG.deserialize(bytes.borrow())?))
Ok(Some(deserialize_with_good_error(bytes.borrow())?))
}
let result = self
.with_tx(tx, |tx| lookup(&self.database, tx, task_id))
.inspect_err(|err| println!("Looking up task type for {task_id} failed: {err:?}"))
.ok()??;
Some(result)
self.with_tx(tx, |tx| lookup(&self.database, tx, task_id))
.with_context(|| format!("Looking up task type for {task_id} from database failed"))
}

unsafe fn lookup_data(
&self,
tx: Option<&T::ReadTransaction<'_>>,
task_id: TaskId,
category: TaskDataCategory,
) -> Vec<CachedDataItem> {
) -> Result<Vec<CachedDataItem>> {
fn lookup<D: KeyValueDatabase>(
database: &D,
tx: &D::ReadTransaction<'_>,
Expand All @@ -437,12 +435,11 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
else {
return Ok(Vec::new());
};
let result: Vec<CachedDataItem> = POT_CONFIG.deserialize(bytes.borrow())?;
let result: Vec<CachedDataItem> = deserialize_with_good_error(bytes.borrow())?;
Ok(result)
}
self.with_tx(tx, |tx| lookup(&self.database, tx, task_id, category))
.inspect_err(|err| println!("Looking up data for {task_id} failed: {err:?}"))
.unwrap_or_default()
.with_context(|| format!("Looking up data for {task_id} from database failed"))
}

fn shutdown(&self) -> Result<()> {
Expand Down Expand Up @@ -647,3 +644,15 @@ fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 1
}
})
}

fn deserialize_with_good_error<'de, T: Deserialize<'de>>(data: &'de [u8]) -> Result<T> {
match POT_CONFIG.deserialize(data) {
Ok(value) => Ok(value),
Err(error) => serde_path_to_error::deserialize::<'_, _, T>(
&mut pot_de_symbol_list().deserializer_for_slice(data)?,
)
.map_err(anyhow::Error::from)
.and(Err(error.into()))
.context("Deserialization failed"),
}
}
Loading