diff --git a/Dockerfile b/Dockerfile index f9548d3b..ea001eb5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,15 @@ # Build image FROM golang:1.12 as build +WORKDIR /src + +# This should create cached layer of our dependencies for subsequent builds to use +COPY go.mod /src +COPY go.sum /src +RUN go mod download + # Copy the local package files to the container ADD . /src -WORKDIR /src ENV VERSION=dev-build # Build the bot inside the container diff --git a/cmd/gubernator/config.go b/cmd/gubernator/config.go index c7bc9e78..926956dc 100644 --- a/cmd/gubernator/config.go +++ b/cmd/gubernator/config.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "flag" "fmt" + "github.com/davecgh/go-spew/spew" "github.com/pkg/errors" "github.com/sirupsen/logrus" "io/ioutil" @@ -21,17 +22,20 @@ import ( var debug = false type ServerConfig struct { - GRPCListenAddress string - AdvertiseAddress string - HTTPListenAddress string - EtcdKeyPrefix string - CacheSize int + GRPCListenAddress string + EtcdAdvertiseAddress string + HTTPListenAddress string + EtcdKeyPrefix string + CacheSize int // Etcd configuration used to find peers EtcdConf etcd.Config // Configure how behaviours behave Behaviors gubernator.BehaviorConfig + + // K8s configuration used to find peers inside a K8s cluster + K8PoolConf gubernator.K8sPoolConfig } func confFromEnv() (ServerConfig, error) { @@ -45,8 +49,10 @@ func confFromEnv() (ServerConfig, error) { return conf, err } - if debug { + if debug || os.Getenv("GUBER_DEBUG") != "" { logrus.SetLevel(logrus.DebugLevel) + logrus.Debug("Debug enabled") + debug = true } if configFile != "" { @@ -59,9 +65,7 @@ func confFromEnv() (ServerConfig, error) { // Main config holster.SetDefault(&conf.GRPCListenAddress, os.Getenv("GUBER_GRPC_ADDRESS"), "0.0.0.0:81") holster.SetDefault(&conf.HTTPListenAddress, os.Getenv("GUBER_HTTP_ADDRESS"), "0.0.0.0:80") - holster.SetDefault(&conf.AdvertiseAddress, os.Getenv("GUBER_ADVERTISE_ADDRESS"), "127.0.0.1:81") holster.SetDefault(&conf.CacheSize, getEnvInteger("GUBER_CACHE_SIZE"), 50000) - holster.SetDefault(&conf.EtcdKeyPrefix, os.Getenv("GUBER_ETCD_KEY_PREFIX"), "/gubernator-peers") // Behaviors holster.SetDefault(&conf.Behaviors.BatchTimeout, getEnvDuration("GUBER_BATCH_TIMEOUT")) @@ -73,17 +77,47 @@ func confFromEnv() (ServerConfig, error) { holster.SetDefault(&conf.Behaviors.GlobalSyncWait, getEnvDuration("GUBER_GLOBAL_SYNC_WAIT")) // ETCD Config + holster.SetDefault(&conf.EtcdAdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), "127.0.0.1:81") + holster.SetDefault(&conf.EtcdKeyPrefix, os.Getenv("GUBER_ETCD_KEY_PREFIX"), "/gubernator-peers") holster.SetDefault(&conf.EtcdConf.Endpoints, getEnvSlice("GUBER_ETCD_ENDPOINTS"), []string{"localhost:2379"}) holster.SetDefault(&conf.EtcdConf.DialTimeout, getEnvDuration("GUBER_ETCD_DIAL_TIMEOUT"), time.Second*5) holster.SetDefault(&conf.EtcdConf.Username, os.Getenv("GUBER_ETCD_USER")) holster.SetDefault(&conf.EtcdConf.Password, os.Getenv("GUBER_ETCD_PASSWORD")) + // Kubernetes Config + holster.SetDefault(&conf.K8PoolConf.Namespace, os.Getenv("GUBER_K8S_NAMESPACE"), "default") + conf.K8PoolConf.PodIP = os.Getenv("GUBER_K8S_POD_IP") + conf.K8PoolConf.PodPort = os.Getenv("GUBER_K8S_POD_PORT") + conf.K8PoolConf.Selector = os.Getenv("GUBER_K8S_ENDPOINTS_SELECTOR") + + if anyHasPrefix("GUBER_K8S_", os.Environ()) { + logrus.Debug("K8s peer pool config found") + conf.K8PoolConf.Enabled = true + if conf.K8PoolConf.Selector == "" { + return conf, errors.New("when using k8s for peer discovery, you MUST provide a " + + "`GUBER_K8S_ENDPOINTS_SELECTOR` to select the gubernator peers from the endpoints listing") + } + } + + if anyHasPrefix("GUBER_ETCD_", os.Environ()) { + logrus.Debug("ETCD peer pool config found") + if conf.K8PoolConf.Enabled { + return conf, errors.New("refusing to register gubernator peers with both etcd and k8s;" + + " remove either `GUBER_ETCD_*` or `GUBER_K8S_*` variables from the environment") + } + } + // If env contains any TLS configuration if anyHasPrefix("GUBER_ETCD_TLS_", os.Environ()) { if err := setupTLS(&conf.EtcdConf); err != nil { return conf, err } } + + if debug { + spew.Dump(conf) + } + return conf, nil } diff --git a/cmd/gubernator/main.go b/cmd/gubernator/main.go index c6645c0b..16cbd80b 100644 --- a/cmd/gubernator/main.go +++ b/cmd/gubernator/main.go @@ -63,17 +63,26 @@ func main() { checkErr(grpcSrv.Serve(listener), "while starting GRPC server") }) - // Register ourselves with other peers via ETCD - etcdClient, err := etcdutil.NewClient(&conf.EtcdConf) - checkErr(err, "while connecting to etcd") - - pool, err := gubernator.NewEtcdPool(gubernator.EtcdPoolConfig{ - AdvertiseAddress: conf.AdvertiseAddress, - OnUpdate: guber.SetPeers, - Client: etcdClient, - BaseKey: conf.EtcdKeyPrefix, - }) - checkErr(err, "while registering with ETCD pool") + var pool gubernator.PoolInterface + + if conf.K8PoolConf.Enabled { + // Source our list of peers from kubernetes endpoint API + conf.K8PoolConf.OnUpdate = guber.SetPeers + pool, err = gubernator.NewK8sPool(conf.K8PoolConf) + checkErr(err, "while querying kubernetes API") + } else { + // Register ourselves with other peers via ETCD + etcdClient, err := etcdutil.NewClient(&conf.EtcdConf) + checkErr(err, "while connecting to etcd") + + pool, err = gubernator.NewEtcdPool(gubernator.EtcdPoolConfig{ + AdvertiseAddress: conf.EtcdAdvertiseAddress, + OnUpdate: guber.SetPeers, + Client: etcdClient, + BaseKey: conf.EtcdKeyPrefix, + }) + checkErr(err, "while registering with ETCD pool") + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -81,7 +90,7 @@ func main() { // Setup an JSON Gateway API for our GRPC methods gateway := runtime.NewServeMux() err = gubernator.RegisterV1HandlerFromEndpoint(ctx, gateway, - conf.AdvertiseAddress, []grpc.DialOption{grpc.WithInsecure()}) + conf.EtcdAdvertiseAddress, []grpc.DialOption{grpc.WithInsecure()}) checkErr(err, "while registering GRPC gateway handler") // Serve the JSON Gateway and metrics handlers via standard HTTP/1 diff --git a/docker-compose.yaml b/docker-compose.yaml index a8f15900..c5bb655e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -23,7 +23,7 @@ services: # The address peers will connect too # Should be the same as grpc-listen-address unless you are running behind # a NAT or running in a docker container without host networking - - GUBER_ADVERTISE_ADDRESS=gubernator-1:81 + - GUBER_ETCD_ADVERTISE_ADDRESS=gubernator-1:81 # Max size of the cache; The cache size will never grow beyond this size. - GUBER_CACHE_SIZE=50000 # A Comma separate list of etcd nodes @@ -42,7 +42,7 @@ services: # The address peers will connect too # Should be the same as grpc-listen-address unless you are running behind # a NAT or running in a docker container without host networking - - GUBER_ADVERTISE_ADDRESS=gubernator-2:81 + - GUBER_ETCD_ADVERTISE_ADDRESS=gubernator-2:81 # Max size of the cache; The cache size will never grow beyond this size. - GUBER_CACHE_SIZE=50000 # A Comma separate list of etcd nodes diff --git a/etcd.go b/etcd.go index 73ac7a4c..755ac0a2 100644 --- a/etcd.go +++ b/etcd.go @@ -24,6 +24,10 @@ const ( defaultBaseKey = "/gubernator/peers/" ) +type PoolInterface interface { + Close() +} + type EtcdPool struct { peers map[string]struct{} wg holster.WaitGroup @@ -46,7 +50,7 @@ func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) { holster.SetDefault(&conf.BaseKey, defaultBaseKey) if conf.AdvertiseAddress == "" { - return nil, errors.New("AdvertiseAddress is required") + return nil, errors.New("GUBER_ETCD_ADVERTISE_ADDRESS is required") } if conf.Client == nil { diff --git a/example.conf b/example.conf index ef1f8e9d..9263619a 100644 --- a/example.conf +++ b/example.conf @@ -8,11 +8,6 @@ GUBER_GRPC_ADDRESS=0.0.0.0:81 # The address HTTP requests will listen on GUBER_HTTP_ADDRESS=0.0.0.0:80 -# The address peers will connect too -# Should be the same as grpc-listen-address unless you are running behind -# a NAT or running in a docker container without host networking -GUBER_ADVERTISE_ADDRESS=localhost:81 - # Max size of the cache; The cache size will never grow beyond this size. GUBER_CACHE_SIZE=50000 @@ -23,6 +18,11 @@ GUBER_CACHE_SIZE=50000 # A Comma separate list of etcd nodes GUBER_ETCD_ENDPOINTS=localhost:2379 +# The address peers will connect too +# Should be the same as grpc-listen-address unless you are running behind +# a NAT or running in a docker container without host networking +GUBER_ETCD_ADVERTISE_ADDRESS=localhost:81 + # The prefix gubernator will use to register peers under in etcd #GUBER_ETCD_KEY_PREFIX=/gubernator-peers diff --git a/go.mod b/go.mod index a4682040..c50cd38d 100644 --- a/go.mod +++ b/go.mod @@ -38,11 +38,14 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/net v0.0.0-20190119204137-ed066c81e75e + golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 google.golang.org/grpc v1.18.0 gopkg.in/ahmetb/go-linq.v3 v3.0.0 // indirect gopkg.in/alexcesaro/statsd.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.2.2 // indirect + k8s.io/api v0.0.0-20190620084959-7cf5895f2711 + k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 + k8s.io/client-go v0.0.0-20190620085101-78d2af792bab ) diff --git a/go.sum b/go.sum index 27b8ed0a..5815d69c 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/Azure/go-autorest v11.1.2+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Unix4ever/statsd v0.0.0-20160120230120-a8219f1fb9d8 h1:Z2mWzwrj1bPcMWAyqWkadeguRCOzkQOcoqJG0yitrjk= github.com/Unix4ever/statsd v0.0.0-20160120230120-a8219f1fb9d8/go.mod h1:flll3wbNf1Qxq2GxIDF5x0IEnv0BvkBSbPSEsnW9I4Q= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -23,20 +25,27 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= +github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= +github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -44,18 +53,34 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= +github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= +github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= +github.com/gophercloud/gophercloud v0.0.0-20190126172459-c818fa66e4c8/go.mod h1:3WdhXV3rUYy9p6AUW8d94kr+HS62Y4VL9mBnFxsD8q4= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.7.0 h1:tPFY/SM+d656aSgLWO2Eckc3ExwpwwybwdN5Ph20h1A= github.com/grpc-ecosystem/grpc-gateway v1.7.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -66,9 +91,15 @@ github.com/mailgun/holster v2.3.5+incompatible h1:E3IQjHqHU8jT9hyQniQYk/UDojsgs1 github.com/mailgun/holster v2.3.5+incompatible/go.mod h1:crzolGx27RP/IBT/BnPQiYBB9igmAFHGRrz0zlMP0b0= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea h1:sKwxy1H95npauwu8vtF95vG/syrL0p8fSZo/XlDg5gk= github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3ro4QMoHfiNl/j7Jkln9+KQuorp0PItHMJYNg= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -102,6 +133,7 @@ github.com/smira/go-statsd v1.2.1 h1:4ZX5aOjb9zq05DK+s4LfGr5wQ+bSwRErbC69TXqMaiU github.com/smira/go-statsd v1.2.1/go.mod h1:EeOJShgQdCLKihNKZrhYf9918MStmI2ueQ4I1kVSTFA= github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -123,13 +155,22 @@ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20181025213731-e84da0312774 h1:a4tQYYYuK9QdeO/+kEvNYyuR21S+7ve5EANok6hABhI= +golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190119204137-ed066c81e75e h1:MDa3fSUp6MdYHouVmCCNz/zaH2a6CRcxY3VhT/K3C5Q= golang.org/x/net v0.0.0-20190119204137-ed066c81e75e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 h1:bfLnR+k0tq5Lqt6dflRLcZiz6UaXCMt3vhYJ1l4FQ80= +golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a h1:tImsplftrFpALCYumobsd0K86vlAs/eXGFms2txfJfA= +golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -137,14 +178,23 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXavqjmgO17k/2puhcFR94= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190312061237-fead79001313 h1:pczuHS43Cp2ktBEEmLwScxgjWsBSzdaQiKzUyf3DTTc= +golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db h1:6/JqlYfC1CCaLnGceQTI+sDGhC9UBSPAsBqI0Gun6kU= +golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20161028155119-f51c12702a4d/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.18.0 h1:IZl7mfBGfbhYx2p2rKRtYgDFw6SBz+kclmxYrCksPPA= @@ -156,7 +206,25 @@ gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVC gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o= +gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/api v0.0.0-20190620084959-7cf5895f2711 h1:BblVYz/wE5WtBsD/Gvu54KyBUTJMflolzc5I2DTvh50= +k8s.io/api v0.0.0-20190620084959-7cf5895f2711/go.mod h1:TBhBqb1AWbBQbW3XRusr7n7E4v2+5ZY8r8sAMnyFC5A= +k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 h1:uV4S5IB5g4Nvi+TBVNf3e9L4wrirlwYJ6w88jUQxTUw= +k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA= +k8s.io/client-go v0.0.0-20190620085101-78d2af792bab h1:E8Fecph0qbNsAbijJJQryKu4Oi9QTp5cVpjTE+nqg6g= +k8s.io/client-go v0.0.0-20190620085101-78d2af792bab/go.mod h1:E95RaSlHr79aHaX0aGSwcPNfygDiPKOVXdmivCIZT0k= +k8s.io/client-go v11.0.0+incompatible h1:LBbX2+lOwY9flffWlJM7f1Ct8V2SRNiMRDFeiwnJo9o= +k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68= +k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= +k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= +k8s.io/utils v0.0.0-20190221042446-c2654d5206da h1:ElyM7RPonbKnQqOcw7dG2IK5uvQQn3b/WPHqD5mBvP4= +k8s.io/utils v0.0.0-20190221042446-c2654d5206da/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= +sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= +sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= diff --git a/gubernator.go b/gubernator.go index 4744a492..8cd8abee 100644 --- a/gubernator.go +++ b/gubernator.go @@ -272,7 +272,7 @@ func (s *Instance) SetPeers(peers []PeerInfo) { s.health.Message = strings.Join(errs, "|") } s.health.PeerCount = int32(picker.Size()) - log.WithField("peers", peers).Debug("Peers updated") + log.WithField("peers", peers).Info("Peers updated") } // GetPeers returns a peer client for the hash key provided diff --git a/k8s-deployment.yaml b/k8s-deployment.yaml new file mode 100644 index 00000000..71e36c61 --- /dev/null +++ b/k8s-deployment.yaml @@ -0,0 +1,66 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gubernator + labels: + app: gubernator +spec: + replicas: 2 + selector: + matchLabels: + app: gubernator + template: + metadata: + labels: + app: gubernator + spec: + containers: + - image: thrawn01/gubernator:latest + imagePullPolicy: IfNotPresent + ports: + - name: grpc-port + containerPort: 81 + - name: http-port + containerPort: 80 + name: gubernator + env: + - name: GUBER_K8S_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: GUBER_K8S_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + # This should match the port number GRPC is listening on + # as defined by `containerPort` + - name: GUBER_K8S_POD_PORT + value: "81" + # The selector used when listing endpoints. This selector + # should only select gubernator peers. + - name: GUBER_K8S_ENDPOINTS_SELECTOR + value: "app=gubernator" + # Enable debug for diagnosing issues + #- name: GUBER_DEBUG + # value: "true" + restartPolicy: Always +--- +apiVersion: v1 +kind: Service +metadata: + name: gubernator + labels: + app: gubernator +spec: + clusterIP: None + #ports: + #- name: grpc-port + #targetPort: 81 + #protocol: TCP + #port: 81 + #- name: http-port + #targetPort: 80 + #protocol: TCP + #port: 80 + selector: + app: gubernator diff --git a/kubernetes.go b/kubernetes.go new file mode 100644 index 00000000..d89d1b49 --- /dev/null +++ b/kubernetes.go @@ -0,0 +1,145 @@ +package gubernator + +import ( + "context" + "fmt" + "github.com/mailgun/holster" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + api_v1 "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "reflect" +) + +type K8sPool struct { + client *kubernetes.Clientset + peers map[string]struct{} + cancelCtx context.CancelFunc + wg holster.WaitGroup + ctx context.Context + log *logrus.Entry + conf K8sPoolConfig + informer cache.SharedIndexInformer + done chan struct{} +} + +type K8sPoolConfig struct { + OnUpdate UpdateFunc + Namespace string + Selector string + PodIP string + PodPort string + Enabled bool +} + +func NewK8sPool(conf K8sPoolConfig) (*K8sPool, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, errors.Wrap(err, "during InClusterConfig()") + } + // creates the client + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, errors.Wrap(err, "during NewForConfig()") + } + + pool := &K8sPool{ + log: logrus.WithField("category", "kubernetes-pool"), + peers: make(map[string]struct{}), + done: make(chan struct{}), + client: client, + conf: conf, + } + + return pool, pool.start() +} + +func (e *K8sPool) start() error { + + e.informer = cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { + options.LabelSelector = e.conf.Selector + return e.client.CoreV1().Endpoints(e.conf.Namespace).List(options) + }, + WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { + options.LabelSelector = e.conf.Selector + return e.client.CoreV1().Endpoints(e.conf.Namespace).Watch(options) + }, + }, + &api_v1.Endpoints{}, + 0, //Skip resync + cache.Indexers{}, + ) + + e.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + /*AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + logrus.Debugf("Queue (Add) '%s' - %s", key, err) + if err != nil { + logrus.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) + return + } + },*/ + UpdateFunc: func(obj, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + logrus.Debugf("Queue (Update) '%s' - %s", key, err) + if err != nil { + logrus.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) + return + } + e.updatePeers() + }, + DeleteFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + logrus.Debugf("Queue (Delete) '%s' - %s", key, err) + if err != nil { + logrus.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) + return + } + e.updatePeers() + }, + }) + + go e.informer.Run(e.done) + + if !cache.WaitForCacheSync(e.done, e.informer.HasSynced) { + close(e.done) + return fmt.Errorf("timed out waiting for caches to sync") + } + + return nil +} + +func (e *K8sPool) updatePeers() { + logrus.Debug("Fetching peer list from endpoints API") + var peers []PeerInfo + for _, obj := range e.informer.GetStore().List() { + endpoint, ok := obj.(*api_v1.Endpoints) + if !ok { + logrus.Errorf("expected type v1.Endpoints got '%s' instead", reflect.TypeOf(obj).String()) + } + + for _, s := range endpoint.Subsets { + for _, addr := range s.Addresses { + peer := PeerInfo{Address: fmt.Sprintf("%s:%s", addr.IP, e.conf.PodPort)} + + if addr.IP == e.conf.PodIP { + peer.IsOwner = true + } + peers = append(peers, peer) + logrus.Debugf("Peer: %+v\n", peer) + } + } + } + e.conf.OnUpdate(peers) +} + +func (e *K8sPool) Close() { + close(e.done) +}