Skip to content

Commit 39e40c0

Browse files
committed
batch payment manager:
The class TxBatcher handles the creation, broadcast and replacement of replaceable transactions. Callers (LNWatcher, SwapManager) use methods add_payment_output and add_sweep_info. Transactions created by TxBatcher may combine sweeps and outgoing payments. Transactions created by TxBatcher will have their fee bumped automatically (this was only the case for sweeps before). TxBatcher manages several TxBatches. TxBatches are created dynamically when needed. The GUI does not touch txbatcher transactions: - wallet.get_candidates_for_batching excludes txbatcher transactions - RBF dialogs do not work with txbatcher transactions wallet: - instead of reading config variables, make_unsigned_transaction takes new parameters: base_tx, send_change_to_lighting tests: - unit tests in test_txbatcher.py (replaces test_sswaps.py) - force all regtests to use MPP, so that we sweep transactions with several HTLCs. This forces the payment manager to aggregate first-stage HTLC tx inputs. second-stage are not batched for now.
1 parent 8011ae0 commit 39e40c0

12 files changed

+786
-289
lines changed

electrum/address_synchronizer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ def add_value_from_prev_output():
360360
self.db.add_transaction(tx_hash, tx)
361361
self.db.add_num_inputs_to_tx(tx_hash, len(tx.inputs()))
362362
if is_new:
363-
util.trigger_callback('adb_added_tx', self, tx_hash, tx)
363+
util.trigger_callback('adb_added_tx', self, tx_hash, tx, conflicting_txns)
364364
return True
365365

366366
def remove_transaction(self, tx_hash: str) -> None:

electrum/gui/qt/main_window.py

+5
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,11 @@ def on_event_channel(self, *args):
461461
def on_event_banner(self, *args):
462462
self.console.showMessage(args[0])
463463

464+
@qt_event_listener
465+
def on_event_adb_set_future_tx(self, adb, txid):
466+
if adb == self.wallet.adb:
467+
self.history_model.refresh('set_future_tx')
468+
464469
@qt_event_listener
465470
def on_event_verified(self, *args):
466471
wallet, tx_hash, tx_mined_status = args

electrum/lnsweep.py

+16-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class SweepInfo(NamedTuple):
4343
cltv_abs: Optional[int] # set to None only if the script has no cltv
4444
txin: PartialTxInput
4545
txout: Optional[PartialTxOutput] # only for first-stage htlc tx
46+
can_be_batched: bool # todo: this could be more fine-grained
47+
4648

4749
def sweep_their_ctx_watchtower(
4850
chan: 'Channel',
@@ -251,7 +253,8 @@ def justice_txin(output_idx):
251253
csv_delay=0,
252254
cltv_abs=None,
253255
txin=txin,
254-
txout=None
256+
txout=None,
257+
can_be_batched=False,
255258
)
256259
return index_to_sweepinfo
257260

@@ -329,6 +332,7 @@ def sweep_our_ctx(
329332
cltv_abs=None,
330333
txin=txin,
331334
txout=None,
335+
can_be_batched=True,
332336
)
333337

334338
# to_local
@@ -350,6 +354,7 @@ def sweep_our_ctx(
350354
cltv_abs=None,
351355
txin=txin,
352356
txout=None,
357+
can_be_batched=True,
353358
)
354359
we_breached = ctn < chan.get_oldest_unrevoked_ctn(LOCAL)
355360
if we_breached:
@@ -384,7 +389,9 @@ def txs_htlc(
384389
csv_delay=0,
385390
cltv_abs=htlc_tx.locktime,
386391
txin=htlc_tx.inputs()[0],
387-
txout=htlc_tx.outputs()[0])
392+
txout=htlc_tx.outputs()[0],
393+
can_be_batched=True,
394+
)
388395
else:
389396
# second-stage
390397
address = bitcoin.script_to_p2wsh(htlctx_witness_script)
@@ -404,6 +411,7 @@ def txs_htlc(
404411
cltv_abs=0,
405412
txin=sweep_txin,
406413
txout=None,
414+
can_be_batched=False, # todo: we could batch htlcs of the same channel
407415
)
408416

409417
# offered HTLCs, in our ctx --> "timeout"
@@ -541,6 +549,7 @@ def sweep_their_ctx_to_remote_backup(
541549
cltv_abs=None,
542550
txin=txin,
543551
txout=None,
552+
can_be_batched=True,
544553
)
545554

546555
# to_remote
@@ -562,6 +571,7 @@ def sweep_their_ctx_to_remote_backup(
562571
cltv_abs=None,
563572
txin=txin,
564573
txout=None,
574+
can_be_batched=True,
565575
)
566576
return txs
567577

@@ -619,6 +629,7 @@ def sweep_their_ctx(
619629
cltv_abs=None,
620630
txin=txin,
621631
txout=None,
632+
can_be_batched=True,
622633
)
623634

624635
# to_local is handled by lnwatcher
@@ -631,6 +642,7 @@ def sweep_their_ctx(
631642
cltv_abs=None,
632643
txin=txin,
633644
txout=None,
645+
can_be_batched=False,
634646
)
635647

636648
# to_remote
@@ -662,6 +674,7 @@ def sweep_their_ctx(
662674
cltv_abs=None,
663675
txin=txin,
664676
txout=None,
677+
can_be_batched=True,
665678
)
666679

667680
# HTLCs
@@ -701,6 +714,7 @@ def tx_htlc(
701714
cltv_abs=cltv_abs,
702715
txin=txin,
703716
txout=None,
717+
can_be_batched=False,
704718
)
705719
# received HTLCs, in their ctx --> "timeout"
706720
# offered HTLCs, in their ctx --> "success"

electrum/lnwatcher.py

+35-105
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ def add_callback(self, address, callback):
8080
async def on_event_blockchain_updated(self, *args):
8181
await self.trigger_callbacks()
8282

83+
@event_listener
84+
async def on_event_wallet_updated(self, wallet):
85+
# called if we add local tx
86+
if wallet.adb != self.adb:
87+
return
88+
await self.trigger_callbacks()
89+
8390
@event_listener
8491
async def on_event_adb_added_verified_tx(self, adb, tx_hash):
8592
if adb != self.adb:
@@ -146,6 +153,10 @@ def get_spender(self, outpoint) -> str:
146153
"""
147154
prev_txid, index = outpoint.split(':')
148155
spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index))
156+
# discard local spenders
157+
tx_mined_status = self.adb.get_tx_height(spender_txid)
158+
if tx_mined_status.height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]:
159+
spender_txid = None
149160
if not spender_txid:
150161
return
151162
spender_tx = self.adb.get_transaction(spender_txid)
@@ -241,9 +252,8 @@ async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bo
241252
return False
242253
# detect who closed and get information about how to claim outputs
243254
sweep_info_dict = chan.sweep_ctx(closing_tx)
244-
self.logger.info(f"do_breach_remedy: {[x.name for x in sweep_info_dict.values()]}")
255+
#self.logger.info(f"do_breach_remedy: {[x.name for x in sweep_info_dict.values()]}")
245256
keep_watching = False if sweep_info_dict else not self.is_deeply_mined(closing_tx.txid())
246-
247257
# create and broadcast transactions
248258
for prevout, sweep_info in sweep_info_dict.items():
249259
if self.is_dust(sweep_info):
@@ -253,7 +263,7 @@ async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bo
253263
self.lnworker.wallet.set_default_label(prevout, name)
254264
if not self.adb.get_transaction(prev_txid):
255265
# do not keep watching if prevout does not exist
256-
self.logger.info(f'prevout does not exist for {name}: {prev_txid}')
266+
self.logger.info(f'prevout does not exist for {name}: {prevout}')
257267
continue
258268
spender_txid = self.get_spender(prevout)
259269
spender_tx = self.adb.get_transaction(spender_txid) if spender_txid else None
@@ -267,7 +277,7 @@ async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bo
267277
keep_watching |= not self.is_deeply_mined(htlc_tx_spender)
268278
else:
269279
keep_watching = True
270-
await self.maybe_redeem(prevout2, htlc_sweep_info, name)
280+
await self.maybe_redeem(htlc_sweep_info)
271281
# extract preimage
272282
keep_watching |= not self.is_deeply_mined(spender_txid)
273283
txin_idx = spender_tx.get_input_idx_that_spent_prevout(TxOutpoint.from_str(prevout))
@@ -276,111 +286,31 @@ async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bo
276286
chan.extract_preimage_from_htlc_txin(spender_txin)
277287
else:
278288
keep_watching = True
279-
# broadcast or maybe update our own tx
280-
await self.maybe_redeem(prevout, sweep_info, name)
281-
289+
# broadcast or maybe update our own tx
290+
await self.maybe_redeem(sweep_info)
282291
return keep_watching
283292

284-
def get_redeem_tx(self, prevout: str, sweep_info: 'SweepInfo', name: str):
285-
# check if redeem tx needs to be updated
286-
# if it is in the mempool, we need to check fee rise
287-
txid = self.get_spender(prevout)
288-
old_tx = self.adb.get_transaction(txid)
289-
assert old_tx is not None or txid is None
290-
tx_depth = self.get_tx_mined_depth(txid) if txid else None
291-
if txid and tx_depth not in [TxMinedDepth.FREE, TxMinedDepth.MEMPOOL]:
292-
assert old_tx is not None
293-
return old_tx, None
294-
# fixme: deepcopy is needed because tx.serialize() is destructive
295-
inputs = [copy.deepcopy(sweep_info.txin)]
296-
outputs = [sweep_info.txout] if sweep_info.txout else []
297-
if sweep_info.name == 'first-stage-htlc':
298-
new_tx = PartialTransaction.from_io(inputs, outputs, locktime=sweep_info.cltv_abs, version=2)
299-
self.lnworker.wallet.sign_transaction(new_tx, password=None, ignore_warnings=True)
300-
else:
301-
# password is needed for 1st stage htlc tx with anchors because we add inputs
302-
password = self.lnworker.wallet.get_unlocked_password()
303-
new_tx = self.lnworker.wallet.create_transaction(
304-
fee_policy = self.fee_policy,
305-
inputs = inputs,
306-
outputs = outputs,
307-
password = password,
308-
locktime = sweep_info.cltv_abs,
309-
BIP69_sort=False,
310-
)
311-
if new_tx is None:
312-
self.logger.info(f'{name} could not claim output: {prevout}, dust')
313-
assert old_tx is not None
314-
return old_tx, None
315-
if txid is None:
316-
return None, new_tx
317-
elif tx_depth == TxMinedDepth.MEMPOOL:
318-
delta = new_tx.get_fee() - self.adb.get_tx_fee(txid)
319-
if delta > 1:
320-
self.logger.info(f'increasing fee of mempool tx {name}: {prevout}')
321-
return old_tx, new_tx
322-
else:
323-
assert old_tx is not None
324-
return old_tx, None
325-
elif tx_depth == TxMinedDepth.FREE:
326-
# return new tx, even if it is equal to old_tx,
327-
# because we need to test if it can be broadcast
328-
return old_tx, new_tx
329-
else:
330-
assert old_tx is not None
331-
return old_tx, None
332-
333-
async def maybe_redeem(self, prevout, sweep_info: 'SweepInfo', name: str) -> None:
334-
old_tx, new_tx = self.get_redeem_tx(prevout, sweep_info, name)
335-
if new_tx is None:
336-
return
337-
prev_txid, prev_index = prevout.split(':')
338-
can_broadcast = True
339-
local_height = self.network.get_local_height()
340-
if sweep_info.cltv_abs:
341-
wanted_height = sweep_info.cltv_abs
342-
if wanted_height - local_height > 0:
343-
can_broadcast = False
344-
# self.logger.debug(f"pending redeem for {prevout}. waiting for {name}: CLTV ({local_height=}, {wanted_height=})")
345-
if sweep_info.csv_delay:
346-
prev_height = self.adb.get_tx_height(prev_txid)
347-
if prev_height.height > 0:
348-
wanted_height = prev_height.height + sweep_info.csv_delay - 1
349-
else:
350-
wanted_height = local_height + sweep_info.csv_delay
351-
if wanted_height - local_height > 0:
352-
can_broadcast = False
353-
# self.logger.debug(
354-
# f"pending redeem for {prevout}. waiting for {name}: CSV "
355-
# f"({local_height=}, {wanted_height=}, {prev_height.height=}, {sweep_info.csv_delay=})")
293+
async def maybe_redeem(self, sweep_info: 'SweepInfo') -> None:
356294
if not (sweep_info.cltv_abs or sweep_info.csv_delay):
357295
# used to control settling of htlcs onchain for testing purposes
358296
# careful, this prevents revocation as well
359297
if not self.lnworker.enable_htlc_settle_onchain:
360298
return
361-
if can_broadcast:
362-
self.logger.info(f'we can broadcast: {name}')
363-
if await self.network.try_broadcasting(new_tx, name):
364-
tx_was_added = self.adb.add_transaction(new_tx, is_new=(old_tx is None))
365-
else:
366-
tx_was_added = False
367-
else:
368-
# we may have a tx with a different fee, in which case it will be replaced
369-
if not old_tx or (old_tx and old_tx.txid() != new_tx.txid()):
370-
try:
371-
tx_was_added = self.adb.add_transaction(new_tx, is_new=(old_tx is None))
372-
except Exception as e:
373-
self.logger.info(f'could not add future tx: {name}. prevout: {prevout} {str(e)}')
374-
tx_was_added = False
375-
if tx_was_added:
376-
self.logger.info(f'added redeem tx: {name}. prevout: {prevout}')
377-
else:
378-
tx_was_added = False
379-
# set future tx regardless of tx_was_added, because it is not persisted
380-
# (and wanted_height can change if input of CSV was not mined before)
381-
self.adb.set_future_tx(new_tx.txid(), wanted_height=wanted_height)
382-
if tx_was_added:
383-
self.lnworker.wallet.set_label(new_tx.txid(), name)
384-
if old_tx and old_tx.txid() != new_tx.txid():
385-
self.lnworker.wallet.set_label(old_tx.txid(), None)
386-
util.trigger_callback('wallet_updated', self.lnworker.wallet)
299+
# pre-anchor htlc txs cannot be batched
300+
if sweep_info.name == 'first-stage-htlc':
301+
await self._maybe_redeem_legacy_htlc_tx(sweep_info)
302+
return
303+
# if it cannot be batched, we still send it to txbatcher, so that the fee will be bumped
304+
batch_key = 'lnwatcher' if sweep_info.can_be_batched else sweep_info.txin.prevout.to_str()
305+
self.lnworker.wallet.txbatcher.add_sweep_input(batch_key, sweep_info, self.fee_policy)
306+
307+
async def _maybe_redeem_legacy_htlc_tx(self, sweep_info):
308+
local_height = self.network.get_local_height()
309+
wanted_height = sweep_info.cltv_abs
310+
if wanted_height - local_height > 0:
311+
return
312+
self.logger.info('legacy first-stage htlc tx')
313+
tx = PartialTransaction.from_io([sweep_info.txin], [sweep_info.txout], locktime=sweep_info.cltv_abs, version=2)
314+
self.lnworker.wallet.sign_transaction(tx, password=None, ignore_warnings=True)
315+
if await self.network.try_broadcasting(tx, sweep_info.name):
316+
self.lnworker.wallet.adb.add_transaction(tx)

0 commit comments

Comments
 (0)