diff --git a/config.go b/config.go index 29b15f4d..9785aabb 100644 --- a/config.go +++ b/config.go @@ -316,6 +316,12 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig, conf.K8PoolConf.PodIP = os.Getenv("GUBER_K8S_POD_IP") conf.K8PoolConf.PodPort = os.Getenv("GUBER_K8S_POD_PORT") conf.K8PoolConf.Selector = os.Getenv("GUBER_K8S_ENDPOINTS_SELECTOR") + var assignErr error + conf.K8PoolConf.Mechanism, assignErr = WatchMechanismFromString(os.Getenv("GUBER_K8S_WATCH_MECHANISM")) + if assignErr != nil { + return conf, errors.New("invalid value for watch mechanism " + + "`GUBER_K8S_WATCH_MECHANISM` needs to be either 'endpoints' or 'watch' or empty(defaulting to 'endpoints'))") + } // PeerPicker Config if pp := os.Getenv("GUBER_PEER_PICKER"); pp != "" { diff --git a/go.sum b/go.sum index 75d1ecf9..a7ba9db2 100644 --- a/go.sum +++ b/go.sum @@ -106,6 +106,7 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/memberlist v0.2.0 h1:WeeNspppWi5s1OFefTviPQueC/Bq8dONfvNjPhiEQKE= github.com/hashicorp/memberlist v0.2.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= diff --git a/kubernetes.go b/kubernetes.go index 153628c9..72394c14 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) @@ -42,6 +41,26 @@ type K8sPool struct { done chan struct{} } +type WatchMechanism string + +const ( + WatchEndpoints WatchMechanism = "endpoints" + WatchPods WatchMechanism = "pods" +) + +func WatchMechanismFromString(mechanism string) (WatchMechanism, error) { + switch WatchMechanism(mechanism) { + case "": // keep default behavior + return WatchEndpoints, nil + case WatchEndpoints: + return WatchEndpoints, nil + case WatchPods: + return WatchPods, nil + default: + return "", fmt.Errorf("unknown watch mechanism specified: %s", mechanism) + } +} + type K8sPoolConfig struct { Logger logrus.FieldLogger OnUpdate UpdateFunc @@ -49,10 +68,12 @@ type K8sPoolConfig struct { Selector string PodIP string PodPort string + + Mechanism WatchMechanism } func NewK8sPool(conf K8sPoolConfig) (*K8sPool, error) { - config, err := rest.InClusterConfig() + config, err := RestConfig() if err != nil { return nil, errors.Wrap(err, "during InClusterConfig()") } @@ -74,19 +95,20 @@ func NewK8sPool(conf K8sPoolConfig) (*K8sPool, error) { } func (e *K8sPool) start() error { + switch e.conf.Mechanism { + case WatchEndpoints: + return e.startEndpointWatch() + case WatchPods: + return e.startPodWatch() + default: + return fmt.Errorf("unknown value for watch mechanism: %s", e.conf.Mechanism) + } +} +func (e *K8sPool) startGenericWatch(objType runtime.Object, listWatch *cache.ListWatch, updateFunc func()) error { e.informer = cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { - options.LabelSelector = e.conf.Selector - return e.client.CoreV1().Endpoints(e.conf.Namespace).List(options) - }, - WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { - options.LabelSelector = e.conf.Selector - return e.client.CoreV1().Endpoints(e.conf.Namespace).Watch(options) - }, - }, - &api_v1.Endpoints{}, + listWatch, + objType, 0, //Skip resync cache.Indexers{}, ) @@ -99,6 +121,7 @@ func (e *K8sPool) start() error { e.log.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) return } + updateFunc() }, UpdateFunc: func(obj, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) @@ -107,7 +130,7 @@ func (e *K8sPool) start() error { e.log.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) return } - e.updatePeers() + updateFunc() }, DeleteFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) @@ -116,7 +139,7 @@ func (e *K8sPool) start() error { e.log.Errorf("while calling MetaNamespaceKeyFunc(): %s", err) return } - e.updatePeers() + updateFunc() }, }) @@ -130,7 +153,64 @@ func (e *K8sPool) start() error { return nil } -func (e *K8sPool) updatePeers() { +func (e *K8sPool) startPodWatch() error { + listWatch := &cache.ListWatch{ + ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { + options.LabelSelector = e.conf.Selector + return e.client.CoreV1().Pods(e.conf.Namespace).List(options) + }, + WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { + options.LabelSelector = e.conf.Selector + return e.client.CoreV1().Pods(e.conf.Namespace).Watch(options) + }, + } + return e.startGenericWatch(&api_v1.Pod{}, listWatch, e.updatePeersFromPods) +} + +func (e *K8sPool) startEndpointWatch() error { + listWatch := &cache.ListWatch{ + ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { + options.LabelSelector = e.conf.Selector + return e.client.CoreV1().Endpoints(e.conf.Namespace).List(options) + }, + WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { + options.LabelSelector = e.conf.Selector + return e.client.CoreV1().Endpoints(e.conf.Namespace).Watch(options) + }, + } + return e.startGenericWatch(&api_v1.Endpoints{}, listWatch, e.updatePeersFromEndpoints) +} + +func (e *K8sPool) updatePeersFromPods() { + e.log.Debug("Fetching peer list from pods API") + var peers []PeerInfo +main: + for _, obj := range e.informer.GetStore().List() { + pod, ok := obj.(*api_v1.Pod) + if !ok { + e.log.Errorf("expected type v1.Endpoints got '%s' instead", reflect.TypeOf(obj).String()) + } + + peer := PeerInfo{GRPCAddress: fmt.Sprintf("%s:%s", pod.Status.PodIP, e.conf.PodPort)} + + // if containers are not ready or not running then skip this peer + for _, status := range pod.Status.ContainerStatuses { + if !status.Ready || status.State.Running == nil { + e.log.Debugf("Skipping peer because it's not ready or not running: %+v\n", peer) + continue main + } + } + + if pod.Status.PodIP == e.conf.PodIP { + peer.IsOwner = true + } + e.log.Debugf("Peer: %+v\n", peer) + peers = append(peers, peer) + } + e.conf.OnUpdate(peers) +} + +func (e *K8sPool) updatePeersFromEndpoints() { e.log.Debug("Fetching peer list from endpoints API") var peers []PeerInfo for _, obj := range e.informer.GetStore().List() { diff --git a/kubernetesconfig.go b/kubernetesconfig.go new file mode 100644 index 00000000..bcb8007d --- /dev/null +++ b/kubernetesconfig.go @@ -0,0 +1,11 @@ +// +build !local + +package gubernator + +import ( + "k8s.io/client-go/rest" +) + +func RestConfig() (*rest.Config, error) { + return rest.InClusterConfig() +} diff --git a/kubernetesconfig_local.go b/kubernetesconfig_local.go new file mode 100644 index 00000000..ab808108 --- /dev/null +++ b/kubernetesconfig_local.go @@ -0,0 +1,38 @@ +// +build local + +package gubernator + +import ( + "os" + "path/filepath" + + "github.com/pkg/errors" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func homeDir() string { + if h := os.Getenv("HOME"); h != "" { + return h + } + return os.Getenv("USERPROFILE") // windows +} + +func config() (*rest.Config, error) { + home := homeDir() + if home == "" { + return nil, errors.New("could not find kube config file. cannot load config") + + } + cfg := filepath.Join(home, ".kube", "config") + r, err := clientcmd.BuildConfigFromFlags("", cfg) + if nil != err { + return nil, err + } + + return r, nil +} + +func RestConfig() (*rest.Config, error) { + return config() +}