Skip to content

Commit f007f14

Browse files
committed
taprpc+universe: implement sparse universe sync
1 parent 2e54ee5 commit f007f14

File tree

2 files changed

+176
-29
lines changed

2 files changed

+176
-29
lines changed

Diff for: taprpc/taprpc_utils.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package taprpc
22

3-
import "google.golang.org/protobuf/encoding/protojson"
3+
import (
4+
"google.golang.org/grpc/codes"
5+
"google.golang.org/grpc/status"
6+
"google.golang.org/protobuf/encoding/protojson"
7+
)
48

59
var (
610
// ProtoJSONMarshalOpts is a struct that holds the default marshal
@@ -40,3 +44,14 @@ var (
4044
UseHexForBytes: true,
4145
}
4246
)
47+
48+
// IsUnimplemented returns true if the error is a gRPC error with the code
49+
// Unimplemented.
50+
func IsUnimplemented(err error) bool {
51+
s, ok := status.FromError(err)
52+
if !ok {
53+
return false
54+
}
55+
56+
return s.Code() == codes.Unimplemented
57+
}

Diff for: universe/syncer.go

+160-28
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/lightninglabs/taproot-assets/fn"
1313
"github.com/lightninglabs/taproot-assets/mssmt"
1414
"github.com/lightninglabs/taproot-assets/proof"
15+
"github.com/lightninglabs/taproot-assets/taprpc"
1516
"golang.org/x/sync/errgroup"
1617
)
1718

@@ -113,6 +114,19 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
113114
// only fetch roots for those universes. We won't filter out any
114115
// Universes here as we assume that the caller has already done so.
115116
case len(idsToSync) != 0:
117+
// We attempt to bisect the set of IDs we really need to sync by
118+
// using ephemeral multiverse trees and a bisect algorithm to
119+
// find the diffs in the root nodes. This allows us to more
120+
// efficiently find out which roots we need to sync compared to
121+
// querying the remote server for each root individually.
122+
idsToSync, err = s.bisectOutdatedRoots(
123+
ctx, idsToSync, diffEngine,
124+
)
125+
if err != nil {
126+
return nil, fmt.Errorf("unable to bisect outdated "+
127+
"roots: %w", err)
128+
}
129+
116130
targetRoots, err = fetchRootsForIDs(ctx, idsToSync, diffEngine)
117131
if err != nil {
118132
return nil, err
@@ -123,6 +137,10 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
123137
case globalInsertEnabled:
124138
log.Infof("Fetching all roots for remote Universe server...")
125139

140+
// Since we're also interested in learning about _new_ universes
141+
// in this case, we can't use the bisect algorithm to find the
142+
// diffs in the root nodes. Instead, we'll just fetch all the
143+
// roots from the remote server.
126144
targetRoots, err = s.fetchAllRoots(ctx, diffEngine)
127145
if err != nil {
128146
return nil, err
@@ -140,18 +158,26 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
140158
// configs.
141159
default:
142160
var uniIDs []Identifier
143-
144161
for _, uniSyncCfg := range syncConfigs.UniSyncConfigs {
145162
// Check with the filter to ensure that the universe is
146163
// applicable for syncing. If not, we would have
147164
// retrieved the corresponding root in vain.
148165
if uniIdSyncFilter(uniSyncCfg.UniverseID) {
149-
uniIDs = append(
150-
uniIDs, uniSyncCfg.UniverseID,
151-
)
166+
uniIDs = append(uniIDs, uniSyncCfg.UniverseID)
152167
}
153168
}
154169

170+
// We attempt to bisect the set of IDs we really need to sync by
171+
// using ephemeral multiverse trees and a bisect algorithm to
172+
// find the diffs in the root nodes. This allows us to more
173+
// efficiently find out which roots we need to sync compared to
174+
// querying the remote server for each root individually.
175+
uniIDs, err = s.bisectOutdatedRoots(ctx, uniIDs, diffEngine)
176+
if err != nil {
177+
return nil, fmt.Errorf("unable to bisect outdated "+
178+
"roots: %w", err)
179+
}
180+
155181
// Retrieve roots for the gathered set of universes.
156182
targetRoots, err = fetchRootsForIDs(ctx, uniIDs, diffEngine)
157183
if err != nil {
@@ -190,8 +216,7 @@ func fetchRootsForIDs(ctx context.Context, idsToSync []Identifier,
190216
// as a series of parallel requests backed by a worker pool.
191217
rootsToSync := make(chan Root, len(idsToSync))
192218
err := fn.ParSlice(
193-
ctx, idsToSync,
194-
func(ctx context.Context, id Identifier) error {
219+
ctx, idsToSync, func(ctx context.Context, id Identifier) error {
195220
root, err := diffEngine.RootNode(ctx, id)
196221
if err != nil {
197222
return err
@@ -209,6 +234,117 @@ func fetchRootsForIDs(ctx context.Context, idsToSync []Identifier,
209234
return fn.Collect(rootsToSync), nil
210235
}
211236

237+
// bisectOutdatedRoots attempts to bisect the set of IDs we need to sync by
238+
// using ephemeral multiverse trees and a bisect algorithm to find the diffs in
239+
// the root nodes. This allows us to more efficiently find out which roots we
240+
// need to sync compared to querying the remote server for each root
241+
// individually. If the server doesn't yet implement the MultiverseRoot RPC, we
242+
// simply return the original set of IDs and the "legacy" sync algorithm will be
243+
// used.
244+
func (s *SimpleSyncer) bisectOutdatedRoots(ctx context.Context,
245+
idsToSync []Identifier, diffEngine DiffEngine) ([]Identifier, error) {
246+
247+
issuanceIDs := make([]Identifier, 0, len(idsToSync))
248+
transferIDs := make([]Identifier, 0, len(idsToSync))
249+
for _, id := range idsToSync {
250+
switch id.ProofType {
251+
case ProofTypeIssuance:
252+
issuanceIDs = append(issuanceIDs, id)
253+
254+
case ProofTypeTransfer:
255+
transferIDs = append(transferIDs, id)
256+
257+
case ProofTypeUnspecified:
258+
issuanceIDs = append(issuanceIDs, id)
259+
transferIDs = append(transferIDs, id)
260+
}
261+
}
262+
263+
targetIDs := make([]Identifier, 0, len(idsToSync))
264+
265+
// Compare the local and remote issuance trees.
266+
if len(issuanceIDs) > 0 {
267+
var localIssuanceNode, remoteIssuanceNode mssmt.Node
268+
localIssuanceTree, err := s.cfg.LocalDiffEngine.MultiverseRoot(
269+
ctx, ProofTypeIssuance, issuanceIDs,
270+
)
271+
if err != nil {
272+
return nil, fmt.Errorf("unable to fetch local "+
273+
"issuance multiverse root: %w", err)
274+
}
275+
localIssuanceTree.WhenSome(func(root MultiverseRoot) {
276+
localIssuanceNode = root.Node
277+
})
278+
279+
// Now fetch the remote trees.
280+
remoteIssuanceTree, err := diffEngine.MultiverseRoot(
281+
ctx, ProofTypeIssuance, issuanceIDs,
282+
)
283+
284+
// Special case for when the server doesn't yet implement the
285+
// MultiverseRoot RPC. In this case, we simply return the
286+
// original set of IDs and the "legacy" sync algorithm will be
287+
// used.
288+
if err != nil && taprpc.IsUnimplemented(err) {
289+
return idsToSync, nil
290+
} else if err != nil {
291+
return nil, fmt.Errorf("unable to fetch remote "+
292+
"issuance multiverse root: %w", err)
293+
}
294+
remoteIssuanceTree.WhenSome(func(root MultiverseRoot) {
295+
remoteIssuanceNode = root.Node
296+
})
297+
298+
// Compare the local and remote issuance trees. If they differ,
299+
// we need to sync all the issuance proofs.
300+
if !mssmt.IsEqualNode(localIssuanceNode, remoteIssuanceNode) {
301+
targetIDs = append(targetIDs, issuanceIDs...)
302+
}
303+
}
304+
305+
// Compare the local and remote transfer trees.
306+
if len(transferIDs) > 0 {
307+
var localTransferNode, remoteTransferNode mssmt.Node
308+
localTransferTree, err := s.cfg.LocalDiffEngine.MultiverseRoot(
309+
ctx, ProofTypeTransfer, transferIDs,
310+
)
311+
// Special case for when the server doesn't yet implement the
312+
// MultiverseRoot RPC. In this case, we simply return the
313+
// original set of IDs and the "legacy" sync algorithm will be
314+
// used.
315+
if err != nil && taprpc.IsUnimplemented(err) {
316+
return idsToSync, nil
317+
} else if err != nil {
318+
return nil, fmt.Errorf("unable to fetch local "+
319+
"transfer multiverse root: %w", err)
320+
}
321+
localTransferTree.WhenSome(func(root MultiverseRoot) {
322+
localTransferNode = root.Node
323+
})
324+
325+
remoteTransferTree, err := diffEngine.MultiverseRoot(
326+
ctx, ProofTypeTransfer, issuanceIDs,
327+
)
328+
if err != nil {
329+
return nil, fmt.Errorf("unable to fetch remote "+
330+
"transfer multiverse root: %w", err)
331+
}
332+
333+
// Compare the local and remote transfer trees. If they differ,
334+
// we need to sync all the transfer proofs.
335+
remoteTransferTree.WhenSome(func(root MultiverseRoot) {
336+
remoteTransferNode = root.Node
337+
})
338+
if !mssmt.IsEqualNode(localTransferNode, remoteTransferNode) {
339+
targetIDs = append(targetIDs, transferIDs...)
340+
}
341+
}
342+
343+
// TODO(guggero): Do an actual bi-sect here.
344+
345+
return targetIDs, nil
346+
}
347+
212348
// syncRoot attempts to sync the local Universe with the remote diff engine for
213349
// a specific base root.
214350
func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
@@ -336,7 +472,7 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
336472
return err
337473
}
338474

339-
// If this is a tranfer tree, then we'll collect all the items as we
475+
// If this is a transfer tree, then we'll collect all the items as we
340476
// need to sort them to ensure we can validate them in dep order.
341477
if !isIssuanceTree {
342478
transferLeaves := fn.Collect(transferLeafProofs)
@@ -348,17 +484,13 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
348484
iRecord := proof.BlockHeightRecord(&iBlockHeight)
349485
jRecord := proof.BlockHeightRecord(&jBlockHeight)
350486

351-
_ = proof.SparseDecode(
352-
//nolint:lll
353-
bytes.NewReader(transferLeaves[i].Leaf.RawProof),
354-
iRecord,
355-
)
487+
_ = proof.SparseDecode(bytes.NewReader(
488+
transferLeaves[i].Leaf.RawProof,
489+
), iRecord)
356490

357-
_ = proof.SparseDecode(
358-
//nolint:lll
359-
bytes.NewReader(transferLeaves[j].Leaf.RawProof),
360-
jRecord,
361-
)
491+
_ = proof.SparseDecode(bytes.NewReader(
492+
transferLeaves[j].Leaf.RawProof,
493+
), jRecord)
362494

363495
return iBlockHeight < jBlockHeight
364496
})
@@ -468,22 +600,22 @@ func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr,
468600
// fetchAllRoots fetches all the roots from the remote Universe. This function
469601
// is used in order to isolate any logic related to the specifics of how we
470602
// fetch the data from the universe server.
471-
func (s *SimpleSyncer) fetchAllRoots(ctx context.Context, diffEngine DiffEngine) ([]Root, error) {
603+
func (s *SimpleSyncer) fetchAllRoots(ctx context.Context,
604+
diffEngine DiffEngine) ([]Root, error) {
605+
472606
offset := int32(0)
473607
pageSize := defaultPageSize
474-
roots := make([]Root, 0)
475608

609+
var roots []Root
476610
for {
477611
log.Debugf("Fetching roots in range: %v to %v", offset,
478612
offset+pageSize)
479-
tempRoots, err := diffEngine.RootNodes(
480-
ctx, RootNodesQuery{
481-
WithAmountsById: false,
482-
SortDirection: SortAscending,
483-
Offset: offset,
484-
Limit: pageSize,
485-
},
486-
)
613+
tempRoots, err := diffEngine.RootNodes(ctx, RootNodesQuery{
614+
WithAmountsById: false,
615+
SortDirection: SortAscending,
616+
Offset: offset,
617+
Limit: pageSize,
618+
})
487619

488620
if err != nil {
489621
return nil, err
@@ -509,8 +641,8 @@ func (s *SimpleSyncer) fetchAllLeafKeys(ctx context.Context,
509641
// Initialize the offset to be used for the pages.
510642
offset := int32(0)
511643
pageSize := defaultPageSize
512-
leafKeys := make([]LeafKey, 0)
513644

645+
var leafKeys []LeafKey
514646
for {
515647
tempRemoteKeys, err := diffEngine.UniverseLeafKeys(
516648
ctx, UniverseLeafKeysQuery{

0 commit comments

Comments
 (0)