Skip to content

Commit b20f32f

Browse files
committed
sweep: add method handleBumpEventError and fix markInputFailed
Previously in `markInputFailed`, we'd remove all inputs under the same group via `removeExclusiveGroup`. This is wrong as when the current sweep fails for this input, it shouldn't affect other inputs.
1 parent 9b8bd6e commit b20f32f

File tree

2 files changed

+234
-6
lines changed

2 files changed

+234
-6
lines changed

sweep/sweeper.go

+58-6
Original file line numberDiff line numberDiff line change
@@ -1433,11 +1433,6 @@ func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) {
14331433

14341434
pi.state = Failed
14351435

1436-
// Remove all other inputs in this exclusive group.
1437-
if pi.params.ExclusiveGroup != nil {
1438-
s.removeExclusiveGroup(*pi.params.ExclusiveGroup)
1439-
}
1440-
14411436
s.signalResult(pi, Result{Err: err})
14421437
}
14431438

@@ -1701,6 +1696,60 @@ func (s *UtxoSweeper) handleBumpEventTxPublished(r *BumpResult) error {
17011696
return nil
17021697
}
17031698

1699+
// handleBumpEventError handles the case where there's an unexpected error when
1700+
// creating or publishing the sweeping tx. In this case, the tx will be removed
1701+
// from the sweeper store and the inputs will be marked as `Failed`.
1702+
func (s *UtxoSweeper) handleBumpEventError(r *BumpResult) error {
1703+
txid := r.Tx.TxHash()
1704+
log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
1705+
1706+
// Remove the tx from the sweeper db if it exists.
1707+
if err := s.cfg.Store.DeleteTx(txid); err != nil {
1708+
return fmt.Errorf("delete tx record for %v: %w", txid, err)
1709+
}
1710+
1711+
// Mark the inputs as failed.
1712+
s.markInputsFailed(r.Tx, r.Err)
1713+
1714+
return nil
1715+
}
1716+
1717+
// markInputsFailed marks all inputs found in the tx as failed. It will also
1718+
// notify all the subscribers of these inputs.
1719+
func (s *UtxoSweeper) markInputsFailed(tx *wire.MsgTx, err error) {
1720+
for _, txIn := range tx.TxIn {
1721+
outpoint := txIn.PreviousOutPoint
1722+
1723+
input, ok := s.inputs[outpoint]
1724+
if !ok {
1725+
// It's very likely that a spending tx contains inputs
1726+
// that we don't know.
1727+
log.Tracef("Skipped marking input as failed: %v not "+
1728+
"found in pending inputs", outpoint)
1729+
1730+
continue
1731+
}
1732+
1733+
// If the input is already in a terminal state, we don't want
1734+
// to rewrite it, which also indicates an error as we only get
1735+
// an error event during the initial broadcast.
1736+
if input.terminated() {
1737+
log.Errorf("Skipped marking input=%v as failed due to "+
1738+
"unexpected state=%v", outpoint, input.state)
1739+
1740+
continue
1741+
}
1742+
1743+
input.state = Failed
1744+
1745+
// Signal result channels.
1746+
s.signalResult(input, Result{
1747+
Tx: tx,
1748+
Err: err,
1749+
})
1750+
}
1751+
}
1752+
17041753
// handleBumpEvent handles the result sent from the bumper based on its event
17051754
// type.
17061755
//
@@ -1724,8 +1773,11 @@ func (s *UtxoSweeper) handleBumpEvent(r *BumpResult) error {
17241773
case TxReplaced:
17251774
return s.handleBumpEventTxReplaced(r)
17261775

1776+
// There's an unexpected error in creating or publishing the tx, we
1777+
// will remove the tx from the sweeper db and mark the inputs as
1778+
// failed.
17271779
case TxError:
1728-
// TODO(yy): create a method to remove this input.
1780+
return s.handleBumpEventError(r)
17291781
}
17301782

17311783
return nil

sweep/sweeper_test.go

+176
Original file line numberDiff line numberDiff line change
@@ -1104,3 +1104,179 @@ func TestMonitorFeeBumpResult(t *testing.T) {
11041104
})
11051105
}
11061106
}
1107+
1108+
// TestMarkInputsFailed checks that given a list of inputs with different
1109+
// states, the method `markInputsFailed` correctly marks the inputs as failed.
1110+
func TestMarkInputsFailed(t *testing.T) {
1111+
t.Parallel()
1112+
1113+
require := require.New(t)
1114+
1115+
// Create a test sweeper.
1116+
s := New(&UtxoSweeperConfig{})
1117+
1118+
// Create a mock input.
1119+
mockInput := &input.MockInput{}
1120+
defer mockInput.AssertExpectations(t)
1121+
1122+
// Mock the `OutPoint` to return a dummy outpoint.
1123+
mockInput.On("OutPoint").Return(wire.OutPoint{Hash: chainhash.Hash{1}})
1124+
1125+
// Create testing inputs for each state.
1126+
//
1127+
// inputNotExist specifies an input that's not found in the sweeper's
1128+
// `inputs` map.
1129+
inputNotExist := &wire.TxIn{
1130+
PreviousOutPoint: wire.OutPoint{Index: 1},
1131+
}
1132+
1133+
// inputInit specifies a newly created input. When marking this as
1134+
// published, we should see an error log as this input hasn't been
1135+
// published yet.
1136+
inputInit := &wire.TxIn{
1137+
PreviousOutPoint: wire.OutPoint{Index: 2},
1138+
}
1139+
s.inputs[inputInit.PreviousOutPoint] = &SweeperInput{
1140+
state: Init,
1141+
Input: mockInput,
1142+
}
1143+
1144+
// inputPendingPublish specifies an input that's about to be published.
1145+
inputPendingPublish := &wire.TxIn{
1146+
PreviousOutPoint: wire.OutPoint{Index: 3},
1147+
}
1148+
s.inputs[inputPendingPublish.PreviousOutPoint] = &SweeperInput{
1149+
state: PendingPublish,
1150+
Input: mockInput,
1151+
}
1152+
1153+
// inputPublished specifies an input that's published.
1154+
inputPublished := &wire.TxIn{
1155+
PreviousOutPoint: wire.OutPoint{Index: 4},
1156+
}
1157+
s.inputs[inputPublished.PreviousOutPoint] = &SweeperInput{
1158+
state: Published,
1159+
Input: mockInput,
1160+
}
1161+
1162+
// inputPublishFailed specifies an input that's failed to be published.
1163+
inputPublishFailed := &wire.TxIn{
1164+
PreviousOutPoint: wire.OutPoint{Index: 5},
1165+
}
1166+
s.inputs[inputPublishFailed.PreviousOutPoint] = &SweeperInput{
1167+
state: PublishFailed,
1168+
Input: mockInput,
1169+
}
1170+
1171+
// inputSwept specifies an input that's swept.
1172+
inputSwept := &wire.TxIn{
1173+
PreviousOutPoint: wire.OutPoint{Index: 6},
1174+
}
1175+
s.inputs[inputSwept.PreviousOutPoint] = &SweeperInput{
1176+
state: Swept,
1177+
Input: mockInput,
1178+
}
1179+
1180+
// inputExcluded specifies an input that's excluded.
1181+
inputExcluded := &wire.TxIn{
1182+
PreviousOutPoint: wire.OutPoint{Index: 7},
1183+
}
1184+
s.inputs[inputExcluded.PreviousOutPoint] = &SweeperInput{
1185+
state: Excluded,
1186+
Input: mockInput,
1187+
}
1188+
1189+
// inputFailed specifies an input that's failed.
1190+
inputFailed := &wire.TxIn{
1191+
PreviousOutPoint: wire.OutPoint{Index: 8},
1192+
}
1193+
s.inputs[inputFailed.PreviousOutPoint] = &SweeperInput{
1194+
state: Failed,
1195+
Input: mockInput,
1196+
}
1197+
1198+
// Create a test tx.
1199+
tx := &wire.MsgTx{
1200+
TxIn: []*wire.TxIn{
1201+
inputNotExist,
1202+
inputInit,
1203+
inputPendingPublish,
1204+
inputPublished,
1205+
inputPublishFailed,
1206+
inputSwept,
1207+
inputExcluded,
1208+
inputFailed,
1209+
},
1210+
}
1211+
1212+
// Mark the test inputs. We expect the non-exist input and
1213+
// inputSwept/inputExcluded/inputFailed to be skipped.
1214+
s.markInputsFailed(tx, errDummy)
1215+
1216+
// We expect unchanged number of pending inputs.
1217+
require.Len(s.inputs, 7)
1218+
1219+
// We expect the init input's to be marked as failed.
1220+
require.Equal(Failed, s.inputs[inputInit.PreviousOutPoint].state)
1221+
1222+
// We expect the pending-publish input to be marked as failed.
1223+
require.Equal(Failed,
1224+
s.inputs[inputPendingPublish.PreviousOutPoint].state)
1225+
1226+
// We expect the published input to be marked as failed.
1227+
require.Equal(Failed, s.inputs[inputPublished.PreviousOutPoint].state)
1228+
1229+
// We expect the publish failed input to be markd as failed.
1230+
require.Equal(Failed,
1231+
s.inputs[inputPublishFailed.PreviousOutPoint].state)
1232+
1233+
// We expect the swept input to stay unchanged.
1234+
require.Equal(Swept, s.inputs[inputSwept.PreviousOutPoint].state)
1235+
1236+
// We expect the excluded input to stay unchanged.
1237+
require.Equal(Excluded, s.inputs[inputExcluded.PreviousOutPoint].state)
1238+
1239+
// We expect the failed input to stay unchanged.
1240+
require.Equal(Failed, s.inputs[inputFailed.PreviousOutPoint].state)
1241+
}
1242+
1243+
// TestHandleBumpEventError checks that `handleBumpEventError` correctly
1244+
// handles a `TxError` event.
1245+
func TestHandleBumpEventError(t *testing.T) {
1246+
t.Parallel()
1247+
1248+
rt := require.New(t)
1249+
1250+
// Create a mock store.
1251+
store := &MockSweeperStore{}
1252+
defer store.AssertExpectations(t)
1253+
1254+
// Create a test sweeper.
1255+
s := New(&UtxoSweeperConfig{
1256+
Store: store,
1257+
})
1258+
1259+
// Create a testing tx.
1260+
//
1261+
// We are not testing `markInputFailed` here, so the actual tx doesn't
1262+
// matter.
1263+
tx := &wire.MsgTx{}
1264+
result := &BumpResult{
1265+
Tx: tx,
1266+
Err: errDummy,
1267+
}
1268+
1269+
// Mock the store to return an error.
1270+
store.On("DeleteTx", mock.Anything).Return(errDummy).Once()
1271+
1272+
// Call the method under test and assert the error is returned.
1273+
err := s.handleBumpEventError(result)
1274+
rt.ErrorIs(err, errDummy)
1275+
1276+
// Mock the store to return nil.
1277+
store.On("DeleteTx", mock.Anything).Return(nil).Once()
1278+
1279+
// Call the method under test and assert no error is returned.
1280+
err = s.handleBumpEventError(result)
1281+
rt.NoError(err)
1282+
}

0 commit comments

Comments
 (0)