diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 45c724cee3..4acf8d4968 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -19,11 +19,17 @@ import ( "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/payload" + "go.temporal.io/server/common/tqid" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) +// WorkflowTypeTag Required workflow tag for standalone activities to ensure consistent metric labeling between workflows and activities +const WorkflowTypeTag = "__temporal_standalone_activity__" + type ActivityStore interface { // PopulateRecordStartedResponse populates the response for RecordActivityTaskStarted PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error @@ -60,6 +66,46 @@ type WithToken[R any] struct { Request R } +// MetricsHandlerBuilderParams contains parameters for building/enriching a metrics handler for activity operations +type MetricsHandlerBuilderParams struct { + Handler metrics.Handler + NamespaceName string + BreakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool] +} + +// RespondCompletedEvent wraps the RespondActivityTaskCompletedRequest with context-specific data. +type RespondCompletedEvent struct { + Request *historyservice.RespondActivityTaskCompletedRequest + Token *tokenspb.Task + MetricsHandlerBuilderParams MetricsHandlerBuilderParams +} + +// RespondFailedEvent wraps the RespondActivityTaskFailedRequest with context-specific data. +type RespondFailedEvent struct { + Request *historyservice.RespondActivityTaskFailedRequest + Token *tokenspb.Task + MetricsHandlerBuilderParams MetricsHandlerBuilderParams +} + +// RespondCancelledEvent wraps the RespondActivityTaskCanceledRequest with context-specific data. +type RespondCancelledEvent struct { + Request *historyservice.RespondActivityTaskCanceledRequest + Token *tokenspb.Task + MetricsHandlerBuilderParams MetricsHandlerBuilderParams +} + +// requestCancelEvent wraps the RequestCancelActivityExecutionRequest with context-specific data. +type requestCancelEvent struct { + request *activitypb.RequestCancelActivityExecutionRequest + MetricsHandlerBuilderParams MetricsHandlerBuilderParams +} + +// terminateEvent wraps the TerminateActivityExecutionRequest with context-specific data. +type terminateEvent struct { + request *activitypb.TerminateActivityExecutionRequest + MetricsHandlerBuilderParams MetricsHandlerBuilderParams +} + func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState { switch a.Status { case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED: @@ -156,7 +202,7 @@ func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.Ex } requestData := a.RequestData.Get(ctx) attempt := a.LastAttempt.Get(ctx) - response.StartedTime = attempt.StartedTime + response.StartedTime = attempt.GetStartedTime() response.Attempt = attempt.GetCount() response.Priority = a.GetPriority() response.RetryPolicy = a.GetRetryPolicy() @@ -187,14 +233,24 @@ func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx ch // HandleCompleted updates the activity on activity completion. func (a *Activity) HandleCompleted( ctx chasm.MutableContext, - input WithToken[*historyservice.RespondActivityTaskCompletedRequest], + event RespondCompletedEvent, ) (*historyservice.RespondActivityTaskCompletedResponse, error) { // TODO(dan): add test coverage for this validation - if err := a.validateActivityTaskToken(ctx, input.Token); err != nil { + if err := a.validateActivityTaskToken(ctx, event.Token); err != nil { return nil, err } - if err := TransitionCompleted.Apply(a, ctx, input.Request); err != nil { + metricsHandler := enrichMetricsHandler( + a, + event.MetricsHandlerBuilderParams.Handler, + event.MetricsHandlerBuilderParams.NamespaceName, + metrics.HistoryRespondActivityTaskCompletedScope, + event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue) + + if err := TransitionCompleted.Apply(a, ctx, completeEvent{ + req: event.Request, + metricsHandler: metricsHandler, + }); err != nil { return nil, err } @@ -205,14 +261,21 @@ func (a *Activity) HandleCompleted( // for retry instead. func (a *Activity) HandleFailed( ctx chasm.MutableContext, - input WithToken[*historyservice.RespondActivityTaskFailedRequest], + event RespondFailedEvent, ) (*historyservice.RespondActivityTaskFailedResponse, error) { // TODO(dan): add test coverage for this validation - if err := a.validateActivityTaskToken(ctx, input.Token); err != nil { + if err := a.validateActivityTaskToken(ctx, event.Token); err != nil { return nil, err } - failure := input.Request.GetFailedRequest().GetFailure() + metricsHandler := enrichMetricsHandler( + a, + event.MetricsHandlerBuilderParams.Handler, + event.MetricsHandlerBuilderParams.NamespaceName, + metrics.HistoryRespondActivityTaskFailedScope, + event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue) + + failure := event.Request.GetFailedRequest().GetFailure() appFailure := failure.GetApplicationFailureInfo() isRetryable := appFailure != nil && @@ -225,11 +288,16 @@ func (a *Activity) HandleFailed( return nil, err } if rescheduled { + a.emitOnAttemptFailedMetrics(ctx, metricsHandler) + return &historyservice.RespondActivityTaskFailedResponse{}, nil } } - if err := TransitionFailed.Apply(a, ctx, input.Request); err != nil { + if err := TransitionFailed.Apply(a, ctx, failedEvent{ + req: event.Request, + metricsHandler: metricsHandler, + }); err != nil { return nil, err } @@ -239,21 +307,32 @@ func (a *Activity) HandleFailed( // HandleCanceled updates the activity on activity canceled. func (a *Activity) HandleCanceled( ctx chasm.MutableContext, - input WithToken[*historyservice.RespondActivityTaskCanceledRequest], + event RespondCancelledEvent, ) (*historyservice.RespondActivityTaskCanceledResponse, error) { // TODO(dan): add test coverage for this validation - if err := a.validateActivityTaskToken(ctx, input.Token); err != nil { + if err := a.validateActivityTaskToken(ctx, event.Token); err != nil { return nil, err } - if err := TransitionCanceled.Apply(a, ctx, input.Request.GetCancelRequest().GetDetails()); err != nil { + metricsHandler := enrichMetricsHandler( + a, + event.MetricsHandlerBuilderParams.Handler, + event.MetricsHandlerBuilderParams.NamespaceName, + metrics.HistoryRespondActivityTaskCanceledScope, + event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue) + + if err := TransitionCanceled.Apply(a, ctx, cancelEvent{ + details: event.Request.GetCancelRequest().GetDetails(), + handler: metricsHandler, + fromStatus: a.GetStatus(), + }); err != nil { return nil, err } return &historyservice.RespondActivityTaskCanceledResponse{}, nil } -func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.TerminateActivityExecutionRequest) ( +func (a *Activity) handleTerminated(ctx chasm.MutableContext, req terminateEvent) ( *activitypb.TerminateActivityExecutionResponse, error, ) { if err := TransitionTerminated.Apply(a, ctx, req); err != nil { @@ -274,10 +353,11 @@ func (a *Activity) getOrCreateLastHeartbeat(ctx chasm.MutableContext) *activityp return heartbeat } -func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *activitypb.RequestCancelActivityExecutionRequest) ( +func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, event requestCancelEvent) ( *activitypb.RequestCancelActivityExecutionResponse, error, ) { - newReqID := req.GetFrontendRequest().GetRequestId() + req := event.request.GetFrontendRequest() + newReqID := req.GetRequestId() existingReqID := a.GetCancelState().GetRequestId() // If already in cancel requested state, fail if request ID is different, else no-op @@ -293,18 +373,29 @@ func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *ac // If in scheduled state, cancel immediately right after marking cancel requested isCancelImmediately := a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED - if err := TransitionCancelRequested.Apply(a, ctx, req.GetFrontendRequest()); err != nil { + if err := TransitionCancelRequested.Apply(a, ctx, req); err != nil { return nil, err } if isCancelImmediately { details := &commonpb.Payloads{ Payloads: []*commonpb.Payload{ - payload.EncodeString(req.GetFrontendRequest().GetReason()), + payload.EncodeString(req.GetReason()), }, } - err := TransitionCanceled.Apply(a, ctx, details) + metricsHandler := enrichMetricsHandler( + a, + event.MetricsHandlerBuilderParams.Handler, + event.MetricsHandlerBuilderParams.NamespaceName, + metrics.HistoryRespondActivityTaskCanceledScope, + event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue) + + err := TransitionCanceled.Apply(a, ctx, cancelEvent{ + details: details, + handler: metricsHandler, + fromStatus: activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, // if we're here the original status was scheduled + }) if err != nil { return nil, err } @@ -628,3 +719,110 @@ func (a *Activity) validateActivityTaskToken( } return nil } + +func enrichMetricsHandler( + a *Activity, + handler metrics.Handler, + namespaceName string, + operationTag string, + breakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool], +) metrics.Handler { + taskQueueFamily := a.GetTaskQueue().GetName() + return metrics.GetPerTaskQueueFamilyScope( + handler, + namespaceName, + tqid.UnsafeTaskQueueFamily(namespaceName, taskQueueFamily), + breakdownMetricsByTaskQueue(namespaceName, taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(operationTag), + metrics.ActivityTypeTag(a.GetActivityType().GetName()), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + metrics.WorkflowTypeTag(WorkflowTypeTag), + ) +} + +func (a *Activity) emitOnAttemptTimedOutMetrics(ctx chasm.Context, handler metrics.Handler, timeoutType enumspb.TimeoutType) { + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + latency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(latency) + + timeoutTag := metrics.StringTag("timeout_type", timeoutType.String()) + metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag) +} + +func (a *Activity) emitOnAttemptFailedMetrics(ctx chasm.Context, handler metrics.Handler) { + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + latency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(latency) + + metrics.ActivityTaskFail.With(handler).Record(1) +} + +func (a *Activity) emitOnCompletedMetrics(ctx chasm.Context, handler metrics.Handler) { + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + startToCloseLatency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency) + + scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime()) + metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) + + metrics.ActivitySuccess.With(handler).Record(1) +} + +func (a *Activity) emitOnFailedMetrics(ctx chasm.Context, handler metrics.Handler) { + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + startToCloseLatency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency) + + scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime()) + metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) + + metrics.ActivityTaskFail.With(handler).Record(1) + metrics.ActivityFail.With(handler).Record(1) +} + +func (a *Activity) emitOnCanceledMetrics( + ctx chasm.Context, + handler metrics.Handler, + fromStatus activitypb.ActivityExecutionStatus, +) { + // Only record start-to-close latency if a current attempt was running. If it in scheduled status, it means the current attempt never started. + if fromStatus != activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED { + startedTime := a.LastAttempt.Get(ctx).GetStartedTime().AsTime() + startToCloseLatency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency) + } + + scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime()) + metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) + + metrics.ActivityCancel.With(handler).Record(1) +} + +func (a *Activity) emitOnTimedOutMetrics( + ctx chasm.Context, + handler metrics.Handler, + timeoutType enumspb.TimeoutType, + fromStatus activitypb.ActivityExecutionStatus, +) { + // Only record start-to-close latency if a current attempt was running. If it in scheduled status, it means the current attempt never started. + if fromStatus != activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED { + startedTime := a.LastAttempt.Get(ctx).GetStartedTime().AsTime() + startToCloseLatency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency) + } + + scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime()) + metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) + + timeoutTag := metrics.StringTag("timeout_type", timeoutType.String()) + metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag) + metrics.ActivityTimeout.With(handler).Record(1, timeoutTag) +} diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index be2cd75f0f..19a1fe63ac 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -6,6 +6,8 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/util" "go.uber.org/fx" @@ -59,10 +61,22 @@ func (e *activityDispatchTaskExecutor) Execute( return err } -type scheduleToStartTimeoutTaskExecutor struct{} +type timeoutTaskExecutorOptions struct { + fx.In + + Config *Config + MetricsHandler metrics.Handler + NamespaceRegistry namespace.Registry +} -func newScheduleToStartTimeoutTaskExecutor() *scheduleToStartTimeoutTaskExecutor { - return &scheduleToStartTimeoutTaskExecutor{} +type scheduleToStartTimeoutTaskExecutor struct { + opts timeoutTaskExecutorOptions +} + +func newScheduleToStartTimeoutTaskExecutor(opts timeoutTaskExecutorOptions) *scheduleToStartTimeoutTaskExecutor { + return &scheduleToStartTimeoutTaskExecutor{ + opts, + } } func (e *scheduleToStartTimeoutTaskExecutor) Validate( @@ -81,13 +95,36 @@ func (e *scheduleToStartTimeoutTaskExecutor) Execute( _ chasm.TaskAttributes, _ *activitypb.ScheduleToStartTimeoutTask, ) error { - return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START) + nsID := namespace.ID(ctx.ExecutionKey().NamespaceID) + namespaceName, err := e.opts.NamespaceRegistry.GetNamespaceName(nsID) + if err != nil { + return err + } + + metricsHandler := enrichMetricsHandler( + activity, + e.opts.MetricsHandler, + namespaceName.String(), + metrics.TimerActiveTaskActivityTimeoutScope, + e.opts.Config.BreakdownMetricsByTaskQueue) + + event := timeoutEvent{ + timeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, + metricsHandler: metricsHandler, + fromStatus: activity.GetStatus(), + } + + return TransitionTimedOut.Apply(activity, ctx, event) } -type scheduleToCloseTimeoutTaskExecutor struct{} +type scheduleToCloseTimeoutTaskExecutor struct { + opts timeoutTaskExecutorOptions +} -func newScheduleToCloseTimeoutTaskExecutor() *scheduleToCloseTimeoutTaskExecutor { - return &scheduleToCloseTimeoutTaskExecutor{} +func newScheduleToCloseTimeoutTaskExecutor(opts timeoutTaskExecutorOptions) *scheduleToCloseTimeoutTaskExecutor { + return &scheduleToCloseTimeoutTaskExecutor{ + opts, + } } func (e *scheduleToCloseTimeoutTaskExecutor) Validate( @@ -105,13 +142,36 @@ func (e *scheduleToCloseTimeoutTaskExecutor) Execute( _ chasm.TaskAttributes, _ *activitypb.ScheduleToCloseTimeoutTask, ) error { - return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) + nsID := namespace.ID(ctx.ExecutionKey().NamespaceID) + namespaceName, err := e.opts.NamespaceRegistry.GetNamespaceName(nsID) + if err != nil { + return err + } + + metricsHandler := enrichMetricsHandler( + activity, + e.opts.MetricsHandler, + namespaceName.String(), + metrics.TimerActiveTaskActivityTimeoutScope, + e.opts.Config.BreakdownMetricsByTaskQueue) + + event := timeoutEvent{ + timeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, + metricsHandler: metricsHandler, + fromStatus: activity.GetStatus(), + } + + return TransitionTimedOut.Apply(activity, ctx, event) } -type startToCloseTimeoutTaskExecutor struct{} +type startToCloseTimeoutTaskExecutor struct { + opts timeoutTaskExecutorOptions +} -func newStartToCloseTimeoutTaskExecutor() *startToCloseTimeoutTaskExecutor { - return &startToCloseTimeoutTaskExecutor{} +func newStartToCloseTimeoutTaskExecutor(opts timeoutTaskExecutorOptions) *startToCloseTimeoutTaskExecutor { + return &startToCloseTimeoutTaskExecutor{ + opts, + } } func (e *startToCloseTimeoutTaskExecutor) Validate( @@ -137,17 +197,42 @@ func (e *startToCloseTimeoutTaskExecutor) Execute( if err != nil { return err } + + nsID := namespace.ID(ctx.ExecutionKey().NamespaceID) + namespaceName, err := e.opts.NamespaceRegistry.GetNamespaceName(nsID) + if err != nil { + return err + } + + metricsHandler := enrichMetricsHandler( + activity, + e.opts.MetricsHandler, + namespaceName.String(), + metrics.TimerActiveTaskActivityTimeoutScope, + e.opts.Config.BreakdownMetricsByTaskQueue) + if rescheduled { + activity.emitOnAttemptTimedOutMetrics(ctx, metricsHandler, enumspb.TIMEOUT_TYPE_START_TO_CLOSE) + return nil } - return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_START_TO_CLOSE) + + return TransitionTimedOut.Apply(activity, ctx, timeoutEvent{ + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + metricsHandler: metricsHandler, + fromStatus: activity.GetStatus(), + }) } // HeartbeatTimeoutTask is a pure task that enforces heartbeat timeouts. -type heartbeatTimeoutTaskExecutor struct{} +type heartbeatTimeoutTaskExecutor struct { + opts timeoutTaskExecutorOptions +} -func newHeartbeatTimeoutTaskExecutor() *heartbeatTimeoutTaskExecutor { - return &heartbeatTimeoutTaskExecutor{} +func newHeartbeatTimeoutTaskExecutor(opts timeoutTaskExecutorOptions) *heartbeatTimeoutTaskExecutor { + return &heartbeatTimeoutTaskExecutor{ + opts, + } } // Validate validates a HeartbeatTimeoutTask. @@ -203,8 +288,28 @@ func (e *heartbeatTimeoutTaskExecutor) Execute( if err != nil { return err } + + nsID := namespace.ID(ctx.ExecutionKey().NamespaceID) + namespaceName, err := e.opts.NamespaceRegistry.GetNamespaceName(nsID) + if err != nil { + return err + } + + metricsHandler := enrichMetricsHandler( + activity, + e.opts.MetricsHandler, + namespaceName.String(), + metrics.TimerActiveTaskActivityTimeoutScope, + e.opts.Config.BreakdownMetricsByTaskQueue) + if rescheduled { + activity.emitOnAttemptTimedOutMetrics(ctx, metricsHandler, enumspb.TIMEOUT_TYPE_HEARTBEAT) return nil } - return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_HEARTBEAT) + + return TransitionTimedOut.Apply(activity, ctx, timeoutEvent{ + timeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, + metricsHandler: metricsHandler, + fromStatus: activity.GetStatus(), + }) } diff --git a/chasm/lib/activity/config.go b/chasm/lib/activity/config.go index cc31eec498..d7784bd449 100644 --- a/chasm/lib/activity/config.go +++ b/chasm/lib/activity/config.go @@ -23,13 +23,15 @@ var ( ) type Config struct { - LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter - LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter + BreakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool] + LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter } func ConfigProvider(dc *dynamicconfig.Collection) *Config { return &Config{ - LongPollTimeout: LongPollTimeout.Get(dc), - LongPollBuffer: LongPollBuffer.Get(dc), + LongPollTimeout: LongPollTimeout.Get(dc), + LongPollBuffer: LongPollBuffer.Get(dc), + BreakdownMetricsByTaskQueue: dynamicconfig.MetricsBreakdownByTaskQueue.Get(dc), } } diff --git a/chasm/lib/activity/handler.go b/chasm/lib/activity/handler.go index 73c0c999e1..2ac592d348 100644 --- a/chasm/lib/activity/handler.go +++ b/chasm/lib/activity/handler.go @@ -11,6 +11,8 @@ import ( "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" "go.temporal.io/server/common/contextutil" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" ) var ( @@ -28,12 +30,16 @@ var ( type handler struct { activitypb.UnimplementedActivityServiceServer - config *Config + config *Config + metricsHandler metrics.Handler + namespaceRegistry namespace.Registry } -func newHandler(config *Config) *handler { +func newHandler(config *Config, metricsHandler metrics.Handler, namespaceRegistry namespace.Registry) *handler { return &handler{ - config: config, + config: config, + metricsHandler: metricsHandler, + namespaceRegistry: namespaceRegistry, } } @@ -76,7 +82,6 @@ func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.St chasm.WithRequestID(req.GetFrontendRequest().GetRequestId()), chasm.WithBusinessIDPolicy(reusePolicy, conflictPolicy), ) - if err != nil { return nil, err } @@ -115,11 +120,11 @@ func (h *handler) DescribeActivityExecution( // deadline that causes us to send that response before the caller's own deadline (see // chasm.activity.longPollBuffer). We also cap the caller's deadline at // chasm.activity.longPollTimeout. - namespace := req.GetFrontendRequest().GetNamespace() + ns := req.GetFrontendRequest().GetNamespace() ctx, cancel := contextutil.WithDeadlineBuffer( ctx, - h.config.LongPollTimeout(namespace), - h.config.LongPollBuffer(namespace), + h.config.LongPollTimeout(ns), + h.config.LongPollBuffer(ns), ) defer cancel() @@ -184,11 +189,11 @@ func (h *handler) GetActivityExecutionOutcome( // deadline that causes us to send that response before the caller's own deadline (see // chasm.activity.longPollBuffer). We also cap the caller's deadline at // chasm.activity.longPollTimeout. - namespace := req.GetFrontendRequest().GetNamespace() + ns := req.GetFrontendRequest().GetNamespace() ctx, cancel := contextutil.WithDeadlineBuffer( ctx, - h.config.LongPollTimeout(namespace), - h.config.LongPollBuffer(namespace), + h.config.LongPollTimeout(ns), + h.config.LongPollBuffer(ns), ) defer cancel() @@ -226,11 +231,23 @@ func (h *handler) TerminateActivityExecution( RunID: frontendReq.GetRunId(), }) + namespaceName, err := h.namespaceRegistry.GetNamespaceName(namespace.ID(req.GetNamespaceId())) + if err != nil { + return nil, err + } + response, _, err = chasm.UpdateComponent( ctx, ref, (*Activity).handleTerminated, - req, + terminateEvent{ + request: req, + MetricsHandlerBuilderParams: MetricsHandlerBuilderParams{ + Handler: h.metricsHandler, + NamespaceName: namespaceName.String(), + BreakdownMetricsByTaskQueue: h.config.BreakdownMetricsByTaskQueue, + }, + }, ) if err != nil { @@ -253,11 +270,23 @@ func (h *handler) RequestCancelActivityExecution( RunID: frontendReq.GetRunId(), }) + namespaceName, err := h.namespaceRegistry.GetNamespaceName(namespace.ID(req.GetNamespaceId())) + if err != nil { + return nil, err + } + response, _, err = chasm.UpdateComponent( ctx, ref, (*Activity).handleCancellationRequested, - req, + requestCancelEvent{ + request: req, + MetricsHandlerBuilderParams: MetricsHandlerBuilderParams{ + Handler: h.metricsHandler, + NamespaceName: namespaceName.String(), + BreakdownMetricsByTaskQueue: h.config.BreakdownMetricsByTaskQueue, + }, + }, ) if err != nil { return nil, err diff --git a/chasm/lib/activity/library.go b/chasm/lib/activity/library.go index 280622c55f..de71b6dd73 100644 --- a/chasm/lib/activity/library.go +++ b/chasm/lib/activity/library.go @@ -59,7 +59,7 @@ func (l *library) RegisterServices(server *grpc.Server) { func (l *library) Tasks() []*chasm.RegistrableTask { return []*chasm.RegistrableTask{ - chasm.NewRegistrableSideEffectTask[*Activity, *activitypb.ActivityDispatchTask]( + chasm.NewRegistrableSideEffectTask( "dispatch", l.activityDispatchTaskExecutor, l.activityDispatchTaskExecutor, diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 685ef3d3ef..5019502727 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -12,6 +12,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" + "go.temporal.io/server/common/metrics" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -76,6 +77,7 @@ var TransitionScheduled = chasm.NewTransition( type rescheduleEvent struct { retryInterval time.Duration failure *failurepb.Failure + timeoutType enumspb.TimeoutType } // TransitionRescheduled affects a transition to Scheduled from Started, which happens on retries. The event to pass in @@ -160,6 +162,11 @@ var TransitionStarted = chasm.NewTransition( }, ) +type completeEvent struct { + req *historyservice.RespondActivityTaskCompletedRequest + metricsHandler metrics.Handler +} + // TransitionCompleted affects a transition to Completed status var TransitionCompleted = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ @@ -167,22 +174,32 @@ var TransitionCompleted = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, - func(a *Activity, ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCompletedRequest) error { + func(a *Activity, ctx chasm.MutableContext, event completeEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + req := event.req.GetCompleteRequest() + attempt := a.LastAttempt.Get(ctx) attempt.CompleteTime = timestamppb.New(ctx.Now(a)) - attempt.LastWorkerIdentity = request.GetCompleteRequest().GetIdentity() + attempt.LastWorkerIdentity = req.GetIdentity() outcome := a.Outcome.Get(ctx) outcome.Variant = &activitypb.ActivityOutcome_Successful_{ Successful: &activitypb.ActivityOutcome_Successful{ - Output: request.GetCompleteRequest().GetResult(), + Output: req.GetResult(), }, } + + a.emitOnCompletedMetrics(ctx, event.metricsHandler) + return nil }) }, ) +type failedEvent struct { + req *historyservice.RespondActivityTaskFailedRequest + metricsHandler metrics.Handler +} + // TransitionFailed affects a transition to Failed status var TransitionFailed = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ @@ -190,16 +207,25 @@ var TransitionFailed = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, - func(a *Activity, ctx chasm.MutableContext, req *historyservice.RespondActivityTaskFailedRequest) error { + func(a *Activity, ctx chasm.MutableContext, event failedEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { - if details := req.GetFailedRequest().GetLastHeartbeatDetails(); details != nil { + req := event.req.GetFailedRequest() + + if details := req.GetLastHeartbeatDetails(); details != nil { heartbeat := a.getOrCreateLastHeartbeat(ctx) heartbeat.Details = details heartbeat.RecordedTime = timestamppb.New(ctx.Now(a)) } attempt := a.LastAttempt.Get(ctx) - attempt.LastWorkerIdentity = req.GetFailedRequest().GetIdentity() - return a.recordFailedAttempt(ctx, 0, req.GetFailedRequest().GetFailure(), true) + attempt.LastWorkerIdentity = req.GetIdentity() + + if err := a.recordFailedAttempt(ctx, 0, req.GetFailure(), true); err != nil { + return err + } + + a.emitOnFailedMetrics(ctx, event.metricsHandler) + + return nil }) }, ) @@ -212,12 +238,12 @@ var TransitionTerminated = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, - func(a *Activity, ctx chasm.MutableContext, req *activitypb.TerminateActivityExecutionRequest) error { + func(a *Activity, ctx chasm.MutableContext, event terminateEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ // TODO if the reason isn't provided, perhaps set a default reason. Also see if we should prefix with "Activity terminated: " - Message: req.GetFrontendRequest().GetReason(), + Message: event.request.GetFrontendRequest().GetReason(), FailureInfo: &failurepb.Failure_TerminatedFailureInfo{}, } outcome.Variant = &activitypb.ActivityOutcome_Failed_{ @@ -225,6 +251,16 @@ var TransitionTerminated = chasm.NewTransition( Failure: failure, }, } + + metricsHandler := enrichMetricsHandler( + a, + event.MetricsHandlerBuilderParams.Handler, + event.MetricsHandlerBuilderParams.NamespaceName, + metrics.ActivityTerminatedScope, + event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue) + + metrics.ActivityTerminate.With(metricsHandler).Record(1) + return nil }) }, @@ -250,20 +286,26 @@ var TransitionCancelRequested = chasm.NewTransition( }, ) +type cancelEvent struct { + details *commonpb.Payloads + handler metrics.Handler + fromStatus activitypb.ActivityExecutionStatus +} + // TransitionCanceled affects a transition to Canceled status var TransitionCanceled = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, - func(a *Activity, ctx chasm.MutableContext, details *commonpb.Payloads) error { + func(a *Activity, ctx chasm.MutableContext, event cancelEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ Message: "Activity canceled", FailureInfo: &failurepb.Failure_CanceledFailureInfo{ CanceledFailureInfo: &failurepb.CanceledFailureInfo{ - Details: details, + Details: event.details, }, }, } @@ -272,12 +314,21 @@ var TransitionCanceled = chasm.NewTransition( Failure: failure, }, } + + a.emitOnCanceledMetrics(ctx, event.handler, event.fromStatus) + return nil }) }, ) -// TransitionTimedOut affects a transition to TimedOut status +type timeoutEvent struct { + metricsHandler metrics.Handler + timeoutType enumspb.TimeoutType + fromStatus activitypb.ActivityExecutionStatus +} + +// TransitionTimedOut transitions to TimedOut status var TransitionTimedOut = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, @@ -285,21 +336,31 @@ var TransitionTimedOut = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, - func(a *Activity, ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error { + func(a *Activity, ctx chasm.MutableContext, event timeoutEvent) error { + timeoutType := event.timeoutType + return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + var err error switch timeoutType { case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE: - return a.recordScheduleToStartOrCloseTimeoutFailure(ctx, timeoutType) + err = a.recordScheduleToStartOrCloseTimeoutFailure(ctx, timeoutType) case enumspb.TIMEOUT_TYPE_START_TO_CLOSE: failure := createStartToCloseTimeoutFailure() - return a.recordFailedAttempt(ctx, 0, failure, true) + err = a.recordFailedAttempt(ctx, 0, failure, true) case enumspb.TIMEOUT_TYPE_HEARTBEAT: failure := createHeartbeatTimeoutFailure() - return a.recordFailedAttempt(ctx, 0, failure, true) + err = a.recordFailedAttempt(ctx, 0, failure, true) default: - return fmt.Errorf("unhandled activity timeout: %v", timeoutType) + err = fmt.Errorf("unhandled activity timeout: %v", timeoutType) + } + if err != nil { + return err } + + a.emitOnTimedOutMetrics(ctx, event.metricsHandler, timeoutType, event.fromStatus) + + return nil }) }, ) diff --git a/chasm/lib/activity/statemachine_test.go b/chasm/lib/activity/statemachine_test.go index 52dc9002cb..1918812a47 100644 --- a/chasm/lib/activity/statemachine_test.go +++ b/chasm/lib/activity/statemachine_test.go @@ -8,13 +8,17 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/testing/protorequire" + "go.uber.org/mock/gomock" "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) var ( @@ -80,17 +84,23 @@ func TestTransitionScheduled(t *testing.T) { } attemptState := &activitypb.ActivityAttemptState{Count: tc.startingAttemptCount} outcome := &activitypb.ActivityOutcome{} + input := payloads.EncodeString("test-input") activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(tc.scheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(tc.scheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), + RequestData: chasm.NewDataField(ctx, &activitypb.ActivityRequestData{ + Input: input, + }), } err := TransitionScheduled.Apply(activity, ctx, nil) @@ -130,6 +140,9 @@ func TestTransitionRescheduled(t *testing.T) { expectedRetryInterval time.Duration retryPolicy *commonpb.RetryPolicy scheduleToStartTimeout time.Duration + operationTag string + counterMetric string + timeoutType enumspb.TimeoutType }{ { name: "second attempt - timeout recorded", @@ -141,6 +154,9 @@ func TestTransitionRescheduled(t *testing.T) { expectedRetryInterval: 2 * time.Second, retryPolicy: defaultRetryPolicy, scheduleToStartTimeout: defaultScheduleToStartTimeout, + operationTag: metrics.TimerActiveTaskActivityTimeoutScope, + counterMetric: metrics.ActivityTaskTimeout.Name(), + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, }, { name: "third attempt - timeout recorded", @@ -152,6 +168,9 @@ func TestTransitionRescheduled(t *testing.T) { expectedRetryInterval: 4 * time.Second, retryPolicy: defaultRetryPolicy, scheduleToStartTimeout: defaultScheduleToStartTimeout, + operationTag: metrics.TimerActiveTaskActivityTimeoutScope, + counterMetric: metrics.ActivityTaskTimeout.Name(), + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, }, { name: "no schedule to start timeout", @@ -162,6 +181,37 @@ func TestTransitionRescheduled(t *testing.T) { expectedRetryInterval: 2 * time.Second, retryPolicy: defaultRetryPolicy, scheduleToStartTimeout: 0, + operationTag: metrics.TimerActiveTaskActivityTimeoutScope, + counterMetric: metrics.ActivityTaskTimeout.Name(), + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + }, + { + name: "heartbeat timeout - timeout recorded", + startingAttemptCount: 1, + expectedTasks: []chasm.MockTask{ + {Payload: &activitypb.ScheduleToStartTimeoutTask{}}, + {Payload: &activitypb.ActivityDispatchTask{}}, + }, + expectedRetryInterval: 2 * time.Second, + retryPolicy: defaultRetryPolicy, + scheduleToStartTimeout: defaultScheduleToStartTimeout, + operationTag: metrics.TimerActiveTaskActivityTimeoutScope, + counterMetric: metrics.ActivityTaskTimeout.Name(), + timeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, + }, + + { + name: "reschedule from failure", + startingAttemptCount: 1, + expectedTasks: []chasm.MockTask{ + {Payload: &activitypb.ScheduleToStartTimeoutTask{}}, + {Payload: &activitypb.ActivityDispatchTask{}}, + }, + expectedRetryInterval: 2 * time.Second, + retryPolicy: defaultRetryPolicy, + scheduleToStartTimeout: defaultScheduleToStartTimeout, + operationTag: metrics.HistoryRespondActivityTaskFailedScope, + counterMetric: metrics.ActivityTaskFail.Name(), }, } @@ -174,20 +224,25 @@ func TestTransitionRescheduled(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(tc.scheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), } - err := TransitionRescheduled.Apply(activity, ctx, rescheduleEvent{ + event := rescheduleEvent{ retryInterval: tc.expectedRetryInterval, failure: createStartToCloseTimeoutFailure(), - }) + timeoutType: tc.timeoutType, + } + + err := TransitionRescheduled.Apply(activity, ctx, event) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activity.Status) require.Equal(t, tc.startingAttemptCount+1, attemptState.Count) @@ -226,7 +281,10 @@ func TestTransitionRescheduled(t *testing.T) { func TestTransitionStarted(t *testing.T) { ctx := &chasm.MockMutableContext{} ctx.HandleNow = func(chasm.Component) time.Time { return defaultTime } - attemptState := &activitypb.ActivityAttemptState{Count: 1} + attemptState := &activitypb.ActivityAttemptState{ + Count: 1, + StartedTime: timestamppb.New(defaultTime), + } outcome := &activitypb.ActivityOutcome{} activity := &Activity{ @@ -290,6 +348,12 @@ func TestTransitionTimedout(t *testing.T) { timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, attemptCount: 5, }, + { + name: "heartbeat timeout", + startStatus: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + timeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, + attemptCount: 2, + }, } for _, tc := range testCases { @@ -300,17 +364,45 @@ func TestTransitionTimedout(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: tc.startStatus, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), } - err := TransitionTimedOut.Apply(activity, ctx, tc.timeoutType) + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + + timerStartToCloseLatency := metrics.NewMockTimerIface(controller) + timerStartToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timerStartToCloseLatency) + + timerScheduleToCloseLatency := metrics.NewMockTimerIface(controller) + timerScheduleToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timerScheduleToCloseLatency) + + timeoutTag := metrics.StringTag("timeout_type", tc.timeoutType.String()) + + counterTimeout := metrics.NewMockCounterIface(controller) + counterTimeout.EXPECT().Record(int64(1), timeoutTag).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityTimeout.Name()).Return(counterTimeout) + + counterTaskTimeout := metrics.NewMockCounterIface(controller) + counterTaskTimeout.EXPECT().Record(int64(1), timeoutTag).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityTaskTimeout.Name()).Return(counterTaskTimeout) + + event := timeoutEvent{ + timeoutType: tc.timeoutType, + metricsHandler: metricsHandler, + } + + err := TransitionTimedOut.Apply(activity, ctx, event) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, activity.Status) require.Equal(t, tc.attemptCount, attemptState.Count) @@ -323,7 +415,8 @@ func TestTransitionTimedout(t *testing.T) { require.Nil(t, attemptState.GetCompleteTime()) require.NotNil(t, outcome.GetFailed().GetFailure()) // do something - case enumspb.TIMEOUT_TYPE_START_TO_CLOSE: + case enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + enumspb.TIMEOUT_TYPE_HEARTBEAT: // Timeout failure is recorded in attempt state only. TransitionTimedOut should only be called when there // are no more retries. Retries go through TransitionRescheduled. require.NotNil(t, attemptState.GetLastFailureDetails().GetFailure()) @@ -349,11 +442,13 @@ func TestTransitionCompleted(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), @@ -361,11 +456,31 @@ func TestTransitionCompleted(t *testing.T) { payload := payloads.EncodeString("Done") - err := TransitionCompleted.Apply(activity, ctx, &historyservice.RespondActivityTaskCompletedRequest{ + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + + timerStartToCloseLatency := metrics.NewMockTimerIface(controller) + timerStartToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timerStartToCloseLatency) + + timerScheduleToCloseLatency := metrics.NewMockTimerIface(controller) + timerScheduleToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timerScheduleToCloseLatency) + + counterSuccess := metrics.NewMockCounterIface(controller) + counterSuccess.EXPECT().Record(int64(1)).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivitySuccess.Name()).Return(counterSuccess) + + req := &historyservice.RespondActivityTaskCompletedRequest{ CompleteRequest: &workflowservice.RespondActivityTaskCompletedRequest{ Result: payload, Identity: "worker", }, + } + + err := TransitionCompleted.Apply(activity, ctx, completeEvent{ + req: req, + metricsHandler: metricsHandler, }) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, activity.Status) @@ -384,11 +499,13 @@ func TestTransitionFailed(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), LastHeartbeat: chasm.NewDataField(ctx, heartbeatState), @@ -404,13 +521,38 @@ func TestTransitionFailed(t *testing.T) { }}, } - err := TransitionFailed.Apply(activity, ctx, &historyservice.RespondActivityTaskFailedRequest{ + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + + timerStartToCloseLatency := metrics.NewMockTimerIface(controller) + timerStartToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timerStartToCloseLatency) + + timerScheduleToCloseLatency := metrics.NewMockTimerIface(controller) + timerScheduleToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timerScheduleToCloseLatency) + + counterFail := metrics.NewMockCounterIface(controller) + counterFail.EXPECT().Record(int64(1)).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityFail.Name()).Return(counterFail) + + counterTaskFail := metrics.NewMockCounterIface(controller) + counterTaskFail.EXPECT().Record(int64(1)).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityTaskFail.Name()).Return(counterTaskFail) + + req := &historyservice.RespondActivityTaskFailedRequest{ FailedRequest: &workflowservice.RespondActivityTaskFailedRequest{ Failure: failure, LastHeartbeatDetails: heartbeatDetails, Identity: "worker", }, + } + + err := TransitionFailed.Apply(activity, ctx, failedEvent{ + req: req, + metricsHandler: metricsHandler, }) + require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, activity.Status) require.EqualValues(t, 1, attemptState.Count) @@ -434,21 +576,52 @@ func TestTransitionTerminated(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), } - err := TransitionTerminated.Apply(activity, ctx, &activitypb.TerminateActivityExecutionRequest{ + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + enrichedMetricsHandler := metrics.NewMockHandler(controller) + + tags := []metrics.Tag{ + metrics.OperationTag(metrics.ActivityTerminatedScope), + metrics.ActivityTypeTag("test-activity-type"), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + metrics.WorkflowTypeTag(WorkflowTypeTag), + metrics.NamespaceTag("test-namespace"), + metrics.UnsafeTaskQueueTag("test-task-queue"), + } + metricsHandler.EXPECT().WithTags(tags).Return(enrichedMetricsHandler) + + counterTerminate := metrics.NewMockCounterIface(controller) + counterTerminate.EXPECT().Record(int64(1)).Times(1) + enrichedMetricsHandler.EXPECT().Counter(metrics.ActivityTerminate.Name()).Return(counterTerminate) + + req := &activitypb.TerminateActivityExecutionRequest{ FrontendRequest: &workflowservice.TerminateActivityExecutionRequest{ Reason: "Test Termination", Identity: "terminator", }, + } + + err := TransitionTerminated.Apply(activity, ctx, terminateEvent{ + request: req, + MetricsHandlerBuilderParams: MetricsHandlerBuilderParams{ + Handler: metricsHandler, + NamespaceName: "test-namespace", + BreakdownMetricsByTaskQueue: func(namespace string, taskQueue string, taskQueueType enumspb.TaskQueueType) bool { + return namespace == "test-namespace" && taskQueue == "test-task-queue" && taskQueueType == enumspb.TASK_QUEUE_TYPE_ACTIVITY + }, + }, }) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, activity.Status) @@ -502,17 +675,39 @@ func TestTransitionCanceled(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), } - err := TransitionCanceled.Apply(activity, ctx, payloads.EncodeString("Details")) + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + + timerStartToCloseLatency := metrics.NewMockTimerIface(controller) + timerStartToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timerStartToCloseLatency) + + timerScheduleToCloseLatency := metrics.NewMockTimerIface(controller) + timerScheduleToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timerScheduleToCloseLatency) + + counterCancel := metrics.NewMockCounterIface(controller) + counterCancel.EXPECT().Record(int64(1)).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityCancel.Name()).Return(counterCancel) + + event := cancelEvent{ + details: payloads.EncodeString("Details"), + handler: metricsHandler, + } + + err := TransitionCanceled.Apply(activity, ctx, event) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, activity.Status) diff --git a/chasm/lib/activity/validator.go b/chasm/lib/activity/validator.go index 885199aa89..6e4b4634ac 100644 --- a/chasm/lib/activity/validator.go +++ b/chasm/lib/activity/validator.go @@ -7,6 +7,9 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" + tokenspb "go.temporal.io/server/api/token/v1" + "go.temporal.io/server/chasm" + activitystatepb "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" @@ -306,3 +309,19 @@ func ValidateGetActivityExecutionOutcomeRequest( } return nil } + +// ValidateActivityTaskToken validates a task token against the current activity state. +func ValidateActivityTaskToken( + ctx chasm.Context, + a *Activity, + token *tokenspb.Task, +) error { + if a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_STARTED && + a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { + return serviceerror.NewNotFound("activity task not found") + } + if token.Attempt != a.LastAttempt.Get(ctx).GetCount() { + return serviceerror.NewNotFound("activity task not found") + } + return nil +} diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 354c79d653..cd2776b6b7 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -339,6 +339,8 @@ const ( HistoryRespondActivityTaskFailedScope = "RespondActivityTaskFailed" // HistoryRespondActivityTaskCanceledScope tracks RespondActivityTaskCanceled API calls received by service HistoryRespondActivityTaskCanceledScope = "RespondActivityTaskCanceled" + // ActivityTerminatedScope tracks TerminateActivityExecution API calls received by service + ActivityTerminatedScope = "ActivityTerminated" // HistoryGetWorkflowExecutionHistoryScope is the metric scope for non-long-poll frontend.GetWorkflowExecutionHistory HistoryGetWorkflowExecutionHistoryScope = "GetWorkflowExecutionHistory" // HistoryPollWorkflowExecutionHistoryScope is the metric scope for long poll case of frontend.GetWorkflowExecutionHistory @@ -861,7 +863,8 @@ var ( ActivitySuccess = NewCounterDef("activity_success", WithDescription("Number of activities that succeeded (doesn't include retries).")) ActivityFail = NewCounterDef("activity_fail", WithDescription("Number of activities that failed and won't be retried anymore.")) ActivityTaskFail = NewCounterDef("activity_task_fail", WithDescription("Number of activity task failures (includes retries).")) - ActivityCancel = NewCounterDef("activity_cancel") + ActivityCancel = NewCounterDef("activity_cancel", WithDescription("Number of activities that are cancelled.")) + ActivityTerminate = NewCounterDef("activity_terminate", WithDescription("Number of activities that are terminated.")) ActivityTaskTimeout = NewCounterDef("activity_task_timeout", WithDescription("Number of activity task timeouts (including retries).")) ActivityTimeout = NewCounterDef("activity_timeout", WithDescription("Number of terminal activity timeouts.")) ActivityPayloadSize = NewCounterDef("activity_payload_size", WithDescription("Size of activity payloads in bytes.")) diff --git a/service/history/api/respondactivitytaskcanceled/api.go b/service/history/api/respondactivitytaskcanceled/api.go index a66dfc9a56..e4bb582490 100644 --- a/service/history/api/respondactivitytaskcanceled/api.go +++ b/service/history/api/respondactivitytaskcanceled/api.go @@ -44,9 +44,14 @@ func Invoke( ctx, componentRef, (*activity.Activity).HandleCanceled, - activity.WithToken[*historyservice.RespondActivityTaskCanceledRequest]{ - Token: token, + activity.RespondCancelledEvent{ Request: req, + Token: token, + MetricsHandlerBuilderParams: activity.MetricsHandlerBuilderParams{ + Handler: shard.GetMetricsHandler(), + NamespaceName: namespace.String(), + BreakdownMetricsByTaskQueue: shard.GetConfig().BreakdownMetricsByTaskQueue, + }, }, ) diff --git a/service/history/api/respondactivitytaskcompleted/api.go b/service/history/api/respondactivitytaskcompleted/api.go index 689a0077b8..6237ef5893 100644 --- a/service/history/api/respondactivitytaskcompleted/api.go +++ b/service/history/api/respondactivitytaskcompleted/api.go @@ -44,9 +44,14 @@ func Invoke( ctx, componentRef, (*activity.Activity).HandleCompleted, - activity.WithToken[*historyservice.RespondActivityTaskCompletedRequest]{ - Token: token, + activity.RespondCompletedEvent{ Request: req, + Token: token, + MetricsHandlerBuilderParams: activity.MetricsHandlerBuilderParams{ + Handler: shard.GetMetricsHandler(), + NamespaceName: namespace.String(), + BreakdownMetricsByTaskQueue: shard.GetConfig().BreakdownMetricsByTaskQueue, + }, }, ) diff --git a/service/history/api/respondactivitytaskfailed/api.go b/service/history/api/respondactivitytaskfailed/api.go index d19c8759bd..0a16b352fe 100644 --- a/service/history/api/respondactivitytaskfailed/api.go +++ b/service/history/api/respondactivitytaskfailed/api.go @@ -45,9 +45,14 @@ func Invoke( ctx, componentRef, (*activity.Activity).HandleFailed, - activity.WithToken[*historyservice.RespondActivityTaskFailedRequest]{ - Token: token, + activity.RespondFailedEvent{ Request: req, + Token: token, + MetricsHandlerBuilderParams: activity.MetricsHandlerBuilderParams{ + Handler: shard.GetMetricsHandler(), + NamespaceName: namespace.String(), + BreakdownMetricsByTaskQueue: shard.GetConfig().BreakdownMetricsByTaskQueue, + }, }, )