Skip to content
2 changes: 1 addition & 1 deletion cmd/redisoperator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (m *Main) Run() error {
}

// Create kubernetes service.
k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder)
k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder, m.flags.UseCache, m.flags.EnableObjectHashing)

// Create the redis clients
redisClient := redis.New(metricsRecorder)
Expand Down
4 changes: 4 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type CMDFlags struct {
K8sQueriesBurstable int
Concurrency int
LogLevel string
UseCache bool
EnableObjectHashing bool
}

// Init initializes and parse the flags
Expand All @@ -35,6 +37,8 @@ func (c *CMDFlags) Init() {
// reference: https://github.com/spotahome/kooper/blob/master/controller/controller.go#L89
flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events")
flag.StringVar(&c.LogLevel, "log-level", "info", "set log level")
flag.BoolVar(&c.UseCache, "use-cache", false, "use cache stores to get k8s objects")
flag.BoolVar(&c.EnableObjectHashing, "enable-hash", false, "Add hashed annotations to k8s objects, apply changes only when theres a diff.")
// Parse flags
flag.Parse()
}
Expand Down
46 changes: 41 additions & 5 deletions service/k8s/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package k8s

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
"k8s.io/client-go/tools/cache"
)

// ConfigMap the ServiceAccount service that knows how to interact with k8s to manage them
Expand All @@ -26,26 +29,51 @@ type ConfigMap interface {
type ConfigMapService struct {
kubeClient kubernetes.Interface
logger log.Logger
cacheStore *cache.Store
metricsRecorder metrics.Recorder
}

// NewConfigMapService returns a new ConfigMap KubeService.
func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ConfigMapService {
logger = logger.With("service", "k8s.configMap")
var err error
rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient)
var cmCacheStore *cache.Store
if ShouldUseCache() {
cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc)
if err != nil {
logger.Errorf("unable to initialize cache: %v", err)
}
}
return &ConfigMapService{
kubeClient: kubeClient,
logger: logger,
cacheStore: cmCacheStore,
metricsRecorder: metricsRecorder,
}
}

func (p *ConfigMapService) GetConfigMap(namespace string, name string) (*corev1.ConfigMap, error) {
configMap, err := p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder)
if err != nil {
return nil, err
var cm *corev1.ConfigMap
var err error
var exists bool
if p.cacheStore != nil {
c := *p.cacheStore
var item interface{}
item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name))
if exists && nil == err {
cm = item.(*corev1.ConfigMap)
}
if !exists {
err = fmt.Errorf("configmap %v not found in namespace %v", name, namespace)
}
} else {
cm, err = p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
return configMap, err

recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder)

return cm, err
}

func (p *ConfigMapService) CreateConfigMap(namespace string, configMap *corev1.ConfigMap) error {
Expand Down Expand Up @@ -76,6 +104,14 @@ func (p *ConfigMapService) CreateOrUpdateConfigMap(namespace string, configMap *
return err
}

if hashingEnabled() {
if !shouldUpdate(configMap, storedConfigMap) {
p.logger.Debugf("%v/%v configmap is upto date, no need to apply changes...", configMap.Namespace, configMap.Name)
return nil
}
p.logger.Debugf("%v/%v configmap has a different resource hash, updating the object...", configMap.Namespace, configMap.Name)
addHashAnnotation(configMap)
}
// Already exists, need to Update.
// Set the correct resource version to ensure we are on the latest version. This way the only valid
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
Expand Down
42 changes: 38 additions & 4 deletions service/k8s/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
Expand All @@ -30,26 +32,49 @@ type Deployment interface {
type DeploymentService struct {
kubeClient kubernetes.Interface
logger log.Logger
cacheStore *cache.Store
metricsRecorder metrics.Recorder
}

// NewDeploymentService returns a new Deployment KubeService.
func NewDeploymentService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *DeploymentService {
logger = logger.With("service", "k8s.deployment")
rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient)
var cacheStore *cache.Store
var err error
if ShouldUseCache() {
cacheStore, err = DeploymentCacheStoreFromKubeClient(rc)
if err != nil {
logger.Errorf("unable to initialize cache: %v", err)
}
}
return &DeploymentService{
kubeClient: kubeClient,
logger: logger,
cacheStore: cacheStore,
metricsRecorder: metricsRecorder,
}
}

// GetDeployment will retrieve the requested deployment based on namespace and name
func (d *DeploymentService) GetDeployment(namespace, name string) (*appsv1.Deployment, error) {
deployment, err := d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{})
recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder)
if err != nil {
return nil, err
var deployment *appsv1.Deployment
var err error
var exists bool
if d.cacheStore != nil {
c := *d.cacheStore
var item interface{}
item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name))
if exists && nil == err {
deployment = item.(*appsv1.Deployment)
}
if !exists {
err = fmt.Errorf("deployment %v not found in namespace %v", name, namespace)
}
} else {
deployment, err = d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder)
return deployment, err
}

Expand Down Expand Up @@ -101,6 +126,15 @@ func (d *DeploymentService) CreateOrUpdateDeployment(namespace string, deploymen
return err
}

if hashingEnabled() {
if !shouldUpdate(deployment, storedDeployment) {
d.logger.Debugf("%v/%v deployment is upto date, no need to apply changes...", deployment.Namespace, deployment.Name)
return nil
}
d.logger.Debugf("%v/%v deployment has a different resource hash, updating the object...", deployment.Namespace, deployment.Name)
addHashAnnotation(deployment)
}

// Already exists, need to Update.
// Set the correct resource version to ensure we are on the latest version. This way the only valid
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
Expand Down
64 changes: 64 additions & 0 deletions service/k8s/hash_annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package k8s

import (
"crypto/sha256"
"encoding/base64"
"hash"

"github.com/davecgh/go-spew/spew"
)

// taken from https://github.com/k8ssandra/cass-operator/blob/master/pkg/utils/hash_annotation.go

type Annotated interface {
GetAnnotations() map[string]string
SetAnnotations(annotations map[string]string)
GetName() string
}

const resourceHashAnnotationKey = "databases.spotahome.com/resource-hash"

// Create hash of a given object

func addHashAnnotation(r Annotated) {
hash := deepHashString(r)
m := r.GetAnnotations()
if m == nil {
m = map[string]string{}
}
m[resourceHashAnnotationKey] = hash
r.SetAnnotations(m)
}

func deepHashString(obj interface{}) string {
hasher := sha256.New()
deepHashObject(hasher, obj)
hashBytes := hasher.Sum([]byte{})
b64Hash := base64.StdEncoding.EncodeToString(hashBytes)
return b64Hash
}

// DeepHashObject writes specified object to hash using the spew library
// which follows pointers and prints actual values of the nested objects
// ensuring the hash does not change when a pointer changes.
func deepHashObject(hasher hash.Hash, objectToWrite interface{}) {
hasher.Reset()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
printer.Fprintf(hasher, "%#v", objectToWrite)
}

func shouldUpdate(desired Annotated, stored Annotated) bool {

storedHash, exists := stored.GetAnnotations()[resourceHashAnnotationKey]
if !exists {
return true
}
desiredHash := deepHashString(desired)

return desiredHash != storedHash
}
21 changes: 20 additions & 1 deletion service/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"github.com/spotahome/redis-operator/metrics"
)

var (
useCache bool
)

func ShouldUseCache() bool {
return useCache
}

// Service is the K8s service entrypoint.
type Services interface {
ConfigMap
Expand All @@ -22,6 +30,14 @@ type Services interface {
StatefulSet
}

var (
objectHashingEnabled bool
)

func hashingEnabled() bool {
return objectHashingEnabled
}

type services struct {
ConfigMap
Secret
Expand All @@ -35,7 +51,10 @@ type services struct {
}

// New returns a new Kubernetes service.
func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder) Services {

func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, cacheEnabled bool, enableHashing bool) Services {
useCache = cacheEnabled
objectHashingEnabled = enableHashing
return &services{
ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder),
Secret: NewSecretService(kubecli, logger, metricsRecorder),
Expand Down
43 changes: 36 additions & 7 deletions service/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package k8s
import (
"context"
"encoding/json"
"fmt"

"k8s.io/apimachinery/pkg/types"

"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

// Pod the ServiceAccount service that knows how to interact with k8s to manage them
Expand All @@ -30,25 +32,52 @@ type Pod interface {
type PodService struct {
kubeClient kubernetes.Interface
logger log.Logger
cacheStore *cache.Store
metricsRecorder metrics.Recorder
}

// NewPodService returns a new Pod KubeService.
func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodService {
logger = logger.With("service", "k8s.pod")
rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient)
fmt.Printf("[POD]-- rest client interface: %v\n", rc)
var podCacheStore *cache.Store
var err error
if ShouldUseCache() {
podCacheStore, err = PodCacheStoreFromKubeClient(rc)
if err != nil {
logger.Errorf("unable to initialize cache: %v", err)
}
}

return &PodService{
kubeClient: kubeClient,
logger: logger,
cacheStore: podCacheStore,
metricsRecorder: metricsRecorder,
}
}

func (p *PodService) GetPod(namespace string, name string) (*corev1.Pod, error) {
pod, err := p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder)
if err != nil {
return nil, err
var pod *corev1.Pod
var err error
var exists bool
if p.cacheStore != nil {

c := *p.cacheStore
var item interface{}
item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name))
if exists && nil == err {
pod = item.(*corev1.Pod)
}
if !exists {
err = fmt.Errorf("pod %v not found in namespace %v", name, namespace)
}
} else {
pod, err = p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})

}
recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder)
return pod, err
}

Expand Down
Loading