Skip to content

Commit 7fede49

Browse files
committed
sweep: add monitor loop to TxPublisher
This commit finishes the implementation of `TxPublisher` by adding the monitor process. Whevenr a new block arrives, the publisher will check all its monitored records and attempt fee bumping them if necessary.
1 parent 7386fa1 commit 7fede49

File tree

1 file changed

+202
-0
lines changed

1 file changed

+202
-0
lines changed

sweep/fee_bumper.go

+202
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync/atomic"
88

99
"github.com/btcsuite/btcd/btcutil"
10+
"github.com/btcsuite/btcd/chaincfg/chainhash"
1011
"github.com/btcsuite/btcd/wire"
1112
"github.com/lightningnetwork/lnd/chainntnfs"
1213
"github.com/lightningnetwork/lnd/input"
@@ -518,6 +519,207 @@ type monitorRecord struct {
518519
fee btcutil.Amount
519520
}
520521

522+
// Start starts the publisher by subscribing to block epoch updates and kicking
523+
// off the monitor loop.
524+
func (t *TxPublisher) Start() error {
525+
log.Info("TxPublisher starting...")
526+
527+
blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
528+
if err != nil {
529+
return fmt.Errorf("register block epoch ntfn: %v", err)
530+
}
531+
532+
select {
533+
case bestBlock := <-blockEvent.Epochs:
534+
t.currentHeight = bestBlock.Height
535+
536+
case <-t.quit:
537+
log.Debugf("TxPublisher shutting down, exit start process")
538+
return nil
539+
}
540+
541+
t.wg.Add(1)
542+
go t.monitor(blockEvent)
543+
544+
return nil
545+
}
546+
547+
// Stop stops the publisher and waits for the monitor loop to exit.
548+
func (t *TxPublisher) Stop() {
549+
log.Info("TxPublisher stopping...")
550+
551+
t.wg.Wait()
552+
close(t.quit)
553+
}
554+
555+
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
556+
// it will examine all the txns being monitored, and check if any of them needs
557+
// to be bumped. If so, it will attempt to bump the fee of the tx.
558+
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
559+
defer blockEvent.Cancel()
560+
defer t.wg.Done()
561+
562+
select {
563+
case epoch, ok := <-blockEvent.Epochs:
564+
if !ok {
565+
// We should stop the publisher before stopping the
566+
// chain service. Otherwise it indicates an error.
567+
log.Error("Block epoch channel closed, exit monitor")
568+
569+
return
570+
}
571+
572+
// Update the best known height for the publisher.
573+
t.currentHeight = epoch.Height
574+
575+
// Check all monitored txns to see if any of them needs to be
576+
// bumped.
577+
t.processRecords()
578+
579+
case <-t.quit:
580+
log.Debug("Fee bumper stopped, exit monitor")
581+
return
582+
}
583+
}
584+
585+
// processRecords checks all the txns being monitored, and check if any of them
586+
// needs to be bumped. If so, it will attempt to bump the fee of the tx.
587+
func (t *TxPublisher) processRecords() {
588+
// confirmed is a helper closure that returns a boolean indicating
589+
// whether the tx is confirmed.
590+
confirmed := func(txid chainhash.Hash,
591+
event *chainntnfs.ConfirmationEvent) bool {
592+
593+
select {
594+
case event := <-event.Confirmed:
595+
log.Debugf("Sweep tx %v confirmed in block %v", txid,
596+
event.BlockHeight)
597+
598+
return true
599+
600+
default:
601+
return false
602+
}
603+
}
604+
605+
// visitor is a helper closure that visits each record and performs an
606+
// RBF if necessary.
607+
visitor := func(requestID uint64, r *monitorRecord) error {
608+
oldTxid := r.tx.TxHash()
609+
log.Tracef("Checking monitor record=%v for tx=%v", requestID,
610+
oldTxid)
611+
612+
// If the tx is already confirmed, we can stop monitoring it.
613+
if confirmed(oldTxid, r.confEvent) {
614+
// Create a result that will be sent to the resultChan
615+
// which is listened by the caller.
616+
result := &BumpResult{
617+
CurrentTx: r.tx,
618+
Confirmed: true,
619+
requestID: requestID,
620+
}
621+
622+
// Notify that this tx is confirmed and remove the
623+
// record from the map.
624+
t.notifyAndRemove(result)
625+
626+
// Move to the next record.
627+
return nil
628+
}
629+
630+
// Get the current conf target for this record.
631+
confTarget := t.calcCurrentConfTarget(r.set.DeadlineHeight())
632+
633+
// Ask the fee function whether a bump is needed. We expect the
634+
// fee function to increase its returned fee rate after calling
635+
// this method.
636+
if r.feeFunction.SkipFeeBump(confTarget) {
637+
log.Debugf("Skip bumping tx %v at height=%v", oldTxid,
638+
t.currentHeight)
639+
640+
return nil
641+
}
642+
643+
// The fee function now has a new fee rate, we will use it to
644+
// bump the fee of the tx.
645+
result, err := t.createAndPublishTx(requestID, r)
646+
if err != nil {
647+
log.Errorf("Failed to bump tx %v: %v", oldTxid, err)
648+
649+
// Return nil so the visitor can continue to the next
650+
// record.
651+
return nil
652+
}
653+
654+
// Notify the new result.
655+
t.notifyAndRemove(result)
656+
657+
return nil
658+
}
659+
660+
// TODO(yy): need to double check as the above `visitor` will put data
661+
// in the sync map, so we may need to do a read first.
662+
t.records.ForEach(visitor)
663+
}
664+
665+
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
666+
// to the network. It will update the record with the new tx and fee rate if
667+
// successfully created, and return the result when published successfully.
668+
func (t *TxPublisher) createAndPublishTx(requestID uint64,
669+
r *monitorRecord) (*BumpResult, error) {
670+
671+
// Fetch the old tx.
672+
oldTx := r.tx
673+
674+
// Create a new tx with the new fee rate.
675+
tx, fee, err := t.createAndCheckTx(r.set, r.changeScript, r.feeFunction)
676+
677+
// If the tx doesn't not have enought budget, we will return a result
678+
// so the sweeper can handle it by re-clustering the utxos.
679+
if errors.Is(err, ErrNotEnoughBudget) {
680+
return &BumpResult{
681+
PreviousTx: oldTx,
682+
Err: err,
683+
requestID: requestID,
684+
}, nil
685+
686+
}
687+
688+
// If the error is not budget related, we will return an error and let
689+
// the fee bumper retry it at next block.
690+
//
691+
// NOTE: we can check the RBF error here and ask the fee function to
692+
// recalculate the fee rate. However, this would defeat the purpose of
693+
// using a deadline based fee function:
694+
// - if the deadline is far away, there's no rush to RBF the tx.
695+
// - if the deadline is close, we expect the fee function to give us a
696+
// higher fee rate. If the fee rate cannot satisfy the RBF rules, it
697+
// means the budget is not enough.
698+
if err != nil {
699+
return nil, err
700+
}
701+
702+
// Register a new record by overwriting the same requestID.
703+
t.records.Store(requestID, &monitorRecord{
704+
tx: tx,
705+
set: r.set,
706+
changeScript: r.changeScript,
707+
feeFunction: r.feeFunction,
708+
fee: fee,
709+
})
710+
711+
// Attempt to broadcast this new tx.
712+
result, err := t.broadcast(requestID)
713+
if err != nil {
714+
return nil, err
715+
}
716+
717+
// Attach the old tx and return.
718+
result.PreviousTx = oldTx
719+
720+
return result, nil
721+
}
722+
521723
// TODO(yy): temp, remove this placeholder once the TestMempoolAccept PR is
522724
// merged.
523725
func (t *TxPublisher) TestMempoolAccept(*wire.MsgTx,

0 commit comments

Comments
 (0)