Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
K8s support working
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Jul 23, 2019
1 parent 872417c commit b517064
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 31 deletions.
8 changes: 7 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Build image
FROM golang:1.12 as build

WORKDIR /src

# This should create cached layer of our dependencies for subsequent builds to use
COPY go.mod /src
COPY go.sum /src
RUN go mod download

# Copy the local package files to the container
ADD . /src
WORKDIR /src
ENV VERSION=dev-build

# Build the bot inside the container
Expand Down
50 changes: 42 additions & 8 deletions cmd/gubernator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/x509"
"flag"
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io/ioutil"
Expand All @@ -21,17 +22,20 @@ import (
var debug = false

type ServerConfig struct {
GRPCListenAddress string
AdvertiseAddress string
HTTPListenAddress string
EtcdKeyPrefix string
CacheSize int
GRPCListenAddress string
EtcdAdvertiseAddress string
HTTPListenAddress string
EtcdKeyPrefix string
CacheSize int

// Etcd configuration used to find peers
EtcdConf etcd.Config

// Configure how behaviours behave
Behaviors gubernator.BehaviorConfig

// K8s configuration used to find peers inside a K8s cluster
K8PoolConf gubernator.K8sPoolConfig
}

func confFromEnv() (ServerConfig, error) {
Expand All @@ -45,8 +49,10 @@ func confFromEnv() (ServerConfig, error) {
return conf, err
}

if debug {
if debug || os.Getenv("GUBER_DEBUG") != "" {
logrus.SetLevel(logrus.DebugLevel)
logrus.Debug("Debug enabled")
debug = true
}

if configFile != "" {
Expand All @@ -59,9 +65,7 @@ func confFromEnv() (ServerConfig, error) {
// Main config
holster.SetDefault(&conf.GRPCListenAddress, os.Getenv("GUBER_GRPC_ADDRESS"), "0.0.0.0:81")
holster.SetDefault(&conf.HTTPListenAddress, os.Getenv("GUBER_HTTP_ADDRESS"), "0.0.0.0:80")
holster.SetDefault(&conf.AdvertiseAddress, os.Getenv("GUBER_ADVERTISE_ADDRESS"), "127.0.0.1:81")
holster.SetDefault(&conf.CacheSize, getEnvInteger("GUBER_CACHE_SIZE"), 50000)
holster.SetDefault(&conf.EtcdKeyPrefix, os.Getenv("GUBER_ETCD_KEY_PREFIX"), "/gubernator-peers")

// Behaviors
holster.SetDefault(&conf.Behaviors.BatchTimeout, getEnvDuration("GUBER_BATCH_TIMEOUT"))
Expand All @@ -73,17 +77,47 @@ func confFromEnv() (ServerConfig, error) {
holster.SetDefault(&conf.Behaviors.GlobalSyncWait, getEnvDuration("GUBER_GLOBAL_SYNC_WAIT"))

// ETCD Config
holster.SetDefault(&conf.EtcdAdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), "127.0.0.1:81")
holster.SetDefault(&conf.EtcdKeyPrefix, os.Getenv("GUBER_ETCD_KEY_PREFIX"), "/gubernator-peers")
holster.SetDefault(&conf.EtcdConf.Endpoints, getEnvSlice("GUBER_ETCD_ENDPOINTS"), []string{"localhost:2379"})
holster.SetDefault(&conf.EtcdConf.DialTimeout, getEnvDuration("GUBER_ETCD_DIAL_TIMEOUT"), time.Second*5)
holster.SetDefault(&conf.EtcdConf.Username, os.Getenv("GUBER_ETCD_USER"))
holster.SetDefault(&conf.EtcdConf.Password, os.Getenv("GUBER_ETCD_PASSWORD"))

// Kubernetes Config
holster.SetDefault(&conf.K8PoolConf.Namespace, os.Getenv("GUBER_K8S_NAMESPACE"), "default")
conf.K8PoolConf.PodIP = os.Getenv("GUBER_K8S_POD_IP")
conf.K8PoolConf.PodPort = os.Getenv("GUBER_K8S_POD_PORT")
conf.K8PoolConf.Selector = os.Getenv("GUBER_K8S_ENDPOINTS_SELECTOR")

if anyHasPrefix("GUBER_K8S_", os.Environ()) {
logrus.Debug("K8s peer pool config found")
conf.K8PoolConf.Enabled = true
if conf.K8PoolConf.Selector == "" {
return conf, errors.New("when using k8s for peer discovery, you MUST provide a " +
"`GUBER_K8S_ENDPOINTS_SELECTOR` to select the gubernator peers from the endpoints listing")
}
}

if anyHasPrefix("GUBER_ETCD_", os.Environ()) {
logrus.Debug("ETCD peer pool config found")
if conf.K8PoolConf.Enabled {
return conf, errors.New("refusing to register gubernator peers with both etcd and k8s;" +
" remove either `GUBER_ETCD_*` or `GUBER_K8S_*` variables from the environment")
}
}

// If env contains any TLS configuration
if anyHasPrefix("GUBER_ETCD_TLS_", os.Environ()) {
if err := setupTLS(&conf.EtcdConf); err != nil {
return conf, err
}
}

if debug {
spew.Dump(conf)
}

return conf, nil
}

Expand Down
33 changes: 21 additions & 12 deletions cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,34 @@ func main() {
checkErr(grpcSrv.Serve(listener), "while starting GRPC server")
})

// Register ourselves with other peers via ETCD
etcdClient, err := etcdutil.NewClient(&conf.EtcdConf)
checkErr(err, "while connecting to etcd")

pool, err := gubernator.NewEtcdPool(gubernator.EtcdPoolConfig{
AdvertiseAddress: conf.AdvertiseAddress,
OnUpdate: guber.SetPeers,
Client: etcdClient,
BaseKey: conf.EtcdKeyPrefix,
})
checkErr(err, "while registering with ETCD pool")
var pool gubernator.PoolInterface

if conf.K8PoolConf.Enabled {
// Source our list of peers from kubernetes endpoint API
conf.K8PoolConf.OnUpdate = guber.SetPeers
pool, err = gubernator.NewK8sPool(conf.K8PoolConf)
checkErr(err, "while querying kubernetes API")
} else {
// Register ourselves with other peers via ETCD
etcdClient, err := etcdutil.NewClient(&conf.EtcdConf)
checkErr(err, "while connecting to etcd")

pool, err = gubernator.NewEtcdPool(gubernator.EtcdPoolConfig{
AdvertiseAddress: conf.EtcdAdvertiseAddress,
OnUpdate: guber.SetPeers,
Client: etcdClient,
BaseKey: conf.EtcdKeyPrefix,
})
checkErr(err, "while registering with ETCD pool")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Setup an JSON Gateway API for our GRPC methods
gateway := runtime.NewServeMux()
err = gubernator.RegisterV1HandlerFromEndpoint(ctx, gateway,
conf.AdvertiseAddress, []grpc.DialOption{grpc.WithInsecure()})
conf.EtcdAdvertiseAddress, []grpc.DialOption{grpc.WithInsecure()})
checkErr(err, "while registering GRPC gateway handler")

// Serve the JSON Gateway and metrics handlers via standard HTTP/1
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:
# The address peers will connect too
# Should be the same as grpc-listen-address unless you are running behind
# a NAT or running in a docker container without host networking
- GUBER_ADVERTISE_ADDRESS=gubernator-1:81
- GUBER_ETCD_ADVERTISE_ADDRESS=gubernator-1:81
# Max size of the cache; The cache size will never grow beyond this size.
- GUBER_CACHE_SIZE=50000
# A Comma separate list of etcd nodes
Expand All @@ -42,7 +42,7 @@ services:
# The address peers will connect too
# Should be the same as grpc-listen-address unless you are running behind
# a NAT or running in a docker container without host networking
- GUBER_ADVERTISE_ADDRESS=gubernator-2:81
- GUBER_ETCD_ADVERTISE_ADDRESS=gubernator-2:81
# Max size of the cache; The cache size will never grow beyond this size.
- GUBER_CACHE_SIZE=50000
# A Comma separate list of etcd nodes
Expand Down
6 changes: 5 additions & 1 deletion etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const (
defaultBaseKey = "/gubernator/peers/"
)

type PoolInterface interface {
Close()
}

type EtcdPool struct {
peers map[string]struct{}
wg holster.WaitGroup
Expand All @@ -46,7 +50,7 @@ func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) {
holster.SetDefault(&conf.BaseKey, defaultBaseKey)

if conf.AdvertiseAddress == "" {
return nil, errors.New("AdvertiseAddress is required")
return nil, errors.New("GUBER_ETCD_ADVERTISE_ADDRESS is required")
}

if conf.Client == nil {
Expand Down
10 changes: 5 additions & 5 deletions example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ GUBER_GRPC_ADDRESS=0.0.0.0:81
# The address HTTP requests will listen on
GUBER_HTTP_ADDRESS=0.0.0.0:80

# The address peers will connect too
# Should be the same as grpc-listen-address unless you are running behind
# a NAT or running in a docker container without host networking
GUBER_ADVERTISE_ADDRESS=localhost:81

# Max size of the cache; The cache size will never grow beyond this size.
GUBER_CACHE_SIZE=50000

Expand All @@ -23,6 +18,11 @@ GUBER_CACHE_SIZE=50000
# A Comma separate list of etcd nodes
GUBER_ETCD_ENDPOINTS=localhost:2379

# The address peers will connect too
# Should be the same as grpc-listen-address unless you are running behind
# a NAT or running in a docker container without host networking
GUBER_ETCD_ADVERTISE_ADDRESS=localhost:81

# The prefix gubernator will use to register peers under in etcd
#GUBER_ETCD_KEY_PREFIX=/gubernator-peers

Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ require (
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/net v0.0.0-20190119204137-ed066c81e75e
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8
google.golang.org/grpc v1.18.0
gopkg.in/ahmetb/go-linq.v3 v3.0.0 // indirect
gopkg.in/alexcesaro/statsd.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
)
Loading

0 comments on commit b517064

Please sign in to comment.