From 63d4874a9a5431d17c4c119ff11bba5da4c0cd52 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Thu, 6 Nov 2025 18:20:48 +0500 Subject: [PATCH 1/7] implement persistence --- Cargo.lock | 3 + Cargo.toml | 7 + migrations/sqlite/01_init_schema.sql | 68 ++ src/error.rs | 12 + src/input/l1.rs | 13 +- src/input/l1/testing.rs | 2 + src/main.rs | 15 +- src/persistence/sql.rs | 984 ++++++++++++++++++++++++++- src/types/common.rs | 11 + 9 files changed, 1089 insertions(+), 26 deletions(-) create mode 100644 migrations/sqlite/01_init_schema.sql diff --git a/Cargo.lock b/Cargo.lock index d558ce0..7f2237a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9985,6 +9985,7 @@ name = "staking-ui-service" version = "0.1.0" dependencies = [ "alloy", + "anyhow", "async-lock 3.4.1", "bitvec", "clap 4.5.51", @@ -10004,6 +10005,8 @@ dependencies = [ "portpicker", "rand 0.8.5", "serde", + "serde_json", + "sqlx", "staking-cli", "staking-ui-service", "surf-disco", diff --git a/Cargo.toml b/Cargo.toml index a7a45b6..5beb761 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ alloy = { version = "1.0", features = [ "json-rpc", "node-bindings", ] } +anyhow = "1" async-lock = "3.0" bitvec = "1" clap = { version = "4.4", features = ["derive", "env"] } @@ -36,6 +37,12 @@ im = { version = "15", features = ["serde"] } log-panics = "2" parking_lot = "0.12" serde = { version = "1.0", features = ["derive"] } +serde_json = "1" +sqlx = { version = "0.8", features = [ + "sqlite", + "runtime-tokio", + "migrate", +] } surf-disco = "0.9" tagged-base64 = "0.4" tide-disco = "0.9" diff --git a/migrations/sqlite/01_init_schema.sql b/migrations/sqlite/01_init_schema.sql new file mode 100644 index 0000000..d4223aa --- /dev/null +++ b/migrations/sqlite/01_init_schema.sql @@ -0,0 +1,68 @@ + +-- L1 Block tracking +-- only one row, updated for each finalized block +CREATE TABLE l1_block ( + hash TEXT PRIMARY KEY, + number BIGINT NOT NULL UNIQUE, + parent_hash TEXT NOT NULL, + `timestamp` BIGINT NOT NULL, + exit_escrow_period BIGINT NOT NULL +); + +-- Full node Set +-- Stores the finalized node set according to L1 state +CREATE TABLE node ( + address TEXT PRIMARY KEY, + staking_key TEXT NOT NULL UNIQUE, + state_key TEXT NOT NULL UNIQUE, + commission REAL NOT NULL +); + +-- Wallet State +-- Stores the latest finalized state for each wallet +CREATE TABLE wallet ( + address TEXT PRIMARY KEY, + claimed_rewards TEXT NOT NULL +); + +-- Delegations, pending withdrawals, and exits +CREATE TABLE delegation ( + delegator TEXT NOT NULL REFERENCES wallet (address), + node TEXT NOT NULL, + -- Store as string to preserve precision for U256 + amount TEXT NOT NULL, + unlocks_at BIGINT NOT NULL, + -- Store as string to preserve precision for U256 + withdrawal_amount TEXT NOT NULL, + PRIMARY KEY (delegator, node) +); + +CREATE INDEX delegation_by_node ON delegation (node); +CREATE INDEX delegation_by_status ON delegation (delegator, unlocks_at, withdrawal_amount); + +-- -- Espresso Block tracking +-- -- Stores information about the current Espresso epoch +-- CREATE TABLE espresso_block ( +-- number BIGINT PRIMARY KEY, +-- view BIGINT NOT NULL, +-- epoch BIGINT NOT NULL, +-- epoch_first_block BIGINT NOT NULL +-- ); + +-- -- Active Validator Set +-- -- Stores statistics for validators active in the current epoch +-- CREATE TABLE active_node ( +-- staking_key TEXT PRIMARY KEY, +-- address TEXT NOT NULL UNIQUE, +-- votes INTEGER NOT NULL, +-- proposals INTEGER NOT NULL, +-- slots INTEGER NOT NULL +-- ); + +-- Rewards +-- Stores total accrued rewards for each account +CREATE TABLE lifetime_rewards ( + address TEXT PRIMARY KEY, + -- Store as string to preserve precision for U256 + amount TEXT NOT NULL +); diff --git a/src/error.rs b/src/error.rs index 9e9d39f..8cf93d7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -96,6 +96,18 @@ impl From for Error { } } +impl From for Error { + fn from(err: anyhow::Error) -> Self { + Self::internal().context(err) + } +} + +impl From for Error { + fn from(err: sqlx::Error) -> Self { + Self::internal().context(err) + } +} + pub type Result = core::result::Result; /// Extension functions for converting other result types into [`Result`]. diff --git a/src/input/l1.rs b/src/input/l1.rs index 25098c0..61876fc 100644 --- a/src/input/l1.rs +++ b/src/input/l1.rs @@ -13,6 +13,7 @@ use futures::stream::{Stream, StreamExt}; use hotshot_contract_adapter::sol_types::{ RewardClaim::RewardClaimEvents, StakeTableV2::StakeTableV2Events, }; +use hotshot_types::light_client::StateVerKey; use tracing::instrument; use crate::{ @@ -445,13 +446,13 @@ impl L1BlockSnapshot { #[derive(Clone, Debug, PartialEq)] pub struct Snapshot { /// The L1 block. - block: L1BlockSnapshot, + pub block: L1BlockSnapshot, /// The full node set as of this L1 block. - node_set: NodeSet, + pub node_set: NodeSet, /// The state of each wallet as of this L1 block. - wallets: Wallets, + pub wallets: Wallets, } impl Snapshot { @@ -530,6 +531,7 @@ impl Snapshot { let diff = FullNodeSetDiff::NodeUpdate(NodeSetEntry { address: ev.account, staking_key: PubKey::from(ev.blsVK).into(), + state_key: StateVerKey::from(ev.schnorrVK).into(), commission: Ratio::new( ev.commission.into(), COMMISSION_BASIS_POINTS.into(), @@ -545,6 +547,7 @@ impl Snapshot { let diff = FullNodeSetDiff::NodeUpdate(NodeSetEntry { address: ev.account, staking_key: PubKey::from(ev.blsVk).into(), + state_key: StateVerKey::from(ev.schnorrVk).into(), commission: Ratio::new( ev.commission.into(), COMMISSION_BASIS_POINTS.into(), @@ -657,7 +660,7 @@ impl Wallets { #[derive(Clone, Debug, Default, PartialEq)] pub struct Wallet { /// Nodes that this user is delegating to. - nodes: im::Vector, + pub nodes: im::Vector, /// Stake that has been undelegated but not yet withdrawn. pub pending_undelegations: im::Vector, @@ -1382,6 +1385,7 @@ mod test { let expected = NodeSetEntry { address: event.account, staking_key: PubKey::from(event.blsVK).into(), + state_key: StateVerKey::from(event.schnorrVK).into(), commission: Ratio::new(event.commission as usize, COMMISSION_BASIS_POINTS as usize), stake: Default::default(), }; @@ -1478,6 +1482,7 @@ mod test { FullNodeSetDiff::NodeUpdate(NodeSetEntry { address: node.account, staking_key: PubKey::from(node.blsVK).into(), + state_key: StateVerKey::from(node.schnorrVK).into(), stake: Default::default(), commission: Ratio::new( node.commission as usize, diff --git a/src/input/l1/testing.rs b/src/input/l1/testing.rs index 1ea5065..4b94dc2 100644 --- a/src/input/l1/testing.rs +++ b/src/input/l1/testing.rs @@ -220,9 +220,11 @@ pub fn block_snapshot(number: u64) -> L1BlockSnapshot { pub fn make_node(i: usize) -> NodeSetEntry { let address = Address::random(); let staking_key = TaggedBase64::new("KEY", &i.to_le_bytes()).unwrap(); + let state_key = TaggedBase64::new("STATEKEY", &i.to_le_bytes()).unwrap(); NodeSetEntry { address, staking_key, + state_key, stake: i.try_into().unwrap(), commission: Ratio::new(5, 100), } diff --git a/src/main.rs b/src/main.rs index a920115..fc99243 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, process::exit, sync::Arc}; +use std::{process::exit, sync::Arc}; use async_lock::RwLock; use clap::{Parser, ValueEnum}; @@ -36,9 +36,9 @@ struct Options { #[clap(flatten)] l1_options: L1ClientOptions, - /// Location for persistent storage. - #[clap(long, env = "ESPRESSO_STAKING_SERVICE_STORAGE")] - storage: PathBuf, + /// Persistence options. + #[clap(flatten)] + persistence: sql::PersistenceOptions, /// Port for the HTTP server. #[clap( @@ -65,7 +65,7 @@ impl Options { let genesis = Snapshot::empty(genesis_block); let l1_input = RpcStream::new(self.l1_options).await?; - let storage = sql::Persistence::new(&self.storage).await?; + let storage = sql::Persistence::new(&self.persistence).await?; // Create server state. let l1 = Arc::new(RwLock::new(l1::State::new(storage, genesis).await?)); @@ -161,7 +161,10 @@ mod test { ..Default::default() }, port, - storage: tmp.path().join("staking-ui-storage"), + persistence: sql::PersistenceOptions { + path: tmp.path().join("temp.db"), + max_connections: 5, + }, log_format: Some(LogFormat::Json), }; diff --git a/src/persistence/sql.rs b/src/persistence/sql.rs index 431badf..0321095 100644 --- a/src/persistence/sql.rs +++ b/src/persistence/sql.rs @@ -1,37 +1,989 @@ -//! SQL-based persistent storage. - +//! SQL-based persistent storage use crate::{ Result, - input::l1::{L1BlockSnapshot, L1Persistence, Snapshot}, - types::{common::Address, global::FullNodeSetDiff, wallet::WalletDiff}, + input::l1::{L1BlockSnapshot, L1Persistence, NodeSet, Snapshot, Wallet, Wallets}, + types::{ + common::{Address, Delegation, L1BlockId, NodeSetEntry, PendingWithdrawal, Ratio}, + global::FullNodeSetDiff, + wallet::WalletDiff, + }, }; -use std::path::Path; +use alloy::primitives::U256; +use anyhow::Context; +use clap::Parser; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; +use std::{collections::HashMap, path::PathBuf, str::FromStr}; +use tracing::instrument; + +/// Options for persistence. +#[derive(Parser, Clone, Debug)] +pub struct PersistenceOptions { + /// Path to the SQLite database file. + /// + /// If the file does not exist, it will be created. + /// The parent directory must exist. + #[clap(long, env = "ESPRESSO_STAKING_SERVICE_DB_PATH")] + pub path: PathBuf, + + /// Maximum number of connections in the connection pool. + #[clap( + long, + env = "ESPRESSO_STAKING_SERVICE_DB_MAX_CONNECTIONS", + default_value = "5" + )] + pub max_connections: u32, +} -#[derive(Debug)] -pub struct Persistence; +#[derive(Debug, Clone)] +pub struct Persistence { + pool: SqlitePool, +} + +#[derive(Default)] +struct DelegationEntry { + amount: U256, + unlocks_at: u64, + withdrawal_amount: U256, +} impl Persistence { - /// Create a new SQLite database at the given file location. - pub async fn new(_file: &Path) -> Result { - Ok(Self) + /// Create a new SQLite database with the given options. + pub async fn new(options: &PersistenceOptions) -> Result { + let connect_options = SqliteConnectOptions::from_str( + options + .path + .to_str() + .ok_or_else(|| anyhow::anyhow!("invalid path"))?, + )? + .create_if_missing(true); + + // Create connection pool + let pool = SqlitePoolOptions::new() + .max_connections(options.max_connections) + .connect_with(connect_options) + .await?; + + Self::run_migrations(&pool).await?; + + tracing::info!("SQLite persistence initialized successfully"); + Ok(Self { pool }) + } + + /// Run database migrations using SQLx's migration system + async fn run_migrations(pool: &SqlitePool) -> Result<()> { + tracing::warn!("running database migrations"); + + sqlx::migrate!("./migrations/sqlite") + .run(pool) + .await + .context("failed to run migrations")?; + + tracing::warn!("migrations completed"); + Ok(()) + } + + /// Load the finalized snapshot from the database. + /// + /// Returns `None` if no snapshot exists (i.e., database is empty). + /// Otherwise returns the complete state including: + /// - L1 block info (hash, number, timestamp, exit escrow period) + /// - Full node set (all registered validators) + /// - All wallet states (delegations, pending withdrawals, claimed rewards) + async fn load_finalized_snapshot(&self) -> Result> { + tracing::info!("loading finalized snapshot from database"); + + // The l1_block table has only one row, representing the latest finalized block + let block_row = sqlx::query_as::<_, (String, i64, String, i64, i64)>( + "SELECT hash, number, parent_hash, timestamp, exit_escrow_period FROM l1_block LIMIT 1", + ) + .fetch_optional(&self.pool) + .await?; + + let Some((hash, number, parent_hash, timestamp, exit_escrow_period)) = block_row else { + tracing::info!("no finalized snapshot found in database"); + return Ok(None); + }; + + let block = L1BlockSnapshot { + id: L1BlockId { + hash: hash.parse().context("failed to parse block hash")?, + number: number as u64, + parent: parent_hash.parse().context("failed to parse parent hash")?, + }, + timestamp: timestamp as u64, + exit_escrow_period: exit_escrow_period as u64, + }; + + // Load all registered nodes + let node_rows = sqlx::query_as::<_, (String, String, String, f64)>( + "SELECT address, staking_key, state_key, commission FROM node", + ) + .fetch_all(&self.pool) + .await?; + + let mut node_set = NodeSet::default(); + for (address, staking_key, state_key, commission) in node_rows { + let node = NodeSetEntry { + address: address.parse().context("failed to parse node address")?, + staking_key: staking_key.parse().context("failed to parse staking key")?, + state_key: state_key.parse().context("failed to parse state key")?, + stake: U256::ZERO, // Stake is computed from delegations, not stored + commission: Ratio::from_f32(commission as f32), + }; + node_set.push(node); + } + + // Load all wallets and their delegations + let wallet_rows = + sqlx::query_as::<_, (String, String)>("SELECT address, claimed_rewards FROM wallet") + .fetch_all(&self.pool) + .await?; + + let mut wallets = Wallets::default(); + for (wallet_address, claimed_rewards_str) in wallet_rows { + let address: Address = wallet_address + .parse() + .context("failed to parse wallet address")?; + let claimed_rewards = + U256::from_str(&claimed_rewards_str).context("failed to parse claimed rewards")?; + + let delegation_rows = sqlx::query_as::<_, (String, String, i64, String)>( + "SELECT node, amount, unlocks_at, withdrawal_amount + FROM delegation + WHERE delegator = $1 + ORDER BY unlocks_at ASC, withdrawal_amount ASC", + ) + .bind(&wallet_address) + .fetch_all(&self.pool) + .await?; + + let mut nodes = im::Vector::new(); + let mut pending_undelegations = im::Vector::new(); + let mut pending_exits = im::Vector::new(); + + // We use unlocks_at and withdrawal_amount to determine the delegation status + for (node_str, amount_str, unlocks_at, withdrawal_amount_str) in delegation_rows { + let node: Address = node_str.parse().context("failed to parse node address")?; + let amount = + U256::from_str(&amount_str).context("failed to parse delegation amount")?; + let withdrawal_amount = U256::from_str(&withdrawal_amount_str) + .context("failed to parse withdrawal amount")?; + if withdrawal_amount.is_zero() && unlocks_at != 0 { + // Node exit + pending_exits.push_back(PendingWithdrawal { + delegator: address, + node, + amount, + available_time: unlocks_at as u64, + }); + } else if unlocks_at != 0 && !withdrawal_amount.is_zero() { + // Partial undelegation + if !amount.is_zero() { + nodes.push_back(Delegation { + delegator: address, + node, + amount, + }); + } + pending_undelegations.push_back(PendingWithdrawal { + delegator: address, + node, + amount: withdrawal_amount, + available_time: unlocks_at as u64, + }); + } else if !amount.is_zero() { + nodes.push_back(Delegation { + delegator: address, + node, + amount, + }); + } + } + + let wallet = Wallet { + nodes, + pending_undelegations, + pending_exits, + claimed_rewards, + }; + wallets.insert(address, wallet); + } + + let snapshot = Snapshot { + block, + node_set, + wallets, + }; + + tracing::info!(block = ?snapshot.block, "loaded finalized snapshot"); + Ok(Some(snapshot)) + } + + async fn save_snapshot(&self, snapshot: &Snapshot) -> Result<()> { + tracing::info!(block = ?snapshot.block, "saving snapshot"); + + let mut tx = self.pool.begin().await?; + sqlx::query("DELETE FROM delegation") + .execute(&mut *tx) + .await?; + sqlx::query("DELETE FROM wallet").execute(&mut *tx).await?; + sqlx::query("DELETE FROM node").execute(&mut *tx).await?; + sqlx::query("DELETE FROM l1_block") + .execute(&mut *tx) + .await?; + + // Insert new block info + sqlx::query( + "INSERT INTO l1_block (hash, number, parent_hash, timestamp, exit_escrow_period) + VALUES ($1, $2, $3, $4, $5)", + ) + .bind(snapshot.block.id().hash.to_string()) + .bind(snapshot.block.id().number as i64) + .bind(snapshot.block.id().parent.to_string()) + .bind(snapshot.block.timestamp() as i64) + .bind(snapshot.block.exit_escrow_period as i64) + .execute(&mut *tx) + .await?; + + if !snapshot.node_set.is_empty() { + let mut query_builder = sqlx::QueryBuilder::new( + "INSERT INTO node (address, staking_key, state_key, commission) ", + ); + + query_builder.push_values(snapshot.node_set.iter(), |mut b, (_address, node)| { + b.push_bind(node.address.to_string()) + .push_bind(node.staking_key.to_string()) + .push_bind(node.state_key.to_string()) + .push_bind(node.commission.as_f32() as f64); + }); + + query_builder.build().execute(&mut *tx).await?; + } + + if !snapshot.wallets.is_empty() { + let mut wallet_builder = + sqlx::QueryBuilder::new("INSERT INTO wallet (address, claimed_rewards) "); + + wallet_builder.push_values(snapshot.wallets.iter(), |mut b, (address, wallet)| { + b.push_bind(address.to_string()) + .push_bind(wallet.claimed_rewards.to_string()); + }); + + wallet_builder.build().execute(&mut *tx).await?; + } + + // Collect all delegations into a single batch + let mut all_delegations = Vec::new(); + + for (address, wallet) in snapshot.wallets.iter() { + let mut delegation_map: HashMap = HashMap::new(); + + // Add active delegations + for delegation in &wallet.nodes { + delegation_map.insert( + delegation.node, + DelegationEntry { + amount: delegation.amount, + ..Default::default() + }, + ); + } + + for withdrawal in &wallet.pending_undelegations { + let entry = delegation_map.get_mut(&withdrawal.node).ok_or_else(|| { + anyhow::anyhow!( + "Invalid state: pending undelegation for node {} but no delegation exists for wallet {address}", + withdrawal.node + ) + })?; + entry.unlocks_at = withdrawal.available_time; + entry.withdrawal_amount = withdrawal.amount; + entry.amount = entry.amount.checked_sub(withdrawal.amount).ok_or_else(|| { + anyhow::anyhow!( + "Underflow: withdrawal amount {} exceeds delegated amount {}", + withdrawal.amount, + entry.amount + ) + })?; + } + + for withdrawal in &wallet.pending_exits { + let entry = delegation_map.get_mut(&withdrawal.node).ok_or_else(|| { + anyhow::anyhow!( + "Invalid state: pending exit for node {} but no delegation exists for wallet {address}", + withdrawal.node + ) + })?; + entry.amount = withdrawal.amount; + entry.unlocks_at = withdrawal.available_time; + entry.withdrawal_amount = U256::ZERO; + } + + for (node, state) in delegation_map { + all_delegations.push((address, node, state)); + } + } + + // Batch insert all delegations + if !all_delegations.is_empty() { + let mut delegation_builder = sqlx::QueryBuilder::new( + "INSERT INTO delegation (delegator, node, amount, unlocks_at, withdrawal_amount) ", + ); + + delegation_builder.push_values( + all_delegations.iter(), + |mut b, (address, node, state)| { + b.push_bind(address.to_string()) + .push_bind(node.to_string()) + .push_bind(state.amount.to_string()) + .push_bind(state.unlocks_at as i64) + .push_bind(state.withdrawal_amount.to_string()); + }, + ); + + delegation_builder.build().execute(&mut *tx).await?; + } + + tx.commit().await?; + tracing::info!("snapshot saved"); + Ok(()) + } + + /// Apply a full node set diff to the database. + async fn apply_node_set_diff( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + diff: &FullNodeSetDiff, + ) -> Result<()> { + match diff { + FullNodeSetDiff::NodeUpdate(node) => { + sqlx::query( + "INSERT INTO node (address, staking_key, state_key, commission) + VALUES ($1, $2, $3, $4) + ON CONFLICT(address) DO UPDATE SET + staking_key = excluded.staking_key, + state_key = excluded.state_key, + commission = excluded.commission", + ) + .bind(node.address.to_string()) + .bind(node.staking_key.to_string()) + .bind(node.state_key.to_string()) + .bind(node.commission.as_f32() as f64) + .execute(&mut **tx) + .await?; + } + FullNodeSetDiff::NodeExit(exit) => { + let result = sqlx::query("DELETE FROM node WHERE address = $1") + .bind(exit.address.to_string()) + .execute(&mut **tx) + .await?; + + if result.rows_affected() != 1 { + return Err(anyhow::anyhow!( + "Expected to delete 1 node row, but {} were affected", + result.rows_affected() + )) + .map_err(Into::into); + } + } + } + Ok(()) + } + + /// Apply a wallet diff to the database. + async fn apply_wallet_diff( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + address: Address, + diff: &WalletDiff, + ) -> Result<()> { + sqlx::query( + "INSERT INTO wallet (address, claimed_rewards) + VALUES ($1, '0') + ON CONFLICT(address) DO NOTHING", + ) + .bind(address.to_string()) + .execute(&mut **tx) + .await?; + + match diff { + WalletDiff::ClaimedRewards(amount) => { + // Read current claimed rewards + let (claimed_rewards,) = sqlx::query_as::<_, (String,)>( + "SELECT claimed_rewards FROM wallet WHERE address = $1", + ) + .bind(address.to_string()) + .fetch_one(&mut **tx) + .await?; + + let current_amount = U256::from_str(&claimed_rewards).unwrap_or(U256::ZERO); + let new_claimed_rewards = current_amount + .checked_add(U256::from(*amount)) + .ok_or_else(|| { + anyhow::anyhow!( + "Overflow: adding {amount} to claimed rewards {current_amount}" + ) + })?; + + // Update claimed rewards with the computed value + let result = sqlx::query( + "UPDATE wallet + SET claimed_rewards = $1 + WHERE address = $2", + ) + .bind(new_claimed_rewards.to_string()) + .bind(address.to_string()) + .execute(&mut **tx) + .await?; + + if result.rows_affected() != 1 { + return Err(anyhow::anyhow!( + "Expected to update 1 wallet row, but {} were affected", + result.rows_affected() + )) + .map_err(Into::into); + } + } + WalletDiff::DelegatedToNode(delegation) => { + // Check if delegation exists + let existing = sqlx::query_as::<_, (String,)>( + "SELECT amount FROM delegation WHERE delegator = $1 AND node = $2", + ) + .bind(delegation.delegator.to_string()) + .bind(delegation.node.to_string()) + .fetch_optional(&mut **tx) + .await?; + + let new_amount = if let Some((amount,)) = existing { + let current_amount = U256::from_str(&amount).unwrap_or(U256::ZERO); + current_amount.checked_add(delegation.amount) + .ok_or_else(|| anyhow::anyhow!( + "Overflow: adding delegation {} to existing amount {current_amount}", + delegation.amount + ))? + } else { + delegation.amount + }; + + // upsert + sqlx::query( + "INSERT INTO delegation (delegator, node, amount, unlocks_at, withdrawal_amount) + VALUES ($1, $2, $3, 0, '0') + ON CONFLICT(delegator, node) DO UPDATE SET + amount = excluded.amount", + ) + .bind(delegation.delegator.to_string()) + .bind(delegation.node.to_string()) + .bind(new_amount.to_string()) + .execute(&mut **tx) + .await?; + } + WalletDiff::UndelegatedFromNode(withdrawal) => { + // Read current amount to calculate new amount after undelegation + let (amount,) = sqlx::query_as::<_, (String,)>( + "SELECT amount FROM delegation WHERE delegator = $1 AND node = $2", + ) + .bind(withdrawal.delegator.to_string()) + .bind(withdrawal.node.to_string()) + .fetch_one(&mut **tx) + .await?; + + let current_amount = U256::from_str(&amount).unwrap_or(U256::ZERO); + let new_amount = current_amount.checked_sub(withdrawal.amount).ok_or_else( + || { + anyhow::anyhow!( + "Underflow: withdrawal {} exceeds delegation amount {current_amount}", + withdrawal.amount + ) + }, + )?; + + // Update delegation: decrement amount and set unlocks_at and withdrawal_amount + let result = sqlx::query( + "UPDATE delegation + SET amount = $1, + unlocks_at = $2, + withdrawal_amount = $3 + WHERE delegator = $4 AND node = $5", + ) + .bind(new_amount.to_string()) + .bind(withdrawal.available_time as i64) + .bind(withdrawal.amount.to_string()) + .bind(withdrawal.delegator.to_string()) + .bind(withdrawal.node.to_string()) + .execute(&mut **tx) + .await?; + + if result.rows_affected() != 1 { + return Err(anyhow::anyhow!( + "Expected to update 1 delegation row, but {} were affected", + result.rows_affected() + )) + .map_err(Into::into); + } + } + WalletDiff::NodeExited(withdrawal) => { + // Mark all delegations to this node as exits + let result = sqlx::query( + "UPDATE delegation + SET unlocks_at = $1, withdrawal_amount = '0' + WHERE delegator = $2 AND node = $3", + ) + .bind(withdrawal.available_time as i64) + .bind(withdrawal.delegator.to_string()) + .bind(withdrawal.node.to_string()) + .execute(&mut **tx) + .await?; + + if result.rows_affected() != 1 { + return Err(anyhow::anyhow!( + "Expected to update 1 delegation row, but {} were affected", + result.rows_affected() + )) + .map_err(Into::into); + } + } + WalletDiff::UndelegationWithdrawal(withdrawal) => { + let result = sqlx::query( + "UPDATE delegation + SET unlocks_at = 0, + withdrawal_amount = '0' + WHERE delegator = $1 AND node = $2", + ) + .bind(withdrawal.delegator.to_string()) + .bind(withdrawal.node.to_string()) + .execute(&mut **tx) + .await?; + + if result.rows_affected() != 1 { + return Err(anyhow::anyhow!( + "Expected to update 1 delegation row, but {} were affected", + result.rows_affected() + )) + .map_err(Into::into); + } + } + WalletDiff::NodeExitWithdrawal(withdrawal) => { + let result = sqlx::query( + "DELETE FROM delegation + WHERE delegator = $1 AND node = $2", + ) + .bind(withdrawal.delegator.to_string()) + .bind(withdrawal.node.to_string()) + .execute(&mut **tx) + .await?; + + if result.rows_affected() != 1 { + return Err(anyhow::anyhow!( + "Expected to delete 1 delegation row, but {} were affected", + result.rows_affected() + )) + .map_err(Into::into); + } + } + } + Ok(()) } } impl L1Persistence for Persistence { async fn finalized_snapshot(&self) -> Result> { - Ok(None) + self.load_finalized_snapshot().await } - async fn save_genesis(&self, _snapshot: Snapshot) -> Result<()> { - Ok(()) + async fn save_genesis(&self, snapshot: Snapshot) -> Result<()> { + self.save_snapshot(&snapshot).await } + #[instrument(skip(self, node_set_diff, wallets_diff))] async fn apply_events( &self, - _block: L1BlockSnapshot, - _node_set_diff: impl IntoIterator + Send, - _wallets_diff: impl IntoIterator + Send, + block: L1BlockSnapshot, + node_set_diff: impl IntoIterator + Send, + wallets_diff: impl IntoIterator + Send, ) -> Result<()> { + tracing::debug!(block_number = block.number(), "applying events to database"); + + // Collect iterators to ensure Send safety + let node_set_diff: Vec<_> = node_set_diff.into_iter().collect(); + let wallets_diff: Vec<_> = wallets_diff.into_iter().collect(); + + let mut tx = self.pool.begin().await?; + + // Update L1 block info + sqlx::query( + "UPDATE l1_block SET + hash = $1, + number = $2, + parent_hash = $3, + timestamp = $4, + exit_escrow_period = $5", + ) + .bind(block.hash().to_string()) + .bind(block.number() as i64) + .bind(block.parent().to_string()) + .bind(block.timestamp() as i64) + .bind(block.exit_escrow_period as i64) + .execute(&mut *tx) + .await?; + + // Apply node set diffs + for diff in node_set_diff { + self.apply_node_set_diff(&mut tx, &diff).await?; + } + + // Apply wallet diffs + for (address, diff) in wallets_diff { + self.apply_wallet_diff(&mut tx, address, &diff).await?; + } + + tx.commit().await?; + tracing::debug!("events applied successfully"); Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::input::l1::testing::{block_snapshot, make_node, validator_registered_event}; + use crate::input::l1::{NodeSet, Wallet, Wallets}; + use crate::types::common::{ESPTokenAmount, NodeExit, NodeSetEntry, PendingWithdrawal}; + use crate::types::global::FullNodeSetDiff; + use espresso_types::{PubKey, v0_3::COMMISSION_BASIS_POINTS}; + use hotshot_types::light_client::StateVerKey; + use rand::SeedableRng; + use rand::rngs::StdRng; + use tempfile::TempDir; + + #[tokio::test] + async fn test_snapshot_save_apply_load() { + let temp_dir = TempDir::new().unwrap(); + let db_path = temp_dir.path().join("test.db"); + + let options = PersistenceOptions { + path: db_path, + max_connections: 5, + }; + + let persistence = Persistence::new(&options).await.unwrap(); + + let node1 = make_node(1); + let node2 = make_node(2); + let node3 = make_node(3); + let node4 = make_node(4); + + let mut initial_node_set = NodeSet::default(); + initial_node_set.apply(&FullNodeSetDiff::NodeUpdate(node1.clone())); + initial_node_set.apply(&FullNodeSetDiff::NodeUpdate(node2.clone())); + initial_node_set.apply(&FullNodeSetDiff::NodeUpdate(node3.clone())); + + let delegator1 = Address::random(); + let delegator2 = Address::random(); + let delegator3 = Address::random(); + + let mut initial_wallets = Wallets::default(); + + let wallet1 = Wallet { + nodes: im::Vector::from(vec![ + Delegation { + delegator: delegator1, + node: node1.address, + amount: U256::from(5000000u64), + }, + Delegation { + delegator: delegator1, + node: node2.address, + amount: U256::from(3000000u64), + }, + ]), + pending_undelegations: im::Vector::new(), + pending_exits: im::Vector::new(), + claimed_rewards: U256::ZERO, + }; + initial_wallets.insert(delegator1, wallet1); + + let wallet2 = Wallet { + nodes: im::Vector::from(vec![ + Delegation { + delegator: delegator2, + node: node1.address, + amount: U256::from(500000u64), + }, + Delegation { + delegator: delegator2, + node: node2.address, + amount: U256::from(10000000u64), + }, + Delegation { + delegator: delegator2, + node: node3.address, + amount: U256::from(2000000u64), + }, + ]), + pending_undelegations: im::Vector::from(vec![PendingWithdrawal { + delegator: delegator2, + node: node1.address, + amount: U256::from(500000u64), + available_time: 900, + }]), + pending_exits: im::Vector::new(), + claimed_rewards: U256::ZERO, + }; + initial_wallets.insert(delegator2, wallet2); + + let initial_snapshot = Snapshot::new( + block_snapshot(100), + initial_node_set.clone(), + initial_wallets.clone(), + ); + + persistence + .save_genesis(initial_snapshot.clone()) + .await + .unwrap(); + + let loaded_genesis = persistence + .load_finalized_snapshot() + .await + .unwrap() + .unwrap(); + assert_eq!(loaded_genesis.block.number(), 100); + assert_eq!(loaded_genesis.node_set.len(), 3); + assert!( + loaded_genesis + .node_set + .iter() + .any(|(_, n)| n.address == node1.address) + ); + assert!( + loaded_genesis + .node_set + .iter() + .any(|(_, n)| n.address == node2.address) + ); + assert!( + loaded_genesis + .node_set + .iter() + .any(|(_, n)| n.address == node3.address) + ); + assert_eq!(loaded_genesis.wallets.len(), 2); + let loaded_wallet1 = loaded_genesis.wallets.get(&delegator1).unwrap(); + assert_eq!(loaded_wallet1.nodes.len(), 2); + assert_eq!(loaded_wallet1.pending_exits.len(), 0); + assert_eq!(loaded_wallet1.claimed_rewards, U256::ZERO); + let loaded_wallet2 = loaded_genesis.wallets.get(&delegator2).unwrap(); + assert_eq!(loaded_wallet2.nodes.len(), 2); + assert!(loaded_wallet2.nodes.iter().any(|d| d.node == node2.address)); + assert!(loaded_wallet2.nodes.iter().any(|d| d.node == node3.address)); + assert!(!loaded_wallet2.nodes.iter().any(|d| d.node == node1.address)); + assert_eq!(loaded_wallet2.pending_undelegations.len(), 1); + assert_eq!( + loaded_wallet2.pending_undelegations[0].amount, + ESPTokenAmount::from(U256::from(500000u64)) + ); + assert_eq!(loaded_wallet2.pending_exits.len(), 0); + assert_eq!(loaded_wallet2.claimed_rewards, U256::ZERO); + + let mut rng = StdRng::from_seed([42; 32]); + let validator_event = validator_registered_event(&mut rng); + let node5 = NodeSetEntry { + address: validator_event.account, + staking_key: PubKey::from(validator_event.blsVK).into(), + state_key: StateVerKey::from(validator_event.schnorrVK).into(), + commission: Ratio::new( + validator_event.commission.into(), + COMMISSION_BASIS_POINTS.into(), + ), + stake: U256::ZERO, + }; + + let node_set_diffs = vec![ + FullNodeSetDiff::NodeUpdate(node4.clone()), + FullNodeSetDiff::NodeUpdate(node5.clone()), + FullNodeSetDiff::NodeExit(NodeExit { + address: node2.address, + exit_time: 1200, + }), + ]; + + let wallet_diffs = vec![ + ( + delegator1, + WalletDiff::UndelegatedFromNode(PendingWithdrawal { + delegator: delegator1, + node: node1.address, + amount: U256::from(1000000u64), + available_time: 1100, + }), + ), + ( + delegator1, + WalletDiff::NodeExited(PendingWithdrawal { + delegator: delegator1, + node: node2.address, + amount: U256::from(3000000u64), + available_time: 1200, + }), + ), + ( + delegator2, + WalletDiff::NodeExited(PendingWithdrawal { + delegator: delegator2, + node: node2.address, + amount: U256::from(10000000u64), + available_time: 1200, + }), + ), + (delegator1, WalletDiff::ClaimedRewards(1500u64)), + (delegator2, WalletDiff::ClaimedRewards(750u64)), + ( + delegator3, + WalletDiff::DelegatedToNode(Delegation { + delegator: delegator3, + node: node5.address, + amount: U256::from(15000000u64), + }), + ), + ( + delegator3, + WalletDiff::DelegatedToNode(Delegation { + delegator: delegator3, + node: node4.address, + amount: U256::from(8000000u64), + }), + ), + ]; + + let updated_block = block_snapshot(101); + + persistence + .apply_events(updated_block, node_set_diffs, wallet_diffs) + .await + .unwrap(); + + let loaded_snapshot = persistence + .load_finalized_snapshot() + .await + .unwrap() + .unwrap(); + + assert_eq!(loaded_snapshot.block, updated_block); + assert_eq!(loaded_snapshot.node_set.len(), 4); + + assert!( + loaded_snapshot + .node_set + .iter() + .any(|(_, n)| n.address == node1.address) + ); + assert!( + !loaded_snapshot + .node_set + .iter() + .any(|(_, n)| n.address == node2.address) + ); + assert!( + loaded_snapshot + .node_set + .iter() + .any(|(_, n)| n.address == node3.address) + ); + assert!( + loaded_snapshot + .node_set + .iter() + .any(|(_, n)| n.address == node4.address) + ); + assert!( + loaded_snapshot + .node_set + .iter() + .any(|(_, n)| n.address == node5.address) + ); + + let wallet1 = loaded_snapshot + .wallets + .get(&delegator1) + .expect("wallet1 should exist"); + assert_eq!(wallet1.claimed_rewards, U256::from(1500u64)); + assert_eq!(wallet1.nodes.len(), 1); + assert_eq!(wallet1.nodes[0].node, node1.address); + assert_eq!( + wallet1.nodes[0].amount, + ESPTokenAmount::from(U256::from(4000000u64)) + ); + assert_eq!(wallet1.pending_undelegations.len(), 1); + assert_eq!( + wallet1.pending_undelegations[0].amount, + ESPTokenAmount::from(U256::from(1000000u64)) + ); + assert_eq!(wallet1.pending_exits.len(), 1); + assert_eq!(wallet1.pending_exits[0].node, node2.address); + assert_eq!( + wallet1.pending_exits[0].amount, + ESPTokenAmount::from(U256::from(3000000u64)) + ); + + let wallet2 = loaded_snapshot + .wallets + .get(&delegator2) + .expect("wallet2 should exist"); + assert_eq!(wallet2.claimed_rewards, U256::from(750u64)); + assert_eq!(wallet2.nodes.len(), 1); + + assert!(!wallet2.nodes.iter().any(|d| d.node == node1.address)); + + let node3_delegation = wallet2 + .nodes + .iter() + .find(|d| d.node == node3.address) + .expect("should have delegation to node3"); + assert_eq!( + node3_delegation.amount, + ESPTokenAmount::from(U256::from(2000000u64)) + ); + + assert_eq!(wallet2.pending_undelegations.len(), 1); + assert_eq!(wallet2.pending_undelegations[0].node, node1.address); + assert_eq!( + wallet2.pending_undelegations[0].amount, + ESPTokenAmount::from(U256::from(500000u64)) + ); + + assert_eq!(wallet2.pending_exits.len(), 1); + assert_eq!(wallet2.pending_exits[0].node, node2.address); + assert_eq!( + wallet2.pending_exits[0].amount, + ESPTokenAmount::from(U256::from(10000000u64)) + ); + + let wallet3 = loaded_snapshot + .wallets + .get(&delegator3) + .expect("wallet3 should exist"); + assert_eq!(wallet3.claimed_rewards, U256::ZERO); + assert_eq!(wallet3.nodes.len(), 2); + let node5_delegation = wallet3 + .nodes + .iter() + .find(|d| d.node == node5.address) + .expect("should have delegation to node5"); + assert_eq!( + node5_delegation.amount, + ESPTokenAmount::from(U256::from(15000000u64)) + ); + let node4_delegation = wallet3 + .nodes + .iter() + .find(|d| d.node == node4.address) + .expect("should have delegation to node4"); + assert_eq!( + node4_delegation.amount, + ESPTokenAmount::from(U256::from(8000000u64)) + ); + assert_eq!(wallet3.pending_undelegations.len(), 0); + assert_eq!(wallet3.pending_exits.len(), 0); + } +} diff --git a/src/types/common.rs b/src/types/common.rs index 91a4bfe..f024071 100644 --- a/src/types/common.rs +++ b/src/types/common.rs @@ -19,6 +19,14 @@ impl Ratio { pub fn new(num: usize, den: usize) -> Self { Self((num as f32) / (den as f32)) } + + pub fn from_f32(value: f32) -> Self { + Self(value) + } + + pub fn as_f32(self) -> f32 { + self.0 + } } /// An entry in the full node set. @@ -30,6 +38,9 @@ pub struct NodeSetEntry { /// The key used for the node for signing consensus messages. pub staking_key: TaggedBase64, + /// state verifying key + pub state_key: TaggedBase64, + /// Total stake currently attributed to the node. pub stake: ESPTokenAmount, From 3cc700cf47e11a25e140964f748275bf462bf0c8 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Thu, 6 Nov 2025 18:25:29 +0500 Subject: [PATCH 2/7] cleanup --- src/persistence/sql.rs | 38 ++++++++++---------------------------- 1 file changed, 10 insertions(+), 28 deletions(-) diff --git a/src/persistence/sql.rs b/src/persistence/sql.rs index 0321095..4f85c5d 100644 --- a/src/persistence/sql.rs +++ b/src/persistence/sql.rs @@ -648,7 +648,7 @@ mod tests { use super::*; use crate::input::l1::testing::{block_snapshot, make_node, validator_registered_event}; use crate::input::l1::{NodeSet, Wallet, Wallets}; - use crate::types::common::{ESPTokenAmount, NodeExit, NodeSetEntry, PendingWithdrawal}; + use crate::types::common::{NodeExit, NodeSetEntry, PendingWithdrawal}; use crate::types::global::FullNodeSetDiff; use espresso_types::{PubKey, v0_3::COMMISSION_BASIS_POINTS}; use hotshot_types::light_client::StateVerKey; @@ -781,7 +781,7 @@ mod tests { assert_eq!(loaded_wallet2.pending_undelegations.len(), 1); assert_eq!( loaded_wallet2.pending_undelegations[0].amount, - ESPTokenAmount::from(U256::from(500000u64)) + U256::from(500000u64) ); assert_eq!(loaded_wallet2.pending_exits.len(), 0); assert_eq!(loaded_wallet2.claimed_rewards, U256::ZERO); @@ -910,21 +910,15 @@ mod tests { assert_eq!(wallet1.claimed_rewards, U256::from(1500u64)); assert_eq!(wallet1.nodes.len(), 1); assert_eq!(wallet1.nodes[0].node, node1.address); - assert_eq!( - wallet1.nodes[0].amount, - ESPTokenAmount::from(U256::from(4000000u64)) - ); + assert_eq!(wallet1.nodes[0].amount, U256::from(4000000u64)); assert_eq!(wallet1.pending_undelegations.len(), 1); assert_eq!( wallet1.pending_undelegations[0].amount, - ESPTokenAmount::from(U256::from(1000000u64)) + U256::from(1000000u64) ); assert_eq!(wallet1.pending_exits.len(), 1); assert_eq!(wallet1.pending_exits[0].node, node2.address); - assert_eq!( - wallet1.pending_exits[0].amount, - ESPTokenAmount::from(U256::from(3000000u64)) - ); + assert_eq!(wallet1.pending_exits[0].amount, U256::from(3000000u64)); let wallet2 = loaded_snapshot .wallets @@ -940,24 +934,18 @@ mod tests { .iter() .find(|d| d.node == node3.address) .expect("should have delegation to node3"); - assert_eq!( - node3_delegation.amount, - ESPTokenAmount::from(U256::from(2000000u64)) - ); + assert_eq!(node3_delegation.amount, U256::from(2000000u64)); assert_eq!(wallet2.pending_undelegations.len(), 1); assert_eq!(wallet2.pending_undelegations[0].node, node1.address); assert_eq!( wallet2.pending_undelegations[0].amount, - ESPTokenAmount::from(U256::from(500000u64)) + U256::from(500000u64) ); assert_eq!(wallet2.pending_exits.len(), 1); assert_eq!(wallet2.pending_exits[0].node, node2.address); - assert_eq!( - wallet2.pending_exits[0].amount, - ESPTokenAmount::from(U256::from(10000000u64)) - ); + assert_eq!(wallet2.pending_exits[0].amount, U256::from(10000000u64)); let wallet3 = loaded_snapshot .wallets @@ -970,19 +958,13 @@ mod tests { .iter() .find(|d| d.node == node5.address) .expect("should have delegation to node5"); - assert_eq!( - node5_delegation.amount, - ESPTokenAmount::from(U256::from(15000000u64)) - ); + assert_eq!(node5_delegation.amount, U256::from(15000000u64)); let node4_delegation = wallet3 .nodes .iter() .find(|d| d.node == node4.address) .expect("should have delegation to node4"); - assert_eq!( - node4_delegation.amount, - ESPTokenAmount::from(U256::from(8000000u64)) - ); + assert_eq!(node4_delegation.amount, U256::from(8000000u64)); assert_eq!(wallet3.pending_undelegations.len(), 0); assert_eq!(wallet3.pending_exits.len(), 0); } From 1a633b4f5c24ec4c1947311017e0865fcd246647 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Thu, 6 Nov 2025 18:35:12 +0500 Subject: [PATCH 3/7] instrumentation --- src/persistence/sql.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/persistence/sql.rs b/src/persistence/sql.rs index 4f85c5d..ef03183 100644 --- a/src/persistence/sql.rs +++ b/src/persistence/sql.rs @@ -89,6 +89,7 @@ impl Persistence { /// - L1 block info (hash, number, timestamp, exit escrow period) /// - Full node set (all registered validators) /// - All wallet states (delegations, pending withdrawals, claimed rewards) + #[instrument(skip(self))] async fn load_finalized_snapshot(&self) -> Result> { tracing::info!("loading finalized snapshot from database"); @@ -219,6 +220,7 @@ impl Persistence { Ok(Some(snapshot)) } + #[instrument(skip(self, snapshot))] async fn save_snapshot(&self, snapshot: &Snapshot) -> Result<()> { tracing::info!(block = ?snapshot.block, "saving snapshot"); From baca3f6f35c6191300b38183be8fe25d84a40e31 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Thu, 6 Nov 2025 18:39:02 +0500 Subject: [PATCH 4/7] instrument apply_wallet_diff --- src/persistence/sql.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/persistence/sql.rs b/src/persistence/sql.rs index ef03183..8482b4b 100644 --- a/src/persistence/sql.rs +++ b/src/persistence/sql.rs @@ -393,6 +393,7 @@ impl Persistence { } /// Apply a wallet diff to the database. + #[instrument(skip(self, tx, diff))] async fn apply_wallet_diff( &self, tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, From 7fb2d6594a0850a1c820ef9260494ab8ad116392 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Fri, 7 Nov 2025 01:31:44 +0500 Subject: [PATCH 5/7] only store l1block in save_snapshot --- Cargo.lock | 18 +- src/persistence/sql.rs | 522 ++++++++++++++++++++++------------------- 2 files changed, 287 insertions(+), 253 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f2237a..a4f56e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3483,7 +3483,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.2", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3732,7 +3732,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -4686,11 +4686,11 @@ dependencies = [ [[package]] name = "home" -version = "0.5.12" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7212,7 +7212,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -8920,7 +8920,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -10409,7 +10409,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.2", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -11702,7 +11702,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/src/persistence/sql.rs b/src/persistence/sql.rs index 8482b4b..eb04fa3 100644 --- a/src/persistence/sql.rs +++ b/src/persistence/sql.rs @@ -12,7 +12,7 @@ use alloy::primitives::U256; use anyhow::Context; use clap::Parser; use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; -use std::{collections::HashMap, path::PathBuf, str::FromStr}; +use std::{path::PathBuf, str::FromStr}; use tracing::instrument; /// Options for persistence. @@ -39,13 +39,6 @@ pub struct Persistence { pool: SqlitePool, } -#[derive(Default)] -struct DelegationEntry { - amount: U256, - unlocks_at: u64, - withdrawal_amount: U256, -} - impl Persistence { /// Create a new SQLite database with the given options. pub async fn new(options: &PersistenceOptions) -> Result { @@ -220,137 +213,6 @@ impl Persistence { Ok(Some(snapshot)) } - #[instrument(skip(self, snapshot))] - async fn save_snapshot(&self, snapshot: &Snapshot) -> Result<()> { - tracing::info!(block = ?snapshot.block, "saving snapshot"); - - let mut tx = self.pool.begin().await?; - sqlx::query("DELETE FROM delegation") - .execute(&mut *tx) - .await?; - sqlx::query("DELETE FROM wallet").execute(&mut *tx).await?; - sqlx::query("DELETE FROM node").execute(&mut *tx).await?; - sqlx::query("DELETE FROM l1_block") - .execute(&mut *tx) - .await?; - - // Insert new block info - sqlx::query( - "INSERT INTO l1_block (hash, number, parent_hash, timestamp, exit_escrow_period) - VALUES ($1, $2, $3, $4, $5)", - ) - .bind(snapshot.block.id().hash.to_string()) - .bind(snapshot.block.id().number as i64) - .bind(snapshot.block.id().parent.to_string()) - .bind(snapshot.block.timestamp() as i64) - .bind(snapshot.block.exit_escrow_period as i64) - .execute(&mut *tx) - .await?; - - if !snapshot.node_set.is_empty() { - let mut query_builder = sqlx::QueryBuilder::new( - "INSERT INTO node (address, staking_key, state_key, commission) ", - ); - - query_builder.push_values(snapshot.node_set.iter(), |mut b, (_address, node)| { - b.push_bind(node.address.to_string()) - .push_bind(node.staking_key.to_string()) - .push_bind(node.state_key.to_string()) - .push_bind(node.commission.as_f32() as f64); - }); - - query_builder.build().execute(&mut *tx).await?; - } - - if !snapshot.wallets.is_empty() { - let mut wallet_builder = - sqlx::QueryBuilder::new("INSERT INTO wallet (address, claimed_rewards) "); - - wallet_builder.push_values(snapshot.wallets.iter(), |mut b, (address, wallet)| { - b.push_bind(address.to_string()) - .push_bind(wallet.claimed_rewards.to_string()); - }); - - wallet_builder.build().execute(&mut *tx).await?; - } - - // Collect all delegations into a single batch - let mut all_delegations = Vec::new(); - - for (address, wallet) in snapshot.wallets.iter() { - let mut delegation_map: HashMap = HashMap::new(); - - // Add active delegations - for delegation in &wallet.nodes { - delegation_map.insert( - delegation.node, - DelegationEntry { - amount: delegation.amount, - ..Default::default() - }, - ); - } - - for withdrawal in &wallet.pending_undelegations { - let entry = delegation_map.get_mut(&withdrawal.node).ok_or_else(|| { - anyhow::anyhow!( - "Invalid state: pending undelegation for node {} but no delegation exists for wallet {address}", - withdrawal.node - ) - })?; - entry.unlocks_at = withdrawal.available_time; - entry.withdrawal_amount = withdrawal.amount; - entry.amount = entry.amount.checked_sub(withdrawal.amount).ok_or_else(|| { - anyhow::anyhow!( - "Underflow: withdrawal amount {} exceeds delegated amount {}", - withdrawal.amount, - entry.amount - ) - })?; - } - - for withdrawal in &wallet.pending_exits { - let entry = delegation_map.get_mut(&withdrawal.node).ok_or_else(|| { - anyhow::anyhow!( - "Invalid state: pending exit for node {} but no delegation exists for wallet {address}", - withdrawal.node - ) - })?; - entry.amount = withdrawal.amount; - entry.unlocks_at = withdrawal.available_time; - entry.withdrawal_amount = U256::ZERO; - } - - for (node, state) in delegation_map { - all_delegations.push((address, node, state)); - } - } - - // Batch insert all delegations - if !all_delegations.is_empty() { - let mut delegation_builder = sqlx::QueryBuilder::new( - "INSERT INTO delegation (delegator, node, amount, unlocks_at, withdrawal_amount) ", - ); - - delegation_builder.push_values( - all_delegations.iter(), - |mut b, (address, node, state)| { - b.push_bind(address.to_string()) - .push_bind(node.to_string()) - .push_bind(state.amount.to_string()) - .push_bind(state.unlocks_at as i64) - .push_bind(state.withdrawal_amount.to_string()); - }, - ); - - delegation_builder.build().execute(&mut *tx).await?; - } - - tx.commit().await?; - tracing::info!("snapshot saved"); - Ok(()) - } - /// Apply a full node set diff to the database. async fn apply_node_set_diff( &self, @@ -595,7 +457,25 @@ impl L1Persistence for Persistence { } async fn save_genesis(&self, snapshot: Snapshot) -> Result<()> { - self.save_snapshot(&snapshot).await + tracing::info!(block = ?snapshot.block, "saving genesis L1 block"); + + let mut tx = self.pool.begin().await?; + + sqlx::query( + "INSERT INTO l1_block (hash, number, parent_hash, timestamp, exit_escrow_period) + VALUES ($1, $2, $3, $4, $5)", + ) + .bind(snapshot.block.id().hash.to_string()) + .bind(snapshot.block.id().number as i64) + .bind(snapshot.block.id().parent.to_string()) + .bind(snapshot.block.timestamp() as i64) + .bind(snapshot.block.exit_escrow_period as i64) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + tracing::info!("genesis L1 block saved"); + Ok(()) } #[instrument(skip(self, node_set_diff, wallets_diff))] @@ -649,16 +529,12 @@ impl L1Persistence for Persistence { #[cfg(test)] mod tests { use super::*; - use crate::input::l1::testing::{block_snapshot, make_node, validator_registered_event}; - use crate::input::l1::{NodeSet, Wallet, Wallets}; - use crate::types::common::{NodeExit, NodeSetEntry, PendingWithdrawal}; + use crate::input::l1::testing::{block_snapshot, make_node}; + use crate::types::common::{NodeExit, PendingWithdrawal, Withdrawal}; use crate::types::global::FullNodeSetDiff; - use espresso_types::{PubKey, v0_3::COMMISSION_BASIS_POINTS}; - use hotshot_types::light_client::StateVerKey; - use rand::SeedableRng; - use rand::rngs::StdRng; use tempfile::TempDir; + /// Tests the complete persistence lifecycle #[tokio::test] async fn test_snapshot_save_apply_load() { let temp_dir = TempDir::new().unwrap(); @@ -676,111 +552,251 @@ mod tests { let node3 = make_node(3); let node4 = make_node(4); - let mut initial_node_set = NodeSet::default(); - initial_node_set.apply(&FullNodeSetDiff::NodeUpdate(node1.clone())); - initial_node_set.apply(&FullNodeSetDiff::NodeUpdate(node2.clone())); - initial_node_set.apply(&FullNodeSetDiff::NodeUpdate(node3.clone())); - let delegator1 = Address::random(); let delegator2 = Address::random(); let delegator3 = Address::random(); - let mut initial_wallets = Wallets::default(); + let genesis_snapshot = Snapshot::empty(block_snapshot(100)); + persistence + .save_genesis(genesis_snapshot.clone()) + .await + .unwrap(); + + let snapshot = persistence + .load_finalized_snapshot() + .await + .unwrap() + .unwrap(); + assert_eq!(snapshot.block.number(), 100); + assert_eq!(snapshot.node_set.len(), 0); + assert_eq!(snapshot.wallets.len(), 0); + + // Block 100: Register 4 nodes, set up initial delegations, delegator1 withdraws and claims rewards + let initial_node_set_diffs = vec![ + FullNodeSetDiff::NodeUpdate(node1.clone()), + FullNodeSetDiff::NodeUpdate(node2.clone()), + FullNodeSetDiff::NodeUpdate(node3.clone()), + FullNodeSetDiff::NodeUpdate(node4.clone()), + ]; - let wallet1 = Wallet { - nodes: im::Vector::from(vec![ - Delegation { + let initial_wallet_diffs = vec![ + // Delegator1 delegates to node1 + ( + delegator1, + WalletDiff::DelegatedToNode(Delegation { delegator: delegator1, node: node1.address, - amount: U256::from(5000000u64), - }, - Delegation { + amount: U256::from(6000000u64), // Initial amount before undelegation + }), + ), + // Delegator1 undelegates from node1 + ( + delegator1, + WalletDiff::UndelegatedFromNode(PendingWithdrawal { + delegator: delegator1, + node: node1.address, + amount: U256::from(1000000u64), + available_time: 800, + }), + ), + // Delegator1 withdraws the undelegation and then claims rewards + ( + delegator1, + WalletDiff::UndelegationWithdrawal(Withdrawal { + delegator: delegator1, + node: node1.address, + amount: U256::from(1000000u64), + }), + ), + // Delegator1 claims rewards after withdrawal + (delegator1, WalletDiff::ClaimedRewards(500u64)), + // Delegator1 delegates to node2 + ( + delegator1, + WalletDiff::DelegatedToNode(Delegation { delegator: delegator1, node: node2.address, amount: U256::from(3000000u64), - }, - ]), - pending_undelegations: im::Vector::new(), - pending_exits: im::Vector::new(), - claimed_rewards: U256::ZERO, - }; - initial_wallets.insert(delegator1, wallet1); - - let wallet2 = Wallet { - nodes: im::Vector::from(vec![ - Delegation { + }), + ), + // Delegator2 delegates to node1 + ( + delegator2, + WalletDiff::DelegatedToNode(Delegation { + delegator: delegator2, + node: node1.address, + amount: U256::from(1000000u64), + }), + ), + // Delegator2 undelegates from node1 + ( + delegator2, + WalletDiff::UndelegatedFromNode(PendingWithdrawal { delegator: delegator2, node: node1.address, amount: U256::from(500000u64), - }, - Delegation { + available_time: 900, + }), + ), + // Delegator2 delegates to node2 + ( + delegator2, + WalletDiff::DelegatedToNode(Delegation { delegator: delegator2, node: node2.address, amount: U256::from(10000000u64), - }, - Delegation { + }), + ), + // Delegator2 delegates to node3 + ( + delegator2, + WalletDiff::DelegatedToNode(Delegation { delegator: delegator2, node: node3.address, amount: U256::from(2000000u64), - }, - ]), - pending_undelegations: im::Vector::from(vec![PendingWithdrawal { - delegator: delegator2, - node: node1.address, - amount: U256::from(500000u64), - available_time: 900, - }]), - pending_exits: im::Vector::new(), - claimed_rewards: U256::ZERO, - }; - initial_wallets.insert(delegator2, wallet2); + }), + ), + // Delegator3 delegates to node4 + ( + delegator3, + WalletDiff::DelegatedToNode(Delegation { + delegator: delegator3, + node: node4.address, + amount: U256::from(8000000u64), + }), + ), + ]; - let initial_snapshot = Snapshot::new( - block_snapshot(100), - initial_node_set.clone(), - initial_wallets.clone(), - ); + persistence + .apply_events( + block_snapshot(100), + initial_node_set_diffs, + initial_wallet_diffs, + ) + .await + .unwrap(); + // Verify state after block 100 + let snapshot_after_block_100 = persistence + .load_finalized_snapshot() + .await + .unwrap() + .unwrap(); + + assert_eq!(snapshot_after_block_100.block.number(), 100); + assert_eq!(snapshot_after_block_100.node_set.len(), 4); + assert_eq!(snapshot_after_block_100.wallets.len(), 3); + + // Verify delegator1 after block 100 + let wallet1_block100 = snapshot_after_block_100.wallets.get(&delegator1).unwrap(); + assert_eq!(wallet1_block100.nodes.len(), 2); + let node1_del = wallet1_block100 + .nodes + .iter() + .find(|d| d.node == node1.address) + .unwrap(); + assert_eq!(node1_del.amount, U256::from(5000000u64)); + assert_eq!(wallet1_block100.pending_undelegations.len(), 0); + assert_eq!(wallet1_block100.claimed_rewards, U256::from(500u64)); + + // Verify delegator2 after block 100 + let wallet2_block100 = snapshot_after_block_100.wallets.get(&delegator2).unwrap(); + assert_eq!(wallet2_block100.nodes.len(), 3); + assert_eq!(wallet2_block100.pending_undelegations.len(), 1); + assert_eq!(wallet2_block100.claimed_rewards, U256::ZERO); + + // Verify delegator3 after block 100 + let wallet3_block100 = snapshot_after_block_100.wallets.get(&delegator3).unwrap(); + assert_eq!(wallet3_block100.nodes.len(), 1); + assert_eq!(wallet3_block100.pending_exits.len(), 0); + + // Block 101: Node4 exits, triggering NodeExited for delegator3 + let node_exit_diffs = vec![FullNodeSetDiff::NodeExit(NodeExit { + address: node4.address, + exit_time: 1500, + })]; + + let node_exit_wallet_diffs = vec![( + delegator3, + WalletDiff::NodeExited(PendingWithdrawal { + delegator: delegator3, + node: node4.address, + amount: U256::from(8000000u64), + available_time: 1500, + }), + )]; + + // Apply node4 exit in block 101 persistence - .save_genesis(initial_snapshot.clone()) + .apply_events(block_snapshot(101), node_exit_diffs, node_exit_wallet_diffs) .await .unwrap(); - let loaded_genesis = persistence + // Verify state after block 101 + let snapshot = persistence .load_finalized_snapshot() .await .unwrap() .unwrap(); - assert_eq!(loaded_genesis.block.number(), 100); - assert_eq!(loaded_genesis.node_set.len(), 3); + assert_eq!(snapshot.block.number(), 101); + assert_eq!(snapshot.node_set.len(), 3); // node4 exited assert!( - loaded_genesis + snapshot .node_set .iter() .any(|(_, n)| n.address == node1.address) ); assert!( - loaded_genesis + snapshot .node_set .iter() .any(|(_, n)| n.address == node2.address) ); assert!( - loaded_genesis + snapshot .node_set .iter() .any(|(_, n)| n.address == node3.address) ); - assert_eq!(loaded_genesis.wallets.len(), 2); - let loaded_wallet1 = loaded_genesis.wallets.get(&delegator1).unwrap(); + // node4 should NOT be in the node set anymore since it exited + assert!( + !snapshot + .node_set + .iter() + .any(|(_, n)| n.address == node4.address) + ); + assert_eq!(snapshot.wallets.len(), 3); + + // Verify delegator1 + let loaded_wallet1 = snapshot.wallets.get(&delegator1).unwrap(); assert_eq!(loaded_wallet1.nodes.len(), 2); + + // Delegator1 had 6M, undelegated 1M + let node1_delegation = loaded_wallet1 + .nodes + .iter() + .find(|d| d.node == node1.address) + .expect("should have delegation to node1"); + assert_eq!(node1_delegation.amount, U256::from(5000000u64)); + assert_eq!(loaded_wallet1.pending_exits.len(), 0); - assert_eq!(loaded_wallet1.claimed_rewards, U256::ZERO); - let loaded_wallet2 = loaded_genesis.wallets.get(&delegator2).unwrap(); - assert_eq!(loaded_wallet2.nodes.len(), 2); + assert_eq!(loaded_wallet1.pending_undelegations.len(), 0); + assert_eq!(loaded_wallet1.claimed_rewards, U256::from(500u64)); + + // Verify delegator2 + let loaded_wallet2 = snapshot.wallets.get(&delegator2).unwrap(); + assert_eq!(loaded_wallet2.nodes.len(), 3); // node1, node2, node3 + + // Check node1 still has remaining delegation after undelegation + let node1_delegation = loaded_wallet2 + .nodes + .iter() + .find(|d| d.node == node1.address) + .expect("should have remaining delegation to node1"); + assert_eq!(node1_delegation.amount, U256::from(500000u64)); + assert!(loaded_wallet2.nodes.iter().any(|d| d.node == node2.address)); assert!(loaded_wallet2.nodes.iter().any(|d| d.node == node3.address)); - assert!(!loaded_wallet2.nodes.iter().any(|d| d.node == node1.address)); assert_eq!(loaded_wallet2.pending_undelegations.len(), 1); assert_eq!( loaded_wallet2.pending_undelegations[0].amount, @@ -789,21 +805,23 @@ mod tests { assert_eq!(loaded_wallet2.pending_exits.len(), 0); assert_eq!(loaded_wallet2.claimed_rewards, U256::ZERO); - let mut rng = StdRng::from_seed([42; 32]); - let validator_event = validator_registered_event(&mut rng); - let node5 = NodeSetEntry { - address: validator_event.account, - staking_key: PubKey::from(validator_event.blsVK).into(), - state_key: StateVerKey::from(validator_event.schnorrVK).into(), - commission: Ratio::new( - validator_event.commission.into(), - COMMISSION_BASIS_POINTS.into(), - ), - stake: U256::ZERO, - }; + // Verify delegator3 + let loaded_wallet3 = snapshot.wallets.get(&delegator3).unwrap(); + assert_eq!(loaded_wallet3.nodes.len(), 0); + assert_eq!(loaded_wallet3.pending_undelegations.len(), 0); + assert_eq!(loaded_wallet3.pending_exits.len(), 1); + assert_eq!(loaded_wallet3.pending_exits[0].node, node4.address); + assert_eq!( + loaded_wallet3.pending_exits[0].amount, + U256::from(8000000u64) + ); + assert_eq!(loaded_wallet3.pending_exits[0].available_time, 1500); + assert_eq!(loaded_wallet3.claimed_rewards, U256::ZERO); + + // Block 102: Register node5, node2 exits, withdrawals and claim rewards + let node5 = make_node(5); let node_set_diffs = vec![ - FullNodeSetDiff::NodeUpdate(node4.clone()), FullNodeSetDiff::NodeUpdate(node5.clone()), FullNodeSetDiff::NodeExit(NodeExit { address: node2.address, @@ -812,6 +830,7 @@ mod tests { ]; let wallet_diffs = vec![ + // Delegator1 undelegates 1M from node1 ( delegator1, WalletDiff::UndelegatedFromNode(PendingWithdrawal { @@ -821,6 +840,8 @@ mod tests { available_time: 1100, }), ), + // Node2 exits + // delegator1's delegation moves to pending_exits ( delegator1, WalletDiff::NodeExited(PendingWithdrawal { @@ -830,6 +851,8 @@ mod tests { available_time: 1200, }), ), + // Node2 exits + // delegator2 delegation moves to pending_exits ( delegator2, WalletDiff::NodeExited(PendingWithdrawal { @@ -839,33 +862,47 @@ mod tests { available_time: 1200, }), ), - (delegator1, WalletDiff::ClaimedRewards(1500u64)), - (delegator2, WalletDiff::ClaimedRewards(750u64)), + // Delegator2 completes withdrawal of undelegation from block 100 + ( + delegator2, + WalletDiff::UndelegationWithdrawal(Withdrawal { + delegator: delegator2, + node: node1.address, + amount: U256::from(500000u64), + }), + ), + // Delegator3 withdrawal from exited node4 ( delegator3, - WalletDiff::DelegatedToNode(Delegation { + WalletDiff::NodeExitWithdrawal(Withdrawal { delegator: delegator3, - node: node5.address, - amount: U256::from(15000000u64), + node: node4.address, + amount: U256::from(8000000u64), }), ), + // Delegator1 claims rewards + (delegator1, WalletDiff::ClaimedRewards(1500u64)), + // Delegator2 claims rewards after withdrawal + (delegator2, WalletDiff::ClaimedRewards(750u64)), + // Delegator3 delegates to new node5 ( delegator3, WalletDiff::DelegatedToNode(Delegation { delegator: delegator3, - node: node4.address, - amount: U256::from(8000000u64), + node: node5.address, + amount: U256::from(15000000u64), }), ), ]; - let updated_block = block_snapshot(101); + let updated_block = block_snapshot(102); persistence .apply_events(updated_block, node_set_diffs, wallet_diffs) .await .unwrap(); + // Verify final state after block 102 let loaded_snapshot = persistence .load_finalized_snapshot() .await @@ -873,7 +910,7 @@ mod tests { .unwrap(); assert_eq!(loaded_snapshot.block, updated_block); - assert_eq!(loaded_snapshot.node_set.len(), 4); + assert_eq!(loaded_snapshot.node_set.len(), 3); // node1, node3, node5 (node2 and node4 exited) assert!( loaded_snapshot @@ -881,6 +918,7 @@ mod tests { .iter() .any(|(_, n)| n.address == node1.address) ); + // node2 exited in block 102 assert!( !loaded_snapshot .node_set @@ -893,8 +931,9 @@ mod tests { .iter() .any(|(_, n)| n.address == node3.address) ); + // node4 exited in block 101 assert!( - loaded_snapshot + !loaded_snapshot .node_set .iter() .any(|(_, n)| n.address == node4.address) @@ -910,7 +949,7 @@ mod tests { .wallets .get(&delegator1) .expect("wallet1 should exist"); - assert_eq!(wallet1.claimed_rewards, U256::from(1500u64)); + assert_eq!(wallet1.claimed_rewards, U256::from(2000u64)); assert_eq!(wallet1.nodes.len(), 1); assert_eq!(wallet1.nodes[0].node, node1.address); assert_eq!(wallet1.nodes[0].amount, U256::from(4000000u64)); @@ -927,10 +966,15 @@ mod tests { .wallets .get(&delegator2) .expect("wallet2 should exist"); - assert_eq!(wallet2.claimed_rewards, U256::from(750u64)); - assert_eq!(wallet2.nodes.len(), 1); + assert_eq!(wallet2.claimed_rewards, U256::from(750u64)); // Claimed in block 102 + assert_eq!(wallet2.nodes.len(), 2); // node1 and node3 - assert!(!wallet2.nodes.iter().any(|d| d.node == node1.address)); + let node1_delegation = wallet2 + .nodes + .iter() + .find(|d| d.node == node1.address) + .expect("should still have delegation to node1"); + assert_eq!(node1_delegation.amount, U256::from(500000u64)); let node3_delegation = wallet2 .nodes @@ -939,12 +983,7 @@ mod tests { .expect("should have delegation to node3"); assert_eq!(node3_delegation.amount, U256::from(2000000u64)); - assert_eq!(wallet2.pending_undelegations.len(), 1); - assert_eq!(wallet2.pending_undelegations[0].node, node1.address); - assert_eq!( - wallet2.pending_undelegations[0].amount, - U256::from(500000u64) - ); + assert_eq!(wallet2.pending_undelegations.len(), 0); assert_eq!(wallet2.pending_exits.len(), 1); assert_eq!(wallet2.pending_exits[0].node, node2.address); @@ -955,20 +994,15 @@ mod tests { .get(&delegator3) .expect("wallet3 should exist"); assert_eq!(wallet3.claimed_rewards, U256::ZERO); - assert_eq!(wallet3.nodes.len(), 2); + assert_eq!(wallet3.nodes.len(), 1); // Only node5 delegation remains let node5_delegation = wallet3 .nodes .iter() .find(|d| d.node == node5.address) .expect("should have delegation to node5"); assert_eq!(node5_delegation.amount, U256::from(15000000u64)); - let node4_delegation = wallet3 - .nodes - .iter() - .find(|d| d.node == node4.address) - .expect("should have delegation to node4"); - assert_eq!(node4_delegation.amount, U256::from(8000000u64)); assert_eq!(wallet3.pending_undelegations.len(), 0); + // NodeExitWithdrawal completed, so no more pending_exits assert_eq!(wallet3.pending_exits.len(), 0); } } From 9acf2f30dbb1fb24d4031874d198907953e17744 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Fri, 7 Nov 2025 01:32:28 +0500 Subject: [PATCH 6/7] remove commented out table --- migrations/sqlite/01_init_schema.sql | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/migrations/sqlite/01_init_schema.sql b/migrations/sqlite/01_init_schema.sql index d4223aa..5fcb729 100644 --- a/migrations/sqlite/01_init_schema.sql +++ b/migrations/sqlite/01_init_schema.sql @@ -40,25 +40,6 @@ CREATE TABLE delegation ( CREATE INDEX delegation_by_node ON delegation (node); CREATE INDEX delegation_by_status ON delegation (delegator, unlocks_at, withdrawal_amount); --- -- Espresso Block tracking --- -- Stores information about the current Espresso epoch --- CREATE TABLE espresso_block ( --- number BIGINT PRIMARY KEY, --- view BIGINT NOT NULL, --- epoch BIGINT NOT NULL, --- epoch_first_block BIGINT NOT NULL --- ); - --- -- Active Validator Set --- -- Stores statistics for validators active in the current epoch --- CREATE TABLE active_node ( --- staking_key TEXT PRIMARY KEY, --- address TEXT NOT NULL UNIQUE, --- votes INTEGER NOT NULL, --- proposals INTEGER NOT NULL, --- slots INTEGER NOT NULL --- ); - -- Rewards -- Stores total accrued rewards for each account CREATE TABLE lifetime_rewards ( From 8f7d07bacd78ddb1a98d203bfc93e7e4847a741b Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Mon, 10 Nov 2025 17:29:56 +0500 Subject: [PATCH 7/7] address comments --- migrations/sqlite/01_init_schema.sql | 1 + src/persistence/sql.rs | 217 ++++++++++++++++----------- src/types/common.rs | 10 +- 3 files changed, 136 insertions(+), 92 deletions(-) diff --git a/migrations/sqlite/01_init_schema.sql b/migrations/sqlite/01_init_schema.sql index 5fcb729..37e13a6 100644 --- a/migrations/sqlite/01_init_schema.sql +++ b/migrations/sqlite/01_init_schema.sql @@ -15,6 +15,7 @@ CREATE TABLE node ( address TEXT PRIMARY KEY, staking_key TEXT NOT NULL UNIQUE, state_key TEXT NOT NULL UNIQUE, + stake TEXT NOT NULL, commission REAL NOT NULL ); diff --git a/src/persistence/sql.rs b/src/persistence/sql.rs index eb04fa3..18b6c1c 100644 --- a/src/persistence/sql.rs +++ b/src/persistence/sql.rs @@ -11,6 +11,7 @@ use crate::{ use alloy::primitives::U256; use anyhow::Context; use clap::Parser; +use futures::future::try_join_all; use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; use std::{path::PathBuf, str::FromStr}; use tracing::instrument; @@ -109,97 +110,111 @@ impl Persistence { }; // Load all registered nodes - let node_rows = sqlx::query_as::<_, (String, String, String, f64)>( - "SELECT address, staking_key, state_key, commission FROM node", + let node_rows = sqlx::query_as::<_, (String, String, String, f64, String)>( + "SELECT address, staking_key, state_key, commission, stake FROM node", ) .fetch_all(&self.pool) .await?; let mut node_set = NodeSet::default(); - for (address, staking_key, state_key, commission) in node_rows { + for (address, staking_key, state_key, commission, stake_str) in node_rows { let node = NodeSetEntry { address: address.parse().context("failed to parse node address")?, staking_key: staking_key.parse().context("failed to parse staking key")?, state_key: state_key.parse().context("failed to parse state key")?, - stake: U256::ZERO, // Stake is computed from delegations, not stored - commission: Ratio::from_f32(commission as f32), + stake: U256::from_str(&stake_str).context("failed to parse node stake")?, + commission: Ratio::from(commission as f32), }; node_set.push(node); } - // Load all wallets and their delegations + // Load all wallets and their delegations in parallel let wallet_rows = sqlx::query_as::<_, (String, String)>("SELECT address, claimed_rewards FROM wallet") .fetch_all(&self.pool) .await?; - let mut wallets = Wallets::default(); - for (wallet_address, claimed_rewards_str) in wallet_rows { - let address: Address = wallet_address - .parse() - .context("failed to parse wallet address")?; - let claimed_rewards = - U256::from_str(&claimed_rewards_str).context("failed to parse claimed rewards")?; - - let delegation_rows = sqlx::query_as::<_, (String, String, i64, String)>( - "SELECT node, amount, unlocks_at, withdrawal_amount - FROM delegation - WHERE delegator = $1 - ORDER BY unlocks_at ASC, withdrawal_amount ASC", - ) - .bind(&wallet_address) - .fetch_all(&self.pool) - .await?; - - let mut nodes = im::Vector::new(); - let mut pending_undelegations = im::Vector::new(); - let mut pending_exits = im::Vector::new(); - - // We use unlocks_at and withdrawal_amount to determine the delegation status - for (node_str, amount_str, unlocks_at, withdrawal_amount_str) in delegation_rows { - let node: Address = node_str.parse().context("failed to parse node address")?; - let amount = - U256::from_str(&amount_str).context("failed to parse delegation amount")?; - let withdrawal_amount = U256::from_str(&withdrawal_amount_str) - .context("failed to parse withdrawal amount")?; - if withdrawal_amount.is_zero() && unlocks_at != 0 { - // Node exit - pending_exits.push_back(PendingWithdrawal { - delegator: address, - node, - amount, - available_time: unlocks_at as u64, - }); - } else if unlocks_at != 0 && !withdrawal_amount.is_zero() { - // Partial undelegation - if !amount.is_zero() { - nodes.push_back(Delegation { - delegator: address, - node, - amount, - }); + let wallet_futures = + wallet_rows + .into_iter() + .map(|(wallet_address, claimed_rewards_str)| { + let pool = self.pool.clone(); + async move { + let address: Address = wallet_address + .parse() + .context("failed to parse wallet address")?; + let claimed_rewards = U256::from_str(&claimed_rewards_str) + .context("failed to parse claimed rewards")?; + + let delegation_rows = sqlx::query_as::<_, (String, String, i64, String)>( + "SELECT node, amount, unlocks_at, withdrawal_amount + FROM delegation + WHERE delegator = $1 + ORDER BY unlocks_at ASC, withdrawal_amount ASC", + ) + .bind(&wallet_address) + .fetch_all(&pool) + .await?; + + let mut nodes = im::Vector::new(); + let mut pending_undelegations = im::Vector::new(); + let mut pending_exits = im::Vector::new(); + + // We use unlocks_at and withdrawal_amount to determine the delegation status + for (node_str, amount_str, unlocks_at, withdrawal_amount_str) in + delegation_rows + { + let node: Address = + node_str.parse().context("failed to parse node address")?; + let amount = U256::from_str(&amount_str) + .context("failed to parse delegation amount")?; + let withdrawal_amount = U256::from_str(&withdrawal_amount_str) + .context("failed to parse withdrawal amount")?; + if withdrawal_amount.is_zero() && unlocks_at != 0 { + // Node exit + pending_exits.push_back(PendingWithdrawal { + delegator: address, + node, + amount, + available_time: unlocks_at as u64, + }); + } else if unlocks_at != 0 && !withdrawal_amount.is_zero() { + // Partial undelegation + if !amount.is_zero() { + nodes.push_back(Delegation { + delegator: address, + node, + amount, + }); + } + pending_undelegations.push_back(PendingWithdrawal { + delegator: address, + node, + amount: withdrawal_amount, + available_time: unlocks_at as u64, + }); + } else if !amount.is_zero() { + nodes.push_back(Delegation { + delegator: address, + node, + amount, + }); + } + } + + let wallet = Wallet { + nodes, + pending_undelegations, + pending_exits, + claimed_rewards, + }; + Ok::<_, anyhow::Error>((address, wallet)) } - pending_undelegations.push_back(PendingWithdrawal { - delegator: address, - node, - amount: withdrawal_amount, - available_time: unlocks_at as u64, - }); - } else if !amount.is_zero() { - nodes.push_back(Delegation { - delegator: address, - node, - amount, - }); - } - } + }); - let wallet = Wallet { - nodes, - pending_undelegations, - pending_exits, - claimed_rewards, - }; + let wallet_results = try_join_all(wallet_futures).await?; + let mut wallets = Wallets::default(); + for (address, wallet) in wallet_results { wallets.insert(address, wallet); } @@ -222,17 +237,19 @@ impl Persistence { match diff { FullNodeSetDiff::NodeUpdate(node) => { sqlx::query( - "INSERT INTO node (address, staking_key, state_key, commission) - VALUES ($1, $2, $3, $4) + "INSERT INTO node (address, staking_key, state_key, commission, stake) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT(address) DO UPDATE SET staking_key = excluded.staking_key, state_key = excluded.state_key, - commission = excluded.commission", + commission = excluded.commission, + stake = excluded.stake", ) .bind(node.address.to_string()) .bind(node.staking_key.to_string()) .bind(node.state_key.to_string()) - .bind(node.commission.as_f32() as f64) + .bind(f32::from(node.commission) as f64) + .bind(node.stake.to_string()) .execute(&mut **tx) .await?; } @@ -344,15 +361,24 @@ impl Persistence { .await?; } WalletDiff::UndelegatedFromNode(withdrawal) => { - // Read current amount to calculate new amount after undelegation - let (amount,) = sqlx::query_as::<_, (String,)>( - "SELECT amount FROM delegation WHERE delegator = $1 AND node = $2", + let (amount, unlocks_at, withdrawal_amount) = sqlx::query_as::<_, (String, i64, String)>( + "SELECT amount, unlocks_at, withdrawal_amount FROM delegation WHERE delegator = $1 AND node = $2", ) .bind(withdrawal.delegator.to_string()) .bind(withdrawal.node.to_string()) .fetch_one(&mut **tx) .await?; + // Sanity check: ensure there's no pending withdrawal already + if unlocks_at != 0 || withdrawal_amount != "0" { + return Err(anyhow::anyhow!( + "pending withdrawal already exists for delegator {} to node {}", + withdrawal.delegator, + withdrawal.node + )) + .map_err(Into::into); + } + let current_amount = U256::from_str(&amount).unwrap_or(U256::ZERO); let new_amount = current_amount.checked_sub(withdrawal.amount).ok_or_else( || { @@ -409,23 +435,36 @@ impl Persistence { } } WalletDiff::UndelegationWithdrawal(withdrawal) => { - let result = sqlx::query( + let (amount,) = sqlx::query_as::<_, (String,)>( "UPDATE delegation - SET unlocks_at = 0, - withdrawal_amount = '0' - WHERE delegator = $1 AND node = $2", + SET unlocks_at = 0, + withdrawal_amount = '0' + WHERE delegator = $1 AND node = $2 + RETURNING amount", ) .bind(withdrawal.delegator.to_string()) .bind(withdrawal.node.to_string()) - .execute(&mut **tx) + .fetch_one(&mut **tx) .await?; - if result.rows_affected() != 1 { - return Err(anyhow::anyhow!( - "Expected to update 1 delegation row, but {} were affected", - result.rows_affected() - )) - .map_err(Into::into); + let remaining_amount = U256::from_str(&amount) + .context("failed to parse remaining delegation amount")?; + + if remaining_amount.is_zero() { + let result = + sqlx::query("DELETE FROM delegation WHERE delegator = $1 AND node = $2") + .bind(withdrawal.delegator.to_string()) + .bind(withdrawal.node.to_string()) + .execute(&mut **tx) + .await?; + + if result.rows_affected() != 1 { + return Err(anyhow::anyhow!( + "Expected to delete 1 delegation row, but {} were affected", + result.rows_affected() + )) + .map_err(Into::into); + } } } WalletDiff::NodeExitWithdrawal(withdrawal) => { diff --git a/src/types/common.rs b/src/types/common.rs index f024071..1dcf450 100644 --- a/src/types/common.rs +++ b/src/types/common.rs @@ -19,13 +19,17 @@ impl Ratio { pub fn new(num: usize, den: usize) -> Self { Self((num as f32) / (den as f32)) } +} - pub fn from_f32(value: f32) -> Self { +impl From for Ratio { + fn from(value: f32) -> Self { Self(value) } +} - pub fn as_f32(self) -> f32 { - self.0 +impl From for f32 { + fn from(val: Ratio) -> Self { + val.0 } }