From bee73bcd6968835aa146df3b4868d1f1bbba17ee Mon Sep 17 00:00:00 2001 From: Jille Timmermans Date: Tue, 8 Sep 2020 20:55:17 +0100 Subject: [PATCH] Make Raft.LeaderCh() return a new channel for each invocation .. and immediately send the current leadership state over the channel. This way it can be used by multiple pieces of code with disrupting another. Sending the value immediately avoids strange race conditions when RaftState gets updated at a slightly different moment. fixes #426 --- api.go | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/api.go b/api.go index 1348453f5..3f878bd8e 100644 --- a/api.go +++ b/api.go @@ -111,6 +111,12 @@ type Raft struct { // leaderCh is used to notify of leadership changes leaderCh chan bool + leaderChs []chan bool + leaderChsLock sync.Mutex + // leaderChLastMessage is the last message sent over leaderCh that has been processed. + // It is sent to new channels created by LeaderCh(). + leaderChLastMessage bool + // leaderState used only while state is leader leaderState leaderState @@ -957,14 +963,36 @@ func (r *Raft) State() RaftState { // lose it. // // Receivers can expect to receive a notification only if leadership -// transition has occured. +// transition has occured and immediately after LeaderCh() returns with the +// current state. // // If receivers aren't ready for the signal, signals may drop and only the // latest leadership transition. For example, if a receiver receives subsequent // `true` values, they may deduce that leadership was lost and regained while -// the the receiver was processing first leadership transition. +// the receiver was processing first leadership transition. func (r *Raft) LeaderCh() <-chan bool { - return r.leaderCh + ch := make(chan bool, 1) + r.leaderChsLock.Lock() + if len(r.leaderChs) == 0 { + select { + case v := <-r.leaderCh: + r.leaderChLastMessage = v + default: + } + go func() { + for v := range r.leaderCh { + r.leaderChsLock.Lock() + for _, c := range r.leaderChs { + overrideNotifyBool(c, v) + } + r.leaderChsLock.Unlock() + } + }() + } + r.leaderChs = append(r.leaderChs, ch) + ch <- r.leaderChLastMessage + r.leaderChsLock.Unlock() + return ch } // String returns a string representation of this Raft node.