From 616c1545ad16256639af017ac4e908b1f3b182d2 Mon Sep 17 00:00:00 2001 From: "zhouyuqing.kernel" Date: Tue, 12 Mar 2024 11:29:53 +0800 Subject: [PATCH] enable watch events update handler --- main.go | 3 ++- pkg/exporter/config.go | 2 ++ pkg/kube/watcher.go | 13 ++++++++++--- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index b8b2ccb5..7e942988 100644 --- a/main.go +++ b/main.go @@ -91,7 +91,7 @@ func main() { } } - w := kube.NewEventWatcher(kubecfg, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent, cfg.OmitLookup, cfg.CacheSize) + w := kube.NewEventWatcher(kubecfg, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent, cfg.OmitLookup, cfg.WatchUpdate, cfg.CacheSize) ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() @@ -151,3 +151,4 @@ func main() { w.Stop() engine.Stop() } + diff --git a/pkg/exporter/config.go b/pkg/exporter/config.go index 1d33df4a..e95f5e4e 100644 --- a/pkg/exporter/config.go +++ b/pkg/exporter/config.go @@ -34,6 +34,7 @@ type Config struct { KubeBurst int `yaml:"kubeBurst,omitempty"` MetricsNamePrefix string `yaml:"metricsNamePrefix,omitempty"` OmitLookup bool `yaml:"omitLookup,omitempty"` + WatchUpdate bool `yaml:"watchUpdate,omitempty"` CacheSize int `yaml:"cacheSize,omitempty"` } @@ -112,3 +113,4 @@ func (c *Config) validateMetricsNamePrefix() error { } return nil } + diff --git a/pkg/kube/watcher.go b/pkg/kube/watcher.go index c72c4838..8675f56b 100644 --- a/pkg/kube/watcher.go +++ b/pkg/kube/watcher.go @@ -25,6 +25,7 @@ type EventWatcher struct { stopper chan struct{} objectMetadataCache ObjectMetadataProvider omitLookup bool + watchUpdate bool fn EventHandler maxEventAgeSeconds time.Duration metricsStore *metrics.Store @@ -32,7 +33,7 @@ type EventWatcher struct { clientset *kubernetes.Clientset } -func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds int64, metricsStore *metrics.Store, fn EventHandler, omitLookup bool, cacheSize int) *EventWatcher { +func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds int64, metricsStore *metrics.Store, fn EventHandler, omitLookup, watchUpdate bool, cacheSize int) *EventWatcher { clientset := kubernetes.NewForConfigOrDie(config) factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace)) informer := factory.Core().V1().Events().Informer() @@ -42,6 +43,7 @@ func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds i stopper: make(chan struct{}), objectMetadataCache: NewObjectMetadataProvider(cacheSize), omitLookup: omitLookup, + watchUpdate: watchUpdate, fn: fn, maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds), metricsStore: metricsStore, @@ -62,8 +64,12 @@ func (e *EventWatcher) OnAdd(obj interface{}) { e.onEvent(event) } -func (e *EventWatcher) OnUpdate(oldObj, newObj interface{}) { - // Ignore updates +func (e *EventWatcher) OnUpdate(_, newObj interface{}) { + // new event emit to watcher when e.enableUpdate is true + if e.watchUpdate { + event := newObj.(*corev1.Event) + e.onEvent(event) + } } // Ignore events older than the maxEventAgeSeconds @@ -152,3 +158,4 @@ func (e *EventWatcher) Stop() { func (e *EventWatcher) setStartUpTime(time time.Time) { startUpTime = time } +