From 49590a8198575b16de633ca945362011ef5e8887 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Fri, 23 Oct 2020 14:33:53 -0500 Subject: [PATCH] Bug fixes and code improvements --- CHANGELOG | 9 +++- Dockerfile | 7 +-- cmd/gubernator/main.go | 49 +++++++++++++------ config.go | 20 +++++++- docker-compose-etcd.yaml | 101 +++++++++++++++++++++++++++++++++++++++ docker-compose.yaml | 43 ++++++++++------- etcd.go | 62 ++++++++++++------------ example.conf | 29 ++++++----- functional_test.go | 6 +-- gubernator.go | 10 ++-- hash.go | 22 +++++---- hash_test.go | 10 ++-- kubernetes.go | 29 ++++++----- memberlist.go | 14 +++++- net.go | 70 +++++++++++++++++++++++++++ region_picker.go | 20 ++++---- replicated_hash.go | 25 +++++----- replicated_hash_test.go | 8 ++-- version | 2 +- 19 files changed, 397 insertions(+), 139 deletions(-) create mode 100644 docker-compose-etcd.yaml create mode 100644 net.go diff --git a/CHANGELOG b/CHANGELOG index 3b7860d4..3620e8ed 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -11,8 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Renamed functions to ensure clarity of version * Removed deprecated `EtcdAdvertiseAddress` config option * Refactored configuration options -* 'member-list' metadata no longer assumes the member-list address is the same as - the gubernator advertise address. +* 'member-list' metadata no longer assumes the member-list address is the same + as the gubernator advertise address. +* Now MD5 sums the peer address key when using replicated hash. This ensures + better key distribution when using domain names or ip address that are very + similar. (gubernator-1, gubernator-2, etc...) +* Now defaults to `replicated-hash` if `GUBER_PEER_PICKER` is unset +* Added support for DataCenter fields when using etcd discovery ## [0.9.2] - 2020-10-23 ### Change diff --git a/Dockerfile b/Dockerfile index 37b5c240..d430f64b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,11 +10,12 @@ RUN go mod download # Copy the local package files to the container ADD . /go/src -ENV VERSION=dev-build -# Build the bot inside the container +ARG VERSION + +# Build the server inside the container RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -installsuffix cgo \ - -ldflags "-w -s -X main.Version=${VERSION}" -o /gubernator /go/src/cmd/gubernator/main.go /go/src/cmd/gubernator/config.go + -ldflags "-w -s -X main.Version=$VERSION" -o /gubernator /go/src/cmd/gubernator/main.go # Create our deploy image FROM scratch diff --git a/cmd/gubernator/main.go b/cmd/gubernator/main.go index fbe729c4..2ffb740d 100644 --- a/cmd/gubernator/main.go +++ b/cmd/gubernator/main.go @@ -26,6 +26,7 @@ import ( "net" "os" "os/signal" + "runtime" "strconv" "strings" "time" @@ -48,6 +49,7 @@ var Version = "dev-build" func main() { var configFile string + logrus.Infof("Gubernator %s (%s/%s)", Version, runtime.GOARCH, runtime.GOOS) flags := flag.NewFlagSet("gubernator", flag.ContinueOnError) flags.StringVar(&configFile, "config", "", "yaml config file") flags.BoolVar(&gubernator.DebugEnabled, "debug", false, "enable debug") @@ -85,10 +87,10 @@ func confFromFile(configFile string) (gubernator.DaemonConfig, error) { klog.InitFlags(nil) flag.Set("logtostderr", "true") - if gubernator.DebugEnabled || os.Getenv("GUBER_DEBUG") != "" { + setter.SetDefault(&gubernator.DebugEnabled, getEnvBool("GUBER_DEBUG")) + if gubernator.DebugEnabled { logrus.SetLevel(logrus.DebugLevel) logrus.Debug("Debug enabled") - gubernator.DebugEnabled = true } if configFile != "" { @@ -103,8 +105,19 @@ func confFromFile(configFile string) (gubernator.DaemonConfig, error) { setter.SetDefault(&conf.HTTPListenAddress, os.Getenv("GUBER_HTTP_ADDRESS"), "localhost:80") setter.SetDefault(&conf.CacheSize, getEnvInteger("GUBER_CACHE_SIZE"), 50_000) setter.SetDefault(&conf.DataCenter, os.Getenv("GUBER_DATA_CENTER"), "") + setter.SetDefault(&conf.AdvertiseAddress, os.Getenv("GUBER_ADVERTISE_ADDRESS"), conf.GRPCListenAddress) + advAddr, advPort, err := net.SplitHostPort(conf.AdvertiseAddress) + if err != nil { + return conf, errors.Wrap(err, "GUBER_ADVERTISE_ADDRESS is invalid; expected format is `address:port`") + } + advAddr, err = gubernator.ResolveHostIP(advAddr) + if err != nil { + return conf, errors.Wrap(err, "failed to discover host ip for GUBER_ADVERTISE_ADDRESS") + } + conf.AdvertiseAddress = net.JoinHostPort(advAddr, advPort) + // Behaviors setter.SetDefault(&conf.Behaviors.BatchTimeout, getEnvDuration("GUBER_BATCH_TIMEOUT")) setter.SetDefault(&conf.Behaviors.BatchLimit, getEnvInteger("GUBER_BATCH_LIMIT")) @@ -119,7 +132,7 @@ func confFromFile(configFile string) (gubernator.DaemonConfig, error) { setter.SetDefault(&conf.Behaviors.MultiRegionSyncWait, getEnvDuration("GUBER_MULTI_REGION_SYNC_WAIT")) choices := []string{"member-list", "k8s", "etcd"} - setter.SetDefault(&conf.PeerDiscoveryType, getEnvDuration("GUBER_PEER_DISCOVERY_TYPE"), "member-list") + setter.SetDefault(&conf.PeerDiscoveryType, os.Getenv("GUBER_PEER_DISCOVERY_TYPE"), "member-list") if !slice.ContainsString(conf.PeerDiscoveryType, choices, nil) { return conf, fmt.Errorf("GUBER_PEER_DISCOVERY_TYPE is invalid; choices are [%s]`", strings.Join(choices, ",")) } @@ -132,15 +145,10 @@ func confFromFile(configFile string) (gubernator.DaemonConfig, error) { setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Username, os.Getenv("GUBER_ETCD_USER")) setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Password, os.Getenv("GUBER_ETCD_PASSWORD")) setter.SetDefault(&conf.EtcdPoolConf.AdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) - - // Memberlist Config - addr, _, err := net.SplitHostPort(conf.GRPCListenAddress) - if err != nil { - return conf, errors.Wrap(err, "GUBER_GRPC_ADDRESS is invalid; expected format is `address:port`") - } + setter.SetDefault(&conf.EtcdPoolConf.DataCenter, os.Getenv("GUBER_ETCD_DATA_CENTER"), conf.DataCenter) setter.SetDefault(&conf.MemberListPoolConf.AdvertiseAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) - setter.SetDefault(&conf.MemberListPoolConf.MemberListAddress, os.Getenv("GUBER_MEMBERLIST_ADDRESS"), fmt.Sprintf("%s:7946", addr)) + setter.SetDefault(&conf.MemberListPoolConf.MemberListAddress, os.Getenv("GUBER_MEMBERLIST_ADDRESS"), fmt.Sprintf("%s:7946", advAddr)) setter.SetDefault(&conf.MemberListPoolConf.KnownNodes, getEnvSlice("GUBER_MEMBERLIST_KNOWN_NODES"), []string{}) setter.SetDefault(&conf.MemberListPoolConf.DataCenter, conf.DataCenter) @@ -168,11 +176,11 @@ func confFromFile(configFile string) (gubernator.DaemonConfig, error) { return conf, errors.Errorf("'GUBER_PEER_PICKER_HASH=%s' is invalid; choices are [%s]", hash, validHashKeys(hashFuncs)) } - conf.Picker = gubernator.NewConsistantHash(fn) + conf.Picker = gubernator.NewConsistentHash(fn) case "replicated-hash": - setter.SetDefault(&replicas, getEnvInteger("GUBER_REPLICATED_HASH_REPLICAS"), 1) - conf.Picker = gubernator.NewReplicatedConsistantHash(nil, replicas) + setter.SetDefault(&replicas, getEnvInteger("GUBER_REPLICATED_HASH_REPLICAS"), gubernator.DefaultReplicas) + conf.Picker = gubernator.NewReplicatedConsistentHash(nil, replicas) setter.SetDefault(&hash, os.Getenv("GUBER_PEER_PICKER_HASH"), "fnv1a") hashFuncs := map[string]gubernator.HashFunc64{ "fnv1a": fnv1a.HashBytes64, @@ -183,7 +191,7 @@ func confFromFile(configFile string) (gubernator.DaemonConfig, error) { return conf, errors.Errorf("'GUBER_PEER_PICKER_HASH=%s' is invalid; choices are [%s]", hash, validHash64Keys(hashFuncs)) } - conf.Picker = gubernator.NewReplicatedConsistantHash(fn, replicas) + conf.Picker = gubernator.NewReplicatedConsistentHash(fn, replicas) default: return conf, errors.Errorf("'GUBER_PEER_PICKER=%s' is invalid; choices are ['replicated-hash', 'consistent-hash']", pp) } @@ -284,6 +292,19 @@ func anyHasPrefix(prefix string, items []string) bool { return false } +func getEnvBool(name string) bool { + v := os.Getenv(name) + if v == "" { + return false + } + b, err := strconv.ParseBool(v) + if err != nil { + log.WithError(err).Errorf("while parsing '%s' as an boolean", name) + return false + } + return b +} + func getEnvInteger(name string) int { v := os.Getenv(name) if v == "" { diff --git a/config.go b/config.go index bcdff6ae..1b9a67e8 100644 --- a/config.go +++ b/config.go @@ -86,6 +86,24 @@ type BehaviorConfig struct { MultiRegionBatchLimit int } +type PeerInfo struct { + // (Optional) The name of the data center this peer is in. Leave blank if not using multi data center support. + DataCenter string + // (Optional) The http address:port of the peer + HTTPAddress string + // (Required) The grpc address:port of the peer + GRPCAddress string + // (Optional) Is true if PeerInfo is for this instance of gubernator + IsOwner bool +} + +// Returns the hash key used to identify this peer in the Picker. +func (p PeerInfo) HashKey() string { + return p.GRPCAddress +} + +type UpdateFunc func([]PeerInfo) + func (c *Config) SetDefaults() error { setter.SetDefault(&c.Behaviors.BatchTimeout, time.Millisecond*500) setter.SetDefault(&c.Behaviors.BatchLimit, maxBatchSize) @@ -99,7 +117,7 @@ func (c *Config) SetDefaults() error { setter.SetDefault(&c.Behaviors.MultiRegionBatchLimit, maxBatchSize) setter.SetDefault(&c.Behaviors.MultiRegionSyncWait, time.Second) - setter.SetDefault(&c.LocalPicker, NewConsistantHash(nil)) + setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, DefaultReplicas)) setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil)) setter.SetDefault(&c.Cache, NewLRUCache(0)) diff --git a/docker-compose-etcd.yaml b/docker-compose-etcd.yaml new file mode 100644 index 00000000..815af3fb --- /dev/null +++ b/docker-compose-etcd.yaml @@ -0,0 +1,101 @@ +version: '3' +services: + etcd: + image: quay.io/coreos/etcd:v3.3.10 + command: > + /usr/local/bin/etcd + -name etcd0 + -advertise-client-urls http://localhost:2379 + -listen-client-urls http://0.0.0.0:2379 + -initial-advertise-peer-urls http://0.0.0.0:2380 + -listen-peer-urls http://0.0.0.0:2380 + -initial-cluster-token etcd-cluster-1 + -initial-cluster etcd0=http://0.0.0.0:2380 + -initial-cluster-state new + ports: + - "2379:2379" + + gubernator-1: + image: thrawn01/gubernator:latest + command: "/gubernator" + environment: + # The address GRPC requests will listen on + - GUBER_GRPC_ADDRESS=0.0.0.0:81 + # The address HTTP requests will listen on + - GUBER_HTTP_ADDRESS=0.0.0.0:80 + # Choose the etcd peer discovery type + - GUBER_PEER_DISCOVERY_TYPE=etcd + # A comma separated list of etcd nodes + - GUBER_ETCD_ENDPOINTS=etcd:2379 + # The key prefix used in the etcd store + - GUBER_ETCD_KEY_PREFIX=/gubernator-docker + # The address that is advertised to other peers + - GUBER_ETCD_ADVERTISE_ADDRESS=gubernator-1:81 + #- GUBER_DATA_CENTER=us-east-1 + ports: + - "9081:81" + - "9080:80" + + gubernator-2: + image: thrawn01/gubernator:latest + command: "/gubernator" + environment: + # The address GRPC requests will listen on + - GUBER_GRPC_ADDRESS=0.0.0.0:81 + # The address HTTP requests will listen on + - GUBER_HTTP_ADDRESS=0.0.0.0:80 + # Choose the etcd peer discovery type + - GUBER_PEER_DISCOVERY_TYPE=etcd + # A comma separated list of etcd nodes + - GUBER_ETCD_ENDPOINTS=etcd:2379 + # The key prefix used in the etcd store + - GUBER_ETCD_KEY_PREFIX=/gubernator-docker + # The address that is advertised to other peers + - GUBER_ETCD_ADVERTISE_ADDRESS=gubernator-2:81 + #- GUBER_DATA_CENTER=us-east-1 + ports: + - "9181:81" + - "9180:80" + + gubernator-3: + image: thrawn01/gubernator:latest + command: "/gubernator" + environment: + # The address GRPC requests will listen on + - GUBER_GRPC_ADDRESS=0.0.0.0:81 + # The address HTTP requests will listen on + - GUBER_HTTP_ADDRESS=0.0.0.0:80 + # Choose the etcd peer discovery type + - GUBER_PEER_DISCOVERY_TYPE=etcd + # A comma separated list of etcd nodes + - GUBER_ETCD_ENDPOINTS=etcd:2379 + # The key prefix used in the etcd store + - GUBER_ETCD_KEY_PREFIX=/gubernator-docker + # The address that is advertised to other peers + - GUBER_ETCD_ADVERTISE_ADDRESS=gubernator-3:81 + #- GUBER_DATA_CENTER=us-west-2 + ports: + - "9281:81" + - "9280:80" + + gubernator-4: + image: thrawn01/gubernator:latest + command: "/gubernator" + environment: + - GUBER_DEBUG=true + # The address GRPC requests will listen on + - GUBER_GRPC_ADDRESS=0.0.0.0:81 + # The address HTTP requests will listen on + - GUBER_HTTP_ADDRESS=0.0.0.0:80 + # Choose the etcd peer discovery type + - GUBER_PEER_DISCOVERY_TYPE=etcd + # A comma separated list of etcd nodes + - GUBER_ETCD_ENDPOINTS=etcd:2379 + # The key prefix used in the etcd store + - GUBER_ETCD_KEY_PREFIX=/gubernator-docker + # The address that is advertised to other peers + - GUBER_ADVERTISE_ADDRESS=gubernator-4:81 + #- GUBER_DATA_CENTER=us-west-2 + ports: + - "9381:81" + - "9380:80" diff --git a/docker-compose.yaml b/docker-compose.yaml index 04413e63..cac65026 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -8,14 +8,16 @@ services: - GUBER_GRPC_ADDRESS=0.0.0.0:81 # The address HTTP requests will listen on - GUBER_HTTP_ADDRESS=0.0.0.0:80 + # The address that is advertised to other peers + - GUBER_ADVERTISE_ADDRESS=gubernator-1:81 # Max size of the cache; The cache size will never grow beyond this size. - GUBER_CACHE_SIZE=50000 # A comma separated list of known gubernator nodes - - GUBER_MEMBERLIST_KNOWN_NODES=gubernator-1,gubernator-2,gubernator-3,gubernator-4 - - GUBER_DATA_CENTER=us-east-1 + - GUBER_MEMBERLIST_KNOWN_NODES=gubernator-1 + #- GUBER_DATA_CENTER=us-east-1 ports: - - "8081:81" - - "8080:80" + - "9081:81" + - "9080:80" gubernator-2: image: thrawn01/gubernator:latest @@ -25,14 +27,16 @@ services: - GUBER_GRPC_ADDRESS=0.0.0.0:81 # The address HTTP requests will listen on - GUBER_HTTP_ADDRESS=0.0.0.0:80 + # The address that is advertised to other peers + - GUBER_ADVERTISE_ADDRESS=gubernator-2:81 # Max size of the cache; The cache size will never grow beyond this size. - GUBER_CACHE_SIZE=50000 # A comma separated list of known gubernator nodes - - GUBER_MEMBERLIST_KNOWN_NODES=gubernator-1,gubernator-2,gubernator-3,gubernator-4 - - GUBER_DATA_CENTER=us-east-1 + - GUBER_MEMBERLIST_KNOWN_NODES=gubernator-1 + #- GUBER_DATA_CENTER=us-east-1 ports: - - "8181:81" - - "8180:80" + - "9181:81" + - "9180:80" gubernator-3: image: thrawn01/gubernator:latest @@ -42,28 +46,33 @@ services: - GUBER_GRPC_ADDRESS=0.0.0.0:81 # The address HTTP requests will listen on - GUBER_HTTP_ADDRESS=0.0.0.0:80 + # The address that is advertised to other peers + - GUBER_ADVERTISE_ADDRESS=gubernator-3:81 # Max size of the cache; The cache size will never grow beyond this size. - GUBER_CACHE_SIZE=50000 # A comma separated list of known gubernator nodes - - GUBER_MEMBERLIST_KNOWN_NODES=gubernator-1,gubernator-2,gubernator-3,gubernator-4 - - GUBER_DATA_CENTER=us-west-2 + - GUBER_MEMBERLIST_KNOWN_NODES=gubernator-1 + #- GUBER_DATA_CENTER=us-west-2 ports: - - "8281:81" - - "8280:80" + - "9281:81" + - "9280:80" gubernator-4: image: thrawn01/gubernator:latest command: "/gubernator" environment: + - GUBER_DEBUG=true # The address GRPC requests will listen on - GUBER_GRPC_ADDRESS=0.0.0.0:81 # The address HTTP requests will listen on - GUBER_HTTP_ADDRESS=0.0.0.0:80 + # The address that is advertised to other peers + - GUBER_ADVERTISE_ADDRESS=gubernator-4:81 # Max size of the cache; The cache size will never grow beyond this size. - GUBER_CACHE_SIZE=50000 - # A Comma separate list of known gubernator nodes - - GUBER_MEMBERLIST_KNOWN_NODES=gubernator-1,gubernator-2,gubernator-3,gubernator-4 - - GUBER_DATA_CENTER=us-west-2 + # A Comma separated list of known gubernator nodes + - GUBER_MEMBERLIST_KNOWN_NODES=gubernator-1 + #- GUBER_DATA_CENTER=us-west-2 ports: - - "8381:81" - - "8380:80" + - "9381:81" + - "9380:80" diff --git a/etcd.go b/etcd.go index c6fdda18..e6197316 100644 --- a/etcd.go +++ b/etcd.go @@ -18,6 +18,7 @@ package gubernator import ( "context" + "encoding/json" "time" etcd "github.com/coreos/etcd/clientv3" @@ -27,24 +28,6 @@ import ( "github.com/sirupsen/logrus" ) -type PeerInfo struct { - // (Optional) The name of the data center this peer is in. Leave blank if not using multi data center support. - DataCenter string - // (Optional) The http address:port of the peer - HTTPAddress string - // (Required) The grpc address:port of the peer - GRPCAddress string - // (Optional) Is true if PeerInfo is for this instance of gubernator - IsOwner bool -} - -// Returns the hash key used to identify this peer in the Picker. -func (p PeerInfo) HashKey() string { - return p.GRPCAddress -} - -type UpdateFunc func([]PeerInfo) - const ( etcdTimeout = time.Second * 10 backOffTimeout = time.Second * 5 @@ -57,7 +40,7 @@ type PoolInterface interface { } type EtcdPool struct { - peers map[string]struct{} + peers map[string]PeerInfo wg syncutil.WaitGroup ctx context.Context cancelCtx context.CancelFunc @@ -85,6 +68,9 @@ type EtcdPoolConfig struct { // (Optional) An interface through which logging will occur (Usually *logrus.Entry) Logger logrus.FieldLogger + + // (Optional) The datacenter this instance belongs too + DataCenter string } func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) { @@ -102,7 +88,7 @@ func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) { ctx, cancel := context.WithCancel(context.Background()) pool := &EtcdPool{ log: conf.Logger, - peers: make(map[string]struct{}), + peers: make(map[string]PeerInfo), cancelCtx: cancel, conf: conf, ctx: ctx, @@ -166,13 +152,25 @@ func (e *EtcdPool) collectPeers(revision *int64) error { // Collect all the peers for _, v := range resp.Kvs { - e.peers[string(v.Value)] = struct{}{} + p := e.unMarshallValue(v.Value) + e.peers[p.GRPCAddress] = p } e.callOnUpdate() return nil } +func (e *EtcdPool) unMarshallValue(v []byte) PeerInfo { + var p PeerInfo + + // for backward compatible with older gubernator versions + if err := json.Unmarshal(v, &p); err != nil { + e.log.WithError(err).Errorf("while unmarshalling peer info from key value") + return PeerInfo{GRPCAddress: string(v)} + } + return p +} + func (e *EtcdPool) watch() error { // Initialize watcher if err := e.watchPeers(); err != nil { @@ -196,7 +194,8 @@ func (e *EtcdPool) watch() error { case etcd.EventTypePut: if event.Kv != nil { e.log.Debugf("new peer [%s]", string(event.Kv.Value)) - e.peers[string(event.Kv.Value)] = struct{}{} + p := e.unMarshallValue(event.Kv.Value) + e.peers[p.GRPCAddress] = p } case etcd.EventTypeDelete: if event.PrevKv != nil { @@ -238,6 +237,14 @@ func (e *EtcdPool) register(name string) error { instanceKey := e.conf.KeyPrefix + name e.log.Infof("Registering peer '%s' with etcd", name) + b, err := json.Marshal(PeerInfo{ + GRPCAddress: e.conf.AdvertiseAddress, + DataCenter: e.conf.DataCenter, + }) + if err != nil { + return errors.Wrap(err, "while marshalling PeerInfo") + } + var keepAlive <-chan *etcd.LeaseKeepAliveResponse var lease *etcd.LeaseGrantResponse @@ -251,7 +258,7 @@ func (e *EtcdPool) register(name string) error { return errors.Wrapf(err, "during grant lease") } - _, err = e.conf.Client.Put(ctx, instanceKey, name, etcd.WithLease(lease.ID)) + _, err = e.conf.Client.Put(ctx, instanceKey, string(b), etcd.WithLease(lease.ID)) if err != nil { return errors.Wrap(err, "during put") } @@ -262,7 +269,6 @@ func (e *EtcdPool) register(name string) error { return nil } - var err error var lastKeepAlive time.Time // Attempt to register our instance with etcd @@ -334,12 +340,8 @@ func (e *EtcdPool) Close() { func (e *EtcdPool) callOnUpdate() { var peers []PeerInfo - for k := range e.peers { - if k == e.conf.AdvertiseAddress { - peers = append(peers, PeerInfo{Address: k, IsOwner: true}) - } else { - peers = append(peers, PeerInfo{Address: k}) - } + for _, v := range e.peers { + peers = append(peers, v) } e.conf.OnUpdate(peers) diff --git a/example.conf b/example.conf index 3a0377b6..b470a8d5 100644 --- a/example.conf +++ b/example.conf @@ -3,14 +3,18 @@ ############################ # The address GRPC requests will listen on -GUBER_GRPC_ADDRESS=localhost:81 +GUBER_GRPC_ADDRESS=0.0.0.0:9990 # The address HTTP requests will listen on -GUBER_HTTP_ADDRESS=localhost:80 - -# The address gubernator peers will connect too. Should be the -# same as GUBER_GRPC_ADDRESS unless you are running behind a NAT -# or running in a docker container without host networking +GUBER_HTTP_ADDRESS=0.0.0.0:9980 + +# The address gubernator peers will connect too. +# +# Should be the same as GUBER_GRPC_ADDRESS unless you are running behind a NAT +# or running in a docker container without host networking. +# +# If unset, will default to the hostname or if that fails will attempt +# to guess at a non loopback interface # GUBER_ADVERTISE_ADDRESS=localhost:81 # Max size of the cache; This is the cache that holds @@ -53,9 +57,7 @@ GUBER_CACHE_SIZE=50000 # Member-List Config (GUBER_PEER_DISCOVERY_TYPE=member-list) ############################ -# The address gubernator peers will connect too -# Should be the same as GUBER_GRPC_ADDRESS unless you are running behind -# a NAT or running in a docker container without host networking +# The address peers will connect too. Defaults to GUBER_ADVERTISE_ADDRESS # GUBER_MEMBERLIST_ADVERTISE_ADDRESS=localhost:81 # The address the member list will listen to in order to discover other list members. @@ -92,9 +94,7 @@ GUBER_CACHE_SIZE=50000 # A Comma separate list of etcd nodes # GUBER_ETCD_ENDPOINTS=localhost:2379 -# The address peers will connect too -# Should be the same as GUBER_GRPC_ADDRESS unless you are running behind -# a NAT or running in a docker container without host networking +# The address peers will connect too. Defaults to GUBER_ADVERTISE_ADDRESS # GUBER_ETCD_ADVERTISE_ADDRESS=localhost:81 # The prefix gubernator will use to register peers under in etcd @@ -103,6 +103,9 @@ GUBER_CACHE_SIZE=50000 # How long etcd client will wait for a response when initial dialing a node #GUBER_ETCD_DIAL_TIMEOUT=5s +# The name of the datacenter this gubernator instance is in. +# GUBER_ETCD_DATA_CENTER=datacenter1 + # Authentication #GUBER_ETCD_USER= #GUBER_ETCD_PASSWORD= @@ -139,5 +142,5 @@ GUBER_CACHE_SIZE=50000 # GUBER_PEER_PICKER_HASH=fnv1a # Choose the number of replications -# GUBER_REPLICATED_HASH_REPLICAS=1 +# GUBER_REPLICATED_HASH_REPLICAS=512 diff --git a/functional_test.go b/functional_test.go index 92cf64ec..8260a23b 100644 --- a/functional_test.go +++ b/functional_test.go @@ -316,7 +316,7 @@ func TestGlobalRateLimits(t *testing.T) { // Our second should be processed as if we own it since the async forward hasn't occurred yet sendHit(guber.Status_UNDER_LIMIT, 3, 2) - testutil.UntilPass(t, 10, time.Millisecond*200, func(t testutil.TestingT) { + testutil.UntilPass(t, 20, time.Millisecond*200, func(t testutil.TestingT) { // Inspect our metrics, ensure they collected the counts we expected during this test d := cluster.DaemonAt(0) metricCh := make(chan prometheus.Metric, 5) @@ -327,8 +327,8 @@ func TestGlobalRateLimits(t *testing.T) { assert.Nil(t, m.Write(&buf)) assert.Equal(t, uint64(2), *buf.Histogram.SampleCount) - // V1Instance 3 should be the owner of our global rate limit - d = cluster.DaemonAt(3) + // V1Instance 2 should be the owner of our global rate limit + d = cluster.DaemonAt(2) metricCh = make(chan prometheus.Metric, 5) d.V1Server.Collect(metricCh) diff --git a/gubernator.go b/gubernator.go index bc112c34..5d38f7db 100644 --- a/gubernator.go +++ b/gubernator.go @@ -19,10 +19,10 @@ package gubernator import ( "context" "fmt" - "github.com/mailgun/holster/v3/setter" "strings" "sync" + "github.com/mailgun/holster/v3/setter" "github.com/mailgun/holster/v3/syncutil" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -379,7 +379,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { s.conf.RegionPicker = regionPicker s.peerMutex.Unlock() - //TODO: This should include the regions peers? log.WithField("peers", peers).Info("Peers updated") + s.log.WithField("peers", peerInfo).Debug("peers updated") // Shutdown any old peers we no longer need ctx, cancel := context.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout) @@ -414,7 +414,11 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { wg.Wait() if len(shutdownPeers) > 0 { - s.log.WithField("peers", shutdownPeers).Info("Peers shutdown") + var peers []string + for _, p := range shutdownPeers { + peers = append(peers, p.info.GRPCAddress) + } + s.log.WithField("peers", peers).Debug("Peers shutdown") } } diff --git a/hash.go b/hash.go index 74a9a099..c6a6bdda 100644 --- a/hash.go +++ b/hash.go @@ -28,14 +28,16 @@ import ( type HashFunc func(data []byte) uint32 // Implements PeerPicker -type ConsistantHash struct { +// deprecated +type ConsistentHash struct { hashFunc HashFunc peerKeys []int peerMap map[int]*PeerClient } -func NewConsistantHash(fn HashFunc) *ConsistantHash { - ch := &ConsistantHash{ +// deprecated +func NewConsistentHash(fn HashFunc) *ConsistentHash { + ch := &ConsistentHash{ hashFunc: fn, peerMap: make(map[int]*PeerClient), } @@ -46,14 +48,14 @@ func NewConsistantHash(fn HashFunc) *ConsistantHash { return ch } -func (ch *ConsistantHash) New() PeerPicker { - return &ConsistantHash{ +func (ch *ConsistentHash) New() PeerPicker { + return &ConsistentHash{ hashFunc: ch.hashFunc, peerMap: make(map[int]*PeerClient), } } -func (ch *ConsistantHash) Peers() []*PeerClient { +func (ch *ConsistentHash) Peers() []*PeerClient { var results []*PeerClient for _, v := range ch.peerMap { results = append(results, v) @@ -62,7 +64,7 @@ func (ch *ConsistantHash) Peers() []*PeerClient { } // Adds a peer to the hash -func (ch *ConsistantHash) Add(peer *PeerClient) { +func (ch *ConsistentHash) Add(peer *PeerClient) { hash := int(ch.hashFunc(strToBytesUnsafe(peer.info.HashKey()))) ch.peerKeys = append(ch.peerKeys, hash) ch.peerMap[hash] = peer @@ -70,17 +72,17 @@ func (ch *ConsistantHash) Add(peer *PeerClient) { } // Returns number of peers in the picker -func (ch *ConsistantHash) Size() int { +func (ch *ConsistentHash) Size() int { return len(ch.peerKeys) } // Returns the peer by peer info -func (ch *ConsistantHash) GetByPeerInfo(peer PeerInfo) *PeerClient { +func (ch *ConsistentHash) GetByPeerInfo(peer PeerInfo) *PeerClient { return ch.peerMap[int(ch.hashFunc(strToBytesUnsafe(peer.HashKey())))] } // Given a key, return the peer that key is assigned too -func (ch *ConsistantHash) Get(key string) (*PeerClient, error) { +func (ch *ConsistentHash) Get(key string) (*PeerClient, error) { if ch.Size() == 0 { return nil, errors.New("unable to pick a peer; pool is empty") } diff --git a/hash_test.go b/hash_test.go index 8970521b..c3984703 100644 --- a/hash_test.go +++ b/hash_test.go @@ -21,7 +21,7 @@ func TestConsistantHash(t *testing.T) { "192.168.1.2": hosts[1], "5f46bb53-6c30-49dc-adb4-b7355058adb6": hosts[1], } - hash := NewConsistantHash(nil) + hash := NewConsistentHash(nil) for _, h := range hosts { hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) } @@ -37,7 +37,7 @@ func TestConsistantHash(t *testing.T) { }) t.Run("Size", func(t *testing.T) { - hash := NewConsistantHash(nil) + hash := NewConsistentHash(nil) for _, h := range hosts { hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) @@ -47,7 +47,7 @@ func TestConsistantHash(t *testing.T) { }) t.Run("Host", func(t *testing.T) { - hash := NewConsistantHash(nil) + hash := NewConsistentHash(nil) hostMap := map[string]*PeerClient{} for _, h := range hosts { @@ -81,7 +81,7 @@ func TestConsistantHash(t *testing.T) { for name, hashFunc := range hashFuncs { t.Run(name, func(t *testing.T) { - hash := NewConsistantHash(hashFunc) + hash := NewConsistentHash(hashFunc) hostMap := map[string]int{} for _, h := range hosts { @@ -117,7 +117,7 @@ func BenchmarkConsistantHash(b *testing.B) { ips[i] = net.IPv4(byte(i>>24), byte(i>>16), byte(i>>8), byte(i)).String() } - hash := NewConsistantHash(hashFunc) + hash := NewConsistentHash(hashFunc) hosts := []string{"a.svc.local", "b.svc.local", "c.svc.local"} for _, h := range hosts { hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) diff --git a/kubernetes.go b/kubernetes.go index 9f71a0b1..5c3a1d5e 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -21,6 +21,8 @@ import ( "fmt" "reflect" + "github.com/mailgun/holster/v3/setter" + "github.com/mailgun/holster/v3/syncutil" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -39,7 +41,7 @@ type K8sPool struct { cancelCtx context.CancelFunc wg syncutil.WaitGroup ctx context.Context - log *logrus.Entry + log logrus.FieldLogger conf K8sPoolConfig informer cache.SharedIndexInformer done chan struct{} @@ -51,6 +53,7 @@ type K8sPoolConfig struct { Selector string PodIP string PodPort string + Logger logrus.FieldLogger } func NewK8sPool(conf K8sPoolConfig) (*K8sPool, error) { @@ -65,12 +68,13 @@ func NewK8sPool(conf K8sPoolConfig) (*K8sPool, error) { } pool := &K8sPool{ - log: logrus.WithField("category", "kubernetes-pool"), + log: conf.Logger, peers: make(map[string]struct{}), done: make(chan struct{}), client: client, conf: conf, } + setter.SetDefault(&pool.log, logrus.WithField("category", "gubernator")) return pool, pool.start() } @@ -96,26 +100,26 @@ func (e *K8sPool) start() error { e.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) - logrus.Debugf("Queue (Add) '%s' - %s", key, err) + e.log.Debugf("Queue (Add) '%s' - %s", key, err) if err != nil { - logrus.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) + e.log.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) return } }, UpdateFunc: func(obj, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) - logrus.Debugf("Queue (Update) '%s' - %s", key, err) + e.log.Debugf("Queue (Update) '%s' - %s", key, err) if err != nil { - logrus.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) + e.log.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) return } e.updatePeers() }, DeleteFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) - logrus.Debugf("Queue (Delete) '%s' - %s", key, err) + e.log.Debugf("Queue (Delete) '%s' - %s", key, err) if err != nil { - logrus.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) + e.log.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) return } e.updatePeers() @@ -133,23 +137,26 @@ func (e *K8sPool) start() error { } func (e *K8sPool) updatePeers() { - logrus.Debug("Fetching peer list from endpoints API") + e.log.Debug("Fetching peer list from endpoints API") var peers []PeerInfo for _, obj := range e.informer.GetStore().List() { endpoint, ok := obj.(*api_v1.Endpoints) if !ok { - logrus.Errorf("expected type v1.Endpoints got '%s' instead", reflect.TypeOf(obj).String()) + e.log.Errorf("expected type v1.Endpoints got '%s' instead", reflect.TypeOf(obj).String()) } for _, s := range endpoint.Subsets { for _, addr := range s.Addresses { + // TODO(thrawn01): Might consider using the `namespace` as the `DataCenter`. We should + // do what ever k8s convention is for identifying a k8s cluster within a federated multi-data + // center setup. peer := PeerInfo{GRPCAddress: fmt.Sprintf("%s:%s", addr.IP, e.conf.PodPort)} if addr.IP == e.conf.PodIP { peer.IsOwner = true } peers = append(peers, peer) - logrus.Debugf("Peer: %+v\n", peer) + e.log.Debugf("Peer: %+v\n", peer) } } } diff --git a/memberlist.go b/memberlist.go index a89f7cc0..21002153 100644 --- a/memberlist.go +++ b/memberlist.go @@ -75,9 +75,21 @@ func NewMemberListPool(conf MemberListPoolConfig) (*MemberListPool, error) { return nil, errors.Wrap(err, "MemberListAddress=`%s` is invalid;") } + // Member list requires the address to be an ip address + if ip := net.ParseIP(host); ip == nil { + addrs, err := net.LookupHost(host) + if err != nil { + return nil, errors.Wrapf(err, "while preforming host lookup for '%s'", host) + } + if len(addrs) == 0 { + return nil, errors.Wrapf(err, "net.LookupHost() returned no addresses for '%s'", host) + } + host = addrs[0] + } + _, advPort, err := splitAddress(conf.AdvertiseAddress) if err != nil { - return nil, errors.Wrap(err, "MemberListAddress=`%s` is invalid;") + return nil, errors.Wrap(err, "AdvertiseAddress=`%s` is invalid;") } // Configure member list event handler diff --git a/net.go b/net.go new file mode 100644 index 00000000..1a252920 --- /dev/null +++ b/net.go @@ -0,0 +1,70 @@ +package gubernator + +import ( + "net" + "os" + + "github.com/mailgun/holster/v3/slice" + "github.com/pkg/errors" +) + +// If the passed address is "0.0.0.0" or "::" attempts to discover the actual ip address of the host +func ResolveHostIP(addr string) (string, error) { + if slice.ContainsString(addr, []string{"0.0.0.0", "::", "0:0:0:0:0:0:0:0"}, nil) { + // Use the hostname as the advertise address as it's most likely to be the external interface + domainName, err := os.Hostname() + if err != nil { + addr, err = discoverIP() + if err != nil { + return "", errors.Wrapf(err, "while discovering ip for '%s'", addr) + } + return addr, nil + } + addrs, err := net.LookupHost(domainName) + if err != nil { + return "", errors.Wrapf(err, "while preforming host lookup for '%s'", domainName) + } + if len(addrs) == 0 { + return "", errors.Wrapf(err, "net.LookupHost() returned no addresses for '%s'", domainName) + } + return addrs[0], nil + } + return addr, nil +} + +func discoverIP() (string, error) { + ifaces, err := net.Interfaces() + if err != nil { + return "", err + } + for _, iface := range ifaces { + if iface.Flags&net.FlagUp == 0 { + continue // interface down + } + if iface.Flags&net.FlagLoopback != 0 { + continue // loopback interface + } + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil || ip.IsLoopback() { + continue + } + ip = ip.To4() + if ip == nil { + continue // not an ipv4 address + } + return ip.String(), nil + } + } + return "", errors.New("Unable to detect external ip address; please set `GUBER_ADVERTISE_ADDRESS`?") +} diff --git a/region_picker.go b/region_picker.go index 29dda7bf..dc9f3158 100644 --- a/region_picker.go +++ b/region_picker.go @@ -15,7 +15,7 @@ type RegionPeerPicker interface { // RegionPicker encapsulates pickers for a set of regions type RegionPicker struct { - *ConsistantHash + *ReplicatedConsistentHash // A map of all the pickers by region regions map[string]PeerPicker @@ -25,21 +25,21 @@ type RegionPicker struct { reqQueue chan *RateLimitReq } -func NewRegionPicker(fn HashFunc) *RegionPicker { +func NewRegionPicker(fn HashFunc64) *RegionPicker { rp := &RegionPicker{ - regions: make(map[string]PeerPicker), - reqQueue: make(chan *RateLimitReq, 0), - ConsistantHash: NewConsistantHash(fn), + regions: make(map[string]PeerPicker), + reqQueue: make(chan *RateLimitReq, 0), + ReplicatedConsistentHash: NewReplicatedConsistentHash(fn, DefaultReplicas), } return rp } func (rp *RegionPicker) New() RegionPeerPicker { - hash := rp.ConsistantHash.New().(*ConsistantHash) + hash := rp.ReplicatedConsistentHash.New().(*ReplicatedConsistentHash) return &RegionPicker{ - regions: make(map[string]PeerPicker), - reqQueue: make(chan *RateLimitReq, 0), - ConsistantHash: hash, + regions: make(map[string]PeerPicker), + reqQueue: make(chan *RateLimitReq, 0), + ReplicatedConsistentHash: hash, } } @@ -88,7 +88,7 @@ func (rp *RegionPicker) Peers() []*PeerClient { func (rp *RegionPicker) Add(peer *PeerClient) { picker, ok := rp.regions[peer.info.DataCenter] if !ok { - picker = rp.ConsistantHash.New() + picker = rp.ReplicatedConsistentHash.New() rp.regions[peer.info.DataCenter] = picker } picker.Add(peer) diff --git a/replicated_hash.go b/replicated_hash.go index 17421f0c..ed02042b 100644 --- a/replicated_hash.go +++ b/replicated_hash.go @@ -17,6 +17,8 @@ limitations under the License. package gubernator import ( + "crypto/md5" + "fmt" "sort" "strconv" @@ -31,7 +33,7 @@ type HashFunc64 func(data []byte) uint64 var DefaultHash64 HashFunc64 = fnv1.HashBytes64 // Implements PeerPicker -type ReplicatedConsistantHash struct { +type ReplicatedConsistentHash struct { hashFunc HashFunc64 peerKeys []peerInfo peers map[string]*PeerClient @@ -43,8 +45,8 @@ type peerInfo struct { peer *PeerClient } -func NewReplicatedConsistantHash(fn HashFunc64, replicas int) *ReplicatedConsistantHash { - ch := &ReplicatedConsistantHash{ +func NewReplicatedConsistentHash(fn HashFunc64, replicas int) *ReplicatedConsistentHash { + ch := &ReplicatedConsistentHash{ hashFunc: fn, peers: make(map[string]*PeerClient), replicas: replicas, @@ -56,15 +58,15 @@ func NewReplicatedConsistantHash(fn HashFunc64, replicas int) *ReplicatedConsist return ch } -func (ch *ReplicatedConsistantHash) New() PeerPicker { - return &ReplicatedConsistantHash{ +func (ch *ReplicatedConsistentHash) New() PeerPicker { + return &ReplicatedConsistentHash{ hashFunc: ch.hashFunc, peers: make(map[string]*PeerClient), replicas: ch.replicas, } } -func (ch *ReplicatedConsistantHash) Peers() []*PeerClient { +func (ch *ReplicatedConsistentHash) Peers() []*PeerClient { var results []*PeerClient for _, v := range ch.peers { results = append(results, v) @@ -73,11 +75,12 @@ func (ch *ReplicatedConsistantHash) Peers() []*PeerClient { } // Adds a peer to the hash -func (ch *ReplicatedConsistantHash) Add(peer *PeerClient) { +func (ch *ReplicatedConsistentHash) Add(peer *PeerClient) { ch.peers[peer.info.GRPCAddress] = peer + key := fmt.Sprintf("%x", md5.Sum([]byte(peer.info.GRPCAddress))) for i := 0; i < ch.replicas; i++ { - hash := ch.hashFunc(strToBytesUnsafe(strconv.Itoa(i) + peer.info.GRPCAddress)) + hash := ch.hashFunc(strToBytesUnsafe(strconv.Itoa(i) + key)) ch.peerKeys = append(ch.peerKeys, peerInfo{ hash: hash, peer: peer, @@ -88,17 +91,17 @@ func (ch *ReplicatedConsistantHash) Add(peer *PeerClient) { } // Returns number of peers in the picker -func (ch *ReplicatedConsistantHash) Size() int { +func (ch *ReplicatedConsistentHash) Size() int { return len(ch.peers) } // Returns the peer by hostname -func (ch *ReplicatedConsistantHash) GetByPeerInfo(peer PeerInfo) *PeerClient { +func (ch *ReplicatedConsistentHash) GetByPeerInfo(peer PeerInfo) *PeerClient { return ch.peers[peer.GRPCAddress] } // Given a key, return the peer that key is assigned too -func (ch *ReplicatedConsistantHash) Get(key string) (*PeerClient, error) { +func (ch *ReplicatedConsistentHash) Get(key string) (*PeerClient, error) { if ch.Size() == 0 { return nil, errors.New("unable to pick a peer; pool is empty") } diff --git a/replicated_hash_test.go b/replicated_hash_test.go index 360e0724..703ca572 100644 --- a/replicated_hash_test.go +++ b/replicated_hash_test.go @@ -15,7 +15,7 @@ func TestReplicatedConsistantHash(t *testing.T) { hosts := []string{"a.svc.local", "b.svc.local", "c.svc.local"} t.Run("Size", func(t *testing.T) { - hash := NewReplicatedConsistantHash(nil, DefaultReplicas) + hash := NewReplicatedConsistentHash(nil, DefaultReplicas) for _, h := range hosts { hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) @@ -25,7 +25,7 @@ func TestReplicatedConsistantHash(t *testing.T) { }) t.Run("Host", func(t *testing.T) { - hash := NewReplicatedConsistantHash(nil, DefaultReplicas) + hash := NewReplicatedConsistentHash(nil, DefaultReplicas) hostMap := map[string]*PeerClient{} for _, h := range hosts { @@ -58,7 +58,7 @@ func TestReplicatedConsistantHash(t *testing.T) { for name, hashFunc := range hashFuncs { t.Run(name, func(t *testing.T) { - hash := NewReplicatedConsistantHash(hashFunc, DefaultReplicas) + hash := NewReplicatedConsistentHash(hashFunc, DefaultReplicas) hostMap := map[string]int{} for _, h := range hosts { @@ -93,7 +93,7 @@ func BenchmarkReplicatedConsistantHash(b *testing.B) { ips[i] = net.IPv4(byte(i>>24), byte(i>>16), byte(i>>8), byte(i)).String() } - hash := NewReplicatedConsistantHash(hashFunc, DefaultReplicas) + hash := NewReplicatedConsistentHash(hashFunc, DefaultReplicas) hosts := []string{"a.svc.local", "b.svc.local", "c.svc.local"} for _, h := range hosts { hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) diff --git a/version b/version index 8ba2dc93..9c218192 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.0.0.rc.1 +1.0.0-rc.1