Skip to content

Commit fa2c3cf

Browse files
committed
sweep: refactor attachAvailableRBFInfo to decideStateAndRBFInfo
Thus this method `decideStateAndRBFInfo` won't touch the state changes of a given input.
1 parent 041aecf commit fa2c3cf

File tree

2 files changed

+53
-58
lines changed

2 files changed

+53
-58
lines changed

sweep/sweeper.go

+32-25
Original file line numberDiff line numberDiff line change
@@ -1365,20 +1365,22 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
13651365
return
13661366
}
13671367

1368+
// This is a new input, and we want to query the mempool to see if this
1369+
// input has already been spent. If so, we'll start the input with
1370+
// state Published and attach the RBFInfo.
1371+
state, rbfInfo := s.decideStateAndRBFInfo(*input.input.OutPoint())
1372+
13681373
// Create a new pendingInput and initialize the listeners slice with
13691374
// the passed in result channel. If this input is offered for sweep
13701375
// again, the result channel will be appended to this slice.
13711376
pi = &pendingInput{
1372-
state: StateInit,
1377+
state: state,
13731378
listeners: []chan Result{input.resultChan},
13741379
Input: input.input,
13751380
params: input.params,
1381+
rbf: rbfInfo,
13761382
}
13771383

1378-
// Try to find fee info for possible RBF if this input has already been
1379-
// spent.
1380-
pi = s.attachAvailableRBFInfo(pi)
1381-
13821384
s.pendingInputs[outpoint] = pi
13831385
log.Tracef("input %v, state=%v, added to pendingInputs", outpoint,
13841386
pi.state)
@@ -1399,31 +1401,36 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
13991401
pi.ntfnRegCancel = cancel
14001402
}
14011403

1402-
// attachAvailableRBFInfo queries the mempool to see whether the given input
1403-
// has already been spent. If so, it will query the sweeper store to fetch the
1404-
// fee info of the spending transction, hence preparing for possible RBF.
1405-
func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput {
1404+
// decideStateAndRBFInfo queries the mempool to see whether the given input has
1405+
// already been spent. If so, the state Published will be returned, otherwise
1406+
// state Init. When spent, it will query the sweeper store to fetch the fee
1407+
// info of the spending transction, and construct an RBFInfo based on it.
1408+
// Suppose an error occurs, fn.None is returned.
1409+
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
1410+
SweepState, fn.Option[RBFInfo]) {
1411+
14061412
// Check if we can find the spending tx of this input in mempool.
1407-
txOption := s.mempoolLookup(*pi.OutPoint())
1413+
txOption := s.mempoolLookup(op)
1414+
1415+
// Extract the spending tx from the option.
1416+
var tx *wire.MsgTx
1417+
txOption.WhenSome(func(t wire.MsgTx) {
1418+
tx = &t
1419+
})
14081420

14091421
// Exit early if it's not found.
14101422
//
14111423
// NOTE: this is not accurate for backends that don't support mempool
14121424
// lookup:
14131425
// - for neutrino we don't have a mempool.
14141426
// - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1415-
if txOption.IsNone() {
1416-
return pi
1427+
if tx == nil {
1428+
return StateInit, fn.None[RBFInfo]()
14171429
}
14181430

1419-
// NOTE: we use UnsafeFromSome for here because we are sure this option
1420-
// is NOT none.
1421-
tx := txOption.UnsafeFromSome()
1422-
1423-
// Otherwise the input is already spent in the mempool, update its
1424-
// state to StatePublished.
1425-
pi.state = StatePublished
1426-
1431+
// Otherwise the input is already spent in the mempool, so eventually
1432+
// we will return StatePublished.
1433+
//
14271434
// We also need to update the RBF info for this input. If the sweeping
14281435
// transaction is broadcast by us, we can find the fee info in the
14291436
// sweeper store.
@@ -1436,25 +1443,25 @@ func (s *UtxoSweeper) attachAvailableRBFInfo(pi *pendingInput) *pendingInput {
14361443
// pendingInputs.
14371444
if errors.Is(err, ErrTxNotFound) {
14381445
log.Warnf("Spending tx %v not found in sweeper store", txid)
1439-
return pi
1446+
return StatePublished, fn.None[RBFInfo]()
14401447
}
14411448

14421449
// Exit if we get an db error.
14431450
if err != nil {
14441451
log.Errorf("Unable to get tx %v from sweeper store: %v",
14451452
txid, err)
14461453

1447-
return pi
1454+
return StatePublished, fn.None[RBFInfo]()
14481455
}
14491456

1450-
// Attach the fee info and return it.
1451-
pi.rbf = fn.Some(RBFInfo{
1457+
// Prepare the fee info and return it.
1458+
rbf := fn.Some(RBFInfo{
14521459
Txid: txid,
14531460
Fee: btcutil.Amount(tr.Fee),
14541461
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
14551462
})
14561463

1457-
return pi
1464+
return StatePublished, rbf
14581465
}
14591466

14601467
// handleExistingInput processes an input that is already known to the sweeper.

sweep/sweeper_test.go

+21-33
Original file line numberDiff line numberDiff line change
@@ -2399,27 +2399,17 @@ func TestUpdateSweeperInputs(t *testing.T) {
23992399
require.Equal(expectedInputs, s.pendingInputs)
24002400
}
24012401

2402-
// TestAttachAvailableRBFInfo checks that the RBF info is attached to the
2403-
// pending input, along with the state being marked as published, when this
2404-
// input can be found both in mempool and the sweeper store.
2405-
func TestAttachAvailableRBFInfo(t *testing.T) {
2402+
// TestDecideStateAndRBFInfo checks that the expected state and RBFInfo are
2403+
// returned based on whether this input can be found both in mempool and the
2404+
// sweeper store.
2405+
func TestDecideStateAndRBFInfo(t *testing.T) {
24062406
t.Parallel()
24072407

24082408
require := require.New(t)
24092409

24102410
// Create a test outpoint.
24112411
op := wire.OutPoint{Index: 1}
24122412

2413-
// Create a mock input.
2414-
testInput := &input.MockInput{}
2415-
defer testInput.AssertExpectations(t)
2416-
2417-
testInput.On("OutPoint").Return(&op)
2418-
pi := &pendingInput{
2419-
Input: testInput,
2420-
state: StateInit,
2421-
}
2422-
24232413
// Create a mock mempool watcher and a mock sweeper store.
24242414
mockMempool := chainntnfs.NewMockMempoolWatcher()
24252415
defer mockMempool.AssertExpectations(t)
@@ -2436,11 +2426,11 @@ func TestAttachAvailableRBFInfo(t *testing.T) {
24362426
mockMempool.On("LookupInputMempoolSpend", op).Return(
24372427
fn.None[wire.MsgTx]()).Once()
24382428

2439-
// Since the mempool lookup failed, we exepect the original pending
2440-
// input to stay unchanged.
2441-
result := s.attachAvailableRBFInfo(pi)
2442-
require.True(result.rbf.IsNone())
2443-
require.Equal(StateInit, result.state)
2429+
// Since the mempool lookup failed, we exepect state Init and no
2430+
// RBFInfo.
2431+
state, rbf := s.decideStateAndRBFInfo(op)
2432+
require.True(rbf.IsNone())
2433+
require.Equal(StateInit, state)
24442434

24452435
// Mock the mempool lookup to return a tx three times as we are calling
24462436
// attachAvailableRBFInfo three times.
@@ -2451,21 +2441,19 @@ func TestAttachAvailableRBFInfo(t *testing.T) {
24512441
// Mock the store to return an error saying the tx cannot be found.
24522442
mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once()
24532443

2454-
// Although the db lookup failed, the pending input should have been
2455-
// marked as published without attaching any RBF info.
2456-
result = s.attachAvailableRBFInfo(pi)
2457-
require.True(result.rbf.IsNone())
2458-
require.Equal(StatePublished, result.state)
2444+
// Although the db lookup failed, we expect the state to be Published.
2445+
state, rbf = s.decideStateAndRBFInfo(op)
2446+
require.True(rbf.IsNone())
2447+
require.Equal(StatePublished, state)
24592448

24602449
// Mock the store to return a db error.
24612450
dummyErr := errors.New("dummy error")
24622451
mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once()
24632452

2464-
// Although the db lookup failed, the pending input should have been
2465-
// marked as published without attaching any RBF info.
2466-
result = s.attachAvailableRBFInfo(pi)
2467-
require.True(result.rbf.IsNone())
2468-
require.Equal(StatePublished, result.state)
2453+
// Although the db lookup failed, we expect the state to be Published.
2454+
state, rbf = s.decideStateAndRBFInfo(op)
2455+
require.True(rbf.IsNone())
2456+
require.Equal(StatePublished, state)
24692457

24702458
// Mock the store to return a record.
24712459
tr := &TxRecord{
@@ -2475,18 +2463,18 @@ func TestAttachAvailableRBFInfo(t *testing.T) {
24752463
mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once()
24762464

24772465
// Call the method again.
2478-
result = s.attachAvailableRBFInfo(pi)
2466+
state, rbf = s.decideStateAndRBFInfo(op)
24792467

2480-
// Assert that the RBF info is attached to the pending input.
2468+
// Assert that the RBF info is returned.
24812469
rbfInfo := fn.Some(RBFInfo{
24822470
Txid: tx.TxHash(),
24832471
Fee: btcutil.Amount(tr.Fee),
24842472
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
24852473
})
2486-
require.Equal(rbfInfo, result.rbf)
2474+
require.Equal(rbfInfo, rbf)
24872475

24882476
// Assert the state is updated.
2489-
require.Equal(StatePublished, result.state)
2477+
require.Equal(StatePublished, state)
24902478
}
24912479

24922480
// TestMarkInputFailed checks that the input is marked as failed as expected.

0 commit comments

Comments
 (0)