Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3/5]sweep: make pending inputs stateful #8423

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/btcsuite/btcwallet/chain"
"github.com/lightningnetwork/lnd/blockcache"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/queue"
)

Expand Down Expand Up @@ -1070,3 +1071,26 @@ func (b *BitcoindNotifier) CancelMempoolSpendEvent(

b.memNotifier.UnsubscribeEvent(sub)
}

// LookupInputMempoolSpend takes an outpoint and queries the mempool to find
// its spending tx. Returns the tx if found, otherwise fn.None.
//
// NOTE: part of the MempoolWatcher interface.
func (b *BitcoindNotifier) LookupInputMempoolSpend(
op wire.OutPoint) fn.Option[wire.MsgTx] {

// Find the spending txid.
txid, found := b.chainConn.LookupInputMempoolSpend(op)
if !found {
return fn.None[wire.MsgTx]()
}

// Query the spending tx using the id.
tx, err := b.chainConn.GetRawTransaction(&txid)
if err != nil {
// TODO(yy): enable logging errors in this package.
return fn.None[wire.MsgTx]()
}

return fn.Some(*tx.MsgTx().Copy())
}
48 changes: 41 additions & 7 deletions chainntnfs/btcdnotify/btcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/chain"
"github.com/lightningnetwork/lnd/blockcache"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/queue"
)

Expand Down Expand Up @@ -58,7 +60,7 @@ type BtcdNotifier struct {
active int32 // To be used atomically.
stopped int32 // To be used atomically.

chainConn *rpcclient.Client
chainConn *chain.RPCClient
chainParams *chaincfg.Params

notificationCancels chan interface{}
Expand Down Expand Up @@ -127,21 +129,30 @@ func New(config *rpcclient.ConnConfig, chainParams *chaincfg.Params,
quit: make(chan struct{}),
}

// Disable connecting to btcd within the rpcclient.New method. We
// defer establishing the connection to our .Start() method.
config.DisableConnectOnNew = true
config.DisableAutoReconnect = false

ntfnCallbacks := &rpcclient.NotificationHandlers{
OnBlockConnected: notifier.onBlockConnected,
OnBlockDisconnected: notifier.onBlockDisconnected,
OnRedeemingTx: notifier.onRedeemingTx,
}

// Disable connecting to btcd within the rpcclient.New method. We
// defer establishing the connection to our .Start() method.
config.DisableConnectOnNew = true
config.DisableAutoReconnect = false
chainConn, err := rpcclient.New(config, ntfnCallbacks)
rpcCfg := &chain.RPCClientConfig{
ReconnectAttempts: 20,
Conn: config,
Chain: chainParams,
NotificationHandlers: ntfnCallbacks,
}

chainRPC, err := chain.NewRPCClientWithConfig(rpcCfg)
if err != nil {
return nil, err
}
notifier.chainConn = chainConn

notifier.chainConn = chainRPC

return notifier, nil
}
Expand Down Expand Up @@ -1127,3 +1138,26 @@ func (b *BtcdNotifier) CancelMempoolSpendEvent(

b.memNotifier.UnsubscribeEvent(sub)
}

// LookupInputMempoolSpend takes an outpoint and queries the mempool to find
// its spending tx. Returns the tx if found, otherwise fn.None.
//
// NOTE: part of the MempoolWatcher interface.
func (b *BtcdNotifier) LookupInputMempoolSpend(
op wire.OutPoint) fn.Option[wire.MsgTx] {

// Find the spending txid.
txid, found := b.chainConn.LookupInputMempoolSpend(op)
if !found {
return fn.None[wire.MsgTx]()
}

// Query the spending tx using the id.
tx, err := b.chainConn.GetRawTransaction(&txid)
if err != nil {
// TODO(yy): enable logging errors in this package.
return fn.None[wire.MsgTx]()
}

return fn.Some(*tx.MsgTx().Copy())
}
6 changes: 6 additions & 0 deletions chainntnfs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn"
)

var (
Expand Down Expand Up @@ -848,4 +849,9 @@ type MempoolWatcher interface {
// CancelMempoolSpendEvent allows the caller to cancel a subscription to
// watch for a spend of an outpoint in the mempool.
CancelMempoolSpendEvent(sub *MempoolSpendEvent)

// LookupInputMempoolSpend looks up the mempool to find a spending tx
// which spends the given outpoint. A fn.None is returned if it's not
// found.
LookupInputMempoolSpend(op wire.OutPoint) fn.Option[wire.MsgTx]
}
52 changes: 52 additions & 0 deletions chainntnfs/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package chainntnfs

import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn"
"github.com/stretchr/testify/mock"
)

// MockMempoolWatcher is a mock implementation of the MempoolWatcher interface.
// This is used by other subsystems to mock the behavior of the mempool
// watcher.
type MockMempoolWatcher struct {
mock.Mock
}

// NewMockMempoolWatcher returns a new instance of a mock mempool watcher.
func NewMockMempoolWatcher() *MockMempoolWatcher {
return &MockMempoolWatcher{}
}

// Compile-time check to ensure MockMempoolWatcher implements MempoolWatcher.
var _ MempoolWatcher = (*MockMempoolWatcher)(nil)

// SubscribeMempoolSpent implements the MempoolWatcher interface.
func (m *MockMempoolWatcher) SubscribeMempoolSpent(
op wire.OutPoint) (*MempoolSpendEvent, error) {

args := m.Called(op)

if args.Get(0) == nil {
return nil, args.Error(1)
}

return args.Get(0).(*MempoolSpendEvent), args.Error(1)
}

// CancelMempoolSpendEvent implements the MempoolWatcher interface.
func (m *MockMempoolWatcher) CancelMempoolSpendEvent(
sub *MempoolSpendEvent) {

m.Called(sub)
}

// LookupInputMempoolSpend looks up the mempool to find a spending tx which
// spends the given outpoint.
func (m *MockMempoolWatcher) LookupInputMempoolSpend(
op wire.OutPoint) fn.Option[wire.MsgTx] {

args := m.Called(op)

return args.Get(0).(fn.Option[wire.MsgTx])
}
19 changes: 10 additions & 9 deletions cmd/lncli/walletrpc_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
// PendingSweep is a CLI-friendly type of the walletrpc.PendingSweep proto. We
// use this to show more useful string versions of byte slices and enums.
type PendingSweep struct {
OutPoint OutPoint `json:"outpoint"`
WitnessType string `json:"witness_type"`
AmountSat uint32 `json:"amount_sat"`
SatPerVByte uint32 `json:"sat_per_vbyte"`
BroadcastAttempts uint32 `json:"broadcast_attempts"`
NextBroadcastHeight uint32 `json:"next_broadcast_height"`
RequestedSatPerVByte uint32 `json:"requested_sat_per_vbyte"`
RequestedConfTarget uint32 `json:"requested_conf_target"`
Force bool `json:"force"`
OutPoint OutPoint `json:"outpoint"`
WitnessType string `json:"witness_type"`
AmountSat uint32 `json:"amount_sat"`
SatPerVByte uint32 `json:"sat_per_vbyte"`
BroadcastAttempts uint32 `json:"broadcast_attempts"`
// TODO(yy): deprecate.
NextBroadcastHeight uint32 `json:"next_broadcast_height"`
RequestedSatPerVByte uint32 `json:"requested_sat_per_vbyte"`
RequestedConfTarget uint32 `json:"requested_conf_target"`
Force bool `json:"force"`
}

// NewPendingSweepFromProto converts the walletrpc.PendingSweep proto type into
Expand Down
11 changes: 0 additions & 11 deletions contractcourt/anchor_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,6 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) {
c.log.Warnf("our anchor spent by someone else")
outcome = channeldb.ResolverOutcomeUnclaimed

// The sweeper gave up on sweeping the anchor. This happens
// after the maximum number of sweep attempts has been reached.
// See sweep.DefaultMaxSweepAttempts. Sweep attempts are
// interspaced with random delays picked from a range that
// increases exponentially.
//
// We consider the anchor as being lost.
case sweep.ErrTooManyAttempts:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

c.log.Warnf("anchor sweep abandoned")
outcome = channeldb.ResolverOutcomeUnclaimed

// An unexpected error occurred.
default:
c.log.Errorf("unable to sweep anchor: %v", sweepRes.Err)
Expand Down
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.18.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ bitcoin peers' feefilter values into account](https://github.com/lightningnetwor
* Bump sqlite version to [fix a data
race](https://github.com/lightningnetwork/lnd/pull/8567).

* The pending inputs in the sweeper is now
[stateful](https://github.com/lightningnetwork/lnd/pull/8423) to better
manage the lifecycle of the inputs.

## Breaking Changes
## Performance Improvements

Expand Down
22 changes: 14 additions & 8 deletions itest/lnd_open_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,12 +829,19 @@ func testSimpleTaprootChannelActivation(ht *lntest.HarnessTest) {
// up as locked balance in the WalletBalance response.
func testOpenChannelLockedBalance(ht *lntest.HarnessTest) {
var (
alice = ht.Alice
bob = ht.Bob
req *lnrpc.ChannelAcceptRequest
err error
bob = ht.Bob
req *lnrpc.ChannelAcceptRequest
err error
)

// Create a new node so we can assert exactly how much fund has been
// locked later.
alice := ht.NewNode("alice", nil)
ht.FundCoins(btcutil.SatoshiPerBitcoin, alice)

// Connect the nodes.
ht.EnsureConnected(alice, bob)

// We first make sure Alice has no locked wallet balance.
balance := alice.RPC.WalletBalance()
require.EqualValues(ht, 0, balance.LockedBalance)
Expand All @@ -851,6 +858,7 @@ func testOpenChannelLockedBalance(ht *lntest.HarnessTest) {
openChannelReq := &lnrpc.OpenChannelRequest{
NodePubkey: bob.PubKey[:],
LocalFundingAmount: int64(funding.MaxBtcFundingAmount),
TargetConf: 6,
}
_ = alice.RPC.OpenChannel(openChannelReq)

Expand All @@ -862,8 +870,7 @@ func testOpenChannelLockedBalance(ht *lntest.HarnessTest) {
}, defaultTimeout)
require.NoError(ht, err)

balance = alice.RPC.WalletBalance()
require.NotEqualValues(ht, 0, balance.LockedBalance)
ht.AssertWalletLockedBalance(alice, btcutil.SatoshiPerBitcoin)

// Next, we let Bob deny the request.
resp := &lnrpc.ChannelAcceptResponse{
Expand All @@ -876,6 +883,5 @@ func testOpenChannelLockedBalance(ht *lntest.HarnessTest) {
require.NoError(ht, err)

// Finally, we check to make sure the balance is unlocked again.
balance = alice.RPC.WalletBalance()
require.EqualValues(ht, 0, balance.LockedBalance)
ht.AssertWalletLockedBalance(alice, 0)
}
2 changes: 0 additions & 2 deletions lnrpc/walletrpc/walletkit_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,6 @@ func (w *WalletKit) PendingSweeps(ctx context.Context,
amountSat := uint32(pendingInput.Amount)
satPerVbyte := uint64(pendingInput.LastFeeRate.FeePerVByte())
broadcastAttempts := uint32(pendingInput.BroadcastAttempts)
nextBroadcastHeight := uint32(pendingInput.NextBroadcastHeight)

feePref := pendingInput.Params.Fee
requestedFee, ok := feePref.(sweep.FeeEstimateInfo)
Expand All @@ -892,7 +891,6 @@ func (w *WalletKit) PendingSweeps(ctx context.Context,
AmountSat: amountSat,
SatPerVbyte: satPerVbyte,
BroadcastAttempts: broadcastAttempts,
NextBroadcastHeight: nextBroadcastHeight,
RequestedSatPerVbyte: requestedFeeRate,
RequestedConfTarget: requestedFee.ConfTarget,
Force: pendingInput.Params.Force,
Expand Down
19 changes: 19 additions & 0 deletions lntest/harness_assertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -2572,3 +2572,22 @@ func (h *HarnessTest) MineClosingTx(cp *lnrpc.ChannelPoint,

return closeTx
}

// AssertWalletLockedBalance asserts the expected amount has been marked as
// locked in the node's WalletBalance response.
func (h *HarnessTest) AssertWalletLockedBalance(hn *node.HarnessNode,
balance int64) {

err := wait.NoError(func() error {
balanceResp := hn.RPC.WalletBalance()
got := balanceResp.LockedBalance

if got != balance {
return fmt.Errorf("want %d, got %d", balance, got)
}

return nil
}, wait.DefaultTimeout)
require.NoError(h, err, "%s: timeout checking locked balance",
hn.Name())
}
24 changes: 12 additions & 12 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,18 +1068,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
)

s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
FeeEstimator: cc.FeeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.Wallet),
Signer: cc.Wallet.Cfg.Signer,
Wallet: newSweeperWallet(cc.Wallet),
TickerDuration: cfg.Sweeper.BatchWindowDuration,
Notifier: cc.ChainNotifier,
Store: sweeperStore,
MaxInputsPerTx: sweep.DefaultMaxInputsPerTx,
MaxSweepAttempts: sweep.DefaultMaxSweepAttempts,
NextAttemptDeltaFunc: sweep.DefaultNextAttemptDeltaFunc,
MaxFeeRate: cfg.Sweeper.MaxFeeRate,
Aggregator: aggregator,
FeeEstimator: cc.FeeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.Wallet),
Signer: cc.Wallet.Cfg.Signer,
Wallet: newSweeperWallet(cc.Wallet),
TickerDuration: cfg.Sweeper.BatchWindowDuration,
Mempool: cc.MempoolNotifier,
Notifier: cc.ChainNotifier,
Store: sweeperStore,
MaxInputsPerTx: sweep.DefaultMaxInputsPerTx,
MaxSweepAttempts: sweep.DefaultMaxSweepAttempts,
MaxFeeRate: cfg.Sweeper.MaxFeeRate,
Aggregator: aggregator,
})

s.utxoNursery = contractcourt.NewUtxoNursery(&contractcourt.NurseryConfig{
Expand Down
Loading
Loading