Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 215 additions & 17 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ import (
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/tqid"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// WorkflowTypeTag Required workflow tag for standalone activities to ensure consistent metric labeling between workflows and activities
const WorkflowTypeTag = "__temporal_standalone_activity__"

type ActivityStore interface {
// PopulateRecordStartedResponse populates the response for RecordActivityTaskStarted
PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error
Expand Down Expand Up @@ -60,6 +66,46 @@ type WithToken[R any] struct {
Request R
}

// MetricsHandlerBuilderParams contains parameters for building/enriching a metrics handler for activity operations
type MetricsHandlerBuilderParams struct {
Handler metrics.Handler
NamespaceName string
BreakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool]
}

// RespondCompletedEvent wraps the RespondActivityTaskCompletedRequest with context-specific data.
type RespondCompletedEvent struct {
Request *historyservice.RespondActivityTaskCompletedRequest
Token *tokenspb.Task
MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}

// RespondFailedEvent wraps the RespondActivityTaskFailedRequest with context-specific data.
type RespondFailedEvent struct {
Request *historyservice.RespondActivityTaskFailedRequest
Token *tokenspb.Task
MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}

// RespondCancelledEvent wraps the RespondActivityTaskCanceledRequest with context-specific data.
type RespondCancelledEvent struct {
Request *historyservice.RespondActivityTaskCanceledRequest
Token *tokenspb.Task
MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}

// requestCancelEvent wraps the RequestCancelActivityExecutionRequest with context-specific data.
type requestCancelEvent struct {
request *activitypb.RequestCancelActivityExecutionRequest
MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}

// terminateEvent wraps the TerminateActivityExecutionRequest with context-specific data.
type terminateEvent struct {
request *activitypb.TerminateActivityExecutionRequest
MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}

func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
switch a.Status {
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED:
Expand Down Expand Up @@ -156,7 +202,7 @@ func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.Ex
}
requestData := a.RequestData.Get(ctx)
attempt := a.LastAttempt.Get(ctx)
response.StartedTime = attempt.StartedTime
response.StartedTime = attempt.GetStartedTime()
response.Attempt = attempt.GetCount()
response.Priority = a.GetPriority()
response.RetryPolicy = a.GetRetryPolicy()
Expand Down Expand Up @@ -187,14 +233,24 @@ func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx ch
// HandleCompleted updates the activity on activity completion.
func (a *Activity) HandleCompleted(
ctx chasm.MutableContext,
input WithToken[*historyservice.RespondActivityTaskCompletedRequest],
event RespondCompletedEvent,
) (*historyservice.RespondActivityTaskCompletedResponse, error) {
// TODO(dan): add test coverage for this validation
if err := a.validateActivityTaskToken(ctx, input.Token); err != nil {
if err := a.validateActivityTaskToken(ctx, event.Token); err != nil {
return nil, err
}

if err := TransitionCompleted.Apply(a, ctx, input.Request); err != nil {
metricsHandler := enrichMetricsHandler(
a,
event.MetricsHandlerBuilderParams.Handler,
event.MetricsHandlerBuilderParams.NamespaceName,
metrics.HistoryRespondActivityTaskCompletedScope,
event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue)

if err := TransitionCompleted.Apply(a, ctx, completeEvent{
req: event.Request,
metricsHandler: metricsHandler,
}); err != nil {
return nil, err
}

Expand All @@ -205,14 +261,21 @@ func (a *Activity) HandleCompleted(
// for retry instead.
func (a *Activity) HandleFailed(
ctx chasm.MutableContext,
input WithToken[*historyservice.RespondActivityTaskFailedRequest],
event RespondFailedEvent,
) (*historyservice.RespondActivityTaskFailedResponse, error) {
// TODO(dan): add test coverage for this validation
if err := a.validateActivityTaskToken(ctx, input.Token); err != nil {
if err := a.validateActivityTaskToken(ctx, event.Token); err != nil {
return nil, err
}

failure := input.Request.GetFailedRequest().GetFailure()
metricsHandler := enrichMetricsHandler(
a,
event.MetricsHandlerBuilderParams.Handler,
event.MetricsHandlerBuilderParams.NamespaceName,
metrics.HistoryRespondActivityTaskFailedScope,
event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue)

failure := event.Request.GetFailedRequest().GetFailure()

appFailure := failure.GetApplicationFailureInfo()
isRetryable := appFailure != nil &&
Expand All @@ -225,11 +288,16 @@ func (a *Activity) HandleFailed(
return nil, err
}
if rescheduled {
a.emitOnAttemptFailedMetrics(ctx, metricsHandler)

return &historyservice.RespondActivityTaskFailedResponse{}, nil
}
}

if err := TransitionFailed.Apply(a, ctx, input.Request); err != nil {
if err := TransitionFailed.Apply(a, ctx, failedEvent{
req: event.Request,
metricsHandler: metricsHandler,
}); err != nil {
return nil, err
}

Expand All @@ -239,21 +307,32 @@ func (a *Activity) HandleFailed(
// HandleCanceled updates the activity on activity canceled.
func (a *Activity) HandleCanceled(
ctx chasm.MutableContext,
input WithToken[*historyservice.RespondActivityTaskCanceledRequest],
event RespondCancelledEvent,
) (*historyservice.RespondActivityTaskCanceledResponse, error) {
// TODO(dan): add test coverage for this validation
if err := a.validateActivityTaskToken(ctx, input.Token); err != nil {
if err := a.validateActivityTaskToken(ctx, event.Token); err != nil {
return nil, err
}

if err := TransitionCanceled.Apply(a, ctx, input.Request.GetCancelRequest().GetDetails()); err != nil {
metricsHandler := enrichMetricsHandler(
a,
event.MetricsHandlerBuilderParams.Handler,
event.MetricsHandlerBuilderParams.NamespaceName,
metrics.HistoryRespondActivityTaskCanceledScope,
event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue)

if err := TransitionCanceled.Apply(a, ctx, cancelEvent{
details: event.Request.GetCancelRequest().GetDetails(),
handler: metricsHandler,
fromStatus: a.GetStatus(),
}); err != nil {
return nil, err
}

return &historyservice.RespondActivityTaskCanceledResponse{}, nil
}

func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.TerminateActivityExecutionRequest) (
func (a *Activity) handleTerminated(ctx chasm.MutableContext, req terminateEvent) (
*activitypb.TerminateActivityExecutionResponse, error,
) {
if err := TransitionTerminated.Apply(a, ctx, req); err != nil {
Expand All @@ -274,10 +353,11 @@ func (a *Activity) getOrCreateLastHeartbeat(ctx chasm.MutableContext) *activityp
return heartbeat
}

func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *activitypb.RequestCancelActivityExecutionRequest) (
func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, event requestCancelEvent) (
*activitypb.RequestCancelActivityExecutionResponse, error,
) {
newReqID := req.GetFrontendRequest().GetRequestId()
req := event.request.GetFrontendRequest()
newReqID := req.GetRequestId()
existingReqID := a.GetCancelState().GetRequestId()

// If already in cancel requested state, fail if request ID is different, else no-op
Expand All @@ -293,18 +373,29 @@ func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *ac
// If in scheduled state, cancel immediately right after marking cancel requested
isCancelImmediately := a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED

if err := TransitionCancelRequested.Apply(a, ctx, req.GetFrontendRequest()); err != nil {
if err := TransitionCancelRequested.Apply(a, ctx, req); err != nil {
return nil, err
}

if isCancelImmediately {
details := &commonpb.Payloads{
Payloads: []*commonpb.Payload{
payload.EncodeString(req.GetFrontendRequest().GetReason()),
payload.EncodeString(req.GetReason()),
},
}

err := TransitionCanceled.Apply(a, ctx, details)
metricsHandler := enrichMetricsHandler(
a,
event.MetricsHandlerBuilderParams.Handler,
event.MetricsHandlerBuilderParams.NamespaceName,
metrics.HistoryRespondActivityTaskCanceledScope,
event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue)

err := TransitionCanceled.Apply(a, ctx, cancelEvent{
details: details,
handler: metricsHandler,
fromStatus: activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, // if we're here the original status was scheduled
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -628,3 +719,110 @@ func (a *Activity) validateActivityTaskToken(
}
return nil
}

func enrichMetricsHandler(
a *Activity,
handler metrics.Handler,
namespaceName string,
operationTag string,
breakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool],
) metrics.Handler {
taskQueueFamily := a.GetTaskQueue().GetName()
return metrics.GetPerTaskQueueFamilyScope(
handler,
namespaceName,
tqid.UnsafeTaskQueueFamily(namespaceName, taskQueueFamily),
breakdownMetricsByTaskQueue(namespaceName, taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY),
metrics.OperationTag(operationTag),
metrics.ActivityTypeTag(a.GetActivityType().GetName()),
metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED),
metrics.WorkflowTypeTag(WorkflowTypeTag),
)
}

func (a *Activity) emitOnAttemptTimedOutMetrics(ctx chasm.Context, handler metrics.Handler, timeoutType enumspb.TimeoutType) {
attempt := a.LastAttempt.Get(ctx)
startedTime := attempt.GetStartedTime().AsTime()

latency := time.Since(startedTime)
metrics.ActivityStartToCloseLatency.With(handler).Record(latency)

timeoutTag := metrics.StringTag("timeout_type", timeoutType.String())
metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag)
}

func (a *Activity) emitOnAttemptFailedMetrics(ctx chasm.Context, handler metrics.Handler) {
attempt := a.LastAttempt.Get(ctx)
startedTime := attempt.GetStartedTime().AsTime()

latency := time.Since(startedTime)
metrics.ActivityStartToCloseLatency.With(handler).Record(latency)

metrics.ActivityTaskFail.With(handler).Record(1)
}

func (a *Activity) emitOnCompletedMetrics(ctx chasm.Context, handler metrics.Handler) {
attempt := a.LastAttempt.Get(ctx)
startedTime := attempt.GetStartedTime().AsTime()

startToCloseLatency := time.Since(startedTime)
metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency)

scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime())
metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency)

metrics.ActivitySuccess.With(handler).Record(1)
}

func (a *Activity) emitOnFailedMetrics(ctx chasm.Context, handler metrics.Handler) {
attempt := a.LastAttempt.Get(ctx)
startedTime := attempt.GetStartedTime().AsTime()

startToCloseLatency := time.Since(startedTime)
metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency)

scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime())
metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency)

metrics.ActivityTaskFail.With(handler).Record(1)
metrics.ActivityFail.With(handler).Record(1)
}

func (a *Activity) emitOnCanceledMetrics(
ctx chasm.Context,
handler metrics.Handler,
fromStatus activitypb.ActivityExecutionStatus,
) {
// Only record start-to-close latency if a current attempt was running. If it in scheduled status, it means the current attempt never started.
if fromStatus != activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED {
startedTime := a.LastAttempt.Get(ctx).GetStartedTime().AsTime()
startToCloseLatency := time.Since(startedTime)
metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency)
}

scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime())
metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency)

metrics.ActivityCancel.With(handler).Record(1)
}

func (a *Activity) emitOnTimedOutMetrics(
ctx chasm.Context,
handler metrics.Handler,
timeoutType enumspb.TimeoutType,
fromStatus activitypb.ActivityExecutionStatus,
) {
// Only record start-to-close latency if a current attempt was running. If it in scheduled status, it means the current attempt never started.
if fromStatus != activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED {
startedTime := a.LastAttempt.Get(ctx).GetStartedTime().AsTime()
startToCloseLatency := time.Since(startedTime)
metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency)
}

scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime())
metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency)

timeoutTag := metrics.StringTag("timeout_type", timeoutType.String())
metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag)
metrics.ActivityTimeout.With(handler).Record(1, timeoutTag)
}
Loading
Loading