diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 33eccec094..8fa1add42e 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -27,8 +27,23 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -// WorkflowTypeTag Required workflow tag for standalone activities to ensure consistent metric labeling between workflows and activities -const WorkflowTypeTag = "__temporal_standalone_activity__" +const ( + // WorkflowTypeTag is a required workflow tag for standalone activities to ensure consistent + // metric labeling between workflows and activities. + WorkflowTypeTag = "__temporal_standalone_activity__" + + TypeSAAlias = "ActivityType" + StatusSAAlias = "ActivityStatus" + TaskQueueSAAlias = "ActivityTaskQueue" +) + +var ( + TypeSearchAttribute = chasm.NewSearchAttributeKeyword(TypeSAAlias, chasm.SearchAttributeFieldKeyword01) + StatusSearchAttribute = chasm.NewSearchAttributeKeyword(StatusSAAlias, chasm.SearchAttributeFieldLowCardinalityKeyword01) + TaskQueueSearchAttribute = chasm.NewSearchAttributeKeyword(TaskQueueSAAlias, chasm.SearchAttributeFieldKeyword02) +) + +var _ chasm.VisibilitySearchAttributesProvider = (*Activity)(nil) type ActivityStore interface { // PopulateRecordStartedResponse populates the response for RecordActivityTaskStarted @@ -552,41 +567,54 @@ func (a *Activity) RecordHeartbeat( }, nil } -func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.ActivityExecutionInfo, error) { - // TODO(dan): support pause states - var status enumspb.ActivityExecutionStatus - var runState enumspb.PendingActivityState - switch a.GetStatus() { - case activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED: - status = enumspb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED - runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED - case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: - status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING - runState = enumspb.PENDING_ACTIVITY_STATE_SCHEDULED - case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED: - status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING - runState = enumspb.PENDING_ACTIVITY_STATE_STARTED - case activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED: - status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING - runState = enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED +// InternalStatusToAPIStatus converts internal activity execution status to API status. +func InternalStatusToAPIStatus(status activitypb.ActivityExecutionStatus) enumspb.ActivityExecutionStatus { + switch status { + case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, + activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED: + return enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED: - status = enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED - runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED + return enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED case activitypb.ACTIVITY_EXECUTION_STATUS_FAILED: - status = enumspb.ACTIVITY_EXECUTION_STATUS_FAILED - runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED + return enumspb.ACTIVITY_EXECUTION_STATUS_FAILED case activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED: - status = enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED - runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED + return enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED case activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED: - status = enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED - runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED + return enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED case activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT: - status = enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT - runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED + return enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT + case activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED: + return enumspb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED default: - return nil, serviceerror.NewInternalf("unknown activity execution status: %s", a.GetStatus()) + panic(fmt.Sprintf("unknown activity execution status: %v", status)) //nolint:forbidigo } +} + +func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb.PendingActivityState { + switch status { + case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: + return enumspb.PENDING_ACTIVITY_STATE_SCHEDULED + case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED: + return enumspb.PENDING_ACTIVITY_STATE_STARTED + case activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED: + return enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED + case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, + activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, + activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, + activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, + activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED: + return enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED + default: + panic(fmt.Sprintf("unknown activity execution status: %v", status)) //nolint:forbidigo + } +} + +func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.ActivityExecutionInfo, error) { + // TODO(dan): support pause states + status := InternalStatusToAPIStatus(a.GetStatus()) + runState := internalStatusToRunState(a.GetStatus()) requestData := a.RequestData.Get(ctx) attempt := a.LastAttempt.Get(ctx) @@ -826,3 +854,13 @@ func (a *Activity) emitOnTimedOutMetrics( metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag) metrics.ActivityTimeout.With(handler).Record(1, timeoutTag) } + +// SearchAttributes implements chasm.VisibilitySearchAttributesProvider interface. +// Returns the current search attribute values for this activity execution. +func (a *Activity) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue { + return []chasm.SearchAttributeKeyValue{ + TypeSearchAttribute.Value(a.GetActivityType().GetName()), + StatusSearchAttribute.Value(InternalStatusToAPIStatus(a.GetStatus()).String()), + TaskQueueSearchAttribute.Value(a.GetTaskQueue().GetName()), + } +} diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index 5eadaf4ce1..4ad6a3c6b5 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -6,8 +6,10 @@ import ( "github.com/google/uuid" apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/log" @@ -15,6 +17,8 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/searchattribute" "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" ) const StandaloneActivityDisabledError = "Standalone activity is disabled" @@ -157,6 +161,101 @@ func (h *frontendHandler) PollActivityExecution( return resp.GetFrontendResponse(), err } +// ListActivityExecutions lists activity executions matching the query in the request. +func (h *frontendHandler) ListActivityExecutions( + ctx context.Context, + req *workflowservice.ListActivityExecutionsRequest, +) (*workflowservice.ListActivityExecutionsResponse, error) { + if !h.config.Enabled(req.GetNamespace()) { + return nil, serviceerror.NewUnavailable(StandaloneActivityDisabledError) + } + + namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace())) + if err != nil { + return nil, err + } + + resp, err := chasm.ListExecutions[*Activity, *emptypb.Empty](ctx, &chasm.ListExecutionsRequest{ + NamespaceID: namespaceID.String(), + NamespaceName: req.GetNamespace(), + PageSize: int(req.GetPageSize()), + NextPageToken: req.GetNextPageToken(), + Query: req.GetQuery(), + }) + if err != nil { + return nil, err + } + + executions := make([]*apiactivitypb.ActivityExecutionListInfo, 0, len(resp.Executions)) + for _, exec := range resp.Executions { + activityType, _ := chasm.GetValue(exec.ChasmSearchAttributes, TypeSearchAttribute) + taskQueue, _ := chasm.GetValue(exec.ChasmSearchAttributes, TaskQueueSearchAttribute) + statusStr, _ := chasm.GetValue(exec.ChasmSearchAttributes, StatusSearchAttribute) + status, _ := enumspb.ActivityExecutionStatusFromString(statusStr) + + info := &apiactivitypb.ActivityExecutionListInfo{ + ActivityId: exec.BusinessID, + RunId: exec.RunID, + ScheduleTime: timestamppb.New(exec.StartTime), + StateTransitionCount: exec.StateTransitionCount, + StateSizeBytes: exec.HistorySizeBytes, + SearchAttributes: &commonpb.SearchAttributes{IndexedFields: exec.CustomSearchAttributes}, + ActivityType: &commonpb.ActivityType{Name: activityType}, + TaskQueue: taskQueue, + Status: status, + } + if !exec.CloseTime.IsZero() { + info.CloseTime = timestamppb.New(exec.CloseTime) + if !exec.StartTime.IsZero() { + info.ExecutionDuration = durationpb.New(exec.CloseTime.Sub(exec.StartTime)) + } + } + executions = append(executions, info) + } + + return &workflowservice.ListActivityExecutionsResponse{ + Executions: executions, + NextPageToken: resp.NextPageToken, + }, nil +} + +// CountActivityExecutions counts activity executions matching the query in the request. +func (h *frontendHandler) CountActivityExecutions( + ctx context.Context, + req *workflowservice.CountActivityExecutionsRequest, +) (*workflowservice.CountActivityExecutionsResponse, error) { + if !h.config.Enabled(req.GetNamespace()) { + return nil, serviceerror.NewUnavailable(StandaloneActivityDisabledError) + } + + namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace())) + if err != nil { + return nil, err + } + + resp, err := chasm.CountExecutions[*Activity](ctx, &chasm.CountExecutionsRequest{ + NamespaceID: namespaceID.String(), + NamespaceName: req.GetNamespace(), + Query: req.GetQuery(), + }) + if err != nil { + return nil, err + } + + groups := make([]*workflowservice.CountActivityExecutionsResponse_AggregationGroup, 0, len(resp.Groups)) + for _, g := range resp.Groups { + groups = append(groups, &workflowservice.CountActivityExecutionsResponse_AggregationGroup{ + GroupValues: g.Values, + Count: g.Count, + }) + } + + return &workflowservice.CountActivityExecutionsResponse{ + Count: resp.Count, + Groups: groups, + }, nil +} + // TerminateActivityExecution terminates a standalone activity execution func (h *frontendHandler) TerminateActivityExecution( ctx context.Context, @@ -265,13 +364,7 @@ func (h *frontendHandler) validateAndPopulateStartRequest( } applyActivityOptionsToStartRequest(opts, req) - err = validateAndNormalizeStartActivityExecutionRequest( - req, - h.config.BlobSizeLimitError, - h.config.BlobSizeLimitWarn, - h.logger, - h.config.MaxIDLengthLimit(), - h.saValidator) + err = h.validateAndNormalizeStartActivityExecutionRequest(req) if err != nil { return nil, err } @@ -279,6 +372,47 @@ func (h *frontendHandler) validateAndPopulateStartRequest( return req, nil } +// validateAndNormalizeStartActivityExecutionRequest validates and normalizes the standalone +// activity specific attributes. Note that this method mutates the input params; the caller must +// clone the request if necessary (e.g. if it may be retried). +func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest( + req *workflowservice.StartActivityExecutionRequest, +) error { + if req.GetRequestId() == "" { + req.RequestId = uuid.NewString() + } + + if len(req.GetRequestId()) > h.config.MaxIDLengthLimit() { + return serviceerror.NewInvalidArgument("RequestID length exceeds limit.") + } + + if err := normalizeAndValidateIDPolicy(req); err != nil { + return err + } + + if err := validateInputSize( + req.GetActivityId(), + req.GetActivityType().GetName(), + h.config.BlobSizeLimitError, + h.config.BlobSizeLimitWarn, + req.Input.Size(), + h.logger, + req.GetNamespace()); err != nil { + return err + } + + if req.GetSearchAttributes() != nil { + if err := validateAndNormalizeSearchAttributes( + req, + h.saMapperProvider, + h.saValidator); err != nil { + return err + } + } + + return nil +} + // activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields // of a StartActivityExecutionRequest for use with shared validation logic. func activityOptionsFromStartRequest(req *workflowservice.StartActivityExecutionRequest) *apiactivitypb.ActivityOptions { diff --git a/chasm/lib/activity/handler.go b/chasm/lib/activity/handler.go index 5abb08520e..d51e304bc5 100644 --- a/chasm/lib/activity/handler.go +++ b/chasm/lib/activity/handler.go @@ -43,6 +43,9 @@ func newHandler(config *Config, metricsHandler metrics.Handler, namespaceRegistr } } +// StartActivityExecution schedules an activity execution. Note that while external callers refer to +// this as "start", the start transition in fact happens later, in response to the activity task in +// matching being delivered to a worker poll request. func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.StartActivityExecutionRequest) (*activitypb.StartActivityExecutionResponse, error) { frontendReq := req.GetFrontendRequest() @@ -93,13 +96,12 @@ func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.St }, nil } -// DescribeActivityExecution handles DescribeActivityExecutionRequest from frontend. This method -// queries current activity state, optionally as a long-poll that waits for any state change. When -// used to long-poll, it returns an empty non-error response on context deadline expiry, to indicate -// that the state being waited for was not reached. Callers should interpret this as an invitation -// to resubmit their long-poll request. This response is sent before the caller's deadline (see -// chasm.activity.longPollBuffer) so that it is likely that the caller does indeed receive the -// non-error response. +// DescribeActivityExecution queries current activity state, optionally as a long-poll that waits +// for any state change. When used to long-poll, it returns an empty non-error response on context +// deadline expiry, to indicate that the state being waited for was not reached. Callers should +// interpret this as an invitation to resubmit their long-poll request. This response is sent before +// the caller's deadline (see chasm.activity.longPollBuffer) so that it is likely that the caller +// does indeed receive the non-error response. func (h *handler) DescribeActivityExecution( ctx context.Context, req *activitypb.DescribeActivityExecutionRequest, @@ -217,7 +219,7 @@ func (h *handler) PollActivityExecution( return response, err } -// TerminateActivityExecution terminates a standalone activity execution +// TerminateActivityExecution terminates an activity execution. func (h *handler) TerminateActivityExecution( ctx context.Context, req *activitypb.TerminateActivityExecutionRequest, @@ -256,7 +258,7 @@ func (h *handler) TerminateActivityExecution( return response, nil } -// RequestCancelActivityExecution requests cancellation on a standalone activity execution +// RequestCancelActivityExecution requests cancellation of an activity execution. func (h *handler) RequestCancelActivityExecution( ctx context.Context, req *activitypb.RequestCancelActivityExecutionRequest, diff --git a/chasm/lib/activity/library.go b/chasm/lib/activity/library.go index de71b6dd73..59ad0ac7d1 100644 --- a/chasm/lib/activity/library.go +++ b/chasm/lib/activity/library.go @@ -20,7 +20,13 @@ func (l *componentOnlyLibrary) Name() string { func (l *componentOnlyLibrary) Components() []*chasm.RegistrableComponent { return []*chasm.RegistrableComponent{ - chasm.NewRegistrableComponent[*Activity]("activity"), + chasm.NewRegistrableComponent[*Activity]("activity", + chasm.WithSearchAttributes( + TypeSearchAttribute, + StatusSearchAttribute, + TaskQueueSearchAttribute, + ), + ), } } diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 5019502727..ee5fa481dd 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -32,7 +32,8 @@ func (a *Activity) SetStateMachineState(state activitypb.ActivityExecutionStatus a.Status = state } -// TransitionScheduled affects a transition to Scheduled status. This is only called on the initial scheduling of the activity. +// TransitionScheduled transitions to Scheduled status. This is only called on the initial +// scheduling of the activity. var TransitionScheduled = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED, @@ -80,8 +81,8 @@ type rescheduleEvent struct { timeoutType enumspb.TimeoutType } -// TransitionRescheduled affects a transition to Scheduled from Started, which happens on retries. The event to pass in -// is the failure to be recorded from the previously failed attempt. +// TransitionRescheduled transitions to Scheduled from Started, which happens on retries. The event +// to pass in is the failure to be recorded from the previously failed attempt. var TransitionRescheduled = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, // For retries the activity will be in started status @@ -121,7 +122,7 @@ var TransitionRescheduled = chasm.NewTransition( }, ) -// TransitionStarted affects a transition to Started status +// TransitionStarted transitions to Started status. var TransitionStarted = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, @@ -167,7 +168,7 @@ type completeEvent struct { metricsHandler metrics.Handler } -// TransitionCompleted affects a transition to Completed status +// TransitionCompleted transitions to Completed status. var TransitionCompleted = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, @@ -200,7 +201,7 @@ type failedEvent struct { metricsHandler metrics.Handler } -// TransitionFailed affects a transition to Failed status +// TransitionFailed transitions to Failed status. var TransitionFailed = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, @@ -230,7 +231,7 @@ var TransitionFailed = chasm.NewTransition( }, ) -// TransitionTerminated affects a transition to Terminated status +// TransitionTerminated transitions to Terminated status. var TransitionTerminated = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, @@ -266,12 +267,12 @@ var TransitionTerminated = chasm.NewTransition( }, ) -// TransitionCancelRequested affects a transition to CancelRequested status +// TransitionCancelRequested transitions to CancelRequested status. var TransitionCancelRequested = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, - activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, // Allow idempotent transition + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, func(a *Activity, ctx chasm.MutableContext, req *workflowservice.RequestCancelActivityExecutionRequest) error { @@ -292,7 +293,7 @@ type cancelEvent struct { fromStatus activitypb.ActivityExecutionStatus } -// TransitionCanceled affects a transition to Canceled status +// TransitionCanceled transitions to Canceled status. var TransitionCanceled = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, @@ -328,7 +329,7 @@ type timeoutEvent struct { fromStatus activitypb.ActivityExecutionStatus } -// TransitionTimedOut transitions to TimedOut status +// TransitionTimedOut transitions to TimedOut status. var TransitionTimedOut = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, diff --git a/chasm/lib/activity/validator.go b/chasm/lib/activity/validator.go index ecc3d2eb56..6edd7b137e 100644 --- a/chasm/lib/activity/validator.go +++ b/chasm/lib/activity/validator.go @@ -165,51 +165,6 @@ func normalizeAndValidateTimeouts( return nil } -// validateAndNormalizeStartActivityExecutionRequest validates and normalizes the standalone activity specific attributes. -// IMPORTANT: this method mutates the input params; in cases where it's critical to maintain immutability -// (i.e., when incoming request can potentially be retried), clone the params first before passing it in. -func validateAndNormalizeStartActivityExecutionRequest( - req *workflowservice.StartActivityExecutionRequest, - blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, - blobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter, - logger log.Logger, - maxIDLengthLimit int, - saValidator *searchattribute.Validator, -) error { - if req.GetRequestId() == "" { - req.RequestId = uuid.NewString() - } - - if len(req.GetRequestId()) > maxIDLengthLimit { - return serviceerror.NewInvalidArgument("RequestID length exceeds limit.") - } - - if err := normalizeAndValidateIDPolicy(req); err != nil { - return err - } - - if err := validateInputSize( - req.GetActivityId(), - req.GetActivityType().GetName(), - blobSizeLimitError, - blobSizeLimitWarn, - req.Input.Size(), - logger, - req.GetNamespace()); err != nil { - return err - } - - if req.GetSearchAttributes() != nil { - if err := validateAndNormalizeSearchAttributes( - req, - saValidator); err != nil { - return err - } - } - - return nil -} - func normalizeAndValidateIDPolicy(req *workflowservice.StartActivityExecutionRequest) error { if req.GetIdReusePolicy() == enumspb.ACTIVITY_ID_REUSE_POLICY_UNSPECIFIED { req.IdReusePolicy = enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE @@ -251,15 +206,26 @@ func validateInputSize( func validateAndNormalizeSearchAttributes( req *workflowservice.StartActivityExecutionRequest, + saMapperProvider searchattribute.MapperProvider, saValidator *searchattribute.Validator, ) error { namespaceName := req.GetNamespace() - if err := saValidator.Validate(req.SearchAttributes, namespaceName); err != nil { + // Unalias search attributes for validation. + saToValidate := req.SearchAttributes + if saMapperProvider != nil && saToValidate != nil { + var err error + saToValidate, err = searchattribute.UnaliasFields(saMapperProvider, saToValidate, namespaceName) + if err != nil { + return err + } + } + + if err := saValidator.Validate(saToValidate, namespaceName); err != nil { return err } - return saValidator.ValidateSize(req.SearchAttributes, namespaceName) + return saValidator.ValidateSize(saToValidate, namespaceName) } // ValidateDescribeActivityExecutionRequest validates DescribeActivityExecutionRequest. diff --git a/chasm/lib/activity/validator_test.go b/chasm/lib/activity/validator_test.go index d64403916f..4da016a369 100644 --- a/chasm/lib/activity/validator_test.go +++ b/chasm/lib/activity/validator_test.go @@ -217,6 +217,21 @@ func TestValidateFailures(t *testing.T) { } } +func newTestFrontendHandler( + blobSizeLimitError func(string) int, + blobSizeLimitWarn func(string) int, + maxIDLengthLimit int, +) *frontendHandler { + return &frontendHandler{ + config: &Config{ + BlobSizeLimitError: blobSizeLimitError, + BlobSizeLimitWarn: blobSizeLimitWarn, + MaxIDLengthLimit: func() int { return maxIDLengthLimit }, + }, + logger: log.NewNoopLogger(), + } +} + func TestValidateStandAloneRequestIDTooLong(t *testing.T) { req := &workflowservice.StartActivityExecutionRequest{ ActivityId: defaultActivityID, @@ -231,13 +246,8 @@ func TestValidateStandAloneRequestIDTooLong(t *testing.T) { Input: payloads.EncodeString("test-input"), } - err := validateAndNormalizeStartActivityExecutionRequest( - req, - defaultBlobSizeLimitError, - defaultBlobSizeLimitWarn, - log.NewNoopLogger(), - defaultMaxIDLengthLimit, - nil) + h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) + err := h.validateAndNormalizeStartActivityExecutionRequest(req) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) } @@ -256,13 +266,8 @@ func TestValidateStandAloneInputTooLarge(t *testing.T) { Input: payloads.EncodeString(string(make([]byte, 1000))), } - err := validateAndNormalizeStartActivityExecutionRequest( - req, - defaultBlobSizeLimitError, - defaultBlobSizeLimitWarn, - log.NewNoopLogger(), - defaultMaxIDLengthLimit, - nil) + h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) + err := h.validateAndNormalizeStartActivityExecutionRequest(req) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) } @@ -284,13 +289,12 @@ func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) { Input: payload, } - err := validateAndNormalizeStartActivityExecutionRequest( - req, + h := newTestFrontendHandler( func(ns string) int { return payloadSize + 1 }, func(ns string) int { return payloadSize }, - log.NewNoopLogger(), defaultMaxIDLengthLimit, - nil) + ) + err := h.validateAndNormalizeStartActivityExecutionRequest(req) require.NoError(t, err) } @@ -307,13 +311,8 @@ func TestValidateStandAlone_IDPolicyShouldDefault(t *testing.T) { RequestId: "test-request-id", } - err := validateAndNormalizeStartActivityExecutionRequest( - req, - defaultBlobSizeLimitError, - defaultBlobSizeLimitWarn, - log.NewNoopLogger(), - defaultMaxIDLengthLimit, - nil) + h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) + err := h.validateAndNormalizeStartActivityExecutionRequest(req) require.NoError(t, err) require.Equal(t, enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE, req.IdReusePolicy) diff --git a/common/persistence/visibility/store/query/resolve.go b/common/persistence/visibility/store/query/resolve.go index b829cd8f86..990899bcf7 100644 --- a/common/persistence/visibility/store/query/resolve.go +++ b/common/persistence/visibility/store/query/resolve.go @@ -38,6 +38,13 @@ func ResolveSearchAttributeAlias( return sadefs.WorkflowID, saType, nil } + // Handle ActivityId → WorkflowID transformation for standalone activities. + // TODO: Remove this hardcoded transformation. + if name == sadefs.ActivityID { + saType, _ := saTypeMap.GetType(sadefs.WorkflowID) + return sadefs.WorkflowID, saType, nil + } + fieldName, fieldType = tryChasmMapper(name, chasmMapper) if fieldName != "" { return fieldName, fieldType, nil diff --git a/common/searchattribute/sadefs/constants.go b/common/searchattribute/sadefs/constants.go index efc760ef04..dee24ca923 100644 --- a/common/searchattribute/sadefs/constants.go +++ b/common/searchattribute/sadefs/constants.go @@ -59,6 +59,9 @@ const ( // any other custom search attribute. ScheduleID = "ScheduleId" + // TODO: Remove this hardcoded constant. + ActivityID = "ActivityId" + // TemporalPauseInfo is a search attribute that stores the information about paused entities in the workflow. // Format of a single paused entity: ":". // * is something that can be used to identify the filtering condition diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 0afb52d8b0..fcc39540e1 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -2,6 +2,7 @@ package tests import ( "context" + "fmt" "testing" "time" @@ -11,6 +12,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" @@ -1287,6 +1289,305 @@ func (s *standaloneActivityTestSuite) TestPollActivityExecution_InvalidArgument( // TODO(dan): add tests that DescribeActivityExecution can wait for deletion, termination, cancellation etc +func (s *standaloneActivityTestSuite) TestListActivityExecutions() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + activityType := s.tv.ActivityType().GetName() + taskQueue := s.tv.TaskQueue().GetName() + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + verifyListQuery := func(t *testing.T, query string) { + t.Helper() + var resp *workflowservice.ListActivityExecutionsResponse + s.Eventually( + func() bool { + var err error + resp, err = s.FrontendClient().ListActivityExecutions(ctx, &workflowservice.ListActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + PageSize: 10, + Query: query, + }) + return err == nil && len(resp.GetExecutions()) >= 1 + }, + testcore.WaitForESToSettle, + 100*time.Millisecond, + ) + require.Len(t, resp.GetExecutions(), 1, "expected exactly 1 result for query: %s", query) + exec := resp.GetExecutions()[0] + // Verify all ActivityExecutionListInfo fields + s.Equal(activityID, exec.GetActivityId()) + s.Equal(runID, exec.GetRunId()) + s.Equal(activityType, exec.GetActivityType().GetName()) + s.Equal(taskQueue, exec.GetTaskQueue()) + s.Equal(enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, exec.GetStatus()) + s.NotNil(exec.GetScheduleTime()) + s.Nil(exec.GetCloseTime()) // Running activity has no close time + s.Nil(exec.GetExecutionDuration()) // Running activity has no execution duration + s.GreaterOrEqual(exec.GetStateSizeBytes(), int64(0)) + s.GreaterOrEqual(exec.GetStateTransitionCount(), int64(0)) + } + + t.Run("QueryByActivityId", func(t *testing.T) { + verifyListQuery(t, fmt.Sprintf("ActivityId = '%s'", activityID)) + }) + + t.Run("QueryByActivityType", func(t *testing.T) { + verifyListQuery(t, fmt.Sprintf("ActivityType = '%s'", activityType)) + }) + + t.Run("QueryByActivityStatus", func(t *testing.T) { + verifyListQuery(t, fmt.Sprintf("ActivityStatus = 'Running' AND ActivityType = '%s'", activityType)) + }) + + t.Run("QueryByTaskQueue", func(t *testing.T) { + verifyListQuery(t, fmt.Sprintf("ActivityTaskQueue = '%s' AND ActivityType = '%s'", taskQueue, activityType)) + }) + + t.Run("QueryByMultipleFields", func(t *testing.T) { + verifyListQuery(t, fmt.Sprintf("ActivityId = '%s' AND ActivityType = '%s'", activityID, activityType)) + }) + + t.Run("QueryByCustomSearchAttribute", func(t *testing.T) { + customSAName := "CustomKeywordField" + customSAValue := "custom-sa-test-value" + customSAActivityID := "custom-sa-activity-id" + + _, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: customSAActivityID, + ActivityType: &commonpb.ActivityType{Name: "custom-sa-activity-type"}, + Identity: s.tv.WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: s.tv.TaskQueue().GetName()}, + StartToCloseTimeout: durationpb.New(1 * time.Minute), + RequestId: s.tv.RequestID(), + SearchAttributes: &commonpb.SearchAttributes{ + IndexedFields: map[string]*commonpb.Payload{ + customSAName: payload.EncodeString(customSAValue), + }, + }, + }) + require.NoError(t, err) + + var resp *workflowservice.ListActivityExecutionsResponse + s.Eventually( + func() bool { + var err error + resp, err = s.FrontendClient().ListActivityExecutions(ctx, &workflowservice.ListActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + PageSize: 10, + Query: fmt.Sprintf("%s = '%s'", customSAName, customSAValue), + }) + return err == nil && len(resp.GetExecutions()) >= 1 + }, + testcore.WaitForESToSettle, + 100*time.Millisecond, + ) + require.Len(t, resp.GetExecutions(), 1) + exec := resp.GetExecutions()[0] + s.Equal(customSAActivityID, exec.GetActivityId()) + s.NotNil(exec.GetSearchAttributes()) + returnedSA := exec.GetSearchAttributes().GetIndexedFields()[customSAName] + s.NotNil(returnedSA) + var returnedValue string + s.NoError(payload.Decode(returnedSA, &returnedValue)) + s.Equal(customSAValue, returnedValue) + }) + + t.Run("InvalidQuery", func(t *testing.T) { + _, err := s.FrontendClient().ListActivityExecutions(ctx, &workflowservice.ListActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + PageSize: 10, + Query: "invalid query syntax !!!", + }) + s.ErrorAs(err, new(*serviceerror.InvalidArgument)) + }) + + t.Run("InvalidSearchAttribute", func(t *testing.T) { + _, err := s.FrontendClient().ListActivityExecutions(ctx, &workflowservice.ListActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + PageSize: 10, + Query: "NonExistentField = 'value'", + }) + s.ErrorAs(err, new(*serviceerror.InvalidArgument)) + }) + + t.Run("NamespaceNotFound", func(t *testing.T) { + _, err := s.FrontendClient().ListActivityExecutions(ctx, &workflowservice.ListActivityExecutionsRequest{ + Namespace: "non-existent-namespace", + PageSize: 10, + Query: "", + }) + s.ErrorAs(err, new(*serviceerror.NamespaceNotFound)) + }) +} + +func (s *standaloneActivityTestSuite) TestCountActivityExecutions() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + activityType := s.tv.ActivityType().GetName() + s.startAndValidateActivity(ctx, t, activityID, s.tv.TaskQueue().GetName()) + + verifyCountQuery := func(t *testing.T, query string, expectedCount int) { + t.Helper() + s.Eventually( + func() bool { + resp, err := s.FrontendClient().CountActivityExecutions(ctx, &workflowservice.CountActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + Query: query, + }) + return err == nil && resp.GetCount() == int64(expectedCount) + }, + testcore.WaitForESToSettle, + 100*time.Millisecond, + ) + } + + t.Run("CountByActivityId", func(t *testing.T) { + verifyCountQuery(t, fmt.Sprintf("ActivityId = '%s'", activityID), 1) + }) + + t.Run("CountByActivityType", func(t *testing.T) { + verifyCountQuery(t, fmt.Sprintf("ActivityType = '%s'", activityType), 1) + }) + + t.Run("CountByActivityStatus", func(t *testing.T) { + verifyCountQuery(t, fmt.Sprintf("ActivityStatus = 'Running' AND ActivityType = '%s'", activityType), 1) + }) + + t.Run("CountByTaskQueue", func(t *testing.T) { + verifyCountQuery(t, fmt.Sprintf("ActivityTaskQueue = '%s' AND ActivityType = '%s'", s.tv.TaskQueue().GetName(), activityType), 1) + }) + + t.Run("GroupByActivityStatus", func(t *testing.T) { + groupByType := &commonpb.ActivityType{Name: "count-groupby-test-type"} + taskQueue := s.tv.TaskQueue().GetName() + + for i := range 3 { + id := fmt.Sprintf("%s-%d", groupByType.Name, i) + resp, err := s.startActivityWithType(ctx, id, taskQueue, groupByType) + require.NoError(t, err) + require.NotEmpty(t, resp.GetRunId()) + } + + query := fmt.Sprintf("ActivityType = '%s' GROUP BY ActivityStatus", groupByType.Name) + var resp *workflowservice.CountActivityExecutionsResponse + s.Eventually( + func() bool { + var err error + resp, err = s.FrontendClient().CountActivityExecutions(ctx, &workflowservice.CountActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + Query: query, + }) + return err == nil && resp.GetCount() == 3 + }, + testcore.WaitForESToSettle, + 100*time.Millisecond, + ) + + require.Len(t, resp.GetGroups(), 1) + s.Equal(int64(3), resp.GetGroups()[0].GetCount()) + var groupValue string + require.NoError(t, payload.Decode(resp.GetGroups()[0].GetGroupValues()[0], &groupValue)) + s.Equal("Running", groupValue) + }) + + t.Run("CountByCustomSearchAttribute", func(t *testing.T) { + customSAName := "ActivityCountCustomKeyword" + customSAValue := "count-custom-sa-value" + + _, err := s.OperatorClient().AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ + Namespace: s.Namespace().String(), + SearchAttributes: map[string]enumspb.IndexedValueType{ + customSAName: enumspb.INDEXED_VALUE_TYPE_KEYWORD, + }, + }) + require.NoError(t, err) + + s.Eventually(func() bool { + descResp, err := s.OperatorClient().ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{ + Namespace: s.Namespace().String(), + }) + if err != nil { + return false + } + _, ok := descResp.CustomAttributes[customSAName] + return ok + }, 10*time.Second, 100*time.Millisecond) + + for i := range 2 { + _, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: fmt.Sprintf("count-custom-sa-%d", i), + ActivityType: &commonpb.ActivityType{Name: "count-custom-sa-type"}, + Identity: s.tv.WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: s.tv.TaskQueue().GetName()}, + StartToCloseTimeout: durationpb.New(1 * time.Minute), + RequestId: s.tv.RequestID(), + SearchAttributes: &commonpb.SearchAttributes{ + IndexedFields: map[string]*commonpb.Payload{ + customSAName: payload.EncodeString(customSAValue), + }, + }, + }) + require.NoError(t, err) + } + + s.Eventually( + func() bool { + resp, err := s.FrontendClient().CountActivityExecutions(ctx, &workflowservice.CountActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + Query: fmt.Sprintf("%s = '%s'", customSAName, customSAValue), + }) + return err == nil && resp.GetCount() == 2 + }, + testcore.WaitForESToSettle, + 100*time.Millisecond, + ) + }) + + t.Run("GroupByUnsupportedField", func(t *testing.T) { + _, err := s.FrontendClient().CountActivityExecutions(ctx, &workflowservice.CountActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + Query: "GROUP BY ActivityType", + }) + s.ErrorAs(err, new(*serviceerror.InvalidArgument)) + s.Contains(err.Error(), "'GROUP BY' clause is only supported for ExecutionStatus") + }) + + t.Run("InvalidQuery", func(t *testing.T) { + _, err := s.FrontendClient().CountActivityExecutions(ctx, &workflowservice.CountActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + Query: "invalid query syntax !!!", + }) + s.ErrorAs(err, new(*serviceerror.InvalidArgument)) + }) + + t.Run("InvalidSearchAttribute", func(t *testing.T) { + _, err := s.FrontendClient().CountActivityExecutions(ctx, &workflowservice.CountActivityExecutionsRequest{ + Namespace: s.Namespace().String(), + Query: "NonExistentField = 'value'", + }) + s.ErrorAs(err, new(*serviceerror.InvalidArgument)) + }) + + t.Run("NamespaceNotFound", func(t *testing.T) { + _, err := s.FrontendClient().CountActivityExecutions(ctx, &workflowservice.CountActivityExecutionsRequest{ + Namespace: "non-existent-namespace", + Query: "", + }) + s.ErrorAs(err, new(*serviceerror.NamespaceNotFound)) + }) +} + func (s *standaloneActivityTestSuite) TestDescribeActivityExecution_DeadlineExceeded() { t := s.T() ctx := testcore.NewContext() @@ -2194,10 +2495,14 @@ func (s *standaloneActivityTestSuite) validateBaseActivityResponse( } func (s *standaloneActivityTestSuite) startActivity(ctx context.Context, activityID string, taskQueue string) (*workflowservice.StartActivityExecutionResponse, error) { + return s.startActivityWithType(ctx, activityID, taskQueue, s.tv.ActivityType()) +} + +func (s *standaloneActivityTestSuite) startActivityWithType(ctx context.Context, activityID string, taskQueue string, activityType *commonpb.ActivityType) (*workflowservice.StartActivityExecutionResponse, error) { return s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ Namespace: s.Namespace().String(), ActivityId: activityID, - ActivityType: s.tv.ActivityType(), + ActivityType: activityType, Identity: s.tv.WorkerIdentity(), Input: defaultInput, TaskQueue: &taskqueuepb.TaskQueue{