Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.18.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@
for SQL backends enabling new users to optionally use an experimental native
SQL invoices database.

* [Expanded SweeperStore](https://github.com/lightningnetwork/lnd/pull/8147) to
also store the feerate, fees paid, and whether it's published or not for a
given sweeping transaction.

## Code Health

* [Remove database pointers](https://github.com/lightningnetwork/lnd/pull/8117)
Comment thread
yyforyongyu marked this conversation as resolved.
Expand Down
193 changes: 184 additions & 9 deletions sweep/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import (
"bytes"
"encoding/binary"
"errors"
"io"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/tlv"
)

var (
// txHashesBucketKey is the key that points to a bucket containing the
// hashes of all sweep txes that were published successfully.
//
// maps: txHash -> empty slice
// maps: txHash -> TxRecord
txHashesBucketKey = []byte("sweeper-tx-hashes")

// utxnChainPrefix is the bucket prefix for nursery buckets.
Expand All @@ -31,19 +33,108 @@ var (
byteOrder = binary.BigEndian

errNoTxHashesBucket = errors.New("tx hashes bucket does not exist")

// ErrTxNotFound is returned when querying using a txid that's not
// found in our db.
ErrTxNotFound = errors.New("tx not found")
)

// TxRecord specifies a record of a tx that's stored in the database.
type TxRecord struct {
// Txid is the sweeping tx's txid, which is used as the key to store
// the following values.
Txid chainhash.Hash
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use tlv.RecordT here now to declare the types inline. These are also all primitive records, so no extra code would be needed.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not part of the serialized tlv stream but a comment which highlights this would be good

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the design could be improved, because we don't really need the Txid only as a key saving the record, so maybe supply it as a function parameter rather than having it as part of the record ?

Copy link
Copy Markdown
Member Author

@yyforyongyu yyforyongyu Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried tlv.RecordT and came up with something like this,

// TxRecord specifies a record of a tx that's stored in the database.
type TxRecord struct {
	// FeeRate is the fee rate of the sweeping tx, unit is sats/kw.
	feeRate tlv.RecordT[tlv.TlvType0, uint64]

	// Fee is the fee of the sweeping tx, unit is sat.
	fee tlv.RecordT[tlv.TlvType1, uint64]

	// Published indicates whether the tx has been published.
	published tlv.RecordT[tlv.TlvType2, bool]
}

func encode(w io.Writer, tr *TxRecord) error {
	recordProducers := []tlv.RecordProducer{
		&tr.feeRate,
		&tr.fee,
		&tr.published,
	}

	records := make([]tlv.Record, 0, len(recordProducers))
	for _, producer := range recordProducers {
		records = append(records, producer.Record())
	}

	tlvStream, err := tlv.NewStream(records...)
	if err != nil {
		return err
	}

	// Encode the tlv stream.
	var buf bytes.Buffer
	if err := tlvStream.Encode(&buf); err != nil {
		return err
	}

	// Write the tlv stream.
	if _, err = w.Write(buf.Bytes()); err != nil {
		return err
	}

	return nil
}

The problem is I cannot unpack the record producers easily like extraData.PackRecords(recordProducers...), which is used in other places that are more related to the wire messages. Also feel a bit weird to read and write via feeRate.Val. The core issue tho, is we are not using primitive types but bigsize here to save bytes, which we'll need to update the tlv package to have this type.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also want to store historical information w.r.t the inputs as well? Given that during the course of fee bumping for a given input, it may have been bundled in several distinct transactions before confirmation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the historical info is implicitly stored, as each sweeping tx is stored, we can find the input by looking at the tx's inputs. Also we will not bump a given input, but always the tx, due to the reason(RBF tx, not inputs) mentioned in #8424.


// FeeRate is the fee rate of the sweeping tx, unit is sats/kw.
FeeRate uint64

// Fee is the fee of the sweeping tx, unit is sat.
Fee uint64

// Published indicates whether the tx has been published.
Published bool
}

// toTlvStream converts TxRecord into a tlv representation.
func (t *TxRecord) toTlvStream() (*tlv.Stream, error) {
const (
// A set of tlv type definitions used to serialize TxRecord.
// We define it here instead of the head of the file to avoid
// naming conflicts.
//
// NOTE: A migration should be added whenever the existing type
// changes.
//
// NOTE: Txid is stored as the key, so it's not included here.
feeRateType tlv.Type = 0
feeType tlv.Type = 1
boolType tlv.Type = 2
)

return tlv.NewStream(
tlv.MakeBigSizeRecord(feeRateType, &t.FeeRate),
tlv.MakeBigSizeRecord(feeType, &t.Fee),
tlv.MakePrimitiveRecord(boolType, &t.Published),
)
}

// serializeTxRecord serializes a TxRecord based on tlv format.
func serializeTxRecord(w io.Writer, tx *TxRecord) error {
// Create the tlv stream.
tlvStream, err := tx.toTlvStream()
if err != nil {
return err
}

// Encode the tlv stream.
var buf bytes.Buffer
if err := tlvStream.Encode(&buf); err != nil {
return err
}

// Write the tlv stream.
if _, err = w.Write(buf.Bytes()); err != nil {
return err
}

return nil
}
Comment on lines 58 to 101
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialization and deserialization methods for TxRecord using TLV encoding are correctly implemented. These methods are crucial for ensuring data integrity and compatibility. It's important to add comprehensive tests for these methods to cover various edge cases and data values.

Suggest adding more tests for serializeTxRecord and deserializeTxRecord to cover edge cases and different data values.


// deserializeTxRecord deserializes a TxRecord based on tlv format.
func deserializeTxRecord(r io.Reader) (*TxRecord, error) {
var tx TxRecord

// Create the tlv stream.
tlvStream, err := tx.toTlvStream()
if err != nil {
return nil, err
}

if err := tlvStream.Decode(r); err != nil {
return nil, err
}

return &tx, nil
}

// SweeperStore stores published txes.
type SweeperStore interface {
// IsOurTx determines whether a tx is published by us, based on its
// hash.
IsOurTx(hash chainhash.Hash) (bool, error)

// NotifyPublishTx signals that we are about to publish a tx.
NotifyPublishTx(*wire.MsgTx) error
// StoreTx stores a tx hash we are about to publish.
StoreTx(*TxRecord) error

// ListSweeps lists all the sweeps we have successfully published.
ListSweeps() ([]chainhash.Hash, error)

// GetTx queries the database to find the tx that matches the given
// txid. Returns ErrTxNotFound if it cannot be found.
GetTx(hash chainhash.Hash) (*TxRecord, error)

// DeleteTx removes a tx specified by the hash from the store.
DeleteTx(hash chainhash.Hash) error
Comment thread
ziggie1984 marked this conversation as resolved.
Outdated
}

type sweeperStore struct {
Expand Down Expand Up @@ -83,6 +174,8 @@ func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) (

// migrateTxHashes migrates nursery finalized txes to the tx hashes bucket. This
// is not implemented as a database migration, to keep the downgrade path open.
//
// TODO(yy): delete this function once nursery is removed.
func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
chainHash *chainhash.Hash) error {

Expand Down Expand Up @@ -138,7 +231,24 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
log.Debugf("Inserting nursery tx %v in hash list "+
"(height=%v)", hash, byteOrder.Uint32(k))

return txHashesBucket.Put(hash[:], []byte{})
// Create the transaction record. Since this is an old record,
// we can assume it's already been published. Although it's
// possible to calculate the fees and fee rate used here, we
// skip it as it's unlikely we'd perform RBF on these old
// sweeping transactions.
tr := &TxRecord{
Txid: hash,
Published: true,
}

// Serialize tx record.
var b bytes.Buffer
err = serializeTxRecord(&b, tr)
if err != nil {
return err
}

return txHashesBucket.Put(tr.Txid[:], b.Bytes())
})
if err != nil {
return err
Comment on lines 231 to 254
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [177-251]

The migrateTxHashes function is a thoughtful addition for transitioning old records to the new format, ensuring backward compatibility. However, consider adding more detailed logging for each migrated transaction to aid in debugging and verification.

+ log.Debugf("Migrating transaction: %v", hash)

Expand All @@ -147,18 +257,22 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
return nil
}

// NotifyPublishTx signals that we are about to publish a tx.
func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error {
// StoreTx stores that we are about to publish a tx.
func (s *sweeperStore) StoreTx(tr *TxRecord) error {
return kvdb.Update(s.db, func(tx kvdb.RwTx) error {

txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
if txHashesBucket == nil {
return errNoTxHashesBucket
}

hash := sweepTx.TxHash()
// Serialize tx record.
var b bytes.Buffer
err := serializeTxRecord(&b, tr)
if err != nil {
return err
}

return txHashesBucket.Put(hash[:], []byte{})
return txHashesBucket.Put(tr.Txid[:], b.Bytes())
}, func() {})
}

Expand Down Expand Up @@ -215,5 +329,66 @@ func (s *sweeperStore) ListSweeps() ([]chainhash.Hash, error) {
return sweepTxns, nil
}

// GetTx queries the database to find the tx that matches the given txid.
// Returns ErrTxNotFound if it cannot be found.
func (s *sweeperStore) GetTx(txid chainhash.Hash) (*TxRecord, error) {
// Create a record.
tr := &TxRecord{}

var err error
err = kvdb.View(s.db, func(tx kvdb.RTx) error {
txHashesBucket := tx.ReadBucket(txHashesBucketKey)
if txHashesBucket == nil {
return errNoTxHashesBucket
}

txBytes := txHashesBucket.Get(txid[:])
if txBytes == nil {
return ErrTxNotFound
}

// For old records, we'd get an empty byte slice here. We can
// assume it's already been published. Although it's possible
// to calculate the fees and fee rate used here, we skip it as
// it's unlikely we'd perform RBF on these old sweeping
// transactions.
//
// TODO(yy): remove this check once migration is added.
if len(txBytes) == 0 {
tr.Published = true
return nil
}

tr, err = deserializeTxRecord(bytes.NewReader(txBytes))
if err != nil {
return err
}

return nil
}, func() {
tr = &TxRecord{}
})
if err != nil {
return nil, err
}

// Attach the txid to the record.
tr.Txid = txid

return tr, nil
}

// DeleteTx removes the given tx from db.
func (s *sweeperStore) DeleteTx(txid chainhash.Hash) error {
return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
if txHashesBucket == nil {
return errNoTxHashesBucket
}

return txHashesBucket.Delete(txid[:])
}, func() {})
}

// Compile-time constraint to ensure sweeperStore implements SweeperStore.
var _ SweeperStore = (*sweeperStore)(nil)
51 changes: 32 additions & 19 deletions sweep/store_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,58 @@ package sweep

import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/stretchr/testify/mock"
)

// MockSweeperStore is a mock implementation of sweeper store. This type is
// exported, because it is currently used in nursery tests too.
type MockSweeperStore struct {
ourTxes map[chainhash.Hash]struct{}
mock.Mock
Comment thread
yyforyongyu marked this conversation as resolved.
Outdated
}

// NewMockSweeperStore returns a new instance.
func NewMockSweeperStore() *MockSweeperStore {
Comment thread
ziggie1984 marked this conversation as resolved.
Outdated
return &MockSweeperStore{
ourTxes: make(map[chainhash.Hash]struct{}),
}
return &MockSweeperStore{}
}

// IsOurTx determines whether a tx is published by us, based on its
// hash.
// IsOurTx determines whether a tx is published by us, based on its hash.
func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
_, ok := s.ourTxes[hash]
return ok, nil
}
args := s.Called(hash)

// NotifyPublishTx signals that we are about to publish a tx.
func (s *MockSweeperStore) NotifyPublishTx(tx *wire.MsgTx) error {
txHash := tx.TxHash()
s.ourTxes[txHash] = struct{}{}
return args.Bool(0), args.Error(1)
}

return nil
// StoreTx stores a tx we are about to publish.
func (s *MockSweeperStore) StoreTx(tr *TxRecord) error {
args := s.Called(tr)
return args.Error(0)
Comment thread
yyforyongyu marked this conversation as resolved.
Outdated
}

// ListSweeps lists all the sweeps we have successfully published.
func (s *MockSweeperStore) ListSweeps() ([]chainhash.Hash, error) {
var txns []chainhash.Hash
for tx := range s.ourTxes {
txns = append(txns, tx)
args := s.Called()

return args.Get(0).([]chainhash.Hash), args.Error(1)
}

// GetTx queries the database to find the tx that matches the given txid.
// Returns ErrTxNotFound if it cannot be found.
func (s *MockSweeperStore) GetTx(hash chainhash.Hash) (*TxRecord, error) {
args := s.Called(hash)

tr := args.Get(0)
if tr != nil {
return args.Get(0).(*TxRecord), args.Error(1)
}

return txns, nil
return nil, args.Error(1)
}

// DeleteTx removes the given tx from db.
func (s *MockSweeperStore) DeleteTx(txid chainhash.Hash) error {
args := s.Called(txid)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these tests pass as is? Given there were no expectations declared, so in my exp, the tests fails as it declares that it got an unexpected call.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah exactly, I was confused too, then realized none of the sweeper store's methods were hit in the sweeper's unit tests...they are properly mocked in the following PRs tho.


return args.Error(0)
}

// Compile-time constraint to ensure MockSweeperStore implements SweeperStore.
Expand Down
Loading