From 1a5b5fa2ea1c3637c8ab198377e4b70154b17dd7 Mon Sep 17 00:00:00 2001 From: rodrigozhou Date: Tue, 21 Oct 2025 13:37:15 -0700 Subject: [PATCH] Add metric to track inflight visibility tasks --- common/metrics/metric_defs.go | 5 ++++- service/history/visibility_queue_task_executor.go | 13 ++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 354c79d653..edeab87ed5 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -846,7 +846,10 @@ var ( QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count") QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count") QueueActionCounter = NewCounterDef("queue_actions") - ActivityE2ELatency = NewTimerDef( + + VisibilityTaskExecutorInflightTasks = NewGaugeDef("visibility_task_executor_inflight_tasks") + + ActivityE2ELatency = NewTimerDef( "activity_end_to_end_latency", WithDescription("DEPRECATED: Will be removed in one of the next releases. Duration of an activity attempt. Use activity_start_to_close_latency instead."), ) diff --git a/service/history/visibility_queue_task_executor.go b/service/history/visibility_queue_task_executor.go index f33f516858..5eb99e31cc 100644 --- a/service/history/visibility_queue_task_executor.go +++ b/service/history/visibility_queue_task_executor.go @@ -3,6 +3,7 @@ package history import ( "context" "strconv" + "sync/atomic" "time" commonpb "go.temporal.io/api/common/v1" @@ -32,9 +33,11 @@ type ( shardContext historyi.ShardContext cache wcache.Cache logger log.Logger - metricProvider metrics.Handler + metricsHandler metrics.Handler visibilityMgr manager.VisibilityManager + inflightTasks atomic.Int32 + ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -48,7 +51,7 @@ func newVisibilityQueueTaskExecutor( workflowCache wcache.Cache, visibilityMgr manager.VisibilityManager, logger log.Logger, - metricProvider metrics.Handler, + metricsHandler metrics.Handler, ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn, enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter, relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter, @@ -57,7 +60,7 @@ func newVisibilityQueueTaskExecutor( shardContext: shardContext, cache: workflowCache, logger: logger, - metricProvider: metricProvider, + metricsHandler: metricsHandler, visibilityMgr: visibilityMgr, ensureCloseBeforeDelete: ensureCloseBeforeDelete, @@ -70,6 +73,10 @@ func (t *visibilityQueueTaskExecutor) Execute( ctx context.Context, executable queues.Executable, ) queues.ExecuteResponse { + inflightTasks := t.inflightTasks.Add(1) + defer t.inflightTasks.Add(-1) + metrics.VisibilityTaskExecutorInflightTasks.With(t.metricsHandler).Record(float64(inflightTasks)) + task := executable.GetTask() taskType := queues.GetVisibilityTaskTypeTagValue(task) namespaceTag, replicationState := getNamespaceTagAndReplicationStateByID(