Skip to content

Commit dc3f40c

Browse files
fix: avoid resources lock contention (argoproj#8172) (argoproj#20329)
* fix: avoid resources lock contention Signed-off-by: Mykola Pelekh <[email protected]> * feat: allow enabling batch events processing Signed-off-by: Mykola Pelekh <[email protected]> * fix: update ParseDurationFromEnv to handle duration in ms Signed-off-by: Mykola Pelekh <[email protected]> * feat: make eventProcessingInterval option configurable (default is 0.1s) Signed-off-by: Mykola Pelekh <[email protected]> * use upstream gitops-engine Signed-off-by: Michael Crenshaw <[email protected]> --------- Signed-off-by: Mykola Pelekh <[email protected]> Signed-off-by: Michael Crenshaw <[email protected]> Co-authored-by: Michael Crenshaw <[email protected]>
1 parent c090f84 commit dc3f40c

File tree

8 files changed

+89
-29
lines changed

8 files changed

+89
-29
lines changed

controller/cache/cache.go

+20
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ const (
6969
// EnvClusterCacheRetryUseBackoff is the env variable to control whether to use a backoff strategy with the retry during cluster cache sync
7070
EnvClusterCacheRetryUseBackoff = "ARGOCD_CLUSTER_CACHE_RETRY_USE_BACKOFF"
7171

72+
// EnvClusterCacheBatchEventsProcessing is the env variable to control whether to enable batch events processing
73+
EnvClusterCacheBatchEventsProcessing = "ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING"
74+
75+
// EnvClusterCacheEventProcessingInterval is the env variable to control the interval between processing events when BatchEventsProcessing is enabled
76+
EnvClusterCacheEventProcessingInterval = "ARGOCD_CLUSTER_CACHE_EVENT_PROCESSING_INTERVAL"
77+
7278
// AnnotationIgnoreResourceUpdates when set to true on an untracked resource,
7379
// argo will apply `ignoreResourceUpdates` configuration on it.
7480
AnnotationIgnoreResourceUpdates = "argocd.argoproj.io/ignore-resource-updates"
@@ -103,6 +109,12 @@ var (
103109

104110
// clusterCacheRetryUseBackoff specifies whether to use a backoff strategy on cluster cache sync, if retry is enabled
105111
clusterCacheRetryUseBackoff bool = false
112+
113+
// clusterCacheBatchEventsProcessing specifies whether to enable batch events processing
114+
clusterCacheBatchEventsProcessing bool = false
115+
116+
// clusterCacheEventProcessingInterval specifies the interval between processing events when BatchEventsProcessing is enabled
117+
clusterCacheEventProcessingInterval = 100 * time.Millisecond
106118
)
107119

108120
func init() {
@@ -114,6 +126,8 @@ func init() {
114126
clusterCacheListSemaphoreSize = env.ParseInt64FromEnv(EnvClusterCacheListSemaphore, clusterCacheListSemaphoreSize, 0, math.MaxInt64)
115127
clusterCacheAttemptLimit = int32(env.ParseNumFromEnv(EnvClusterCacheAttemptLimit, int(clusterCacheAttemptLimit), 1, math.MaxInt32))
116128
clusterCacheRetryUseBackoff = env.ParseBoolFromEnv(EnvClusterCacheRetryUseBackoff, false)
129+
clusterCacheBatchEventsProcessing = env.ParseBoolFromEnv(EnvClusterCacheBatchEventsProcessing, false)
130+
clusterCacheEventProcessingInterval = env.ParseDurationFromEnv(EnvClusterCacheEventProcessingInterval, clusterCacheEventProcessingInterval, 0, math.MaxInt64)
117131
}
118132

119133
type LiveStateCache interface {
@@ -554,6 +568,8 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e
554568
clustercache.SetLogr(logutils.NewLogrusLogger(log.WithField("server", cluster.Server))),
555569
clustercache.SetRetryOptions(clusterCacheAttemptLimit, clusterCacheRetryUseBackoff, isRetryableError),
556570
clustercache.SetRespectRBAC(respectRBAC),
571+
clustercache.SetBatchEventsProcessing(clusterCacheBatchEventsProcessing),
572+
clustercache.SetEventProcessingInterval(clusterCacheEventProcessingInterval),
557573
}
558574

559575
clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...)
@@ -608,6 +624,10 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e
608624
c.metricsServer.IncClusterEventsCount(cluster.Server, gvk.Group, gvk.Kind)
609625
})
610626

627+
_ = clusterCache.OnProcessEventsHandler(func(duration time.Duration, processedEventsNumber int) {
628+
c.metricsServer.ObserveResourceEventsProcessingDuration(cluster.Server, duration, processedEventsNumber)
629+
})
630+
611631
c.clusters[server] = clusterCache
612632

613633
return clusterCache, nil

controller/metrics/metrics.go

+50-22
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,20 @@ import (
3030

3131
type MetricsServer struct {
3232
*http.Server
33-
syncCounter *prometheus.CounterVec
34-
kubectlExecCounter *prometheus.CounterVec
35-
kubectlExecPendingGauge *prometheus.GaugeVec
36-
orphanedResourcesGauge *prometheus.GaugeVec
37-
k8sRequestCounter *prometheus.CounterVec
38-
clusterEventsCounter *prometheus.CounterVec
39-
redisRequestCounter *prometheus.CounterVec
40-
reconcileHistogram *prometheus.HistogramVec
41-
redisRequestHistogram *prometheus.HistogramVec
42-
registry *prometheus.Registry
43-
hostname string
44-
cron *cron.Cron
33+
syncCounter *prometheus.CounterVec
34+
kubectlExecCounter *prometheus.CounterVec
35+
kubectlExecPendingGauge *prometheus.GaugeVec
36+
orphanedResourcesGauge *prometheus.GaugeVec
37+
k8sRequestCounter *prometheus.CounterVec
38+
clusterEventsCounter *prometheus.CounterVec
39+
redisRequestCounter *prometheus.CounterVec
40+
reconcileHistogram *prometheus.HistogramVec
41+
redisRequestHistogram *prometheus.HistogramVec
42+
resourceEventsProcessingHistogram *prometheus.HistogramVec
43+
resourceEventsNumberGauge *prometheus.GaugeVec
44+
registry *prometheus.Registry
45+
hostname string
46+
cron *cron.Cron
4547
}
4648

4749
const (
@@ -153,6 +155,20 @@ var (
153155
},
154156
descAppDefaultLabels,
155157
)
158+
159+
resourceEventsProcessingHistogram = prometheus.NewHistogramVec(
160+
prometheus.HistogramOpts{
161+
Name: "argocd_resource_events_processing",
162+
Help: "Time to process resource events in seconds.",
163+
Buckets: []float64{0.25, .5, 1, 2, 4, 8, 16},
164+
},
165+
[]string{"server"},
166+
)
167+
168+
resourceEventsNumberGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
169+
Name: "argocd_resource_events_processed_in_batch",
170+
Help: "Number of resource events processed in batch",
171+
}, []string{"server"})
156172
)
157173

158174
// NewMetricsServer returns a new prometheus server which collects application metrics
@@ -202,23 +218,27 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, appFil
202218
registry.MustRegister(clusterEventsCounter)
203219
registry.MustRegister(redisRequestCounter)
204220
registry.MustRegister(redisRequestHistogram)
221+
registry.MustRegister(resourceEventsProcessingHistogram)
222+
registry.MustRegister(resourceEventsNumberGauge)
205223

206224
return &MetricsServer{
207225
registry: registry,
208226
Server: &http.Server{
209227
Addr: addr,
210228
Handler: mux,
211229
},
212-
syncCounter: syncCounter,
213-
k8sRequestCounter: k8sRequestCounter,
214-
kubectlExecCounter: kubectlExecCounter,
215-
kubectlExecPendingGauge: kubectlExecPendingGauge,
216-
orphanedResourcesGauge: orphanedResourcesGauge,
217-
reconcileHistogram: reconcileHistogram,
218-
clusterEventsCounter: clusterEventsCounter,
219-
redisRequestCounter: redisRequestCounter,
220-
redisRequestHistogram: redisRequestHistogram,
221-
hostname: hostname,
230+
syncCounter: syncCounter,
231+
k8sRequestCounter: k8sRequestCounter,
232+
kubectlExecCounter: kubectlExecCounter,
233+
kubectlExecPendingGauge: kubectlExecPendingGauge,
234+
orphanedResourcesGauge: orphanedResourcesGauge,
235+
reconcileHistogram: reconcileHistogram,
236+
clusterEventsCounter: clusterEventsCounter,
237+
redisRequestCounter: redisRequestCounter,
238+
redisRequestHistogram: redisRequestHistogram,
239+
resourceEventsProcessingHistogram: resourceEventsProcessingHistogram,
240+
resourceEventsNumberGauge: resourceEventsNumberGauge,
241+
hostname: hostname,
222242
// This cron is used to expire the metrics cache.
223243
// Currently clearing the metrics cache is logging and deleting from the map
224244
// so there is no possibility of panic, but we will add a chain to keep robfig/cron v1 behavior.
@@ -284,6 +304,12 @@ func (m *MetricsServer) ObserveRedisRequestDuration(duration time.Duration) {
284304
m.redisRequestHistogram.WithLabelValues(m.hostname, common.ApplicationController).Observe(duration.Seconds())
285305
}
286306

307+
// ObserveResourceEventsProcessingDuration observes resource events processing duration
308+
func (m *MetricsServer) ObserveResourceEventsProcessingDuration(server string, duration time.Duration, processedEventsNumber int) {
309+
m.resourceEventsProcessingHistogram.WithLabelValues(server).Observe(duration.Seconds())
310+
m.resourceEventsNumberGauge.WithLabelValues(server).Set(float64(processedEventsNumber))
311+
}
312+
287313
// IncReconcile increments the reconcile counter for an application
288314
func (m *MetricsServer) IncReconcile(app *argoappv1.Application, duration time.Duration) {
289315
m.reconcileHistogram.WithLabelValues(app.Namespace, app.Spec.Destination.Server).Observe(duration.Seconds())
@@ -311,6 +337,8 @@ func (m *MetricsServer) SetExpiration(cacheExpiration time.Duration) error {
311337
m.redisRequestCounter.Reset()
312338
m.reconcileHistogram.Reset()
313339
m.redisRequestHistogram.Reset()
340+
m.resourceEventsProcessingHistogram.Reset()
341+
m.resourceEventsNumberGauge.Reset()
314342
})
315343
if err != nil {
316344
return err

docs/operator-manual/high_availability.md

+9
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,15 @@ stringData:
130130
count (grouped by k8s api version, the granule of parallelism for list operations). In this case, all resources will
131131
be buffered in memory -- no api server request will be blocked by processing.
132132

133+
* `ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING` - environment variable that enables the controller to collect events
134+
for Kubernetes resources and process them in a batch. This is useful when the cluster contains a large number of resources,
135+
and the controller is overwhelmed by the number of events. The default value is `false`, which means that the controller
136+
processes events one by one.
137+
138+
* `ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING_INTERVAL` - environment variable controlling the interval for processing events in a batch.
139+
The valid value is in the format of Go time duration string, e.g. `1ms`, `1s`, `1m`, `1h`. The default value is `100ms`.
140+
The variable is used only when `ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING` is set to `true`.
141+
133142
* `ARGOCD_APPLICATION_TREE_SHARD_SIZE` - environment variable controlling the max number of resources stored in one Redis
134143
key. Splitting application tree into multiple keys helps to reduce the amount of traffic between the controller and Redis.
135144
The default value is 0, which means that the application tree is stored in a single Redis key. The reasonable value is 100.

docs/operator-manual/metrics.md

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ Metrics about applications. Scraped at the `argocd-metrics:8082/metrics` endpoin
2424
| `argocd_kubectl_exec_total` | counter | Number of kubectl executions |
2525
| `argocd_redis_request_duration` | histogram | Redis requests duration. |
2626
| `argocd_redis_request_total` | counter | Number of redis requests executed during application reconciliation |
27+
| `argocd_resource_events_processing` | histogram | Time to process resource events in batch in seconds |
28+
| `argocd_resource_events_processed_in_batch` | gauge | Number of resource events processed in batch |
2729

2830
If you use Argo CD with many application and project creation and deletion,
2931
the metrics page will keep in cache your application and project's history.

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/TomOnTime/utfutil v0.0.0-20180511104225-09c41003ee1d
1111
github.com/alicebob/miniredis/v2 v2.33.0
1212
github.com/antonmedv/expr v1.15.1
13-
github.com/argoproj/gitops-engine v0.7.1-0.20241211202847-8849c3f30c55
13+
github.com/argoproj/gitops-engine v0.7.1-0.20241216155226-54992bf42431
1414
github.com/argoproj/notifications-engine v0.4.1-0.20241007194503-2fef5c9049fd
1515
github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1
1616
github.com/aws/aws-sdk-go v1.55.5

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ github.com/antonmedv/expr v1.15.1/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4J
8888
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
8989
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
9090
github.com/appscode/go v0.0.0-20191119085241-0887d8ec2ecc/go.mod h1:OawnOmAL4ZX3YaPdN+8HTNwBveT1jMsqP74moa9XUbE=
91-
github.com/argoproj/gitops-engine v0.7.1-0.20241211202847-8849c3f30c55 h1:GEyH0LY7gB/BKnahEl6pgNHc6EdjX308b090tn6odKc=
92-
github.com/argoproj/gitops-engine v0.7.1-0.20241211202847-8849c3f30c55/go.mod h1:WsnykM8idYRUnneeT31cM/Fq/ZsjkefCbjiD8ioCJkU=
91+
github.com/argoproj/gitops-engine v0.7.1-0.20241216155226-54992bf42431 h1:ku0Gzp1dHr7yn83B/xmMrmbB5sJbe32LXaYSDSBd6/c=
92+
github.com/argoproj/gitops-engine v0.7.1-0.20241216155226-54992bf42431/go.mod h1:WsnykM8idYRUnneeT31cM/Fq/ZsjkefCbjiD8ioCJkU=
9393
github.com/argoproj/notifications-engine v0.4.1-0.20241007194503-2fef5c9049fd h1:lOVVoK89j9Nd4+JYJiKAaMNYC1402C0jICROOfUPWn0=
9494
github.com/argoproj/notifications-engine v0.4.1-0.20241007194503-2fef5c9049fd/go.mod h1:N0A4sEws2soZjEpY4hgZpQS8mRIEw6otzwfkgc3g9uQ=
9595
github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1 h1:qsHwwOJ21K2Ao0xPju1sNuqphyMnMYkyB3ZLoLtxWpo=

util/env/env.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"strings"
88
"time"
99

10-
timeutil "github.com/argoproj/pkg/time"
11-
1210
log "github.com/sirupsen/logrus"
1311
)
1412

@@ -133,13 +131,12 @@ func ParseDurationFromEnv(env string, defaultValue, min, max time.Duration) time
133131
if str == "" {
134132
return defaultValue
135133
}
136-
durPtr, err := timeutil.ParseDuration(str)
134+
dur, err := time.ParseDuration(str)
137135
if err != nil {
138136
log.Warnf("Could not parse '%s' as a duration string from environment %s", str, env)
139137
return defaultValue
140138
}
141139

142-
dur := *durPtr
143140
if dur < min {
144141
log.Warnf("Value in %s is %s, which is less than minimum %s allowed", env, dur, min)
145142
return defaultValue

util/env/env_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ func TestParseDurationFromEnv(t *testing.T) {
115115
name: "ValidValueSet",
116116
env: "2s",
117117
expected: time.Second * 2,
118+
}, {
119+
name: "ValidValueSetMs",
120+
env: "2500ms",
121+
expected: time.Millisecond * 2500,
118122
}, {
119123
name: "MoreThanMaxSet",
120124
env: "6s",

0 commit comments

Comments
 (0)