diff --git a/src/coordinator.rs b/src/coordinator.rs index cd4bcaf..0747deb 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -1,7 +1,7 @@ use crate::{ config::{CoordinatorSettings, CoordinatorSettingsConfig}, errors::{BitcoinBroadcastErrorKind, BitcoinCoordinatorError}, - settings::CPFP_TRANSACTION_CONTEXT, + settings::{CPFP_TRANSACTION_CONTEXT, FUNDING_TRANSACTION_CONTEXT, RBF_TRANSACTION_CONTEXT}, speedup::SpeedupStore, storage::{BitcoinCoordinatorStore, BitcoinCoordinatorStoreApi}, types::{ @@ -9,7 +9,8 @@ use crate::{ TransactionState, }, }; -use bitcoin::{Network, Transaction, Txid}; +use bitcoin::{consensus::encode::serialize_hex, Network, Transaction, Txid}; +use bitcoincore_rpc::RpcApi; use bitvmx_bitcoin_rpc::{bitcoin_client::BitcoinClient, rpc_config::RpcConfig}; use bitvmx_bitcoin_rpc::{bitcoin_client::BitcoinClientApi, types::BlockHeight}; use bitvmx_transaction_monitor::{ @@ -155,7 +156,10 @@ impl BitcoinCoordinator { txs_to_dispatch: &mut Vec, ) -> Result<(), BitcoinCoordinatorError> { for tx in txs_to_review { - let tx_status = self.monitor.get_tx_status(&tx.tx_id)?; + // If the transaction was broadcasted, we need to search it in the mempool otherwise we just search on chain. + let search_in_mempool = tx.broadcast_block_height.is_some(); + + let tx_status = self.monitor.get_tx_status(&tx.tx_id, search_in_mempool)?; if tx_status.is_in_mempool() { // Check if transaction is stuck in mempool @@ -283,32 +287,29 @@ impl BitcoinCoordinator { ); // Check if we can send transactions or we stop the process until CPFP transactions start to be confirmed. - if self.store.can_speedup()? { + if self.can_speedup()? { self.speedup_and_dispatch_in_batch(txs_to_dispatch_with_speedup)?; - } else { - let is_funding_available = self.store.is_funding_available()?; - let is_enough_unconfirmed_txs = self.store.has_enough_unconfirmed_txs_for_cpfp()?; - - if !is_enough_unconfirmed_txs { - warn!( - "{} Can not speedup, waiting for more unconfirmed transactions", - style("Coordinator").green() - ); - } - - if !is_funding_available { - warn!( - "{} Can not speedup, waiting for funding", - style("Coordinator").green() - ); - self.notify_funding_not_found()?; - } } } Ok(()) } + fn process_boost_speedup(&self) -> Result<(), BitcoinCoordinatorError> { + if !self.should_boost_last_speedup()? { + return Ok(()); + } + + if self.should_replace_last_speedup()? { + self.replace_last_speedup()?; + return Ok(()); + } + + self.perform_speedup()?; + + return Ok(()); + } + /// Process all active transactions (ToDispatch, InMempool, Confirmed) until they are finalized. /// This method: /// 1. Updates transaction states based on monitor status (confirmed, finalized, orphan) @@ -359,62 +360,168 @@ impl BitcoinCoordinator { Ok(()) } + /// Runs testmempoolaccept on each candidate transaction (one at a time). + /// Rejected or error transactions are marked as Failed; returns only the allowed ones. + fn filter_txs_allowed_by_mempool( + &self, + txs: Vec, + ) -> Result, BitcoinCoordinatorError> { + if txs.is_empty() { + return Ok(Vec::new()); + } + + let mut allowed_txs: Vec = Vec::new(); + + for tx in txs { + let raw_hex = serialize_hex(&tx.tx); + let raw_refs: [&str; 1] = [raw_hex.as_str()]; + + match self.client.client.test_mempool_accept(&raw_refs) { + Ok(results) => { + let allowed = results.first().map(|res| res.allowed).unwrap_or(false); + if allowed { + allowed_txs.push(tx); + } else { + let reject_reason = results + .first() + .and_then(|res| res.reject_reason.clone()) + .unwrap_or_else(|| "rejected by testmempoolaccept".to_string()); + warn!( + "{} Excluding Transaction({}) from batch due to testmempoolaccept rejection: {:?}", + style("Coordinator").green(), + style(tx.tx_id).yellow(), + reject_reason + ); + self.store + .update_tx_state(tx.tx_id, TransactionState::Failed)?; + self.update_news(CoordinatorNews::DispatchTransactionError( + tx.tx_id, + tx.context, + reject_reason, + ))?; + } + } + Err(e) => { + warn!( + "{} Excluding Transaction({}) due to testmempoolaccept error: {:?}", + style("Coordinator").green(), + style(tx.tx_id).yellow(), + e + ); + self.store + .update_tx_state(tx.tx_id, TransactionState::Failed)?; + + self.update_news(CoordinatorNews::DispatchTransactionError( + tx.tx_id, + tx.context, + e.to_string(), + ))?; + } + } + } + + Ok(allowed_txs) + } + + /// Dispatches transactions with speedup in batches. Validates before sending: + /// 1. testmempoolaccept – rejected txs are marked Failed and excluded; only allowed ones are batched. + /// 2. can_speedup (funding, unconfirmed limit, min amount) – skip batch if speedup not possible. + /// 3. Sufficient funding for CPFP fee – skip batch and report news if insufficient. + /// Only if all pass do we send the batch and then create the CPFP. fn speedup_and_dispatch_in_batch( &self, txs: Vec, ) -> Result<(), BitcoinCoordinatorError> { - // Attempt to dispatch as many transactions as possible in a single CPFP (Child Pays For Parent) transaction, - // while ensuring the resulting transaction does not exceed Bitcoin's standardness limits. - // We have two policies to dispatch the transactions: - // 1. Maximum transaction size: MAX_TX_WEIGHT weight units. Exceeding these limits will result in the transaction - // being considered non-standard and rejected by most mempools. - // 2. Maximum number of unconfirmed transactions is 25 (MAX_LIMIT_UNCONFIRMED_PARENTS) - // If the set of transactions exceeds these limits, will fail the dispatch. + if txs.is_empty() { + return Ok(()); + } + + // 1. Test mempool acceptance; keep only transactions allowed by testmempoolaccept. + let allowed_txs = self.filter_txs_allowed_by_mempool(txs)?; + + if allowed_txs.is_empty() { + return Ok(()); + } + // 2. Apply batching policies only over transactions that passed testmempoolaccept. let txs_in_batch_by_policies: Vec> = - self.batch_txs_by_weight_limit(txs)?; + self.batch_txs_by_weight_limit(allowed_txs)?; for txs_batch in txs_in_batch_by_policies { - // For each batch, attempt to broadcast all transactions individually. After determining which transactions were successfully sent, - // construct and broadcast a single CPFP transaction to pay for the entire batch. - let txs_sent: Vec = self.dispatch_txs(txs_batch)?; + // 3. Validate speedup is possible (funding, unconfirmed limit, min amount). + if !self.can_speedup()? { + debug!( + "{} Batch skipped: speedup not possible", + style("Coordinator").green() + ); + continue; + } - // Only create a CPFP (Child Pays For Parent) transaction if there are transactions that were successfully sent in this batch. - // If no transactions were sent, skip CPFP creation for this batch. - if !txs_sent.is_empty() { - info!( - "{} Sending batch of {} transactions", + let funding = self.get_funding_or_error()?; + + // 4. Pre-compute CPFP fee for this (already filtered) batch; if funding is insufficient, + // do not send any tx. + let txs_data: Vec<(SpeedupData, Transaction, String)> = txs_batch + .iter() + .map(|t| { + ( + t.speedup_data.clone().unwrap(), + t.tx.clone(), + t.context.clone(), + ) + }) + .collect(); + + let speedup_fee = self.compute_speedup_fee_for_batch( + &txs_data, + &funding, + self.settings.base_fee_multiplier, + None, + )?; + + if speedup_fee > funding.amount { + let news = + CoordinatorNews::InsufficientFunds(funding.txid, funding.amount, speedup_fee); + self.update_news(news)?; + warn!( + "{} Batch skipped: insufficient funds for speedup | Funding({}) | Required({})", style("Coordinator").green(), - txs_sent.len() + style(funding.amount).red(), + style(speedup_fee).blue(), ); - - let txs_data = txs_sent - .iter() - .map(|coordinated_tx| { - ( - coordinated_tx.speedup_data.clone().unwrap(), - coordinated_tx.tx.clone(), - coordinated_tx.context.clone(), - ) - }) - .collect(); - // Up to here we have funding and we are sure we have funding. - let funding = self.store.get_funding()?.unwrap(); - self.create_and_send_cpfp_tx( - txs_data, - funding, - self.settings.base_fee_multiplier, - None, - )?; + continue; } - } - Ok(()) - } + // 5. All validations passed: send all remaining transactions, then create CPFP + // for the ones that were successfully sent. + // If there is an error in a transaction, we can skip it. This will reduce the required fee, so there is no issue with funding. + let txs_sent: Vec = self.dispatch_txs(txs_batch)?; + + info!( + "{} Sending batch of {} transactions", + style("Coordinator").green(), + txs_sent.len() + ); + + let txs_data_sent = txs_sent + .iter() + .map(|coordinated_tx| { + ( + coordinated_tx.speedup_data.clone().unwrap(), + coordinated_tx.tx.clone(), + coordinated_tx.context.clone(), + ) + }) + .collect(); + + self.create_cpfp_tx( + txs_data_sent, + funding, + self.settings.base_fee_multiplier, + None, + )?; + } - fn notify_funding_not_found(&self) -> Result<(), BitcoinCoordinatorError> { - let news = CoordinatorNews::FundingNotFound; - self.update_news(news)?; Ok(()) } @@ -432,7 +539,7 @@ impl BitcoinCoordinator { // It achieves this by creating an additional CPFP transaction to provide further funding to the previous one. // It is ensured that funding is available before invoking this function. fn speedup_cpfp_tx(&self) -> Result<(), BitcoinCoordinatorError> { - let funding = self.store.get_funding()?.unwrap(); + let funding = self.get_funding_or_error()?; let last_speedup = self.store.get_last_pending_speedup()?; @@ -441,16 +548,26 @@ impl BitcoinCoordinator { self.get_bump_fee_percentage_strategy(speedup.bump_fee_percentage_used)?; info!( - "{} Boosting CPFP Transaction({})", + "{} Boosting CPFP({})", style("Coordinator").green(), style(speedup.tx_id).yellow() ); - self.create_and_send_cpfp_tx(vec![], funding, bump_fee_percentage, None)?; + self.create_cpfp_tx(vec![], funding, bump_fee_percentage, None)?; } Ok(()) } + fn get_funding_or_error(&self) -> Result { + let funding = self.store.get_funding()?; + match funding { + Some(f) => Ok(f), + None => Err(BitcoinCoordinatorError::BitcoinCoordinatorError( + "Funding not found".to_string(), + )), + } + } + fn inform_dispatch_speedup_error( &self, txs_info: (Vec, Vec), @@ -459,7 +576,7 @@ impl BitcoinCoordinator { dispatch_error: String, ) -> Result<(), BitcoinCoordinatorError> { error!( - "{} Error Sending {} Transaction({})", + "{} Error Sending {}({})", style("Coordinator").green(), speedup_type, style(speedup_tx_id).yellow(), @@ -477,28 +594,60 @@ impl BitcoinCoordinator { Ok(()) } - fn dispatch_speedup( + fn dispatch_speedup_tx( &self, - tx: Transaction, speedup_data: CoordinatedSpeedUpTransaction, ) -> Result<(), BitcoinCoordinatorError> { let speedup_type = speedup_data.get_tx_name(); info!( - "{} Send {} Transaction({})", + "{} Send {}({})", style("Coordinator").green(), speedup_type, style(speedup_data.tx_id).yellow(), ); - let speedup_tx_id = tx.compute_txid(); - let txs_info: (Vec, Vec) = speedup_data .speedup_tx_data .iter() .map(|(_, tx, context)| (tx.compute_txid(), context.clone())) .collect(); + // Extract the transaction to be dispatched. + // If it's missing or its computed txid doesn't match the recorded tx_id, return an error. + let tx = match speedup_data.tx.clone() { + Some(tx) => { + let actual_txid = tx.compute_txid(); + if actual_txid != speedup_data.tx_id { + error!( + "{} Speedup record tx_id ({}) does not match tx.compute_txid() ({}).", + style("Coordinator").green(), + style(speedup_data.tx_id).yellow(), + style(actual_txid).yellow(), + ); + + return Err(BitcoinCoordinatorError::BitcoinCoordinatorError(format!( + "Speedup transaction id mismatch: {} != {}", + speedup_data.tx_id, actual_txid + ))); + } + tx + } + None => { + error!( + "{} Speedup({}) has no stored tx to dispatch", + style("Coordinator").green(), + style(speedup_data.tx_id).yellow() + ); + + return Err(BitcoinCoordinatorError::BitcoinCoordinatorError(format!( + "Missing transaction for speedup {}", + speedup_data.tx_id + ))); + } + }; + + let speedup_tx_id = tx.compute_txid(); let dispatch_result = self.client.send_transaction(&tx); match dispatch_result { @@ -508,18 +657,13 @@ impl BitcoinCoordinator { // Update broadcast_block_height with the block where the transaction was dispatched let mut speedup = speedup_data; speedup.broadcast_block_height = dispatch_block; - - self.monitor.monitor(TypesToMonitor::Transactions( - vec![speedup.tx_id], - CPFP_TRANSACTION_CONTEXT.to_string(), - None, - ))?; + speedup.state = TransactionState::InMempool; // Update state after successful dispatch info!( - "{} Successfully sent {} Transaction({}) dispatched at block height {}", + "{} Successfully sent {}({}) dispatched at block height {}", style("Coordinator").green(), speedup_type, - style(speedup.tx_id).yellow(), + style(speedup_tx_id).yellow(), style(dispatch_block).blue(), ); @@ -533,73 +677,114 @@ impl BitcoinCoordinator { } } + // Allow the monitor to track the speedup transaction in mempool. + self.allow_search_in_mempool( + speedup_tx_id, + speedup.context.clone(), + speedup.confirmation_trigger, + )?; + self.store.save_speedup(speedup)?; } Err(e) => { let error_msg = e.to_string(); let error_kind = BitcoinBroadcastErrorKind::from_error_message(&error_msg); + let tx_id = speedup_data.tx_id; match error_kind { BitcoinBroadcastErrorKind::AlreadyKnown => { // The speedup transaction is already known by the node (mempool or blockchain), // So we just acknowledge it, and warn the user. + warn!( - "{} {} Speedup Transaction({}) already known by node: {}", + "{} {} Speedup({}) already known by node: {}", style("Coordinator").green(), speedup_type, - style(speedup_data.tx_id).yellow(), + style(tx_id).yellow(), error_msg ); + // Ask the monitor/indexer for the current status of this transaction. + let tx_status = self.monitor.get_tx_status(&tx_id, true)?; let dispatch_block = self.client.get_best_block()?; let mut speedup_data_with_block = speedup_data; speedup_data_with_block.broadcast_block_height = dispatch_block; + // Set state according to the actual chain status. + speedup_data_with_block.state = tx_status.status.clone().into(); - self.monitor.monitor(TypesToMonitor::Transactions( - vec![speedup_data_with_block.tx_id], - CPFP_TRANSACTION_CONTEXT.to_string(), - None, - ))?; - self.store.update_speedup_state( - speedup_data_with_block.tx_id, - TransactionState::InMempool, + self.store.save_speedup(speedup_data_with_block.clone())?; + + // Allow the monitor to track the speedup transaction in mempool. + self.allow_search_in_mempool( + speedup_tx_id, + speedup_data_with_block.context.clone(), + speedup_data_with_block.confirmation_trigger, )?; } - BitcoinBroadcastErrorKind::MempoolRejection - | BitcoinBroadcastErrorKind::NetworkError => { - // Retryable errors (mempool policy / infrastructure). - // Keep in InMempool state - will be retried when not found in next tick + BitcoinBroadcastErrorKind::MempoolRejection => { + // Retryable error - keep in ToDispatch state to retry on next tick + error!( + "{} Error sending {}({}): {} (will retry)", + style("Coordinator").green(), + speedup_type, + style(tx_id).yellow(), + error_msg + ); + self.inform_dispatch_speedup_error( txs_info.clone(), speedup_type.clone(), - speedup_data.tx_id, + tx_id, error_msg, )?; - // Keep in InMempool state - process_active_speedups() will retry when not found - self.store.update_speedup_state( - speedup_data.tx_id, - TransactionState::InMempool, + // Keep in ToDispatch state - will be retried on next tick + self.store + .update_speedup_state(tx_id, TransactionState::ToDispatch)?; + } + BitcoinBroadcastErrorKind::NetworkError => { + // Retryable error - keep in ToDispatch state to retry on next tick + error!( + "{} Error sending {}({}): {} (will retry)", + style("Coordinator").green(), + speedup_type, + style(tx_id).yellow(), + error_msg + ); + + self.inform_dispatch_speedup_error( + txs_info.clone(), + speedup_type.clone(), + tx_id, + error_msg, )?; + + // Keep in ToDispatch state - will be retried on next tick + self.store + .update_speedup_state(tx_id, TransactionState::ToDispatch)?; } BitcoinBroadcastErrorKind::Other => { // Non-retryable error (malformed transaction, invalid inputs, etc.) - // Don't retry, just report the error + // Mark as Failed, which means the speedup will no longer be dispatched error!( - "{} Fatal error sending {} Transaction({}): {} (not retrying)", + "{} Fatal error sending {}({}): {} (not retrying)", style("Coordinator").green(), speedup_type, - style(speedup_data.tx_id).yellow(), + style(tx_id).yellow(), error_msg ); self.inform_dispatch_speedup_error( txs_info.clone(), speedup_type.clone(), - speedup_data.tx_id, + tx_id, error_msg, )?; + + // Mark as Failed - will not be retried + self.store + .update_speedup_state(tx_id, TransactionState::Failed)?; } } } @@ -608,6 +793,20 @@ impl BitcoinCoordinator { Ok(()) } + /// Registers a transaction with the monitor, allowing search in mempool. + fn allow_search_in_mempool( + &self, + tx_id: Txid, + context: impl Into, + number_confirmation_trigger: Option, + ) -> Result<(), BitcoinCoordinatorError> { + self.monitor.monitor( + TypesToMonitor::Transactions(vec![tx_id], context.into(), number_confirmation_trigger), + true, + )?; + Ok(()) + } + fn dispatch_txs( &self, txs: Vec, @@ -637,6 +836,12 @@ impl BitcoinCoordinator { self.store .update_tx_to_dispatched(tx.tx_id, dispatch_block)?; + self.allow_search_in_mempool( + tx.tx_id, + tx.context.clone(), + tx.number_confirmation_trigger, + )?; + txs_sent.push(tx); } Err(e) => { @@ -653,10 +858,29 @@ impl BitcoinCoordinator { let news = match error_kind { BitcoinBroadcastErrorKind::AlreadyKnown => { - let deliver_block_height = self.monitor.get_monitor_height()?; - - self.store - .update_tx_to_dispatched(tx.tx_id, deliver_block_height)?; + // Transaction is already known by the node (mempool or blockchain). + // Ask the monitor/indexer for the current status and set the state accordingly. + let tx_status = self.monitor.get_tx_status(&tx.tx_id, true)?; + let mapped_state = tx_status.status.clone().into(); + + match mapped_state { + TransactionState::Finalized | TransactionState::Confirmed => { + self.store.update_tx_state(tx.tx_id, mapped_state)?; + } + + _ => { + // Other states are not expected here; keep previous behavior. + let deliver_block_height = self.monitor.get_monitor_height()?; + self.store + .update_tx_to_dispatched(tx.tx_id, deliver_block_height)?; + + self.allow_search_in_mempool( + tx.tx_id, + tx.context.clone(), + tx.number_confirmation_trigger, + )?; + } + } // The transaction is already in mempool or blockchain, so we acknowledge it. let news = CoordinatorNews::TransactionAlreadyInMempool( @@ -735,7 +959,7 @@ impl BitcoinCoordinator { // for example, the 26th ancestor in the mempool's view. // Therefore, we must decrement the available unconfirmed CPFP slots, // since each batch will require a new CPFP transaction and further extend the ancestry chain. - if allow_unconfirmed_txs - 1 > 0 { + if allow_unconfirmed_txs > 1 { allow_unconfirmed_txs -= 1; } else { batches.push(current_batch); @@ -786,7 +1010,8 @@ impl BitcoinCoordinator { for speedup in active_speedups { match speedup.state { TransactionState::ToDispatch => { - // Speedups in ToDispatch state should be dispatched immediately + // Speedups in ToDispatch state should be dispatched in order + // This includes new speedups and speedups that need retry speedups_to_dispatch.push(speedup); } // Failed speedups are not active - they represent fatal errors @@ -801,7 +1026,7 @@ impl BitcoinCoordinator { // Update states for dispatched and confirmed speedups for speedup in speedups_to_review { - let tx_status = self.monitor.get_tx_status(&speedup.tx_id)?; + let tx_status = self.monitor.get_tx_status(&speedup.tx_id, true)?; if tx_status.is_in_mempool() { // Skip speedups that are still in the mempool, @@ -833,7 +1058,7 @@ impl BitcoinCoordinator { let tx_name = speedup.get_tx_name(); debug!( - "{} {} Transaction({}) was replaced by another RBF", + "{} {}({}) was replaced by another RBF", style("Coordinator").green(), tx_name, style(speedup.tx_id).blue(), @@ -854,20 +1079,22 @@ impl BitcoinCoordinator { // CPFP CASE: // ---------- // For regular CPFP speedups, NotFound typically means the transaction was dropped - // from the mempool (e.g. due to eviction). In that case we should recreate and - // re-dispatch a new speedup transaction. + // from the mempool (e.g. due to eviction). In that case we should mark it as ToDispatch + // to recreate and re-dispatch it in the next tick. debug!( - "{} CPFP speedup transaction not found, scheduling redispatch: {}", + "{} Speedup({}) not found, mark for redispatch", style("Coordinator").green(), style(speedup.tx_id).blue(), ); - speedups_to_dispatch.push(speedup); + // Mark as ToDispatch - will be recreated and dispatched in next tick + self.store + .update_speedup_state(speedup.tx_id, TransactionState::ToDispatch)?; continue; } debug!( - "{} {} Transaction({}) | Confirmations({})", + "{} {}({}) | Confirmations({})", style("Coordinator").green(), speedup.get_tx_name(), style(speedup.tx_id).blue(), @@ -915,70 +1142,32 @@ impl BitcoinCoordinator { } } - // Re-dispatch speedups that need to be resent - if !speedups_to_dispatch.is_empty() { - debug!( - "{} Total number of speedups to be dispatched {} [{:?}]", - style("Coordinator").green(), - style(speedups_to_dispatch.len()).yellow(), - style( - speedups_to_dispatch - .iter() - .map(|s| s.tx_id) - .collect::>() - ) - .blue(), - ); - - for speedup in speedups_to_dispatch { - let can_speedup = self.store.can_speedup()?; + // Dispatch speedups in order (important for funding chain) + // If one fails, we stop to preserve the order + // dispatch_speedup will handle state updates (InMempool on success, ToDispatch for retry, Failed for fatal errors) + if speedups_to_dispatch.is_empty() { + return Ok(()); + } - if !can_speedup { - warn!( - "{} Cannot speedup, waiting for funding or confirmations", - style("Coordinator").green() - ); - break; - } + debug!( + "{} Total number of speedups to be dispatched {}", + style("Coordinator").green(), + style(speedups_to_dispatch.len()).yellow(), + ); - let funding = match self.store.get_funding()? { - Some(funding) => funding, - None => { - warn!( - "{} No funding available for speedup retry", - style("Coordinator").green() - ); - break; - } - }; + for speedup in speedups_to_dispatch { + // TODO: Review this logic. + // Skip funding transactions - they are already finalized and don't need to be dispatched + if speedup.is_funding() { + continue; + } - // Determine if we should use RBF (if the speedup was already replacing another, use its replaces_tx_id, otherwise use its own tx_id for replacement) - let replace_cpfp_txid = if speedup.is_replacing() { - // If this speedup was already replacing another one, continue replacing that one - // Otherwise, replace this speedup itself - speedup.replaces_tx_id.or(Some(speedup.tx_id)) - } else { - None - }; - - let txs_data: Vec<(SpeedupData, Transaction, String)> = speedup - .speedup_tx_data - .iter() - .map(|(speedup_data, tx, _)| { - (speedup_data.clone(), tx.clone(), speedup.context.clone()) - }) - .collect(); - - // Use the existing bump_fee_percentage_used, or increase it for retries - let bump_fee = if speedup.state == TransactionState::Failed { - // For error retries, increase the bump fee - self.get_bump_fee_percentage_strategy(speedup.bump_fee_percentage_used)? - } else { - speedup.bump_fee_percentage_used - }; + // Dispatch the speedup using the existing dispatch_speedup method + // dispatch_speedup will handle state updates (InMempool on success, ToDispatch for retry, Failed for fatal errors) + self.dispatch_speedup_tx(speedup.clone())?; - self.create_and_send_cpfp_tx(txs_data, funding, bump_fee, replace_cpfp_txid)?; - } + // Speedups that were marked as ToDispatch (from NotFound case) will be handled in the next tick + // when they are retrieved again in get_active_speedups() and processed in the ToDispatch branch above } Ok(()) @@ -1016,34 +1205,13 @@ impl BitcoinCoordinator { Ok(current_block_height >= pending_tx.target_block_height.unwrap()) } - fn create_and_send_cpfp_tx( + fn create_cpfp_tx( &self, txs_data: Vec<(SpeedupData, Transaction, String)>, funding: Utxo, bump_fee: f64, replaces_tx_id: Option, ) -> Result<(), BitcoinCoordinatorError> { - // Check if the funding amount is below the minimum required for a speedup. - // If so, notify via CoordinatorNews and exit early. - if funding.amount < self.settings.min_funding_amount_sats { - let news = CoordinatorNews::InsufficientFunds( - funding.txid, - funding.amount, - self.settings.min_funding_amount_sats, - ); - self.update_news(news)?; - - warn!( - "{} Insufficient funds for speedup | FundingTx({}) | Amount({}) | MinRequired({})", - style("Coordinator").green(), - style(funding.txid).yellow(), - style(funding.amount).red(), - style(self.settings.min_funding_amount_sats).blue(), - ); - - return Ok(()); - } - let txs_speedup_data = txs_data .iter() .map(|(speedup_data, tx, _)| (speedup_data.clone(), tx.vsize())) @@ -1089,7 +1257,7 @@ impl BitcoinCoordinator { let previous_txid = speedup_tx.input[0].previous_output.txid; info!( - "{} New {} Transaction({}) | Tx2Speedup({:#?}) | Fee({}) | Transactions#({}) | FundingTx({}) | Vout({}) {} | BumpFee({})", + "{} New {}({}) | Tx2Speedup({:#?}) | Fee({}) | Transactions#({}) | FundingTx({}) | Vout({}) {} | BumpFee({})", style("Coordinator").green(), speedup_type, style(speedup_tx_id).yellow(), @@ -1110,22 +1278,56 @@ impl BitcoinCoordinator { ); let speedup_data = CoordinatedSpeedUpTransaction::new( - speedup_tx_id, + speedup_tx.compute_txid(), + Some(speedup_tx.clone()), funding, new_funding_utxo, replaces_tx_id, - 0, // Temporary value, will be updated after send_transaction - TransactionState::InMempool, + 0, // Will be updated after dispatch + TransactionState::ToDispatch, // Mark for dispatch instead of sending immediately bump_fee, txs_data, new_network_fee_rate, + None, ); - self.dispatch_speedup(speedup_tx, speedup_data)?; + self.dispatch_speedup(speedup_data)?; Ok(()) } + /// Computes the speedup fee that would be required for a batch (without creating the CPFP). + /// Used to validate funding before sending any transaction in the batch. + fn compute_speedup_fee_for_batch( + &self, + txs_data: &[(SpeedupData, Transaction, String)], + funding: &Utxo, + bump_fee: f64, + replaces_tx_id: Option, + ) -> Result { + let txs_speedup_data = txs_data + .iter() + .map(|(speedup_data, tx, _)| (speedup_data.clone(), tx.vsize())) + .collect(); + + let new_network_fee_rate = self.get_network_fee_rate()?; + let (diff_fee_for_unconfirmed_chain, chain_vsize) = + self.get_diff_fee_for_unconfirmed_chain(new_network_fee_rate)?; + let is_rbf = replaces_tx_id.is_some(); + + let (_speedup_tx, speedup_fee) = self.get_speedup_tx( + &txs_speedup_data, + funding, + bump_fee, + is_rbf, + new_network_fee_rate, + diff_fee_for_unconfirmed_chain, + chain_vsize, + )?; + + Ok(speedup_fee) + } + fn get_diff_fee_for_unconfirmed_chain( &self, new_network_fee_rate: u64, @@ -1267,9 +1469,7 @@ impl BitcoinCoordinator { // When this function is called, we know that the last speedup exists to be replaced. let (speedup, rbf_tx) = self.store.get_last_pending_speedup()?.unwrap(); - let replaces_tx_id = rbf_tx - .as_ref() - .map_or_else(|| speedup.tx_id, |rbf| rbf.tx_id); + let rbf_tx_id = rbf_tx.as_ref().map_or(speedup.tx_id, |rbf| rbf.tx_id); let mut txs_to_speedup: Vec = Vec::new(); @@ -1281,25 +1481,26 @@ impl BitcoinCoordinator { // The new_bump_fee will increase the previous bump fee from the CPFP used by adding the number of RBF operations performed + 1. let mut increase_last_bump_fee = speedup.bump_fee_percentage_used; - if let Some(rbf_tx) = rbf_tx { - increase_last_bump_fee = rbf_tx.bump_fee_percentage_used; + if let Some(rbf) = rbf_tx { + increase_last_bump_fee = rbf.bump_fee_percentage_used; } let new_bump_fee = self.get_bump_fee_percentage_strategy(increase_last_bump_fee)?; info!( - "{} RBF last CPFP | SpeedupTxId({}) | PrevBumpFee({}) | NewBumpFee({})", + "{} RBF last CPFP | CPFP({}) | PrevBumpFee({}) | NewBumpFee({})", style("Coordinator").green(), style(speedup.tx_id).yellow(), style(increase_last_bump_fee).blue(), style(new_bump_fee).red() ); - self.create_and_send_cpfp_tx( + // Mark for dispatch instead of sending immediately + self.create_cpfp_tx( speedup.speedup_tx_data, speedup.prev_funding, new_bump_fee, - Some(replaces_tx_id), + Some(rbf_tx_id), )?; Ok(()) @@ -1307,19 +1508,63 @@ impl BitcoinCoordinator { fn perform_speedup(&self) -> Result<(), BitcoinCoordinatorError> { // Check if we can send transactions or we stop the process until CPFP transactions start to be confirmed. - if self.store.can_speedup()? { + if self.can_speedup()? { self.speedup_cpfp_tx()?; - } else { - warn!("{} Can not speedup", style("Coordinator").green()); + } - let is_funding_available = self.store.is_funding_available()?; + Ok(()) + } - if !is_funding_available { - self.notify_funding_not_found()?; - } + fn can_speedup(&self) -> Result { + let is_enough_unconfirmed_txs = self.store.has_enough_unconfirmed_txs_for_cpfp()?; + if !is_enough_unconfirmed_txs { + warn!( + "{} Can not speedup, waiting for release of unconfirmed transactions", + style("Coordinator").green() + ); + + return Ok(false); } - Ok(()) + let is_funding_available = self.store.is_funding_available()?; + if !is_funding_available { + warn!( + "{} Can not speedup, waiting for funding", + style("Coordinator").green() + ); + + let news = CoordinatorNews::FundingNotFound; + self.update_news(news)?; + + return Ok(false); + } + + // Funding exists, validate minimum funding amount + let funding = match self.store.get_funding()? { + Some(funding) => funding, + None => return Ok(false), + }; + + if funding.amount < self.settings.min_funding_amount_sats { + let news = CoordinatorNews::InsufficientFunds( + funding.txid, + funding.amount, + self.settings.min_funding_amount_sats, + ); + self.update_news(news)?; + + warn!( + "{} Insufficient funds for speedup | FundingTx({}) | Amount({}) | MinRequired({})", + style("Coordinator").green(), + style(funding.txid).yellow(), + style(funding.amount).red(), + style(self.settings.min_funding_amount_sats).blue(), + ); + + return Ok(false); + } + + Ok(true) } fn calculate_speedup_fee( @@ -1460,10 +1705,23 @@ impl BitcoinCoordinator { Ok(false) } - fn should_boost_speedup(&self) -> Result { + fn should_boost_last_speedup(&self) -> Result { let last_speedup = self.store.get_last_pending_speedup()?; if let Some((speedup, rbf_tx)) = last_speedup { + // If the last speedup (or its latest RBF replacement) hasn't been broadcast yet, + // do NOT attempt to boost/replace it. It will be dispatched in a later tick. + if speedup.state == TransactionState::ToDispatch { + return Ok(false); + } + + // If the last RBF transaction hasn't been broadcast yet, do NOT attempt to boost/replace it. It will be dispatched in a later tick. + if let Some(rbf) = &rbf_tx { + if rbf.state == TransactionState::ToDispatch { + return Ok(false); + } + } + let current_block_height = self.monitor.get_monitor_height()?; // This block checks if the last speedup transaction should be replaced-by-fee. // It retrieves the last speedup transaction and the number of times it has already been replaced (replace_speedup_count). @@ -1495,32 +1753,43 @@ impl BitcoinCoordinator { Ok(false) } + + fn dispatch_speedup( + &self, + speedup_data: CoordinatedSpeedUpTransaction, + ) -> Result<(), BitcoinCoordinatorError> { + self.allow_search_in_mempool(speedup_data.tx_id, speedup_data.context.clone(), None)?; + + self.store.save_speedup(speedup_data.clone())?; + + info!( + "{} Mark Speedup({}) to dispatch", + style("Coordinator").green(), + style(speedup_data.tx_id).yellow() + ); + + Ok(()) + } } impl BitcoinCoordinatorApi for BitcoinCoordinator { fn tick(&self) -> Result<(), BitcoinCoordinatorError> { self.monitor.tick()?; - if self.is_ready()? { - debug!("{} Ready", style("Coordinator").green()); - } else { - debug!("{} Not Ready", style("Coordinator").green()); - return Ok(()); - } - - self.process_active_transactions()?; - self.process_active_speedups()?; + let is_ready = self.is_ready()?; - if !self.should_boost_speedup()? { + if !is_ready { + debug!("{} Not Ready", style("Coordinator").green()); return Ok(()); } - if self.should_replace_last_speedup()? { - self.replace_last_speedup()?; - return Ok(()); - } + debug!("{} Ready", style("Coordinator").green()); - self.perform_speedup()?; + // IMPORTANT: process speedups first so speedups created during this tick are only dispatched + // in the *next* tick. This preserves ordering guarantees and avoids same-tick dispatch. + self.process_active_speedups()?; + self.process_active_transactions()?; + self.process_boost_speedup()?; Ok(()) } @@ -1534,7 +1803,7 @@ impl BitcoinCoordinatorApi for BitcoinCoordinator { } } - self.monitor.monitor(data)?; + self.monitor.monitor(data, false)?; Ok(()) } @@ -1559,11 +1828,18 @@ impl BitcoinCoordinatorApi for BitcoinCoordinator { context.clone(), number_confirmation_trigger, ); - self.monitor.monitor(to_monitor)?; + + self.monitor.monitor(to_monitor, false)?; // Save the transaction to be dispatched. - self.store - .save_tx(tx.clone(), speedup_data, target_block_height, context, None)?; + self.store.save_tx( + tx.clone(), + speedup_data, + target_block_height, + context, + None, + number_confirmation_trigger, + )?; info!( "{} Mark Transaction({}) to dispatch", @@ -1587,7 +1863,7 @@ impl BitcoinCoordinatorApi for BitcoinCoordinator { context.clone(), number_confirmation_trigger, ); - self.monitor.monitor(to_monitor)?; + self.monitor.monitor(to_monitor, false)?; self.store.save_tx( tx.clone(), @@ -1595,6 +1871,7 @@ impl BitcoinCoordinatorApi for BitcoinCoordinator { target_block_height, context, Some(stuck_in_mempool_blocks), + number_confirmation_trigger, )?; info!( @@ -1619,7 +1896,7 @@ impl BitcoinCoordinatorApi for BitcoinCoordinator { } fn get_transaction(&self, txid: Txid) -> Result { - let tx_status = self.monitor.get_tx_status(&txid)?; + let tx_status = self.monitor.get_tx_status(&txid, true)?; Ok(tx_status) } @@ -1647,6 +1924,8 @@ impl BitcoinCoordinatorApi for BitcoinCoordinator { .filter(|tx| { if let MonitorNews::Transaction(_, _, context_data) = tx { !context_data.contains(CPFP_TRANSACTION_CONTEXT) + && !context_data.contains(RBF_TRANSACTION_CONTEXT) + && !context_data.contains(FUNDING_TRANSACTION_CONTEXT) } else { true } diff --git a/src/speedup.rs b/src/speedup.rs index b82913b..3aea89c 100644 --- a/src/speedup.rs +++ b/src/speedup.rs @@ -33,8 +33,6 @@ pub trait SpeedupStore { txid: &Txid, ) -> Result; - fn can_speedup(&self) -> Result; - fn is_funding_available(&self) -> Result; fn has_enough_unconfirmed_txs_for_cpfp(&self) -> Result; @@ -88,14 +86,16 @@ impl SpeedupStore for BitcoinCoordinatorStore { // The broadcast block height is set to 0 and Finalized because funding should be confirmed on chain. let funding_to_speedup = CoordinatedSpeedUpTransaction::new( next_funding.txid, - next_funding.clone(), - next_funding, - None, // Funding is not an RBF replacement + None, // Funding transactions don't have an associated speedup tx + next_funding.clone(), // prev_funding + next_funding, // next_funding + None, // Funding is not an RBF replacement 0, TransactionState::Finalized, 1.0, vec![], 1, + None, ); self.save_speedup(funding_to_speedup)?; @@ -281,19 +281,6 @@ impl SpeedupStore for BitcoinCoordinatorStore { Ok(active_speedups) } - /// Determines if a speedup (CPFP) transaction can be created and dispatched. - /// - /// Returns `true` if: - /// - There is a funding transaction available to pay for the speedup. - /// - There are enough available unconfirmed transaction slots to satisfy Bitcoin's mempool chain limit policy. - /// (At least `MIN_UNCONFIRMED_TXS_FOR_CPFP` unconfirmed transactions are required: one for the CPFP itself and at least one unconfirmed output to spend.) - fn can_speedup(&self) -> Result { - let is_funding_available = self.is_funding_available()?; - let is_enough_unconfirmed_txs = self.has_enough_unconfirmed_txs_for_cpfp()?; - - Ok(is_funding_available && is_enough_unconfirmed_txs) - } - fn is_funding_available(&self) -> Result { let funding = self.get_funding()?; let is_funding_available = funding.is_some(); diff --git a/src/storage.rs b/src/storage.rs index 49b3fa1..cc25e9e 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -33,6 +33,7 @@ pub trait BitcoinCoordinatorStoreApi { target_block_height: Option, context: String, stuck_in_mempool_blocks: Option, + number_confirmation_trigger: Option, ) -> Result<(), BitcoinCoordinatorStoreError>; fn remove_tx(&self, tx_id: Txid) -> Result<(), BitcoinCoordinatorStoreError>; @@ -186,6 +187,7 @@ impl BitcoinCoordinatorStoreApi for BitcoinCoordinatorStore { target_block_height: Option, context: String, stuck_in_mempool_blocks: Option, + number_confirmation_trigger: Option, ) -> Result<(), BitcoinCoordinatorStoreError> { let key = self.get_key(StoreKey::Transaction(tx.compute_txid())); @@ -196,6 +198,7 @@ impl BitcoinCoordinatorStoreApi for BitcoinCoordinatorStore { target_block_height, context, stuck_in_mempool_blocks, + number_confirmation_trigger, ); self.store.set(&key, &tx_info, None)?; @@ -258,10 +261,19 @@ impl BitcoinCoordinatorStoreApi for BitcoinCoordinatorStore { // Validate state transitions let valid_transition = match (&tx.state, &new_state) { - // Valid transitions (TransactionState::ToDispatch, TransactionState::InMempool) => true, (TransactionState::ToDispatch, TransactionState::Failed) => true, + + // From ToDispatch to Confirmed, Finalized is possible if the client crash and in a + // new tick detects that the transaction was already in the chain and is confirmed or finalized. + // Same happens with InMempool to Confirmed, Finalized. + (TransactionState::ToDispatch, TransactionState::Confirmed) => true, + (TransactionState::ToDispatch, TransactionState::Finalized) => true, + (TransactionState::InMempool, TransactionState::Confirmed) => true, + (TransactionState::InMempool, TransactionState::ToDispatch) => true, + (TransactionState::InMempool, TransactionState::Finalized) => true, + (TransactionState::Confirmed, TransactionState::Finalized) => true, // Allow transition from Confirmed to InMempool when transaction becomes orphan (reorg) (TransactionState::Confirmed, TransactionState::InMempool) => true, diff --git a/src/types.rs b/src/types.rs index 18b8923..06149cf 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,6 +1,7 @@ use bitcoin::{Transaction, Txid}; use bitvmx_bitcoin_rpc::types::BlockHeight; use bitvmx_transaction_monitor::types::{AckMonitorNews, MonitorNews}; +use bitvmx_transaction_monitor::TransactionBlockchainStatus; use protocol_builder::types::{output::SpeedupData, Utxo}; use serde::{Deserialize, Serialize}; @@ -39,6 +40,7 @@ pub struct CoordinatedTransaction { // Number of blocks to wait before considering the transaction stuck in mempool // None means this transaction doesn't have a stuck threshold pub stuck_in_mempool_blocks: Option, + pub number_confirmation_trigger: Option, } impl CoordinatedTransaction { @@ -49,6 +51,7 @@ impl CoordinatedTransaction { target_block_height: Option, context: String, stuck_in_mempool_blocks: Option, + number_confirmation_trigger: Option, ) -> Self { Self { tx_id: tx.compute_txid(), @@ -59,6 +62,7 @@ impl CoordinatedTransaction { target_block_height, context, stuck_in_mempool_blocks, + number_confirmation_trigger, } } } @@ -72,6 +76,9 @@ pub struct CoordinatedSpeedUpTransaction { pub tx_id: Txid, + // The speedup transaction itself (needed for dispatch, None for funding transactions) + pub tx: Option, + // The previous funding utxo. pub prev_funding: Utxo, @@ -97,6 +104,8 @@ pub struct CoordinatedSpeedUpTransaction { pub speedup_tx_data: Vec<(SpeedupData, Transaction, String)>, pub network_fee_rate_used: u64, + + pub confirmation_trigger: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -124,6 +133,7 @@ impl RetryInfo { impl CoordinatedSpeedUpTransaction { pub fn new( tx_id: Txid, + tx: Option, prev_funding: Utxo, next_funding: Utxo, replaces_tx_id: Option, @@ -132,6 +142,7 @@ impl CoordinatedSpeedUpTransaction { bump_fee_percentage_used: f64, speedup_tx_data: Vec<(SpeedupData, Transaction, String)>, network_fee_rate_used: u64, + confirmation_trigger: Option, ) -> Self { let is_rbf = replaces_tx_id.is_some(); let mut context = if is_rbf { @@ -154,6 +165,7 @@ impl CoordinatedSpeedUpTransaction { SpeedupType::CPFP }, tx_id, + tx, prev_funding, next_funding, replaced_by_tx_id: None, // Initially, no transaction replaces this one @@ -164,6 +176,7 @@ impl CoordinatedSpeedUpTransaction { bump_fee_percentage_used, speedup_tx_data, network_fee_rate_used, + confirmation_trigger, } } } @@ -196,6 +209,19 @@ impl CoordinatedSpeedUpTransaction { } } +/// Direct, enum-to-enum mapping between the indexer blockchain status and our internal TransactionState. +impl From for TransactionState { + fn from(status: TransactionBlockchainStatus) -> Self { + match status { + TransactionBlockchainStatus::InMempool => TransactionState::InMempool, + TransactionBlockchainStatus::Confirmed => TransactionState::Confirmed, + TransactionBlockchainStatus::Finalized => TransactionState::Finalized, + TransactionBlockchainStatus::NotFound => TransactionState::ToDispatch, + TransactionBlockchainStatus::Orphan => TransactionState::InMempool, + } + } +} + #[derive(Deserialize, Serialize, Debug, Clone)] pub struct TransactionFullInfo { pub tx: Transaction, diff --git a/tests/batch_txs_test.rs b/tests/batch_txs_test.rs index 26a3e6c..c75c79b 100644 --- a/tests/batch_txs_test.rs +++ b/tests/batch_txs_test.rs @@ -17,7 +17,10 @@ fn batch_txs_regtest_test() -> Result<(), anyhow::Error> { config_trace_aux(); let mut blocks_mined = 102; - info!("Starting batch_txs_regtest_test with {} initial blocks mined", blocks_mined); + info!( + "Starting batch_txs_regtest_test with {} initial blocks mined", + blocks_mined + ); let setup = create_test_setup(TestSetupConfig { blocks_mined, bitcoind_flags: None, @@ -151,6 +154,12 @@ fn batch_txs_regtest_test() -> Result<(), anyhow::Error> { info!("Ticking coordinator after second block mined"); coordinator.tick()?; + coordinator.tick()?; + setup + .bitcoin_client + .mine_blocks_to_address(1, &setup.funding_wallet)?; + coordinator.tick()?; + coordinator.tick()?; let news = coordinator.get_news()?; info!( diff --git a/tests/reorg_rbf_test.rs b/tests/reorg_rbf_test.rs index 8dcb38e..08455ca 100644 --- a/tests/reorg_rbf_test.rs +++ b/tests/reorg_rbf_test.rs @@ -12,7 +12,9 @@ use protocol_builder::types::Utxo; use std::rc::Rc; use tracing::info; -use crate::utils::{config_trace_aux, coordinate_tx, create_test_setup, TestSetupConfig}; +use crate::utils::{ + config_trace_aux, coordinate_tx, create_test_setup, tick_until_coordinator_ready, TestSetupConfig, +}; mod utils; #[test] @@ -109,7 +111,7 @@ fn replace_speedup_regtest_test() -> Result<(), anyhow::Error> { // Mine 3 blocks to confirm tx1 and its speedup transaction // Each block mined advances the blockchain, and each tick processes the new blocks - for _ in 0..3 { + for _ in 0..6 { info!("{} Mine and Tick", style("Test").green()); setup .bitcoin_client @@ -120,6 +122,7 @@ fn replace_speedup_regtest_test() -> Result<(), anyhow::Error> { // Verify that tx1 has been confirmed (1 confirmation) let news = coordinator.get_news()?; + assert_eq!( news.monitor_news.len(), 1, @@ -224,9 +227,7 @@ fn replace_speedup_regtest_test() -> Result<(), anyhow::Error> { .unwrap(); // Wait for coordinator to be ready (indexer synced with blockchain) - while !coordinator.is_ready()? { - coordinator.tick()?; - } + tick_until_coordinator_ready(&coordinator)?; // After mining a new block, tx1 should be confirmed again (re-mined in the new chain) let news = coordinator.get_news()?; diff --git a/tests/reorg_test.rs b/tests/reorg_test.rs index 461121e..fb1bb10 100644 --- a/tests/reorg_test.rs +++ b/tests/reorg_test.rs @@ -82,7 +82,7 @@ fn replace_speedup_regtest_test() -> Result<(), anyhow::Error> { coordinator.tick()?; - for _ in 0..4 { + for _ in 0..8 { info!("{} Mine and Tick", style("Test").green()); // Mine a block to confirm tx1 and its speedup transaction setup diff --git a/tests/replace_speedup_regtest_test.rs b/tests/replace_speedup_regtest_test.rs index 3fe8997..060dc42 100644 --- a/tests/replace_speedup_regtest_test.rs +++ b/tests/replace_speedup_regtest_test.rs @@ -7,7 +7,9 @@ use protocol_builder::types::Utxo; use std::rc::Rc; use tracing::info; -use crate::utils::{config_trace_aux, coordinate_tx, create_test_setup, TestSetupConfig}; +use crate::utils::{ + config_trace_aux, coordinate_tx, create_test_setup, tick_until_coordinator_ready, TestSetupConfig, +}; mod utils; // Almost every transaction sent in the protocol uses a CPFP (Child Pays For Parent) transaction for broadcasting. @@ -66,12 +68,8 @@ fn replace_speedup_regtest_test() -> Result<(), anyhow::Error> { None, )?); - // Since we've already mined 102 blocks, we need to advance the coordinator by 102 ticks - // so the indexer can catch up with the current blockchain height. // Tick coordinator until it is ready (indexer is caught up with the current blockchain height) - while !coordinator.is_ready()? { - coordinator.tick()?; - } + tick_until_coordinator_ready(&coordinator)?; // Add funding for speed up transaction coordinator.add_funding(Utxo::new( diff --git a/tests/speedup_chain_recompute_fee_test.rs b/tests/speedup_chain_recompute_fee_test.rs index 6e6e6dc..f7ca562 100644 --- a/tests/speedup_chain_recompute_fee_test.rs +++ b/tests/speedup_chain_recompute_fee_test.rs @@ -129,10 +129,12 @@ fn speedup_chain_recompute_fee_test() -> Result<(), anyhow::Error> { coordinator.tick()?; } - setup - .bitcoin_client - .mine_blocks_to_address(1, &setup.funding_wallet)?; - coordinator.tick()?; + for _ in 0..4 { + setup + .bitcoin_client + .mine_blocks_to_address(1, &setup.funding_wallet)?; + coordinator.tick()?; + } let news = coordinator.get_news()?; assert_eq!(news.monitor_news.len(), 1); diff --git a/tests/speedup_prevalidation_test.rs b/tests/speedup_prevalidation_test.rs new file mode 100644 index 0000000..bb304a1 --- /dev/null +++ b/tests/speedup_prevalidation_test.rs @@ -0,0 +1,160 @@ +use bitcoin::Amount; +use bitcoin_coordinator::{ + coordinator::{BitcoinCoordinator, BitcoinCoordinatorApi}, + storage::{BitcoinCoordinatorStore, BitcoinCoordinatorStoreApi}, + types::TransactionState, +}; +use bitvmx_bitcoin_rpc::bitcoin_client::BitcoinClientApi; +use protocol_builder::types::Utxo; +use std::rc::Rc; +use tracing::info; + +mod utils; +use crate::utils::{ + config_trace_aux, coordinate_tx, create_test_setup, tick_until_coordinator_ready, + TestSetupConfig, +}; + +/// Indices (0-based) of the 4 intentionally invalid transactions among 40 (not consecutive). +const INVALID_TX_INDICES: [usize; 4] = [3, 11, 19, 27]; + +/// Integration test for the speedup pre‑validation + batching path: +/// - Creates 40 coordinated transactions; 4 of them use fee 0 so `testmempoolaccept` rejects them. +/// - `filter_txs_allowed_by_mempool` must mark those 4 as `Failed` and only keep the 36 valid ones. +/// - The 36 valid txs are dispatched in 2 CPFP batches and later reported as monitor news. +#[test] +fn speedup_prevalidation_40_txs_4_invalid_two_batches() -> Result<(), anyhow::Error> { + config_trace_aux(); + + let mut blocks_mined = 102; + let setup = create_test_setup(TestSetupConfig { + blocks_mined, + bitcoind_flags: None, + })?; + + let amount = Amount::from_sat(23450000); + + let _ = setup + .bitcoin_client + .fund_address(&setup.funding_wallet, amount)?; + blocks_mined += 1; + + let (funding_speedup, funding_speedup_vout) = setup + .bitcoin_client + .fund_address(&setup.funding_wallet, amount)?; + blocks_mined += 1; + + let coordinator = Rc::new(BitcoinCoordinator::new_with_paths( + &setup.config_bitcoin_client, + setup.storage.clone(), + setup.key_manager.clone(), + None, + )?); + + for _ in 0..blocks_mined { + coordinator.tick()?; + } + + coordinator.add_funding(Utxo::new( + funding_speedup.compute_txid(), + funding_speedup_vout, + amount.to_sat(), + &setup.public_key, + ))?; + + // Build 40 coordinated txs; at indices in INVALID_TX_INDICES we force fee = 0 so + // `testmempoolaccept` rejects them during pre‑validation. The rest use the default fee. + let mut invalid_tx_ids = Vec::new(); + info!( + "Submitting 40 transactions (4 invalid at indices {:?})", + INVALID_TX_INDICES + ); + for i in 0..40 { + let is_invalid = INVALID_TX_INDICES.contains(&i); + let fee = if is_invalid { Some(0) } else { None }; + let tx = coordinate_tx( + coordinator.clone(), + amount, + setup.network, + setup.key_manager.clone(), + setup.bitcoin_client.clone(), + fee, + )?; + if is_invalid { + invalid_tx_ids.push(tx.compute_txid()); + } + } + + tick_until_coordinator_ready(&coordinator)?; + + // First batch: + // Mine + tick twice so the coordinator first processes parent txs and then their CPFP speedups, + // and the monitor accumulates news for the first batch of valid transactions. + setup + .bitcoin_client + .mine_blocks_to_address(1, &setup.funding_wallet)?; + + coordinator.tick()?; + + setup + .bitcoin_client + .mine_blocks_to_address(1, &setup.funding_wallet)?; + + coordinator.tick()?; + + let news = coordinator.get_news()?; + let first_batch_count = news.monitor_news.len(); + info!( + "After first block: monitor_news.len() = {}", + first_batch_count + ); + assert!( + first_batch_count == 24, + "expected first batch 24 monitor notifications, got {}", + first_batch_count + ); + + tick_until_coordinator_ready(&coordinator)?; + + // Second batch: + // Again, mine + tick twice so the remaining valid transactions are confirmed and reported. + setup + .bitcoin_client + .mine_blocks_to_address(1, &setup.funding_wallet)?; + + coordinator.tick()?; + + setup + .bitcoin_client + .mine_blocks_to_address(1, &setup.funding_wallet)?; + + coordinator.tick()?; + + let news = coordinator.get_news()?; + info!( + "After second block: monitor_news.len() = {}", + news.monitor_news.len() + ); + assert_eq!( + news.monitor_news.len(), + 36, + "expected 36 monitor notifications (only valid txs), got {}", + news.monitor_news.len() + ); + + // 4 invalid txs must be Failed in store (not active). + let store = BitcoinCoordinatorStore::new(setup.storage.clone(), 10)?; + for tx_id in &invalid_tx_ids { + let coordinated = store.get_tx(tx_id)?; + assert_eq!( + coordinated.state, + TransactionState::Failed, + "invalid tx {} should be Failed", + tx_id + ); + } + + setup.bitcoind.stop()?; + + Ok(()) +} diff --git a/tests/storage_news_test.rs b/tests/storage_news_test.rs index 4609b39..7fc2e9d 100644 --- a/tests/storage_news_test.rs +++ b/tests/storage_news_test.rs @@ -520,7 +520,14 @@ fn test_transaction_state_failed_on_fatal_error() -> Result<(), anyhow::Error> { let tx_id = tx.compute_txid(); // Save the transaction - store.save_tx(tx.clone(), None, None, "test_context".to_string(), None)?; + store.save_tx( + tx.clone(), + None, + None, + "test_context".to_string(), + None, + None, + )?; // Mark transaction as failed (simulating fatal error handling) store.update_tx_state(tx_id, TransactionState::Failed)?; diff --git a/tests/storage_speedup_test.rs b/tests/storage_speedup_test.rs index 38c3b0b..f88b441 100644 --- a/tests/storage_speedup_test.rs +++ b/tests/storage_speedup_test.rs @@ -43,6 +43,7 @@ fn dummy_speedup_tx( CoordinatedSpeedUpTransaction::new( *txid, + None, dummy_utxo(&txid), dummy_utxo(&txid), if is_replace { Some(*txid) } else { None }, // If is_replace, use the same txid as the replaced transaction @@ -55,6 +56,7 @@ fn dummy_speedup_tx( (speedup_data_3, tx_3, "Context 3".to_string()), ], 1, + None, ) } @@ -127,8 +129,8 @@ fn test_save_and_get_speedup() -> Result<(), anyhow::Error> { assert_eq!(pending.len(), 1); assert_eq!(pending[0].tx_id, tx.compute_txid()); - // can_speedup should be true (funding exists) - assert!(store.can_speedup()?); + assert!(store.is_funding_available()?); + assert!(store.has_enough_unconfirmed_txs_for_cpfp()?); clear_output(); Ok(()) @@ -242,23 +244,6 @@ fn test_get_funding_with_replace_speedup_dispatched_and_no_confirmed() -> Result Ok(()) } -#[test] -fn test_can_speedup_none() -> Result<(), anyhow::Error> { - let store = create_store(); - assert!(!store.can_speedup()?); - - // Add 10 dispatched speedups (none are finalized or confirmed) - for _ in 0..10 { - let tx = generate_random_tx(); - let s = dummy_speedup_tx(&tx.compute_txid(), SpeedupState::InMempool, false, 0); - store.save_speedup(s)?; - } - // After only dispatched speedups, can_speedup should still be false - assert!(!store.can_speedup()?); - clear_output(); - Ok(()) -} - #[test] fn test_update_speedup_state_and_remove_from_pending() -> Result<(), anyhow::Error> { let store = create_store(); diff --git a/tests/storage_tx_test.rs b/tests/storage_tx_test.rs index daebc54..e3297f7 100644 --- a/tests/storage_tx_test.rs +++ b/tests/storage_tx_test.rs @@ -33,7 +33,7 @@ fn test_save_and_get_tx() -> Result<(), anyhow::Error> { let tx_id = tx.compute_txid(); // Save transaction - store.save_tx(tx.clone(), None, None, "context_tx".to_string(), None)?; + store.save_tx(tx.clone(), None, None, "context_tx".to_string(), None, None)?; // Get transactions by state let txs = store.get_txs_in_progress()?; @@ -82,7 +82,7 @@ fn test_multiple_transactions() -> Result<(), anyhow::Error> { let tx_id = tx.compute_txid(); // Save transaction - store.save_tx(tx.clone(), None, None, "context_tx".to_string(), None)?; + store.save_tx(tx.clone(), None, None, "context_tx".to_string(), None, None)?; // Test adding multiple transactions and verifying transaction list @@ -105,8 +105,22 @@ fn test_multiple_transactions() -> Result<(), anyhow::Error> { let tx3_id = tx3.compute_txid(); // Save additional transactions - store.save_tx(tx2.clone(), None, None, "context_tx2".to_string(), None)?; - store.save_tx(tx3.clone(), None, None, "context_tx3".to_string(), None)?; + store.save_tx( + tx2.clone(), + None, + None, + "context_tx2".to_string(), + None, + None, + )?; + store.save_tx( + tx3.clone(), + None, + None, + "context_tx3".to_string(), + None, + None, + )?; // Get all transactions in ReadyToSend state (should be all three) let ready_txs = store.get_txs_in_progress()?; @@ -168,8 +182,22 @@ fn test_cancel_monitor() -> Result<(), anyhow::Error> { let tx_id_2 = tx2.compute_txid(); // Save transaction to be monitored, this will be mark as pending dispatch - coordinator.save_tx(tx1.clone(), None, None, "context_tx1".to_string(), None)?; - coordinator.save_tx(tx2.clone(), None, None, "context_tx2".to_string(), None)?; + coordinator.save_tx( + tx1.clone(), + None, + None, + "context_tx1".to_string(), + None, + None, + )?; + coordinator.save_tx( + tx2.clone(), + None, + None, + "context_tx2".to_string(), + None, + None, + )?; // Remove one of the transactions coordinator.remove_tx(tx_id_1)?; diff --git a/tests/stuck_in_mempool_test.rs b/tests/stuck_in_mempool_test.rs index d9d0cc3..6f025e0 100644 --- a/tests/stuck_in_mempool_test.rs +++ b/tests/stuck_in_mempool_test.rs @@ -6,7 +6,9 @@ use bitcoind::bitcoind::BitcoindFlags; use bitvmx_bitcoin_rpc::bitcoin_client::BitcoinClientApi; use std::rc::Rc; -use crate::utils::{config_trace_aux, create_test_setup, TestSetupConfig}; +use crate::utils::{ + config_trace_aux, create_test_setup, tick_until_coordinator_ready, TestSetupConfig, +}; mod utils; #[test] @@ -29,9 +31,7 @@ fn stuck_in_mempool_test() -> Result<(), anyhow::Error> { None, )?); - while !coordinator.is_ready()? { - coordinator.tick()?; - } + tick_until_coordinator_ready(&coordinator)?; let amount = Amount::from_sat(1000000); let (funding_tx, funding_vout) = setup diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs index 83e1ce7..80f4dbc 100644 --- a/tests/utils/mod.rs +++ b/tests/utils/mod.rs @@ -170,15 +170,14 @@ pub fn create_store() -> BitcoinCoordinatorStore { } pub fn config_trace_aux() { - use tracing_subscriber::util::SubscriberInitExt; - let default_modules = [ "info", "libp2p=off", - "bitvmx_transaction_monitor=info", - "bitcoin_indexer=off", + "bitvmx_transaction_monitor=debug", + "bitcoin_indexer=debug", "bitcoin_coordinator=debug", - "bitcoin_client=off", + "bitcoin_rpc=debug", + "bitcoin_client=debug", "p2p_protocol=off", "p2p_handler=off", "tarpc=off", @@ -198,6 +197,20 @@ pub fn config_trace_aux() { .try_init(); } +pub fn tick_until_coordinator_ready( + coordinator: &Rc, +) -> Result<(), anyhow::Error> { + info!( + "{} Waiting for coordinator to be ready", + style("Test").green() + ); + while !coordinator.is_ready()? { + coordinator.tick()?; + } + + Ok(()) +} + pub fn coordinate_tx( coordinator: Rc, amount: Amount,