Skip to content

Commit

Permalink
Added metric for raft leaders (#525)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Aug 16, 2022
1 parent 3b74921 commit 92b882a
Showing 1 changed file with 35 additions and 9 deletions.
44 changes: 35 additions & 9 deletions cluster/dragon/proc_mgr.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
package dragon

import (
"fmt"
"github.com/lni/dragonboat/v3/raftio"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
"github.com/squareup/pranadb/cluster"
"github.com/squareup/pranadb/errors"
"github.com/squareup/pranadb/interruptor"
"github.com/squareup/pranadb/protos/squareup/cash/pranadb/v1/clustermsgs"
"github.com/squareup/pranadb/remoting"
"sync"
"sync/atomic"
"time"
)

var numLeadersVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "number_of_raft_leaders",
Help: "counter for number of raft leaders on the node, segmented by node_id",
}, []string{"node_id"})

func newProcManager(d *Dragon, serverAddresses []string) *procManager {
return &procManager{
nodeID: uint64(d.cnf.NodeID),
dragon: d,
serverAddresses: serverAddresses,
numLeadersGuage: numLeadersVec.WithLabelValues(fmt.Sprintf("node-%04d", d.cnf.NodeID)),
}
}

Expand All @@ -32,6 +42,8 @@ type procManager struct {
serverAddresses []string
leaders sync.Map
fillInterruptor *interruptor.Interruptor
leadersCount int64
numLeadersGuage prometheus.Gauge
}

// returns leaders for all *data* shards
Expand Down Expand Up @@ -91,17 +103,13 @@ func (p *procManager) getLeaderNode(shardID uint64) (uint64, bool) {
if !ok {
return 0, false
}
leader, ok := l.(uint64)
leader := l.(uint64) //nolint:forcetypeassert
if !ok {
panic("not a uint64")
}
return leader, true
}

func (p *procManager) setLeaderNode(shardID uint64, nodeID uint64) {
p.leaders.Store(shardID, nodeID)
}

func (p *procManager) LeaderUpdated(info raftio.LeaderInfo) {
p.lock.Lock()
defer p.lock.Unlock()
Expand All @@ -124,21 +132,34 @@ func (p *procManager) LeaderUpdated(info raftio.LeaderInfo) {
// When a later node joins, it won't receive info for membership changes before it joined - but it will
// know if it becomes the leader for a node, or some other node takes over leadership

if info.LeaderID == p.nodeID+1 && info.ClusterID >= cluster.DataShardIDBase {
newLeaderNodeID := info.LeaderID - 1 // Dragon node ids start at 1, our node ids start at zero
if newLeaderNodeID == p.nodeID && info.ClusterID >= cluster.DataShardIDBase {
// We've become leader for a data shard
p.setLeaderChannel <- info
}
if info.LeaderID > 0 {
p.setLeaderNode(info.ClusterID, info.LeaderID-1)
if newLeaderNodeID == p.nodeID {
prev, ok := p.getLeaderNode(info.ClusterID)
if !ok || prev != p.nodeID {
p.addNumberLeadersCount(1)
}
}
p.leaders.Store(info.ClusterID, newLeaderNodeID)
} else {
// 0 represents no leader
p.leaders.Delete(info.ClusterID)
pr, ok := p.leaders.LoadAndDelete(info.ClusterID)
if ok {
prev := pr.(uint64) //nolint:forcetypeassert
if prev == p.nodeID {
p.addNumberLeadersCount(-1)
}
}
}
}

func (p *procManager) handleLeaderInfosMessage(msg *clustermsgs.LeaderInfosMessage) {
for _, leaderInfo := range msg.LeaderInfos {
p.setLeaderNode(uint64(leaderInfo.ShardId), uint64(leaderInfo.NodeId))
p.leaders.Store(uint64(leaderInfo.ShardId), uint64(leaderInfo.NodeId))
}
}

Expand Down Expand Up @@ -215,3 +236,8 @@ func (p *procManager) broadcastInfo() {
defer p.lock.Unlock()
p.scheduleBroadcast()
}

func (p *procManager) addNumberLeadersCount(delta int64) {
count := atomic.AddInt64(&p.leadersCount, delta)
p.numLeadersGuage.Set(float64(count))
}

0 comments on commit 92b882a

Please sign in to comment.