Skip to content

Support group keys for RFQ negotiation flows #1382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,16 @@ func (s *Specifier) UnwrapToPtr() (*ID, *btcec.PublicKey) {
return s.UnwrapIdToPtr(), s.UnwrapGroupKeyToPtr()
}

// AssertNotEmpty checks whether the specifier is empty, returning an error if
// so.
func (s *Specifier) AssertNotEmpty() error {
if !s.HasId() && !s.HasGroupPubKey() {
return fmt.Errorf("asset specifier is empty")
}

return nil
}

// Type denotes the asset types supported by the Taproot Asset protocol.
type Type uint8

Expand Down
106 changes: 106 additions & 0 deletions itest/rfq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,112 @@ func testRfqAssetSellHtlcIntercept(t *harnessTest) {
require.NoError(t.t, err)
}

// testRfqNegotiationGroupKey checks that two nodes can negotiate and register
// quotes based on a specifier that only uses a group key.
func testRfqNegotiationGroupKey(t *harnessTest) {
// Initialize a new test scenario.
ts := newRfqTestScenario(t)

// Mint an asset with Alice's tapd node.
rpcAssets := MintAssetsConfirmBatch(
t.t, t.lndHarness.Miner().Client, ts.AliceTapd,
[]*mintrpc.MintAssetRequest{issuableAssets[0]},
)

mintedAssetGroupKey := rpcAssets[0].AssetGroup.TweakedGroupKey

ctxb := context.Background()
ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout)
defer cancel()

// Subscribe to Alice's RFQ events stream.
aliceEventNtfns, err := ts.AliceTapd.SubscribeRfqEventNtfns(
ctxb, &rfqrpc.SubscribeRfqEventNtfnsRequest{},
)
require.NoError(t.t, err)

// Alice sends a sell order to Bob for some amount of the newly minted
// asset.
askAmt := uint64(42000)
sellOrderExpiry := uint64(time.Now().Add(24 * time.Hour).Unix())

// We first try to add a sell order without specifying the asset skip
// flag. That should result in an error, since we only have a normal
// channel and not an asset channel.
sellReq := &rfqrpc.AddAssetSellOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_GroupKey{
GroupKey: mintedAssetGroupKey,
},
},
PaymentMaxAmt: askAmt,
Expiry: sellOrderExpiry,

// Here we explicitly specify Bob as the destination
// peer for the sell order. This will prompt Alice's
// tapd node to send a request for quote message to
// Bob's node.
PeerPubKey: ts.BobLnd.PubKey[:],

TimeoutSeconds: uint32(rfqTimeout.Seconds()),
}
_, err = ts.AliceTapd.AddAssetSellOrder(ctxt, sellReq)
require.ErrorContains(
t.t, err, "no asset channel balance found",
)

// Now we set the skip flag and we shouldn't get an error anymore.
sellReq.SkipAssetChannelCheck = true
_, err = ts.AliceTapd.AddAssetSellOrder(ctxt, sellReq)
require.NoError(t.t, err, "unable to upsert asset sell order")

// Wait until Alice receives an incoming sell quote accept message (sent
// from Bob) RFQ event notification.
BeforeTimeout(t.t, func() {
event, err := aliceEventNtfns.Recv()
require.NoError(t.t, err)

_, ok := event.Event.(*rfqrpc.RfqEvent_PeerAcceptedSellQuote)
require.True(t.t, ok, "unexpected event: %v", event)
}, rfqTimeout)

// We now repeat the same flow, where Alice is making a BuyOrderRequest.
assetMaxAmt := uint64(1000)
buyOrderExpiry := sellOrderExpiry

buyReq := &rfqrpc.AddAssetBuyOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_GroupKey{
GroupKey: mintedAssetGroupKey,
},
},
AssetMaxAmt: assetMaxAmt,
Expiry: buyOrderExpiry,
PeerPubKey: ts.BobLnd.PubKey[:],
TimeoutSeconds: uint32(rfqTimeout.Seconds()),
}

_, err = ts.AliceTapd.AddAssetBuyOrder(ctxt, buyReq)
require.ErrorContains(
t.t, err, "no asset channel balance found",
)

// Now we set the skip flag and we shouldn't get an error anymore.
buyReq.SkipAssetChannelCheck = true
_, err = ts.AliceTapd.AddAssetBuyOrder(ctxt, buyReq)
require.NoError(t.t, err)

// Wait until Alice receives an incoming buy quote accept message (sent
// from Bob) RFQ event notification.
BeforeTimeout(t.t, func() {
event, err := aliceEventNtfns.Recv()
require.NoError(t.t, err)

_, ok := event.Event.(*rfqrpc.RfqEvent_PeerAcceptedBuyQuote)
require.True(t.t, ok, "unexpected event: %v", event)
}, rfqTimeout)
}

// rfqTestScenario is a struct which holds test scenario helper infra.
type rfqTestScenario struct {
testHarness *harnessTest
Expand Down
5 changes: 4 additions & 1 deletion itest/test_list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,10 @@ var testCases = []*testCase{
name: "rfq asset sell htlc intercept",
test: testRfqAssetSellHtlcIntercept,
},

{
name: "rfq negotiation group key",
test: testRfqNegotiationGroupKey,
},
{
name: "multi signature on all levels",
test: testMultiSignature,
Expand Down
149 changes: 129 additions & 20 deletions rfq/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rfq

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
Expand Down Expand Up @@ -61,6 +62,14 @@ type (
SellAcceptMap map[SerialisedScid]rfqmsg.SellAccept
)

// GroupLookup is an interface that helps us look up a group of an asset based
// on the asset ID.
type GroupLookup interface {
// QueryAssetGroup fetches the group information of an asset, if it
// belongs in a group.
QueryAssetGroup(context.Context, asset.ID) (*asset.AssetGroup, error)
}

// ManagerCfg is a struct that holds the configuration parameters for the RFQ
// manager.
type ManagerCfg struct {
Expand All @@ -84,6 +93,10 @@ type ManagerCfg struct {
// determine the available channels for routing.
ChannelLister ChannelLister

// GroupLookup is an interface that helps us querry asset groups by
// asset IDs.
GroupLookup GroupLookup

// AliasManager is the SCID alias manager. This component is injected
// into the manager once lnd and tapd are hooked together.
AliasManager ScidAliasManager
Expand Down Expand Up @@ -165,6 +178,12 @@ type Manager struct {
SerialisedScid, rfqmsg.SellAccept,
]

// groupKeyLookupCache is a map that helps us quickly perform an
// in-memory look up of the group an asset belongs to. Since this
// information is static and generated during minting, it is not
// possible for an asset to change groups.
groupKeyLookupCache lnutils.SyncMap[asset.ID, *btcec.PublicKey]

// subscribers is a map of components that want to be notified on new
// events, keyed by their subscription ID.
subscribers lnutils.SyncMap[uint64, *fn.EventReceiver[fn.Event]]
Expand Down Expand Up @@ -539,18 +558,7 @@ func (m *Manager) addScidAlias(scidAlias uint64, assetSpecifier asset.Specifier,
return c.PubKeyBytes == peer
}, localChans)

// Identify the correct channel to use as the base SCID for the alias
// by inspecting the asset data in the custom channel data.
assetID, err := assetSpecifier.UnwrapIdOrErr()
if err != nil {
return fmt.Errorf("asset ID must be specified when adding "+
"alias: %w", err)
}

var (
assetIDStr = assetID.String()
baseSCID uint64
)
var baseSCID uint64
for _, localChan := range peerChannels {
if len(localChan.CustomChannelData) == 0 {
continue
Expand All @@ -564,12 +572,20 @@ func (m *Manager) addScidAlias(scidAlias uint64, assetSpecifier asset.Specifier,
continue
}

for _, channelAsset := range assetData.Assets {
gen := channelAsset.AssetInfo.AssetGenesis
if gen.AssetID == assetIDStr {
baseSCID = localChan.ChannelID
break
}
match, err := m.ChannelCompatible(
ctxb, assetData.Assets, assetSpecifier,
)
if err != nil {
return err
}

// TODO(george): Instead of returning the first result,
// try to pick the best channel for what we're trying to
// do (receive/send). Binding a baseSCID means we're
// also binding the asset liquidity on that channel.
if match {
baseSCID = localChan.ChannelID
break
}
}

Expand All @@ -583,8 +599,8 @@ func (m *Manager) addScidAlias(scidAlias uint64, assetSpecifier asset.Specifier,
// At this point, if the base SCID is still not found, we return an
// error. We can't map the SCID alias to a base SCID.
if baseSCID == 0 {
return fmt.Errorf("add alias: base SCID not found for asset: "+
"%v", assetID)
return fmt.Errorf("add alias: base SCID not found for %s",
&assetSpecifier)
}

log.Debugf("Adding SCID alias %d for base SCID %d", scidAlias, baseSCID)
Expand Down Expand Up @@ -917,6 +933,99 @@ func (m *Manager) RemoveSubscriber(
return nil
}

// getAssetGroupKey retrieves the group key of an asset based on its ID.
func (m *Manager) getAssetGroupKey(ctx context.Context,
id asset.ID) (fn.Option[btcec.PublicKey], error) {

// First, see if we have already queried our DB for this ID.
v, ok := m.groupKeyLookupCache.Load(id)
if ok {
return fn.Some(*v), nil
}

// Perform the DB query.
group, err := m.cfg.GroupLookup.QueryAssetGroup(ctx, id)
if err != nil {
return fn.None[btcec.PublicKey](), err
}

// If the asset does not belong to a group, return early with no error
// or response.
if group == nil || group.GroupKey == nil {
return fn.None[btcec.PublicKey](), nil
}

// Store the result for future calls.
m.groupKeyLookupCache.Store(id, &group.GroupPubKey)

return fn.Some(group.GroupPubKey), nil
}

// AssetMatchesSpecifier checks if the provided asset satisfies the provided
// specifier. If the specifier includes a group key, we will check if the asset
// belongs to that group.
func (m *Manager) AssetMatchesSpecifier(ctx context.Context,
specifier asset.Specifier, id asset.ID) (bool, error) {

switch {
case specifier.HasGroupPubKey():
group, err := m.getAssetGroupKey(ctx, id)
if err != nil {
return false, err
}

if group.IsNone() {
return false, nil
}

specifierGK := specifier.UnwrapGroupKeyToPtr()

return group.UnwrapToPtr().IsEqual(specifierGK), nil

case specifier.HasId():
specifierID := specifier.UnwrapIdToPtr()

return *specifierID == id, nil

default:
return false, fmt.Errorf("specifier is empty")
}
}

// ChannelCompatible checks a channel's assets against an asset specifier. If
// the specifier is an asset ID, then all assets must be of that specific ID,
// if the specifier is a group key, then all assets in the channel must belong
// to that group.
func (m *Manager) ChannelCompatible(ctx context.Context,
jsonAssets []rfqmsg.JsonAssetChanInfo, specifier asset.Specifier) (bool,
error) {

for _, chanAsset := range jsonAssets {
gen := chanAsset.AssetInfo.AssetGenesis
assetIDBytes, err := hex.DecodeString(
gen.AssetID,
)
if err != nil {
return false, fmt.Errorf("error decoding asset ID: %w",
err)
}

var assetID asset.ID
copy(assetID[:], assetIDBytes)

match, err := m.AssetMatchesSpecifier(ctx, specifier, assetID)
if err != nil {
return false, err
}

if !match {
return false, err
}
}

return true, nil
}

// publishSubscriberEvent publishes an event to all subscribers.
func (m *Manager) publishSubscriberEvent(event fn.Event) {
// Iterate over the subscribers and deliver the event to each one.
Expand Down
10 changes: 4 additions & 6 deletions rfqmsg/buy_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,9 @@ func NewBuyRequestFromWire(wireMsg WireMessage,
// Validate ensures that the buy request is valid.
func (q *BuyRequest) Validate() error {
// Ensure that the asset specifier is set.
//
// TODO(ffranr): For now, the asset ID must be set. We do not currently
// support group keys.
if !q.AssetSpecifier.HasId() {
return fmt.Errorf("asset id not specified in BuyRequest")
err := q.AssetSpecifier.AssertNotEmpty()
if err != nil {
return err
}

// Ensure that the message version is supported.
Expand All @@ -173,7 +171,7 @@ func (q *BuyRequest) Validate() error {
}

// Ensure that the suggested asset rate has not expired.
err := fn.MapOptionZ(q.AssetRateHint, func(rate AssetRate) error {
err = fn.MapOptionZ(q.AssetRateHint, func(rate AssetRate) error {
if rate.Expiry.Before(time.Now()) {
return fmt.Errorf("suggested asset rate has expired")
}
Expand Down
10 changes: 4 additions & 6 deletions rfqmsg/sell_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,10 @@ func NewSellRequestFromWire(wireMsg WireMessage,

// Validate ensures that the quote request is valid.
func (q *SellRequest) Validate() error {
// Ensure that the asset specifier is set.
//
// TODO(ffranr): For now, the asset ID must be set. We do not currently
// support group keys.
if !q.AssetSpecifier.HasId() {
return fmt.Errorf("asset id not specified in SellRequest")
// Ensure that the asset specifier is not empty.
err := q.AssetSpecifier.AssertNotEmpty()
if err != nil {
return err
}

// Ensure that the message version is supported.
Expand Down
Loading
Loading