Skip to content

graph: move graph cache out of CRUD layer #9544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ed8e10e
graph/db: rename graph.go file
ellemouton Feb 18, 2025
1ee4bb8
graph/db: rename ChannelGraph and introduce the new ChannelGraph layer
ellemouton Feb 18, 2025
ae3961b
graph/db: fix linter issues of old code
ellemouton Feb 19, 2025
81e0608
graph/db: rename Options to KVStoreOptions
ellemouton Feb 18, 2025
00432e4
multi: add ChannelGraph Config struct
ellemouton Feb 18, 2025
88398e3
graph/db: let ChannelGraph init the graphCache
ellemouton Feb 18, 2025
9fe9e32
graph/db: move cache read checks to ChannelGraph.
ellemouton Feb 18, 2025
f75e6a1
graph/db: move various cache write calls to ChannelGraph
ellemouton Feb 18, 2025
081c9dc
graph/db: refactor delChannelEdgeUnsafe to return edge info
ellemouton Feb 17, 2025
71e5ab6
graph/db: move some cache writes to ChannelGraph.
ellemouton Feb 17, 2025
941e7bf
graph/db: move cache update out of pruneGraphNodes
ellemouton Feb 17, 2025
cc4fcbf
graph/db: move cache writes for Prune methods
ellemouton Feb 17, 2025
4d00eb2
graph/db: move FilterKnownChanIDs zombie logic up one layer
ellemouton Feb 19, 2025
9d0b9f9
graph/db: move cache write for MarkEdgeZombie
ellemouton Feb 25, 2025
ba1d21d
graph/db: move cache write for UpdateEdgePolicy
ellemouton Feb 17, 2025
bb3839e
graph/db: completely remove cache from KVStore
ellemouton Feb 18, 2025
b497c46
multi: add Start and Stop methods for ChannelGraph
ellemouton Feb 19, 2025
4545088
graph/db: populate the graph cache in Start instead of during constru…
ellemouton Feb 19, 2025
8c11ca9
itest: rename closure for clarity
ellemouton Mar 3, 2025
caf69cc
docs: update release notes
ellemouton Mar 3, 2025
4131b3f
graph/db: adjust TestPartialNode
ellemouton Mar 6, 2025
2221aaa
graph/db: move Topology client management to ChannelGraph
ellemouton Feb 19, 2025
878746c
graph/db: refactor to group all topology notification fields
ellemouton Mar 5, 2025
947ca93
docs: update release notes
ellemouton Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions autopilot/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/graph"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
)
Expand Down Expand Up @@ -36,7 +36,7 @@ type ManagerCfg struct {

// SubscribeTopology is used to get a subscription for topology changes
// on the network.
SubscribeTopology func() (*graph.TopologyClient, error)
SubscribeTopology func() (*graphdb.TopologyClient, error)
}

// Manager is struct that manages an autopilot agent, making it possible to
Expand Down
7 changes: 6 additions & 1 deletion autopilot/prefattach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ func newDiskChanGraph(t *testing.T) (testGraph, error) {
})
require.NoError(t, err)

graphDB, err := graphdb.NewChannelGraph(backend)
graphDB, err := graphdb.NewChannelGraph(&graphdb.Config{KVDB: backend})
require.NoError(t, err)

require.NoError(t, graphDB.Start())
t.Cleanup(func() {
require.NoError(t, graphDB.Stop())
})

return &testDBGraph{
db: graphDB,
databaseChannelGraph: databaseChannelGraph{
Expand Down
16 changes: 10 additions & 6 deletions config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,26 +1026,30 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
"instances")
}

graphDBOptions := []graphdb.OptionModifier{
graphDBOptions := []graphdb.KVStoreOptionModifier{
graphdb.WithRejectCacheSize(cfg.Caches.RejectCacheSize),
graphdb.WithChannelCacheSize(cfg.Caches.ChannelCacheSize),
graphdb.WithBatchCommitInterval(cfg.DB.BatchCommitInterval),
}

chanGraphOpts := []graphdb.ChanGraphOption{
graphdb.WithUseGraphCache(!cfg.DB.NoGraphCache),
}

// We want to pre-allocate the channel graph cache according to what we
// expect for mainnet to speed up memory allocation.
if cfg.ActiveNetParams.Name == chaincfg.MainNetParams.Name {
graphDBOptions = append(
graphDBOptions, graphdb.WithPreAllocCacheNumNodes(
chanGraphOpts = append(
chanGraphOpts, graphdb.WithPreAllocCacheNumNodes(
graphdb.DefaultPreAllocCacheNumNodes,
),
)
}

dbs.GraphDB, err = graphdb.NewChannelGraph(
databaseBackends.GraphDB, graphDBOptions...,
)
dbs.GraphDB, err = graphdb.NewChannelGraph(&graphdb.Config{
KVDB: databaseBackends.GraphDB,
KVStoreOpts: graphDBOptions,
}, chanGraphOpts...)
if err != nil {
cleanUp()

Expand Down
5 changes: 5 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ The underlying functionality between those two options remain the same.
- [Abstract autopilot access](https://github.com/lightningnetwork/lnd/pull/9480)
- [Abstract invoicerpc server access](https://github.com/lightningnetwork/lnd/pull/9516)
- [Refactor to hide DB transactions](https://github.com/lightningnetwork/lnd/pull/9513)
- Move the [graph cache out of the graph
CRUD](https://github.com/lightningnetwork/lnd/pull/9544) layer.
- Move [topology
subscription](https://github.com/lightningnetwork/lnd/pull/9577) and
notification handling from the graph.Builder to the ChannelGraph.

* [Golang was updated to
`v1.22.11`](https://github.com/lightningnetwork/lnd/pull/9462).
Expand Down
117 changes: 6 additions & 111 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ type Builder struct {
started atomic.Bool
stopped atomic.Bool

ntfnClientCounter atomic.Uint64
bestHeight atomic.Uint32
bestHeight atomic.Uint32

cfg *Config

Expand All @@ -123,22 +122,6 @@ type Builder struct {
// of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock

// topologyUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
topologyUpdates chan any

// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients *lnutils.SyncMap[uint64, *topologyClient]

// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the Builder. Updates either
// add a new notification client, or cancel notifications for an
// existing client.
ntfnClientUpdates chan *topologyClientUpdate

// channelEdgeMtx is a mutex we use to make sure we process only one
// ChannelEdgePolicy at a time for a given channelID, to ensure
// consistency between the various database accesses.
Expand All @@ -163,14 +146,11 @@ var _ ChannelGraphSource = (*Builder)(nil)
// NewBuilder constructs a new Builder.
func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{
cfg: cfg,
topologyUpdates: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](),
statTicker: ticker.New(defaultStatInterval),
stats: new(builderStats),
quit: make(chan struct{}),
cfg: cfg,
channelEdgeMtx: multimutex.NewMutex[uint64](),
statTicker: ticker.New(defaultStatInterval),
stats: new(builderStats),
quit: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -656,28 +636,6 @@ func (b *Builder) pruneZombieChans() error {
return nil
}

// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
func (b *Builder) handleTopologyUpdate(update any) {
defer b.wg.Done()

topChange := &TopologyChange{}
err := addToTopologyChange(b.cfg.Graph, topChange, update)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}

if topChange.isEmpty() {
return
}

b.notifyTopologyChange(topChange)
}

// networkHandler is the primary goroutine for the Builder. The roles of
// this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification, applying network
Expand All @@ -701,16 +659,6 @@ func (b *Builder) networkHandler() {
}

select {
// A new fully validated topology update has just arrived.
// We'll notify any registered clients.
case update := <-b.topologyUpdates:
b.wg.Add(1)
go b.handleTopologyUpdate(update)

// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.

case chainUpdate, ok := <-b.staleBlocks:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
Expand Down Expand Up @@ -783,31 +731,6 @@ func (b *Builder) networkHandler() {
" processed.", chainUpdate.Height)
}

// A new notification client update has arrived. We're either
// gaining a new client, or cancelling notifications for an
// existing client.
case ntfnUpdate := <-b.ntfnClientUpdates:
clientID := ntfnUpdate.clientID

if ntfnUpdate.cancel {
client, ok := b.topologyClients.LoadAndDelete(
clientID,
)
if ok {
close(client.exit)
client.wg.Wait()

close(client.ntfnChan)
}

continue
}

b.topologyClients.Store(clientID, &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}),
})

// The graph prune ticker has ticked, so we'll examine the
// state of the known graph to filter out any zombie channels
// for pruning.
Expand Down Expand Up @@ -934,16 +857,6 @@ func (b *Builder) updateGraphWithClosedChannels(
log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
blockHeight, len(chansClosed))

if len(chansClosed) == 0 {
return err
}

// Notify all currently registered clients of the newly closed channels.
closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
b.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})

return nil
}

Expand Down Expand Up @@ -1067,12 +980,6 @@ func (b *Builder) AddNode(node *models.LightningNode,
return err
}

select {
case b.topologyUpdates <- node:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

Expand Down Expand Up @@ -1117,12 +1024,6 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
return err
}

select {
case b.topologyUpdates <- edge:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

Expand Down Expand Up @@ -1224,12 +1125,6 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
return err
}

select {
case b.topologyUpdates <- update:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

Expand Down
Loading
Loading