Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
added extended k8s options:
Browse files Browse the repository at this point in the history
-can specify whether we watch for pods or endpoints while keeping the default behavior
-can run gubernator with -tags local to make use of local k8s config
-update peer list in add function again to get initial list of peers
  • Loading branch information
mier85 committed Dec 21, 2020
1 parent bae2e66 commit bd3dee8
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 16 deletions.
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig,
conf.K8PoolConf.PodIP = os.Getenv("GUBER_K8S_POD_IP")
conf.K8PoolConf.PodPort = os.Getenv("GUBER_K8S_POD_PORT")
conf.K8PoolConf.Selector = os.Getenv("GUBER_K8S_ENDPOINTS_SELECTOR")
var assignErr error
conf.K8PoolConf.Mechanism, assignErr = WatchMechanismFromString(os.Getenv("GUBER_K8S_WATCH_MECHANISM"))
if assignErr != nil {
return conf, errors.New("invalid value for watch mechanism " +
"`GUBER_K8S_WATCH_MECHANISM` needs to be either 'endpoints' or 'watch' or empty(defaulting to 'endpoints'))")
}

// PeerPicker Config
if pp := os.Getenv("GUBER_PEER_PICKER"); pp != "" {
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/memberlist v0.2.0 h1:WeeNspppWi5s1OFefTviPQueC/Bq8dONfvNjPhiEQKE=
github.com/hashicorp/memberlist v0.2.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down
112 changes: 96 additions & 16 deletions kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

Expand All @@ -42,17 +41,39 @@ type K8sPool struct {
done chan struct{}
}

type WatchMechanism string

const (
WatchEndpoints WatchMechanism = "endpoints"
WatchPods WatchMechanism = "pods"
)

func WatchMechanismFromString(mechanism string) (WatchMechanism, error) {
switch WatchMechanism(mechanism) {
case "": // keep default behavior
return WatchEndpoints, nil
case WatchEndpoints:
return WatchEndpoints, nil
case WatchPods:
return WatchPods, nil
default:
return "", fmt.Errorf("unknown watch mechanism specified: %s", mechanism)
}
}

type K8sPoolConfig struct {
Logger logrus.FieldLogger
OnUpdate UpdateFunc
Namespace string
Selector string
PodIP string
PodPort string

Mechanism WatchMechanism
}

func NewK8sPool(conf K8sPoolConfig) (*K8sPool, error) {
config, err := rest.InClusterConfig()
config, err := RestConfig()
if err != nil {
return nil, errors.Wrap(err, "during InClusterConfig()")
}
Expand All @@ -74,19 +95,20 @@ func NewK8sPool(conf K8sPoolConfig) (*K8sPool, error) {
}

func (e *K8sPool) start() error {
switch e.conf.Mechanism {
case WatchEndpoints:
return e.startEndpointWatch()
case WatchPods:
return e.startPodWatch()
default:
return fmt.Errorf("unknown value for watch mechanism: %s", e.conf.Mechanism)
}
}

func (e *K8sPool) startGenericWatch(objType runtime.Object, listWatch *cache.ListWatch, updateFunc func()) error {
e.informer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
options.LabelSelector = e.conf.Selector
return e.client.CoreV1().Endpoints(e.conf.Namespace).List(options)
},
WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = e.conf.Selector
return e.client.CoreV1().Endpoints(e.conf.Namespace).Watch(options)
},
},
&api_v1.Endpoints{},
listWatch,
objType,
0, //Skip resync
cache.Indexers{},
)
Expand All @@ -99,6 +121,7 @@ func (e *K8sPool) start() error {
e.log.Errorf("while calling MetaNamespaceKeyFunc(): %s", err)
return
}
updateFunc()
},
UpdateFunc: func(obj, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
Expand All @@ -107,7 +130,7 @@ func (e *K8sPool) start() error {
e.log.Errorf("while calling MetaNamespaceKeyFunc(): %s", err)
return
}
e.updatePeers()
updateFunc()
},
DeleteFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
Expand All @@ -116,7 +139,7 @@ func (e *K8sPool) start() error {
e.log.Errorf("while calling MetaNamespaceKeyFunc(): %s", err)
return
}
e.updatePeers()
updateFunc()
},
})

Expand All @@ -130,7 +153,64 @@ func (e *K8sPool) start() error {
return nil
}

func (e *K8sPool) updatePeers() {
func (e *K8sPool) startPodWatch() error {
listWatch := &cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
options.LabelSelector = e.conf.Selector
return e.client.CoreV1().Pods(e.conf.Namespace).List(options)
},
WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = e.conf.Selector
return e.client.CoreV1().Pods(e.conf.Namespace).Watch(options)
},
}
return e.startGenericWatch(&api_v1.Pod{}, listWatch, e.updatePeersFromPods)
}

func (e *K8sPool) startEndpointWatch() error {
listWatch := &cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
options.LabelSelector = e.conf.Selector
return e.client.CoreV1().Endpoints(e.conf.Namespace).List(options)
},
WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = e.conf.Selector
return e.client.CoreV1().Endpoints(e.conf.Namespace).Watch(options)
},
}
return e.startGenericWatch(&api_v1.Endpoints{}, listWatch, e.updatePeersFromEndpoints)
}

func (e *K8sPool) updatePeersFromPods() {
e.log.Debug("Fetching peer list from pods API")
var peers []PeerInfo
main:
for _, obj := range e.informer.GetStore().List() {
pod, ok := obj.(*api_v1.Pod)
if !ok {
e.log.Errorf("expected type v1.Endpoints got '%s' instead", reflect.TypeOf(obj).String())
}

peer := PeerInfo{GRPCAddress: fmt.Sprintf("%s:%s", pod.Status.PodIP, e.conf.PodPort)}

// if containers are not ready or not running then skip this peer
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready || status.State.Running == nil {
e.log.Debugf("Skipping peer because it's not ready or not running: %+v\n", peer)
continue main
}
}

if pod.Status.PodIP == e.conf.PodIP {
peer.IsOwner = true
}
e.log.Debugf("Peer: %+v\n", peer)
peers = append(peers, peer)
}
e.conf.OnUpdate(peers)
}

func (e *K8sPool) updatePeersFromEndpoints() {
e.log.Debug("Fetching peer list from endpoints API")
var peers []PeerInfo
for _, obj := range e.informer.GetStore().List() {
Expand Down
11 changes: 11 additions & 0 deletions kubernetesconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// +build !local

package gubernator

import (
"k8s.io/client-go/rest"
)

func RestConfig() (*rest.Config, error) {
return rest.InClusterConfig()
}
38 changes: 38 additions & 0 deletions kubernetesconfig_local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// +build local

package gubernator

import (
"os"
"path/filepath"

"github.com/pkg/errors"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}

func config() (*rest.Config, error) {
home := homeDir()
if home == "" {
return nil, errors.New("could not find kube config file. cannot load config")

}
cfg := filepath.Join(home, ".kube", "config")
r, err := clientcmd.BuildConfigFromFlags("", cfg)
if nil != err {
return nil, err
}

return r, nil
}

func RestConfig() (*rest.Config, error) {
return config()
}

0 comments on commit bd3dee8

Please sign in to comment.