-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstate_follower.go
86 lines (76 loc) · 2.16 KB
/
state_follower.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Copyright (c) 2019 Meng Huang ([email protected])
// This package is licensed under a MIT license that can be found in the LICENSE file.
package raft
import (
"runtime"
"sync/atomic"
"time"
)
type followerState struct {
node *node
applying int32
}
func newFollowerState(n *node) state {
//logger.Tracef("%s newFollowerState",node.address)
s := &followerState{
node: n,
}
s.node.votedFor.Store("")
s.node.stateMachine.snapshotReadWriter.clearTar()
s.Start()
return s
}
func (s *followerState) Start() {
s.node.ready = false
s.node.leader.Store("")
s.node.election.Random(true)
s.node.election.Reset()
s.node.logger.Tracef("%s followerState.Start Term :%d", s.node.address, s.node.currentTerm.Load())
}
func (s *followerState) Update() bool {
if s.node.commitIndex.ID() > 0 && s.node.commitIndex.ID() > s.node.stateMachine.lastApplied {
if atomic.CompareAndSwapInt32(&s.applying, 0, 1) {
defer atomic.StoreInt32(&s.applying, 0)
var ch = make(chan bool, 1)
go func(ch chan bool) {
//var lastApplied=state.node.stateMachine.lastApplied
s.node.log.applyCommited()
//s.node.logger.Tracef("followerState.Update %s lastApplied %d==>%d",state.node.address, lastApplied,state.node.stateMachine.lastApplied)
ch <- true
}(ch)
timer := time.NewTimer(defaultCommandTimeout)
runtime.Gosched()
select {
case <-ch:
timer.Stop()
case <-timer.C:
//s.node.logger.Tracef("%s followerState.Update applyCommited time out", s.node.address)
}
}
return true
}
return false
}
func (s *followerState) FixedUpdate() {
if s.node.election.Timeout() && s.node.lastLogIndex >= s.node.commitIndex.ID() {
s.node.leader.Store("")
s.node.votedFor.Store("")
s.node.logger.Tracef("%s followerState.FixedUpdate ElectionTimeout", s.node.address)
s.node.nextState()
}
}
func (s *followerState) String() string {
return follower
}
func (s *followerState) StepDown() state {
s.node.logger.Tracef("%s followerState.StepDown", s.node.address)
s.Start()
return s
}
func (s *followerState) NextState() state {
if !s.node.voting() {
return s
}
s.node.logger.Tracef("%s followerState.NextState", s.node.address)
return newCandidateState(s.node)
}