Skip to content

Commit 45c7543

Browse files
committed
wallet: RBF batch payments manager
1 parent fef6fc5 commit 45c7543

File tree

2 files changed

+134
-21
lines changed

2 files changed

+134
-21
lines changed

electrum/submarine_swaps.py

+9-14
Original file line numberDiff line numberDiff line change
@@ -464,19 +464,11 @@ async def hold_invoice_callback(self, payment_hash: bytes) -> None:
464464
if key in self.swaps:
465465
swap = self.swaps[key]
466466
if swap.funding_txid is None:
467-
password = self.wallet.get_unlocked_password()
468-
for batch_rbf in [False]:
469-
# FIXME: tx batching is disabled, because extra logic is needed to handle
470-
# the case where the base tx gets mined.
471-
tx = self.create_funding_tx(swap, None, password=password, batch_rbf=batch_rbf)
472-
self.logger.info(f'adding funding_tx {tx.txid()}')
473-
self.wallet.adb.add_transaction(tx)
474-
try:
475-
await self.broadcast_funding_tx(swap, tx)
476-
except TxBroadcastError:
477-
self.wallet.adb.remove_transaction(tx.txid())
478-
continue
479-
break
467+
output = self.create_funding_output(swap)
468+
self.wallet.add_batch_payment(output)
469+
swap.funding_txid = True
470+
else:
471+
self.logger.info(f'key not in swaps {key}')
480472

481473
def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None):
482474
""" server method """
@@ -773,6 +765,9 @@ async def callback(payment_hash):
773765
await asyncio.sleep(0.1)
774766
return swap.funding_txid
775767

768+
def create_funding_output(self, swap):
769+
return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)
770+
776771
def create_funding_tx(
777772
self,
778773
swap: SwapData,
@@ -785,7 +780,7 @@ def create_funding_tx(
785780
# note: rbf must not decrease payment
786781
# this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output
787782
if tx is None:
788-
funding_output = PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)
783+
funding_output = self.create_funding_output(swap)
789784
tx = self.wallet.create_transaction(
790785
outputs=[funding_output],
791786
rbf=True,

electrum/wallet.py

+125-7
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
WalletFileException, BitcoinException,
6060
InvalidPassword, format_time, timestamp_to_datetime, Satoshis,
6161
Fiat, bfh, TxMinedInfo, quantize_feerate, OrderedDictWithIndex)
62+
from .util import log_exceptions
6263
from .simple_config import SimpleConfig, FEE_RATIO_HIGH_WARNING, FEERATE_WARNING_HIGH_FEE
6364
from .bitcoin import COIN, TYPE_ADDRESS
6465
from .bitcoin import is_address, address_to_script, is_minikey, relayfee, dust_threshold
@@ -88,6 +89,7 @@
8889
from .util import EventListener, event_listener
8990
from . import descriptor
9091
from .descriptor import Descriptor
92+
from .network import TxBroadcastError
9193

9294
if TYPE_CHECKING:
9395
from .network import Network
@@ -459,6 +461,7 @@ async def main_loop(self):
459461
async with self.taskgroup as group:
460462
await group.spawn(asyncio.Event().wait) # run forever (until cancel)
461463
await group.spawn(self.do_synchronize_loop())
464+
await group.spawn(self.manage_batch_payments())
462465
except Exception as e:
463466
self.logger.exception("taskgroup died.")
464467
finally:
@@ -1813,21 +1816,26 @@ def make_unsigned_transaction(
18131816
self, *,
18141817
coins: Sequence[PartialTxInput],
18151818
outputs: List[PartialTxOutput],
1819+
inputs: Optional[List[PartialTxInput]] = None,
18161820
fee=None,
18171821
change_addr: str = None,
18181822
is_sweep: bool = False, # used by Wallet_2fa subclass
18191823
rbf: bool = True,
1820-
batch_rbf: Optional[bool] = None,
1824+
base_tx = None,
18211825
send_change_to_lightning: Optional[bool] = None,
18221826
) -> PartialTransaction:
18231827
"""Can raise NotEnoughFunds or NoDynamicFeeEstimates."""
18241828

1829+
assert base_tx is None or inputs is None
1830+
18251831
if not coins: # any bitcoin tx must have at least 1 input by consensus
18261832
raise NotEnoughFunds()
18271833
if any([c.already_has_some_signatures() for c in coins]):
18281834
raise Exception("Some inputs already contain signatures!")
1829-
if batch_rbf is None:
1830-
batch_rbf = self.config.WALLET_BATCH_RBF
1835+
if inputs is None:
1836+
inputs = []
1837+
if base_tx is None and self.config.WALLET_BATCH_RBF:
1838+
base_tx = self.get_unconfirmed_base_tx_for_batching(outputs, coins)
18311839
if send_change_to_lightning is None:
18321840
send_change_to_lightning = self.config.WALLET_SEND_CHANGE_TO_LIGHTNING
18331841

@@ -1866,7 +1874,6 @@ def make_unsigned_transaction(
18661874
# Let the coin chooser select the coins to spend
18671875
coin_chooser = coinchooser.get_coin_chooser(self.config)
18681876
# If there is an unconfirmed RBF tx, merge with it
1869-
base_tx = self.get_unconfirmed_base_tx_for_batching(outputs, coins) if batch_rbf else None
18701877
if base_tx:
18711878
# make sure we don't try to spend change from the tx-to-be-replaced:
18721879
coins = [c for c in coins if c.prevout.txid.hex() != base_tx.txid()]
@@ -1888,7 +1895,7 @@ def fee_estimator(size: Union[int, float, Decimal]) -> int:
18881895
old_change_addrs = [o.address for o in base_tx.outputs() if self.is_change(o.address)]
18891896
rbf_merge_txid = base_tx.txid()
18901897
else:
1891-
txi = []
1898+
txi = list(inputs)
18921899
txo = list(outputs)
18931900
old_change_addrs = []
18941901
# change address. if empty, coin_chooser will set it
@@ -3065,7 +3072,8 @@ def create_transaction(
30653072
password=None,
30663073
locktime=None,
30673074
tx_version: Optional[int] = None,
3068-
batch_rbf: Optional[bool] = None,
3075+
base_tx=None,
3076+
inputs=None,
30693077
send_change_to_lightning: Optional[bool] = None,
30703078
nonlocal_only: bool = False,
30713079
) -> PartialTransaction:
@@ -3082,10 +3090,11 @@ def create_transaction(
30823090
fee_estimator = fee
30833091
tx = self.make_unsigned_transaction(
30843092
coins=coins,
3093+
inputs=inputs,
30853094
outputs=outputs,
30863095
fee=fee_estimator,
30873096
change_addr=change_addr,
3088-
batch_rbf=batch_rbf,
3097+
base_tx=base_tx,
30893098
send_change_to_lightning=send_change_to_lightning,
30903099
rbf=rbf,
30913100
)
@@ -3292,6 +3301,115 @@ def unlock(self, password):
32923301
def get_unlocked_password(self):
32933302
return self._password_in_memory
32943303

3304+
def add_batch_payment(self, output: 'PartialTxOutput'):
3305+
# todo: raise InsufficientFunds if needed
3306+
self.batch_payments.append(output)
3307+
3308+
def find_confirmed_base_tx(self):
3309+
for tx in self.batch_txs:
3310+
tx_mined_status = self.adb.get_tx_height(tx.txid())
3311+
if tx_mined_status.conf > 0:
3312+
return tx
3313+
3314+
@log_exceptions
3315+
async def manage_batch_payments(self):
3316+
# batch rbf, and add it to adb before we broadcast it
3317+
# TODO: we should keep track of the transactions that have been replaced (base_tx)
3318+
# if a replaced transaction gets mined, we should ensure the payment is broadcast in a new tx
3319+
#
3320+
# output1 : tx1(o1) -----
3321+
# \
3322+
# output 2: tx1'(o1,o2) ---> tx2(tx1|o2) -----
3323+
# \ \
3324+
# output 3: tx1''(o1,o2,o3) --> tx2'(tx1|o2,o3) ---> tx3(tx2|o3)
3325+
# tx3(tx1'|o3) (if tx1'cannot be replaced)
3326+
#
3327+
# self.batch_txs = [tx1, tx1', tx1'']
3328+
#
3329+
# if tx1 gets mined:
3330+
# - use its output, batch all remaining payments: tx2(mined, o2,o3)
3331+
#
3332+
# if tx1' gets mined: tx3(mined, o3)
3333+
#
3334+
# what if we cannot RBF? -> we must add a child tx
3335+
# if cannot_rbf(tx1) -> broadcast tx2(tx1,o2) and remove first row: neww base is now tx2(tx,o2)
3336+
# if cannot_rbf(tx1') -> broadcast tx3(tx1'|o3)
3337+
#
3338+
# that's the same strategy as if it was mined
3339+
#
3340+
#
3341+
# TODO: make this reorg-safe.
3342+
#
3343+
#
3344+
# TODO: persist batch_payments and batch_txs in wallet file.
3345+
# Note that it is probably fine not to persist them, but it is dangerous
3346+
# to persist one and not the other, as it might result in a double send.
3347+
self.batch_payments = [] # list of payments we need to make
3348+
self.batch_txs = [] # list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined.
3349+
3350+
3351+
while True:
3352+
await asyncio.sleep(1)
3353+
password = self.get_unlocked_password()
3354+
if self.has_keystore_encryption() and not password:
3355+
continue
3356+
tx = self.find_confirmed_base_tx()
3357+
if tx:
3358+
# one of the batch_txs has been confirmed
3359+
# find which outputs still need to be paid
3360+
to_pay = [x for x in self.batch_payments if x not in tx.outputs()]
3361+
self.logger.info(f'base tx confirmed. to_pay={to_pay}')
3362+
if to_pay:
3363+
await self.create_new_base_tx(tx, to_pay, password)
3364+
else:
3365+
self.batch_txs = []
3366+
self.batch_payments = []
3367+
else:
3368+
base_tx = self.batch_txs[-1] if self.batch_txs else None
3369+
base_tx_outputs = base_tx.outputs() if base_tx else []
3370+
# check if all payments are in that tx
3371+
to_pay = [o for o in self.batch_payments if o not in base_tx_outputs]
3372+
if not to_pay:
3373+
continue
3374+
self.logger.info(f'manage_batch_payments {to_pay}')
3375+
tx = self.create_transaction(
3376+
outputs=to_pay,
3377+
rbf=True,
3378+
password=password,
3379+
base_tx=base_tx,
3380+
)
3381+
try:
3382+
#self.adb.add_transaction(tx)
3383+
await self.network.broadcast_transaction(tx)
3384+
self.batch_txs.append(tx)
3385+
except TxBroadcastError:
3386+
# base_tx is not replaceable, probabaly because it has children
3387+
#self.adb.remove_transaction(tx.txid())
3388+
await self.create_new_base_tx(base_tx, to_pay, password)
3389+
3390+
async def create_new_base_tx(self, tx, to_pay, password):
3391+
inputs = []
3392+
for o in tx.get_change_outputs():
3393+
coins = self.adb.get_addr_utxo(o.address)
3394+
inputs += list(coins.values())
3395+
self.logger.info(f'create_new_base_tx: inputs={inputs} outputs={to_pay}')
3396+
tx2 = self.create_transaction(
3397+
inputs=inputs,
3398+
outputs=to_pay,
3399+
password=password,
3400+
)
3401+
#self.adb.add_transaction(tx2)
3402+
try:
3403+
await self.network.broadcast_transaction(tx2)
3404+
except TxBroadcastError:
3405+
# we will retry later, because we have not changed batch_payments
3406+
self.logger.info(f'create_new_base_tx: failed to broadcast')
3407+
return
3408+
3409+
self.logger.info(f'create_new_base_tx: success {tx2.txid()}')
3410+
self.batch_txs = [tx2]
3411+
self.batch_payments = to_pay # this removes payments in the old tx
3412+
32953413

32963414
class Simple_Wallet(Abstract_Wallet):
32973415
# wallet with a single keystore

0 commit comments

Comments
 (0)