Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
VAL-467: Mutli-Region Support (1 of 2) (#48)
Browse files Browse the repository at this point in the history
* Added Behavior RESET_REMAINING to reset any hits recorded in the cache for the specified rate limit

* Behavior is now a flag, this should be a backward compatible change for
  anyone using GLOBAL or NO_BATCHING but will break anyone using
  DURATION_IS_GREGORIAN. Use `HasBehavior()` function to check for behavior
  flags.

* WIP: Add support for multi dc hash rings

rebased and got tests working

* updated docker/compose config, added memberlist for clustering, partial multi-region syncing

* added memberlist metadata, got multi-region sending hits, cleaned up code

* RegionPicker is now configurable

* pulled derrick's commit, some refactoring

* more refactoring, improved documentation, split out region manager

* refactored memberlist, added health check errors, modified config to use etcd as default

fixed tests after rebase

* updated health check to cache all errors in the past 5 minutes, added functional test

* rebased

* revert cosistant -> consistent

* health check now includes region peers, some refactoring

* GubernatorListenAddress -> GubernatorPort for memberlist config

* exposed logging in memberlist

* added configuration of node name in memberlist

* removed debug log line

Co-authored-by: Derrick J. Wippler <[email protected]>
  • Loading branch information
mkbond and thrawn01 authored Jun 17, 2020
1 parent cd5588d commit 07e238f
Show file tree
Hide file tree
Showing 31 changed files with 1,610 additions and 477 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ ENTRYPOINT ["/gubernator"]

EXPOSE 80
EXPOSE 81
EXPOSE 7946
11 changes: 4 additions & 7 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) {
b.Errorf("SetDefaults err: %s", err)
}

client, err := guber.NewPeerClient(conf.Behaviors, cluster.GetPeer())
if err != nil {
b.Errorf("NewPeerClient err: %s", err)
}
client := guber.NewPeerClient(conf.Behaviors, cluster.GetRandomPeer())

b.Run("GetPeerRateLimitNoBatching", func(b *testing.B) {
for n := 0; n < b.N; n++ {
Expand All @@ -54,7 +51,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) {
}

func BenchmarkServer_GetRateLimit(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetPeer())
client, err := guber.DialV1Server(cluster.GetRandomPeer().Address)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand All @@ -80,7 +77,7 @@ func BenchmarkServer_GetRateLimit(b *testing.B) {
}

func BenchmarkServer_Ping(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetPeer())
client, err := guber.DialV1Server(cluster.GetRandomPeer().Address)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand Down Expand Up @@ -108,7 +105,7 @@ func BenchmarkServer_Ping(b *testing.B) {
}*/

func BenchmarkServer_ThunderingHeard(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetPeer())
client, err := guber.DialV1Server(cluster.GetRandomPeer().Address)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func FromUnixMilliseconds(ts int64) time.Time {
}

// Given a list of peers, return a random peer
func RandomPeer(peers []string) string {
func RandomPeer(peers []PeerInfo) PeerInfo {
return peers[rand.Intn(len(peers))]
}

Expand Down
45 changes: 31 additions & 14 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ type instance struct {
func (i *instance) Peers() []gubernator.PeerInfo {
var result []gubernator.PeerInfo
for _, peer := range peers {
info := gubernator.PeerInfo{Address: peer}
if peer == i.Address {
info.IsOwner = true
if peer.Address == i.Address {
peer.IsOwner = true
}
result = append(result, info)
result = append(result, peer)
}
return result
}
Expand All @@ -52,15 +51,27 @@ func (i *instance) Stop() error {
}

var instances []*instance
var peers []string
var peers []gubernator.PeerInfo

// Returns default testing configuration
func GetDefaultConfig() gubernator.Config {
return gubernator.Config{
Behaviors: gubernator.BehaviorConfig{
GlobalSyncWait: time.Millisecond * 50, // Suitable for testing but not production
GlobalTimeout: time.Second,
MultiRegionSyncWait: time.Millisecond * 50, // Suitable for testing but not production
MultiRegionTimeout: time.Second,
},
}
}

// Returns a random peer from the cluster
func GetPeer() string {
func GetRandomPeer() gubernator.PeerInfo {
return gubernator.RandomPeer(peers)
}

// Returns a specific peer
func PeerAt(idx int) string {
func PeerAt(idx int) gubernator.PeerInfo {
return peers[idx]
}

Expand All @@ -79,6 +90,16 @@ func InstanceForHost(host string) *instance {
return nil
}

// Stop an instance without updating peers, used to cause connection errors
func StopInstanceAt(idx int) {
instances[idx].Stop()
}

// Returns the number of instances
func NumOfInstances() int {
return len(instances)
}

// Start a local cluster of gubernator servers
func Start(numInstances int) error {
addresses := make([]string, numInstances, numInstances)
Expand All @@ -87,19 +108,15 @@ func Start(numInstances int) error {

// Start a local cluster with specific addresses
func StartWith(addresses []string) error {
config := GetDefaultConfig()
for _, address := range addresses {
ins, err := StartInstance(address, gubernator.Config{
Behaviors: gubernator.BehaviorConfig{
GlobalSyncWait: time.Millisecond * 50, // Suitable for testing but not production
GlobalTimeout: time.Second,
},
})
ins, err := StartInstance(address, config)
if err != nil {
return errors.Wrapf(err, "while starting instance for addr '%s'", address)
}

// Add the peers and instances to the package level variables
peers = append(peers, ins.Address)
peers = append(peers, gubernator.PeerInfo{Address: ins.Address})
instances = append(instances, ins)
}

Expand Down
29 changes: 13 additions & 16 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ func Test_instance_Peers(t *testing.T) {
tests := []struct {
name string
instance *instance
peers []string
peers []gubernator.PeerInfo
want []gubernator.PeerInfo
}{
{
name: "Happy path",
instance: &instance{Address: "mailgun.com"},
peers: []string{"mailgun.com"},
peers: []gubernator.PeerInfo{{Address: "mailgun.com"}},
want: []gubernator.PeerInfo{
{Address: "mailgun.com", IsOwner: true},
},
},
{
name: "Get multy peers",
instance: &instance{Address: "mailgun.com"},
peers: []string{"localhost:11111", "mailgun.com"},
peers: []gubernator.PeerInfo{{Address: "localhost:11111"}, {Address: "mailgun.com"}},
want: []gubernator.PeerInfo{
{Address: "localhost:11111"},
{Address: "mailgun.com", IsOwner: true},
Expand All @@ -50,7 +50,7 @@ func Test_instance_Peers(t *testing.T) {
{
name: "No Peers",
instance: &instance{Address: "www.mailgun.com:11111"},
peers: []string{},
peers: []gubernator.PeerInfo{},
want: []gubernator.PeerInfo(nil),
},
{
Expand All @@ -62,7 +62,7 @@ func Test_instance_Peers(t *testing.T) {
{
name: "Owner does not exist",
instance: &instance{Address: "mailgun.com"},
peers: []string{"localhost:11111"},
peers: []gubernator.PeerInfo{{Address: "localhost:11111"}},
want: []gubernator.PeerInfo{
{Address: "localhost:11111"},
},
Expand All @@ -82,36 +82,33 @@ func Test_instance_Peers(t *testing.T) {
func TestGetPeer(t *testing.T) {
tests := []struct {
name string
peers []string
peers []gubernator.PeerInfo
oneOf map[string]bool
}{
{
name: "Happy path",
peers: []string{"mailgun.com"},
peers: []gubernator.PeerInfo{{Address: "mailgun.com"}},
},
{
name: "Get one peer from multiple peers",
peers: []string{"mailgun.com", "localhost", "test.com"},
peers: []gubernator.PeerInfo{{Address: "mailgun.com"}, {Address: "localhost"}, {Address: "test.com"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
peers = tt.peers
got := GetPeer()
got := GetRandomPeer()

if !stringInSlice(got, peers) {
t.Errorf("expected one of: %v", tt.peers)
t.Errorf("got: %s", got)
}
assert.Contains(t, peers, got)
})
}
}

func TestPeerAt(t *testing.T) {
peers = []string{"mailgun.com"}
peers = []gubernator.PeerInfo{{Address: "mailgun.com"}}

got := PeerAt(0)
want := "mailgun.com"
want := gubernator.PeerInfo{Address: "mailgun.com"}

assert.Equal(t, want, got)
}
Expand Down Expand Up @@ -186,7 +183,7 @@ func TestStartMultipleInstancesWithAddresses(t *testing.T) {
err := StartWith(addresses)
assert.Nil(t, err)

wantPeers := []string{"127.0.0.1:11111", "127.0.0.1:22222"}
wantPeers := []gubernator.PeerInfo{{Address: "127.0.0.1:11111"}, {Address: "127.0.0.1:22222"}}
wantInstances := []*instance{
{Address: "127.0.0.1:11111"},
{Address: "127.0.0.1:22222"},
Expand Down
34 changes: 31 additions & 3 deletions cmd/gubernator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ServerConfig struct {
HTTPListenAddress string
EtcdKeyPrefix string
CacheSize int
DataCenter string

// Etcd configuration used to find peers
EtcdConf etcd.Config
Expand All @@ -56,6 +57,9 @@ type ServerConfig struct {
// K8s configuration used to find peers inside a K8s cluster
K8PoolConf gubernator.K8sPoolConfig

// Memberlist configuration used to find peers
MemberlistPoolConf gubernator.MemberlistPoolConfig

// The PeerPicker as selected by `GUBER_PEER_PICKER`
Picker gubernator.PeerPicker
}
Expand Down Expand Up @@ -95,6 +99,7 @@ func confFromEnv() (ServerConfig, error) {
setter.SetDefault(&conf.GRPCListenAddress, os.Getenv("GUBER_GRPC_ADDRESS"), "0.0.0.0:81")
setter.SetDefault(&conf.HTTPListenAddress, os.Getenv("GUBER_HTTP_ADDRESS"), "0.0.0.0:80")
setter.SetDefault(&conf.CacheSize, getEnvInteger("GUBER_CACHE_SIZE"), 50000)
setter.SetDefault(&conf.DataCenter, os.Getenv("GUBER_DATA_CENTER"), "")

// Behaviors
setter.SetDefault(&conf.Behaviors.BatchTimeout, getEnvDuration("GUBER_BATCH_TIMEOUT"))
Expand All @@ -105,6 +110,10 @@ func confFromEnv() (ServerConfig, error) {
setter.SetDefault(&conf.Behaviors.GlobalBatchLimit, getEnvInteger("GUBER_GLOBAL_BATCH_LIMIT"))
setter.SetDefault(&conf.Behaviors.GlobalSyncWait, getEnvDuration("GUBER_GLOBAL_SYNC_WAIT"))

setter.SetDefault(&conf.Behaviors.MultiRegionTimeout, getEnvDuration("GUBER_MULTI_REGION_TIMEOUT"))
setter.SetDefault(&conf.Behaviors.MultiRegionBatchLimit, getEnvInteger("GUBER_MULTI_REGION_BATCH_LIMIT"))
setter.SetDefault(&conf.Behaviors.MultiRegionSyncWait, getEnvDuration("GUBER_MULTI_REGION_SYNC_WAIT"))

// ETCD Config
setter.SetDefault(&conf.EtcdAdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), "127.0.0.1:81")
setter.SetDefault(&conf.EtcdKeyPrefix, os.Getenv("GUBER_ETCD_KEY_PREFIX"), "/gubernator-peers")
Expand All @@ -113,6 +122,11 @@ func confFromEnv() (ServerConfig, error) {
setter.SetDefault(&conf.EtcdConf.Username, os.Getenv("GUBER_ETCD_USER"))
setter.SetDefault(&conf.EtcdConf.Password, os.Getenv("GUBER_ETCD_PASSWORD"))

// Memberlist Config
setter.SetDefault(&conf.MemberlistPoolConf.AdvertiseAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), "")
setter.SetDefault(&conf.MemberlistPoolConf.AdvertisePort, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_PORT"), 7946)
setter.SetDefault(&conf.MemberlistPoolConf.KnownNodes, getEnvSlice("GUBER_MEMBERLIST_KNOWN_NODES"), []string{})

// Kubernetes Config
setter.SetDefault(&conf.K8PoolConf.Namespace, os.Getenv("GUBER_K8S_NAMESPACE"), "default")
conf.K8PoolConf.PodIP = os.Getenv("GUBER_K8S_POD_IP")
Expand Down Expand Up @@ -163,11 +177,25 @@ func confFromEnv() (ServerConfig, error) {
}
}

if anyHasPrefix("GUBER_MEMBERLIST_", os.Environ()) {
logrus.Debug("Memberlist pool config found")
conf.MemberlistPoolConf.Enabled = true
if conf.K8PoolConf.Enabled {
return conf, errors.New("refusing to register gubernator peers with both memberlist and k8s;" +
" remove either `GUBER_MEMBERLIST_*` or `GUBER_K8S_*` variables from the environment")
}

if len(conf.MemberlistPoolConf.KnownNodes) == 0 {
return conf, errors.New("when using memberlist for peer discovery, you MUST provide a " +
"hostname of a known host in the cluster via `GUBER_MEMBERLIST_KNOWN_NODES`")
}
}

if anyHasPrefix("GUBER_ETCD_", os.Environ()) {
logrus.Debug("ETCD peer pool config found")
if conf.K8PoolConf.Enabled {
return conf, errors.New("refusing to register gubernator peers with both etcd and k8s;" +
" remove either `GUBER_ETCD_*` or `GUBER_K8S_*` variables from the environment")
if conf.K8PoolConf.Enabled || conf.MemberlistPoolConf.Enabled {
return conf, errors.New("refusing to register gubernator peers with both etcd, memberlist and k8s;" +
" remove all but one of `GUBER_MEMBERLIST_*`, `GUBER_ETCD_*` or `GUBER_K8S_*` variables from the environment")
}
}

Expand Down
28 changes: 24 additions & 4 deletions cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"strings"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/mailgun/gubernator"
"github.com/mailgun/holster/v3/etcdutil"
"github.com/mailgun/holster/etcdutil"
"github.com/mailgun/holster/v3/syncutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -61,9 +63,10 @@ func main() {

// Registers a new gubernator instance with the GRPC server
guber, err := gubernator.New(gubernator.Config{
Picker: conf.Picker,
GRPCServer: grpcSrv,
Cache: cache,
LocalPicker: conf.Picker,
GRPCServer: grpcSrv,
Cache: cache,
DataCenter: conf.DataCenter,
})
checkErr(err, "while creating new gubernator instance")

Expand All @@ -86,6 +89,23 @@ func main() {
conf.K8PoolConf.OnUpdate = guber.SetPeers
pool, err = gubernator.NewK8sPool(conf.K8PoolConf)
checkErr(err, "while querying kubernetes API")

} else if conf.MemberlistPoolConf.Enabled {
gubernatorPort, err := strconv.Atoi(strings.Split(conf.GRPCListenAddress, ":")[1])
checkErr(err, "while converting gubernator port to int")

// Register peer on memberlist
pool, err = gubernator.NewMemberlistPool(gubernator.MemberlistPoolConfig{
AdvertiseAddress: conf.MemberlistPoolConf.AdvertiseAddress,
AdvertisePort: conf.MemberlistPoolConf.AdvertisePort,
KnownNodes: conf.MemberlistPoolConf.KnownNodes,
LoggerOutput: logrus.WithField("category", "memberlist").Writer(),
DataCenter: conf.DataCenter,
GubernatorPort: gubernatorPort,
OnUpdate: guber.SetPeers,
})
checkErr(err, "while creating memberlist")

} else {
// Register ourselves with other peers via ETCD
etcdClient, err := etcdutil.NewClient(&conf.EtcdConf)
Expand Down
Loading

0 comments on commit 07e238f

Please sign in to comment.