Skip to content

Commit b71b11f

Browse files
committed
wallet: RBF batch payments manager
1 parent 589dc87 commit b71b11f

File tree

2 files changed

+134
-21
lines changed

2 files changed

+134
-21
lines changed

electrum/submarine_swaps.py

+9-14
Original file line numberDiff line numberDiff line change
@@ -464,19 +464,11 @@ async def hold_invoice_callback(self, payment_hash: bytes) -> None:
464464
if key in self.swaps:
465465
swap = self.swaps[key]
466466
if swap.funding_txid is None:
467-
password = self.wallet.get_unlocked_password()
468-
for batch_rbf in [False]:
469-
# FIXME: tx batching is disabled, because extra logic is needed to handle
470-
# the case where the base tx gets mined.
471-
tx = self.create_funding_tx(swap, None, password=password, batch_rbf=batch_rbf)
472-
self.logger.info(f'adding funding_tx {tx.txid()}')
473-
self.wallet.adb.add_transaction(tx)
474-
try:
475-
await self.broadcast_funding_tx(swap, tx)
476-
except TxBroadcastError:
477-
self.wallet.adb.remove_transaction(tx.txid())
478-
continue
479-
break
467+
output = self.create_funding_output(swap)
468+
self.wallet.add_batch_payment(output)
469+
swap.funding_txid = True
470+
else:
471+
self.logger.info(f'key not in swaps {key}')
480472

481473
def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None):
482474
""" server method """
@@ -773,6 +765,9 @@ async def callback(payment_hash):
773765
await asyncio.sleep(0.1)
774766
return swap.funding_txid
775767

768+
def create_funding_output(self, swap):
769+
return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)
770+
776771
def create_funding_tx(
777772
self,
778773
swap: SwapData,
@@ -785,7 +780,7 @@ def create_funding_tx(
785780
# note: rbf must not decrease payment
786781
# this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output
787782
if tx is None:
788-
funding_output = PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)
783+
funding_output = self.create_funding_output(swap)
789784
tx = self.wallet.create_transaction(
790785
outputs=[funding_output],
791786
rbf=True,

electrum/wallet.py

+125-7
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
WalletFileException, BitcoinException,
6060
InvalidPassword, format_time, timestamp_to_datetime, Satoshis,
6161
Fiat, bfh, TxMinedInfo, quantize_feerate, OrderedDictWithIndex)
62+
from .util import log_exceptions
6263
from .simple_config import SimpleConfig, FEE_RATIO_HIGH_WARNING, FEERATE_WARNING_HIGH_FEE
6364
from .bitcoin import COIN, TYPE_ADDRESS
6465
from .bitcoin import is_address, address_to_script, is_minikey, relayfee, dust_threshold
@@ -88,6 +89,7 @@
8889
from .util import EventListener, event_listener
8990
from . import descriptor
9091
from .descriptor import Descriptor
92+
from .network import TxBroadcastError
9193

9294
if TYPE_CHECKING:
9395
from .network import Network
@@ -458,6 +460,7 @@ async def main_loop(self):
458460
async with self.taskgroup as group:
459461
await group.spawn(asyncio.Event().wait) # run forever (until cancel)
460462
await group.spawn(self.do_synchronize_loop())
463+
await group.spawn(self.manage_batch_payments())
461464
except Exception as e:
462465
self.logger.exception("taskgroup died.")
463466
finally:
@@ -1812,21 +1815,26 @@ def make_unsigned_transaction(
18121815
self, *,
18131816
coins: Sequence[PartialTxInput],
18141817
outputs: List[PartialTxOutput],
1818+
inputs: Optional[List[PartialTxInput]] = None,
18151819
fee=None,
18161820
change_addr: str = None,
18171821
is_sweep: bool = False, # used by Wallet_2fa subclass
18181822
rbf: bool = True,
1819-
batch_rbf: Optional[bool] = None,
1823+
base_tx = None,
18201824
send_change_to_lightning: Optional[bool] = None,
18211825
) -> PartialTransaction:
18221826
"""Can raise NotEnoughFunds or NoDynamicFeeEstimates."""
18231827

1828+
assert base_tx is None or inputs is None
1829+
18241830
if not coins: # any bitcoin tx must have at least 1 input by consensus
18251831
raise NotEnoughFunds()
18261832
if any([c.already_has_some_signatures() for c in coins]):
18271833
raise Exception("Some inputs already contain signatures!")
1828-
if batch_rbf is None:
1829-
batch_rbf = self.config.WALLET_BATCH_RBF
1834+
if inputs is None:
1835+
inputs = []
1836+
if base_tx is None and self.config.WALLET_BATCH_RBF:
1837+
base_tx = self.get_unconfirmed_base_tx_for_batching(outputs, coins)
18301838
if send_change_to_lightning is None:
18311839
send_change_to_lightning = self.config.WALLET_SEND_CHANGE_TO_LIGHTNING
18321840

@@ -1865,7 +1873,6 @@ def make_unsigned_transaction(
18651873
# Let the coin chooser select the coins to spend
18661874
coin_chooser = coinchooser.get_coin_chooser(self.config)
18671875
# If there is an unconfirmed RBF tx, merge with it
1868-
base_tx = self.get_unconfirmed_base_tx_for_batching(outputs, coins) if batch_rbf else None
18691876
if base_tx:
18701877
# make sure we don't try to spend change from the tx-to-be-replaced:
18711878
coins = [c for c in coins if c.prevout.txid.hex() != base_tx.txid()]
@@ -1887,7 +1894,7 @@ def fee_estimator(size: Union[int, float, Decimal]) -> int:
18871894
old_change_addrs = [o.address for o in base_tx.outputs() if self.is_change(o.address)]
18881895
rbf_merge_txid = base_tx.txid()
18891896
else:
1890-
txi = []
1897+
txi = list(inputs)
18911898
txo = list(outputs)
18921899
old_change_addrs = []
18931900
# change address. if empty, coin_chooser will set it
@@ -3057,7 +3064,8 @@ def create_transaction(
30573064
password=None,
30583065
locktime=None,
30593066
tx_version: Optional[int] = None,
3060-
batch_rbf: Optional[bool] = None,
3067+
base_tx=None,
3068+
inputs=None,
30613069
send_change_to_lightning: Optional[bool] = None,
30623070
nonlocal_only: bool = False,
30633071
) -> PartialTransaction:
@@ -3074,10 +3082,11 @@ def create_transaction(
30743082
fee_estimator = fee
30753083
tx = self.make_unsigned_transaction(
30763084
coins=coins,
3085+
inputs=inputs,
30773086
outputs=outputs,
30783087
fee=fee_estimator,
30793088
change_addr=change_addr,
3080-
batch_rbf=batch_rbf,
3089+
base_tx=base_tx,
30813090
send_change_to_lightning=send_change_to_lightning,
30823091
rbf=rbf,
30833092
)
@@ -3284,6 +3293,115 @@ def unlock(self, password):
32843293
def get_unlocked_password(self):
32853294
return self._password_in_memory
32863295

3296+
def add_batch_payment(self, output: 'PartialTxOutput'):
3297+
# todo: raise InsufficientFunds if needed
3298+
self.batch_payments.append(output)
3299+
3300+
def find_confirmed_base_tx(self):
3301+
for tx in self.batch_txs:
3302+
tx_mined_status = self.adb.get_tx_height(tx.txid())
3303+
if tx_mined_status.conf > 0:
3304+
return tx
3305+
3306+
@log_exceptions
3307+
async def manage_batch_payments(self):
3308+
# batch rbf, and add it to adb before we broadcast it
3309+
# TODO: we should keep track of the transactions that have been replaced (base_tx)
3310+
# if a replaced transaction gets mined, we should ensure the payment is broadcast in a new tx
3311+
#
3312+
# output1 : tx1(o1) -----
3313+
# \
3314+
# output 2: tx1'(o1,o2) ---> tx2(tx1|o2) -----
3315+
# \ \
3316+
# output 3: tx1''(o1,o2,o3) --> tx2'(tx1|o2,o3) ---> tx3(tx2|o3)
3317+
# tx3(tx1'|o3) (if tx1'cannot be replaced)
3318+
#
3319+
# self.batch_txs = [tx1, tx1', tx1'']
3320+
#
3321+
# if tx1 gets mined:
3322+
# - use its output, batch all remaining payments: tx2(mined, o2,o3)
3323+
#
3324+
# if tx1' gets mined: tx3(mined, o3)
3325+
#
3326+
# what if we cannot RBF? -> we must add a child tx
3327+
# if cannot_rbf(tx1) -> broadcast tx2(tx1,o2) and remove first row: neww base is now tx2(tx,o2)
3328+
# if cannot_rbf(tx1') -> broadcast tx3(tx1'|o3)
3329+
#
3330+
# that's the same strategy as if it was mined
3331+
#
3332+
#
3333+
# TODO: make this reorg-safe.
3334+
#
3335+
#
3336+
# TODO: persist batch_payments and batch_txs in wallet file.
3337+
# Note that it is probably fine not to persist them, but it is dangerous
3338+
# to persist one and not the other, as it might result in a double send.
3339+
self.batch_payments = [] # list of payments we need to make
3340+
self.batch_txs = [] # list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined.
3341+
3342+
3343+
while True:
3344+
await asyncio.sleep(1)
3345+
password = self.get_unlocked_password()
3346+
if self.has_keystore_encryption() and not password:
3347+
continue
3348+
tx = self.find_confirmed_base_tx()
3349+
if tx:
3350+
# one of the batch_txs has been confirmed
3351+
# find which outputs still need to be paid
3352+
to_pay = [x for x in self.batch_payments if x not in tx.outputs()]
3353+
self.logger.info(f'base tx confirmed. to_pay={to_pay}')
3354+
if to_pay:
3355+
await self.create_new_base_tx(tx, to_pay, password)
3356+
else:
3357+
self.batch_txs = []
3358+
self.batch_payments = []
3359+
else:
3360+
base_tx = self.batch_txs[-1] if self.batch_txs else None
3361+
base_tx_outputs = base_tx.outputs() if base_tx else []
3362+
# check if all payments are in that tx
3363+
to_pay = [o for o in self.batch_payments if o not in base_tx_outputs]
3364+
if not to_pay:
3365+
continue
3366+
self.logger.info(f'manage_batch_payments {to_pay}')
3367+
tx = self.create_transaction(
3368+
outputs=to_pay,
3369+
rbf=True,
3370+
password=password,
3371+
base_tx=base_tx,
3372+
)
3373+
try:
3374+
#self.adb.add_transaction(tx)
3375+
await self.network.broadcast_transaction(tx)
3376+
self.batch_txs.append(tx)
3377+
except TxBroadcastError:
3378+
# base_tx is not replaceable, probabaly because it has children
3379+
#self.adb.remove_transaction(tx.txid())
3380+
await self.create_new_base_tx(base_tx, to_pay, password)
3381+
3382+
async def create_new_base_tx(self, tx, to_pay, password):
3383+
inputs = []
3384+
for o in tx.get_change_outputs():
3385+
coins = self.adb.get_addr_utxo(o.address)
3386+
inputs += list(coins.values())
3387+
self.logger.info(f'create_new_base_tx: inputs={inputs} outputs={to_pay}')
3388+
tx2 = self.create_transaction(
3389+
inputs=inputs,
3390+
outputs=to_pay,
3391+
password=password,
3392+
)
3393+
#self.adb.add_transaction(tx2)
3394+
try:
3395+
await self.network.broadcast_transaction(tx2)
3396+
except TxBroadcastError:
3397+
# we will retry later, because we have not changed batch_payments
3398+
self.logger.info(f'create_new_base_tx: failed to broadcast')
3399+
return
3400+
3401+
self.logger.info(f'create_new_base_tx: success {tx2.txid()}')
3402+
self.batch_txs = [tx2]
3403+
self.batch_payments = to_pay # this removes payments in the old tx
3404+
32873405

32883406
class Simple_Wallet(Abstract_Wallet):
32893407
# wallet with a single keystore

0 commit comments

Comments
 (0)