-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathloadbalancer.go
128 lines (110 loc) · 3.48 KB
/
loadbalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
"errors"
log "github.com/sirupsen/logrus"
"sync/atomic"
"time"
)
var noBackendAvailable error = errors.New("No backend is available")
var failedAllBackends error = errors.New("Failed on all the backends")
// LoadBalancer
type LoadBalancer interface {
// add a backend
AddBackend(backendInfo *BackendInfo)
// remove previous added backend
RemoveBackend(addr string) error
// send a message to thrift server
Send(msg *Message, requestTimeoutTime time.Time, callback ResponseCallback)
// get the backends
GetAllBackends() []Backend
}
// Roundrobin this class implements LoadBalancer interface
type Roundrobin struct {
resolver *Resolver
backends *BackendMgr
nextBackend uint32
}
// NewRoundrobin create a Roundrobin object
func NewRoundrobin() *Roundrobin {
return &Roundrobin{resolver: NewResolver(10),
backends: NewBackendMgr(),
nextBackend: 0}
}
// AddBackend add a thrift backend server
func (r *Roundrobin) AddBackend(backendInfo *BackendInfo) {
hostname, _, err := splitAddr(backendInfo.Addr)
if err != nil {
log.WithFields(log.Fields{"address": backendInfo.Addr}).Error("Backend address is invalid")
return
}
log.WithFields(log.Fields{"address": backendInfo.Addr}).Info("Add backend")
if !isIPAddress(hostname) {
r.resolver.ResolveHost(backendInfo.Addr, func(hostname string, newAddrs []string, removedAddrs []string) {
r.resolvedAddrs(hostname, newAddrs, removedAddrs, backendInfo.Readiness)
})
} else if !r.backends.Exists(backendInfo.Addr) {
backend := NewBackend(backendInfo)
r.backends.Add(backend)
}
}
func (r *Roundrobin) resolvedAddrs(hostname string, newAddrs []string, removedAddrs []string, readinessConf *ReadinessConf) {
for _, addr := range newAddrs {
r.AddBackend(&BackendInfo{Addr: addr, Readiness: readinessConf})
}
for _, addr := range removedAddrs {
r.RemoveBackend(addr)
}
}
// RemoveBackend remove a previous added thrift backend server
func (r *Roundrobin) RemoveBackend(addr string) error {
hostname, _, err := splitAddr(addr)
if err != nil {
return err
}
if !isIPAddress(hostname) {
ips := r.resolver.GetAddrsOfHost(addr)
r.resolver.StopResolve(addr)
for _, a := range ips {
r.RemoveBackend(a)
}
return nil
} else {
backend, err := r.backends.Remove(addr)
if err == nil {
backend.Stop()
}
return err
}
}
func (r *Roundrobin) GetAllBackends() []Backend {
return r.backends.GetAll()
}
// Send send a request to one of thrift backend server
func (r *Roundrobin) Send(request *Message, requestTimeoutTime time.Time, callback ResponseCallback) {
n := uint32(r.backends.Size())
if n <= 0 {
callback(nil, noBackendAvailable)
} else {
index := atomic.AddUint32(&r.nextBackend, uint32(1)) % n
r.sendTo(request, requestTimeoutTime, index, n, n, callback)
}
}
func (r *Roundrobin) sendTo(request *Message, requestTimeoutTime time.Time, index uint32, leftTimes uint32, total uint32, callback ResponseCallback) {
if leftTimes <= 0 {
callback(nil, failedAllBackends)
} else {
backend, err := r.backends.GetIndex(int(index))
if err == nil {
backend.Send(request, requestTimeoutTime, func(response *Message, err error) {
if err == nil {
callback(response, err)
} else {
log.WithFields(log.Fields{"backend": backend.GetAddr(), "error": err}).Error("Fail to send request")
r.sendTo(request, requestTimeoutTime, (index+1)%total, leftTimes-1, total, callback)
}
})
} else {
r.sendTo(request, requestTimeoutTime, (index+1)%total, leftTimes-1, total, callback)
}
}
}