diff --git a/cluster/dragon/proc_mgr.go b/cluster/dragon/proc_mgr.go index 98ae3899..3bc8a569 100644 --- a/cluster/dragon/proc_mgr.go +++ b/cluster/dragon/proc_mgr.go @@ -1,7 +1,10 @@ package dragon import ( + "fmt" "github.com/lni/dragonboat/v3/raftio" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" "github.com/squareup/pranadb/cluster" "github.com/squareup/pranadb/errors" @@ -9,14 +12,21 @@ import ( "github.com/squareup/pranadb/protos/squareup/cash/pranadb/v1/clustermsgs" "github.com/squareup/pranadb/remoting" "sync" + "sync/atomic" "time" ) +var numLeadersVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "number_of_raft_leaders", + Help: "counter for number of raft leaders on the node, segmented by node_id", +}, []string{"node_id"}) + func newProcManager(d *Dragon, serverAddresses []string) *procManager { return &procManager{ nodeID: uint64(d.cnf.NodeID), dragon: d, serverAddresses: serverAddresses, + numLeadersGuage: numLeadersVec.WithLabelValues(fmt.Sprintf("node-%04d", d.cnf.NodeID)), } } @@ -32,6 +42,8 @@ type procManager struct { serverAddresses []string leaders sync.Map fillInterruptor *interruptor.Interruptor + leadersCount int64 + numLeadersGuage prometheus.Gauge } // returns leaders for all *data* shards @@ -91,17 +103,13 @@ func (p *procManager) getLeaderNode(shardID uint64) (uint64, bool) { if !ok { return 0, false } - leader, ok := l.(uint64) + leader := l.(uint64) //nolint:forcetypeassert if !ok { panic("not a uint64") } return leader, true } -func (p *procManager) setLeaderNode(shardID uint64, nodeID uint64) { - p.leaders.Store(shardID, nodeID) -} - func (p *procManager) LeaderUpdated(info raftio.LeaderInfo) { p.lock.Lock() defer p.lock.Unlock() @@ -124,21 +132,34 @@ func (p *procManager) LeaderUpdated(info raftio.LeaderInfo) { // When a later node joins, it won't receive info for membership changes before it joined - but it will // know if it becomes the leader for a node, or some other node takes over leadership - if info.LeaderID == p.nodeID+1 && info.ClusterID >= cluster.DataShardIDBase { + newLeaderNodeID := info.LeaderID - 1 // Dragon node ids start at 1, our node ids start at zero + if newLeaderNodeID == p.nodeID && info.ClusterID >= cluster.DataShardIDBase { // We've become leader for a data shard p.setLeaderChannel <- info } if info.LeaderID > 0 { - p.setLeaderNode(info.ClusterID, info.LeaderID-1) + if newLeaderNodeID == p.nodeID { + prev, ok := p.getLeaderNode(info.ClusterID) + if !ok || prev != p.nodeID { + p.addNumberLeadersCount(1) + } + } + p.leaders.Store(info.ClusterID, newLeaderNodeID) } else { // 0 represents no leader - p.leaders.Delete(info.ClusterID) + pr, ok := p.leaders.LoadAndDelete(info.ClusterID) + if ok { + prev := pr.(uint64) //nolint:forcetypeassert + if prev == p.nodeID { + p.addNumberLeadersCount(-1) + } + } } } func (p *procManager) handleLeaderInfosMessage(msg *clustermsgs.LeaderInfosMessage) { for _, leaderInfo := range msg.LeaderInfos { - p.setLeaderNode(uint64(leaderInfo.ShardId), uint64(leaderInfo.NodeId)) + p.leaders.Store(uint64(leaderInfo.ShardId), uint64(leaderInfo.NodeId)) } } @@ -215,3 +236,8 @@ func (p *procManager) broadcastInfo() { defer p.lock.Unlock() p.scheduleBroadcast() } + +func (p *procManager) addNumberLeadersCount(delta int64) { + count := atomic.AddInt64(&p.leadersCount, delta) + p.numLeadersGuage.Set(float64(count)) +}