-
Notifications
You must be signed in to change notification settings - Fork 2.2k
graph/db: add zombie channels cleanup routine #10015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package graphdb | |
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/btcsuite/btcd/btcutil" | ||
"github.com/lightningnetwork/lnd/graph/db/models" | ||
|
@@ -66,7 +67,18 @@ type GraphCache struct { | |
nodeChannels map[route.Vertex]map[uint64]*DirectedChannel | ||
nodeFeatures map[route.Vertex]*lnwire.FeatureVector | ||
|
||
mtx sync.RWMutex | ||
// zombieIndex tracks channels that may be leaked during the removal | ||
// process. Since the remover could not have the node ID, these channels | ||
// are stored here and will be removed later in a separate loop. | ||
zombieIndex map[uint64]struct{} | ||
|
||
// zombieCleanerInterval is the interval at which the zombie cleaner | ||
// runs to clean up channels that are still missing their nodes. | ||
zombieCleanerInterval time.Duration | ||
|
||
mtx sync.RWMutex | ||
quit chan struct{} | ||
ellemouton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
wg sync.WaitGroup | ||
} | ||
|
||
// NewGraphCache creates a new graphCache. | ||
|
@@ -83,6 +95,9 @@ func NewGraphCache(preAllocNumNodes int) *GraphCache { | |
map[route.Vertex]*lnwire.FeatureVector, | ||
preAllocNumNodes, | ||
), | ||
zombieIndex: make(map[uint64]struct{}), | ||
zombieCleanerInterval: time.Hour, | ||
quit: make(chan struct{}), | ||
} | ||
} | ||
|
||
|
@@ -100,6 +115,81 @@ func (c *GraphCache) Stats() string { | |
numChannels) | ||
} | ||
|
||
// Start launches the background goroutine that periodically cleans up zombie | ||
// channels. | ||
func (c *GraphCache) Start() { | ||
c.wg.Add(1) | ||
go c.zombieCleaner() | ||
} | ||
|
||
// Stop signals the background cleaner to shut down and waits for it to exit. | ||
func (c *GraphCache) Stop() { | ||
close(c.quit) | ||
c.wg.Wait() | ||
} | ||
|
||
// zombieCleaner periodically iterates over the zombie index and removes | ||
// channels that are still missing their nodes. | ||
// | ||
// NOTE: must be run as a goroutine. | ||
func (c *GraphCache) zombieCleaner() { | ||
GustavoStingelin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer c.wg.Done() | ||
|
||
ticker := time.NewTicker(c.zombieCleanerInterval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ticker.C: | ||
c.cleanupZombies() | ||
case <-c.quit: | ||
return | ||
} | ||
} | ||
} | ||
|
||
// cleanupZombies attempts to prune channels tracked in the zombie index. If the | ||
// nodes for a channel still cannot be resolved, the channel is deleted from the | ||
// cache. | ||
func (c *GraphCache) cleanupZombies() { | ||
c.mtx.Lock() | ||
defer c.mtx.Unlock() | ||
|
||
if len(c.zombieIndex) == 0 { | ||
log.Debug("no zombie channels to clean up") | ||
return | ||
GustavoStingelin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// Go through all nodes and their channels once to check if any are | ||
// marked as zombies. This is faster than checking every node for each | ||
// zombie channel, since there are usually many more nodes than zombie | ||
// channels. | ||
GustavoStingelin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for node, chans := range c.nodeChannels { | ||
for cid, ch := range chans { | ||
// if the channel isn't a zombie, we can skip it. | ||
if _, ok := c.zombieIndex[cid]; !ok { | ||
continue | ||
} | ||
|
||
// delete peer's side channel if it exists. | ||
c.removeChannelIfFound(ch.OtherNode, cid) | ||
|
||
// delete the channel from our side. | ||
ellemouton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
delete(chans, cid) | ||
} | ||
|
||
// If all channels were deleted for this node, clean up the map | ||
// entry entirely. | ||
if len(chans) == 0 { | ||
delete(c.nodeChannels, node) | ||
} | ||
} | ||
|
||
// Now that we have removed all channels that were zombies, we can | ||
// clear the zombie index. | ||
c.zombieIndex = make(map[uint64]struct{}) | ||
} | ||
|
||
// AddNodeFeatures adds a graph node and its features to the cache. | ||
func (c *GraphCache) AddNodeFeatures(node route.Vertex, | ||
features *lnwire.FeatureVector) { | ||
|
@@ -251,6 +341,15 @@ func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) { | |
// Remove that one channel from both sides. | ||
c.removeChannelIfFound(node1, chanID) | ||
c.removeChannelIfFound(node2, chanID) | ||
|
||
zeroVertex := route.Vertex{} | ||
if node1 == zeroVertex || node2 == zeroVertex { | ||
// If one of the nodes is the zero vertex, it means that we will | ||
// leak the channel in the memory cache, since we don't have the | ||
// node ID to remove, so we add it to the zombie index to post | ||
// removal. | ||
c.zombieIndex[chanID] = struct{}{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we know it's a zombie why can't we just remove it here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. related to
the reason is that we only have the channel ID, not the node ID. To find the node ID, we need to traverse the nodeChannels map, which is more expansive. If there is a way to access the node ID directly, we might be able to avoid this extra There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How much more expensive? I think it's about tradeoffs here - like how often we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cost is loop over all nodes and retrieve their channels. It's about 16k nodes and requires a read-write lock on the map for 50-150ms, depending on the node's infrastructure (guessing based on benchmark with 50k nodes). Without this PR, zombie channels could be returned during pathfinding, leading to incorrect route attempts. This PR introduces a low cost validation during node-channel memory retrieval (used in graph traversal) that filters out zombie channels from the returned slice. As a result, pathfinding is no longer affected. The |
||
} | ||
} | ||
|
||
// removeChannelIfFound removes a single channel from one side. | ||
|
@@ -290,7 +389,13 @@ func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel { | |
|
||
i := 0 | ||
channelsCopy := make([]*DirectedChannel, len(channels)) | ||
for _, channel := range channels { | ||
for cid, channel := range channels { | ||
if _, ok := c.zombieIndex[cid]; ok { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason not to delete the channel at this point? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's bc at this point we’re holding the mutex in read mode, so multiple reads can happen at once. But if we want to delete the channel, we’ll need to switch to a write lock, which might slow things down a bit. Another idea is to send the deletion request to a channel and have a separate goroutine handle it when it can grab the write lock. have you thought of any other ways we could handle this? |
||
// If this channel is a zombie, we don't want to return | ||
// it, so we skip it. | ||
continue | ||
} | ||
|
||
// We need to copy the channel and policy to avoid it being | ||
// updated in the cache if the path finding algorithm sets | ||
// fields on it (currently only the ToNodeFeatures of the | ||
|
@@ -305,6 +410,9 @@ func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel { | |
i++ | ||
} | ||
|
||
// Copy the slice to clean up the unused pre allocated tail entries. | ||
copy(channelsCopy, channelsCopy[:i]) | ||
|
||
return channelsCopy | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.