Skip to content

Commit 94f4b0b

Browse files
committed
batch payment manager:
The class TxBatcher handles the creation, broadcast and replacement of replaceable transactions. Callers (LNWatcher, SwapManager) use methods add_payment_output and add_sweep_info. Transactions created by TxBatcher may combine sweeps and outgoing payments. Transactions created by TxBatcher will have their fee bumped automatically (this was only the case for sweeps before). TxBatcher manages several TxBatches. TxBatches are created dynamically when needed. The GUI does not touch txbatcher transactions: - wallet.get_candidates_for_batching excludes txbatcher transactions - RBF dialogs do not work with txbatcher transactions wallet: - instead of reading config variables, make_unsigned_transaction takes new parameters: base_tx, send_change_to_lighting tests: - unit tests in test_txbatcher.py (replaces test_sswaps.py) - force all regtests to use MPP, so that we sweep transactions with several HTLCs. This forces the payment manager to aggregate first-stage HTLC tx inputs. second-stage are not batched for now.
1 parent c7dcfab commit 94f4b0b

11 files changed

+818
-312
lines changed

electrum/gui/qt/main_window.py

+5
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,11 @@ def on_event_channel(self, *args):
461461
def on_event_banner(self, *args):
462462
self.console.showMessage(args[0])
463463

464+
@qt_event_listener
465+
def on_event_adb_set_future_tx(self, adb, txid):
466+
if adb == self.wallet.adb:
467+
self.history_model.refresh('set_future_tx')
468+
464469
@qt_event_listener
465470
def on_event_verified(self, *args):
466471
wallet, tx_hash, tx_mined_status = args

electrum/lnsweep.py

+19-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class SweepInfo(NamedTuple):
4343
cltv_abs: Optional[int] # set to None only if the script has no cltv
4444
txin: PartialTxInput
4545
txout: Optional[PartialTxOutput] # only for first-stage htlc tx
46+
can_be_batched: bool # todo: this could be more fine-grained
47+
4648

4749
def sweep_their_ctx_watchtower(
4850
chan: 'Channel',
@@ -251,7 +253,8 @@ def justice_txin(output_idx):
251253
csv_delay=0,
252254
cltv_abs=None,
253255
txin=txin,
254-
txout=None
256+
txout=None,
257+
can_be_batched=False,
255258
)
256259
return index_to_sweepinfo
257260

@@ -329,6 +332,7 @@ def sweep_our_ctx(
329332
cltv_abs=None,
330333
txin=txin,
331334
txout=None,
335+
can_be_batched=True,
332336
)
333337

334338
# to_local
@@ -350,6 +354,7 @@ def sweep_our_ctx(
350354
cltv_abs=None,
351355
txin=txin,
352356
txout=None,
357+
can_be_batched=True,
353358
)
354359
we_breached = ctn < chan.get_oldest_unrevoked_ctn(LOCAL)
355360
if we_breached:
@@ -384,7 +389,11 @@ def txs_htlc(
384389
csv_delay=0,
385390
cltv_abs=htlc_tx.locktime,
386391
txin=htlc_tx.inputs()[0],
387-
txout=htlc_tx.outputs()[0])
392+
txout=htlc_tx.outputs()[0],
393+
can_be_batched=False, # both parties can spend
394+
# actually, we might want to batch depending on the context
395+
# f(amount in htlc, remaining_time, number of available utxos for anchors)
396+
)
388397
else:
389398
# second-stage
390399
address = bitcoin.script_to_p2wsh(htlctx_witness_script)
@@ -404,6 +413,7 @@ def txs_htlc(
404413
cltv_abs=0,
405414
txin=sweep_txin,
406415
txout=None,
416+
can_be_batched=True, # this is safe, we are the only ones who can spend (assuming we did not broadcast a revoked state)
407417
)
408418

409419
# offered HTLCs, in our ctx --> "timeout"
@@ -541,6 +551,7 @@ def sweep_their_ctx_to_remote_backup(
541551
cltv_abs=None,
542552
txin=txin,
543553
txout=None,
554+
can_be_batched=True,
544555
)
545556

546557
# to_remote
@@ -562,6 +573,7 @@ def sweep_their_ctx_to_remote_backup(
562573
cltv_abs=None,
563574
txin=txin,
564575
txout=None,
576+
can_be_batched=True,
565577
)
566578
return txs
567579

@@ -619,6 +631,7 @@ def sweep_their_ctx(
619631
cltv_abs=None,
620632
txin=txin,
621633
txout=None,
634+
can_be_batched=True,
622635
)
623636

624637
# to_local is handled by lnwatcher
@@ -631,6 +644,7 @@ def sweep_their_ctx(
631644
cltv_abs=None,
632645
txin=txin,
633646
txout=None,
647+
can_be_batched=False,
634648
)
635649

636650
# to_remote
@@ -656,12 +670,14 @@ def sweep_their_ctx(
656670
our_payment_privkey=our_payment_privkey,
657671
has_anchors=chan.has_anchors()
658672
):
673+
# todo: we might not want to sweep this at all, if we add it to the wallet addresses
659674
txs[prevout] = SweepInfo(
660675
name='their_ctx_to_remote',
661676
csv_delay=csv_delay,
662677
cltv_abs=None,
663678
txin=txin,
664679
txout=None,
680+
can_be_batched=True,
665681
)
666682

667683
# HTLCs
@@ -701,6 +717,7 @@ def tx_htlc(
701717
cltv_abs=cltv_abs,
702718
txin=txin,
703719
txout=None,
720+
can_be_batched=False,
704721
)
705722
# received HTLCs, in their ctx --> "timeout"
706723
# offered HTLCs, in their ctx --> "success"

electrum/lnwatcher.py

+25-129
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,14 @@
22
# Distributed under the MIT software license, see the accompanying
33
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
44

5-
from typing import NamedTuple, Iterable, TYPE_CHECKING
6-
import copy
7-
import asyncio
5+
from typing import TYPE_CHECKING
86
from enum import IntEnum, auto
9-
from typing import NamedTuple, Dict
107

11-
from . import util
12-
from .util import log_exceptions, ignore_exceptions, TxMinedInfo
8+
from .util import log_exceptions, ignore_exceptions, TxMinedInfo, BelowDustLimit
139
from .util import EventListener, event_listener
1410
from .address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGHT_UNCONF_PARENT, TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_FUTURE
15-
from .transaction import Transaction, TxOutpoint, PartialTransaction
11+
from .transaction import Transaction, TxOutpoint
1612
from .logging import Logger
17-
from .bitcoin import dust_threshold
1813
from .fee_policy import FeePolicy
1914

2015

@@ -75,6 +70,13 @@ def add_callback(self, address, callback):
7570
async def on_event_blockchain_updated(self, *args):
7671
await self.trigger_callbacks()
7772

73+
@event_listener
74+
async def on_event_wallet_updated(self, wallet):
75+
# called if we add local tx
76+
if wallet.adb != self.adb:
77+
return
78+
await self.trigger_callbacks()
79+
7880
@event_listener
7981
async def on_event_adb_added_verified_tx(self, adb, tx_hash):
8082
if adb != self.adb:
@@ -141,6 +143,10 @@ def get_spender(self, outpoint) -> str:
141143
"""
142144
prev_txid, index = outpoint.split(':')
143145
spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index))
146+
# discard local spenders
147+
tx_mined_status = self.adb.get_tx_height(spender_txid)
148+
if tx_mined_status.height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]:
149+
spender_txid = None
144150
if not spender_txid:
145151
return
146152
spender_tx = self.adb.get_transaction(spender_txid)
@@ -212,18 +218,6 @@ async def update_channel_state(self, *, funding_outpoint: str, funding_txid: str
212218
keep_watching=keep_watching)
213219
await self.lnworker.handle_onchain_state(chan)
214220

215-
def is_dust(self, sweep_info):
216-
if sweep_info.name in ['local_anchor', 'remote_anchor']:
217-
return False
218-
if sweep_info.txout is not None:
219-
return False
220-
value = sweep_info.txin._trusted_value_sats
221-
witness_size = len(sweep_info.txin.make_witness(71*b'\x00'))
222-
tx_size_vbytes = 84 + witness_size//4 # assumes no batching, sweep to p2wpkh
223-
self.logger.info(f'{sweep_info.name} size = {tx_size_vbytes}')
224-
fee = self.fee_policy.estimate_fee(tx_size_vbytes, network=self.network, allow_fallback_to_static_rates=True)
225-
return value - fee <= dust_threshold()
226-
227221
@log_exceptions
228222
async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bool:
229223
"""This function is called when a channel was closed. In this case
@@ -236,19 +230,16 @@ async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bo
236230
return False
237231
# detect who closed and get information about how to claim outputs
238232
sweep_info_dict = chan.sweep_ctx(closing_tx)
239-
self.logger.info(f"do_breach_remedy: {[x.name for x in sweep_info_dict.values()]}")
233+
#self.logger.info(f"do_breach_remedy: {[x.name for x in sweep_info_dict.values()]}")
240234
keep_watching = False if sweep_info_dict else not self.is_deeply_mined(closing_tx.txid())
241-
242235
# create and broadcast transactions
243236
for prevout, sweep_info in sweep_info_dict.items():
244-
if self.is_dust(sweep_info):
245-
continue
246237
prev_txid, prev_index = prevout.split(':')
247238
name = sweep_info.name + ' ' + chan.get_id_for_log()
248239
self.lnworker.wallet.set_default_label(prevout, name)
249240
if not self.adb.get_transaction(prev_txid):
250241
# do not keep watching if prevout does not exist
251-
self.logger.info(f'prevout does not exist for {name}: {prev_txid}')
242+
self.logger.info(f'prevout does not exist for {name}: {prevout}')
252243
continue
253244
spender_txid = self.get_spender(prevout)
254245
spender_tx = self.adb.get_transaction(spender_txid) if spender_txid else None
@@ -261,116 +252,21 @@ async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bo
261252
if htlc_tx_spender:
262253
keep_watching |= not self.is_deeply_mined(htlc_tx_spender)
263254
else:
264-
keep_watching = True
265-
await self.maybe_redeem(prevout2, htlc_sweep_info, name)
255+
keep_watching |= self.maybe_redeem(htlc_sweep_info)
266256
# extract preimage
267257
keep_watching |= not self.is_deeply_mined(spender_txid)
268258
txin_idx = spender_tx.get_input_idx_that_spent_prevout(TxOutpoint.from_str(prevout))
269259
assert txin_idx is not None
270260
spender_txin = spender_tx.inputs()[txin_idx]
271261
chan.extract_preimage_from_htlc_txin(spender_txin)
272262
else:
273-
keep_watching = True
274-
# broadcast or maybe update our own tx
275-
await self.maybe_redeem(prevout, sweep_info, name)
276-
263+
keep_watching |= self.maybe_redeem(sweep_info)
277264
return keep_watching
278265

279-
def get_redeem_tx(self, prevout: str, sweep_info: 'SweepInfo', name: str):
280-
# check if redeem tx needs to be updated
281-
# if it is in the mempool, we need to check fee rise
282-
txid = self.get_spender(prevout)
283-
old_tx = self.adb.get_transaction(txid)
284-
assert old_tx is not None or txid is None
285-
tx_depth = self.get_tx_mined_depth(txid) if txid else None
286-
if txid and tx_depth not in [TxMinedDepth.FREE, TxMinedDepth.MEMPOOL]:
287-
assert old_tx is not None
288-
return old_tx, None
289-
# fixme: deepcopy is needed because tx.serialize() is destructive
290-
inputs = [copy.deepcopy(sweep_info.txin)]
291-
outputs = [sweep_info.txout] if sweep_info.txout else []
292-
if sweep_info.name == 'first-stage-htlc':
293-
new_tx = PartialTransaction.from_io(inputs, outputs, locktime=sweep_info.cltv_abs, version=2)
294-
self.lnworker.wallet.sign_transaction(new_tx, password=None, ignore_warnings=True)
295-
else:
296-
# password is needed for 1st stage htlc tx with anchors because we add inputs
297-
password = self.lnworker.wallet.get_unlocked_password()
298-
new_tx = self.lnworker.wallet.create_transaction(
299-
fee_policy = self.fee_policy,
300-
inputs = inputs,
301-
outputs = outputs,
302-
password = password,
303-
locktime = sweep_info.cltv_abs,
304-
BIP69_sort=False,
305-
)
306-
if new_tx is None:
307-
self.logger.info(f'{name} could not claim output: {prevout}, dust')
308-
assert old_tx is not None
309-
return old_tx, None
310-
if txid is None:
311-
return None, new_tx
312-
elif tx_depth == TxMinedDepth.MEMPOOL:
313-
delta = new_tx.get_fee() - self.adb.get_tx_fee(txid)
314-
if delta > 1:
315-
self.logger.info(f'increasing fee of mempool tx {name}: {prevout}')
316-
return old_tx, new_tx
317-
else:
318-
assert old_tx is not None
319-
return old_tx, None
320-
elif tx_depth == TxMinedDepth.FREE:
321-
# return new tx, even if it is equal to old_tx,
322-
# because we need to test if it can be broadcast
323-
return old_tx, new_tx
324-
else:
325-
assert old_tx is not None
326-
return old_tx, None
327-
328-
async def maybe_redeem(self, prevout, sweep_info: 'SweepInfo', name: str) -> None:
329-
old_tx, new_tx = self.get_redeem_tx(prevout, sweep_info, name)
330-
if new_tx is None:
331-
return
332-
prev_txid, prev_index = prevout.split(':')
333-
can_broadcast = True
334-
local_height = self.network.get_local_height()
335-
if sweep_info.cltv_abs:
336-
wanted_height = sweep_info.cltv_abs
337-
if wanted_height - local_height > 0:
338-
can_broadcast = False
339-
# self.logger.debug(f"pending redeem for {prevout}. waiting for {name}: CLTV ({local_height=}, {wanted_height=})")
340-
if sweep_info.csv_delay:
341-
prev_height = self.adb.get_tx_height(prev_txid)
342-
if prev_height.height > 0:
343-
wanted_height = prev_height.height + sweep_info.csv_delay - 1
344-
else:
345-
wanted_height = local_height + sweep_info.csv_delay
346-
if wanted_height - local_height > 0:
347-
can_broadcast = False
348-
# self.logger.debug(
349-
# f"pending redeem for {prevout}. waiting for {name}: CSV "
350-
# f"({local_height=}, {wanted_height=}, {prev_height.height=}, {sweep_info.csv_delay=})")
351-
if can_broadcast:
352-
self.logger.info(f'we can broadcast: {name}')
353-
if await self.network.try_broadcasting(new_tx, name):
354-
tx_was_added = self.adb.add_transaction(new_tx, is_new=(old_tx is None))
355-
else:
356-
tx_was_added = False
357-
else:
358-
# we may have a tx with a different fee, in which case it will be replaced
359-
if not old_tx or (old_tx and old_tx.txid() != new_tx.txid()):
360-
try:
361-
tx_was_added = self.adb.add_transaction(new_tx, is_new=(old_tx is None))
362-
except Exception as e:
363-
self.logger.info(f'could not add future tx: {name}. prevout: {prevout} {str(e)}')
364-
tx_was_added = False
365-
if tx_was_added:
366-
self.logger.info(f'added redeem tx: {name}. prevout: {prevout}')
367-
else:
368-
tx_was_added = False
369-
# set future tx regardless of tx_was_added, because it is not persisted
370-
# (and wanted_height can change if input of CSV was not mined before)
371-
self.adb.set_future_tx(new_tx.txid(), wanted_height=wanted_height)
372-
if tx_was_added:
373-
self.lnworker.wallet.set_label(new_tx.txid(), name)
374-
if old_tx and old_tx.txid() != new_tx.txid():
375-
self.lnworker.wallet.set_label(old_tx.txid(), None)
376-
util.trigger_callback('wallet_updated', self.lnworker.wallet)
266+
def maybe_redeem(self, sweep_info: 'SweepInfo') -> bool:
267+
""" returns whether it was added """
268+
try:
269+
self.lnworker.wallet.txbatcher.add_sweep_input('lnwatcher', sweep_info, self.fee_policy)
270+
except BelowDustLimit:
271+
return False
272+
return True

0 commit comments

Comments
 (0)