This repository has been archived by the owner on Feb 14, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
serverset.go
152 lines (125 loc) · 4.16 KB
/
serverset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package serversets
import (
"fmt"
"path"
"strings"
"time"
"github.com/samuel/go-zookeeper/zk"
)
var (
// BaseDirectory is the Zookeeper namespace that all nodes made by this package will live.
// This path must begin with '/'
BaseDirectory = "/discovery"
// MemberPrefix is prefix for the Zookeeper sequential ephemeral nodes.
// member_ is used by Finagle server sets.
MemberPrefix = "member_"
)
// BaseZnodePath allows for a custom Zookeeper directory structure.
// This function should return the path where you want the service's members to live.
// Default is `BaseDirectory + "/" + environment + "/" + service` where the default base directory is `/discovery`
var BaseZnodePath = func(environment Environment, service string) string {
return BaseDirectory + "/" + string(environment) + "/" + service
}
// An Environment is the test/staging/production state of the service.
type Environment string
// Typically used environments
const (
Local Environment = "local"
Production Environment = "prod"
Test Environment = "test"
Staging Environment = "staging"
)
// DefaultZKTimeout is the zookeeper timeout used if it is not overwritten.
var DefaultZKTimeout = 5 * time.Second
// A ServerSet represents a service with a set of servers that may change over time.
// The master lists of servers is kept as ephemeral nodes in Zookeeper.
type ServerSet struct {
ZKTimeout time.Duration
environment Environment
service string
zkServers []string
}
// New creates a new ServerSet object that can then be watched
// or have an endpoint added to. The service name must not contain
// any slashes. Will panic if it does.
func New(environment Environment, service string, zookeepers []string) *ServerSet {
if strings.Contains(service, "/") {
panic(fmt.Errorf("service name (%s) must not contain slashes", service))
}
ss := &ServerSet{
ZKTimeout: DefaultZKTimeout,
environment: environment,
service: service,
zkServers: zookeepers,
}
return ss
}
// ZookeeperServers returns the Zookeeper servers this set is using.
// Useful to check if everything is configured correctly.
func (ss *ServerSet) ZookeeperServers() []string {
return ss.zkServers
}
func (ss *ServerSet) connectToZookeeper() (*zk.Conn, <-chan zk.Event, error) {
return zk.Connect(ss.zkServers, ss.ZKTimeout)
}
// directoryPath returns the base path of where all the ephemeral nodes will live.
func (ss *ServerSet) directoryPath() string {
return BaseZnodePath(ss.environment, ss.service)
}
func splitPaths(fullPath string) []string {
var parts []string
var last string
for fullPath != "/" {
fullPath, last = path.Split(path.Clean(fullPath))
parts = append(parts, last)
}
// parts are in reverse order, put back together
// into set of subdirectory paths
result := make([]string, 0, len(parts))
base := ""
for i := len(parts) - 1; i >= 0; i-- {
base += "/" + parts[i]
result = append(result, base)
}
return result
}
// createFullPath makes sure all the znodes are created for the parent directories
func (ss *ServerSet) createFullPath(connection *zk.Conn) error {
paths := splitPaths(ss.directoryPath())
// TODO: can't we just create all? ie. mkdir -p
for _, key := range paths {
_, err := connection.Create(key, nil, 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return err
}
}
return nil
}
// structure of the data in each member znode
// Mimics finagle serverset structure.
type entity struct {
ServiceEndpoint endpoint `json:"serviceEndpoint"`
AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"` // unused
Status string `json:"status"`
}
type endpoint struct {
Host string `json:"host"`
Port int `json:"port"`
}
func newEntity(host string, port int) *entity {
return &entity{
ServiceEndpoint: endpoint{host, port},
AdditionalEndpoints: make(map[string]endpoint),
Status: statusAlive,
}
}
// possible endpoint statuses. Currently only concerned with ALIVE.
const (
statusDead = "DEAD"
statusStarting = "STARTING"
statusAlive = "ALIVE"
statusStopping = "STOPPING"
statusStopped = "STOPPED"
statusWarning = "WARNING"
statusUnknown = "UNKNOWN"
)