Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 68 additions & 30 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,23 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

// WorkflowTypeTag Required workflow tag for standalone activities to ensure consistent metric labeling between workflows and activities
const WorkflowTypeTag = "__temporal_standalone_activity__"
const (
// WorkflowTypeTag is a required workflow tag for standalone activities to ensure consistent
// metric labeling between workflows and activities.
WorkflowTypeTag = "__temporal_standalone_activity__"

TypeSAAlias = "ActivityType"
StatusSAAlias = "ActivityStatus"
TaskQueueSAAlias = "ActivityTaskQueue"
)

var (
TypeSearchAttribute = chasm.NewSearchAttributeKeyword(TypeSAAlias, chasm.SearchAttributeFieldKeyword01)
StatusSearchAttribute = chasm.NewSearchAttributeKeyword(StatusSAAlias, chasm.SearchAttributeFieldLowCardinalityKeyword01)
TaskQueueSearchAttribute = chasm.NewSearchAttributeKeyword(TaskQueueSAAlias, chasm.SearchAttributeFieldKeyword02)
)

var _ chasm.VisibilitySearchAttributesProvider = (*Activity)(nil)

type ActivityStore interface {
// PopulateRecordStartedResponse populates the response for RecordActivityTaskStarted
Expand Down Expand Up @@ -552,41 +567,54 @@ func (a *Activity) RecordHeartbeat(
}, nil
}

func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.ActivityExecutionInfo, error) {
// TODO(dan): support pause states
var status enumspb.ActivityExecutionStatus
var runState enumspb.PendingActivityState
switch a.GetStatus() {
case activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING
runState = enumspb.PENDING_ACTIVITY_STATE_SCHEDULED
case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING
runState = enumspb.PENDING_ACTIVITY_STATE_STARTED
case activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING
runState = enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED
// InternalStatusToAPIStatus converts internal activity execution status to API status.
func InternalStatusToAPIStatus(status activitypb.ActivityExecutionStatus) enumspb.ActivityExecutionStatus {
switch status {
case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED,
activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED:
return enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
return enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED
case activitypb.ACTIVITY_EXECUTION_STATUS_FAILED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_FAILED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
return enumspb.ACTIVITY_EXECUTION_STATUS_FAILED
case activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
return enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED
case activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED:
status = enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
return enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED
case activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT:
status = enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT
runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
return enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT
case activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED:
return enumspb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED
default:
return nil, serviceerror.NewInternalf("unknown activity execution status: %s", a.GetStatus())
panic(fmt.Sprintf("unknown activity execution status: %v", status)) //nolint:forbidigo
}
}

func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb.PendingActivityState {
switch status {
case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED:
return enumspb.PENDING_ACTIVITY_STATE_SCHEDULED
case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED:
return enumspb.PENDING_ACTIVITY_STATE_STARTED
case activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED:
return enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED,
activitypb.ACTIVITY_EXECUTION_STATUS_FAILED,
activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED,
activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED,
activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT,
activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED:
return enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED
default:
panic(fmt.Sprintf("unknown activity execution status: %v", status)) //nolint:forbidigo
}
}

func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.ActivityExecutionInfo, error) {
// TODO(dan): support pause states
status := InternalStatusToAPIStatus(a.GetStatus())
runState := internalStatusToRunState(a.GetStatus())

requestData := a.RequestData.Get(ctx)
attempt := a.LastAttempt.Get(ctx)
Expand Down Expand Up @@ -826,3 +854,13 @@ func (a *Activity) emitOnTimedOutMetrics(
metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag)
metrics.ActivityTimeout.With(handler).Record(1, timeoutTag)
}

// SearchAttributes implements chasm.VisibilitySearchAttributesProvider interface.
// Returns the current search attribute values for this activity execution.
func (a *Activity) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue {
return []chasm.SearchAttributeKeyValue{
TypeSearchAttribute.Value(a.GetActivityType().GetName()),
StatusSearchAttribute.Value(InternalStatusToAPIStatus(a.GetStatus()).String()),
TaskQueueSearchAttribute.Value(a.GetTaskQueue().GetName()),
}
}
148 changes: 141 additions & 7 deletions chasm/lib/activity/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ import (
"github.com/google/uuid"
apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/searchattribute"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

const StandaloneActivityDisabledError = "Standalone activity is disabled"
Expand Down Expand Up @@ -157,6 +161,101 @@ func (h *frontendHandler) PollActivityExecution(
return resp.GetFrontendResponse(), err
}

// ListActivityExecutions lists activity executions matching the query in the request.
func (h *frontendHandler) ListActivityExecutions(
ctx context.Context,
req *workflowservice.ListActivityExecutionsRequest,
) (*workflowservice.ListActivityExecutionsResponse, error) {
if !h.config.Enabled(req.GetNamespace()) {
return nil, serviceerror.NewUnavailable(StandaloneActivityDisabledError)
}

namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
if err != nil {
return nil, err
}

resp, err := chasm.ListExecutions[*Activity, *emptypb.Empty](ctx, &chasm.ListExecutionsRequest{
NamespaceID: namespaceID.String(),
NamespaceName: req.GetNamespace(),
PageSize: int(req.GetPageSize()),
NextPageToken: req.GetNextPageToken(),
Query: req.GetQuery(),
})
if err != nil {
return nil, err
}

executions := make([]*apiactivitypb.ActivityExecutionListInfo, 0, len(resp.Executions))
for _, exec := range resp.Executions {
activityType, _ := chasm.GetValue(exec.ChasmSearchAttributes, TypeSearchAttribute)
taskQueue, _ := chasm.GetValue(exec.ChasmSearchAttributes, TaskQueueSearchAttribute)
statusStr, _ := chasm.GetValue(exec.ChasmSearchAttributes, StatusSearchAttribute)
status, _ := enumspb.ActivityExecutionStatusFromString(statusStr)

info := &apiactivitypb.ActivityExecutionListInfo{
ActivityId: exec.BusinessID,
RunId: exec.RunID,
ScheduleTime: timestamppb.New(exec.StartTime),
StateTransitionCount: exec.StateTransitionCount,
StateSizeBytes: exec.HistorySizeBytes,
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: exec.CustomSearchAttributes},
ActivityType: &commonpb.ActivityType{Name: activityType},
TaskQueue: taskQueue,
Status: status,
}
if !exec.CloseTime.IsZero() {
info.CloseTime = timestamppb.New(exec.CloseTime)
if !exec.StartTime.IsZero() {
info.ExecutionDuration = durationpb.New(exec.CloseTime.Sub(exec.StartTime))
}
}
executions = append(executions, info)
}

return &workflowservice.ListActivityExecutionsResponse{
Executions: executions,
NextPageToken: resp.NextPageToken,
}, nil
}

// CountActivityExecutions counts activity executions matching the query in the request.
func (h *frontendHandler) CountActivityExecutions(
ctx context.Context,
req *workflowservice.CountActivityExecutionsRequest,
) (*workflowservice.CountActivityExecutionsResponse, error) {
if !h.config.Enabled(req.GetNamespace()) {
return nil, serviceerror.NewUnavailable(StandaloneActivityDisabledError)
}

namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
if err != nil {
return nil, err
}

resp, err := chasm.CountExecutions[*Activity](ctx, &chasm.CountExecutionsRequest{
NamespaceID: namespaceID.String(),
NamespaceName: req.GetNamespace(),
Query: req.GetQuery(),
})
if err != nil {
return nil, err
}

groups := make([]*workflowservice.CountActivityExecutionsResponse_AggregationGroup, 0, len(resp.Groups))
for _, g := range resp.Groups {
groups = append(groups, &workflowservice.CountActivityExecutionsResponse_AggregationGroup{
GroupValues: g.Values,
Count: g.Count,
})
}

return &workflowservice.CountActivityExecutionsResponse{
Count: resp.Count,
Groups: groups,
}, nil
}

// TerminateActivityExecution terminates a standalone activity execution
func (h *frontendHandler) TerminateActivityExecution(
ctx context.Context,
Expand Down Expand Up @@ -265,20 +364,55 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
}
applyActivityOptionsToStartRequest(opts, req)

err = validateAndNormalizeStartActivityExecutionRequest(
req,
h.config.BlobSizeLimitError,
h.config.BlobSizeLimitWarn,
h.logger,
h.config.MaxIDLengthLimit(),
h.saValidator)
err = h.validateAndNormalizeStartActivityExecutionRequest(req)
if err != nil {
return nil, err
}

return req, nil
}

// validateAndNormalizeStartActivityExecutionRequest validates and normalizes the standalone
// activity specific attributes. Note that this method mutates the input params; the caller must
// clone the request if necessary (e.g. if it may be retried).
func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest(
req *workflowservice.StartActivityExecutionRequest,
) error {
if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()
}

if len(req.GetRequestId()) > h.config.MaxIDLengthLimit() {
return serviceerror.NewInvalidArgument("RequestID length exceeds limit.")
}

if err := normalizeAndValidateIDPolicy(req); err != nil {
return err
}

if err := validateInputSize(
req.GetActivityId(),
req.GetActivityType().GetName(),
h.config.BlobSizeLimitError,
h.config.BlobSizeLimitWarn,
req.Input.Size(),
h.logger,
req.GetNamespace()); err != nil {
return err
}

if req.GetSearchAttributes() != nil {
if err := validateAndNormalizeSearchAttributes(
req,
h.saMapperProvider,
h.saValidator); err != nil {
return err
}
}

return nil
}

// activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields
// of a StartActivityExecutionRequest for use with shared validation logic.
func activityOptionsFromStartRequest(req *workflowservice.StartActivityExecutionRequest) *apiactivitypb.ActivityOptions {
Expand Down
20 changes: 11 additions & 9 deletions chasm/lib/activity/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func newHandler(config *Config, metricsHandler metrics.Handler, namespaceRegistr
}
}

// StartActivityExecution schedules an activity execution. Note that while external callers refer to
// this as "start", the start transition in fact happens later, in response to the activity task in
// matching being delivered to a worker poll request.
func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.StartActivityExecutionRequest) (*activitypb.StartActivityExecutionResponse, error) {
frontendReq := req.GetFrontendRequest()

Expand Down Expand Up @@ -93,13 +96,12 @@ func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.St
}, nil
}

// DescribeActivityExecution handles DescribeActivityExecutionRequest from frontend. This method
// queries current activity state, optionally as a long-poll that waits for any state change. When
// used to long-poll, it returns an empty non-error response on context deadline expiry, to indicate
// that the state being waited for was not reached. Callers should interpret this as an invitation
// to resubmit their long-poll request. This response is sent before the caller's deadline (see
// chasm.activity.longPollBuffer) so that it is likely that the caller does indeed receive the
// non-error response.
// DescribeActivityExecution queries current activity state, optionally as a long-poll that waits
// for any state change. When used to long-poll, it returns an empty non-error response on context
// deadline expiry, to indicate that the state being waited for was not reached. Callers should
// interpret this as an invitation to resubmit their long-poll request. This response is sent before
// the caller's deadline (see chasm.activity.longPollBuffer) so that it is likely that the caller
// does indeed receive the non-error response.
func (h *handler) DescribeActivityExecution(
ctx context.Context,
req *activitypb.DescribeActivityExecutionRequest,
Expand Down Expand Up @@ -217,7 +219,7 @@ func (h *handler) PollActivityExecution(
return response, err
}

// TerminateActivityExecution terminates a standalone activity execution
// TerminateActivityExecution terminates an activity execution.
func (h *handler) TerminateActivityExecution(
ctx context.Context,
req *activitypb.TerminateActivityExecutionRequest,
Expand Down Expand Up @@ -256,7 +258,7 @@ func (h *handler) TerminateActivityExecution(
return response, nil
}

// RequestCancelActivityExecution requests cancellation on a standalone activity execution
// RequestCancelActivityExecution requests cancellation of an activity execution.
func (h *handler) RequestCancelActivityExecution(
ctx context.Context,
req *activitypb.RequestCancelActivityExecutionRequest,
Expand Down
8 changes: 7 additions & 1 deletion chasm/lib/activity/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ func (l *componentOnlyLibrary) Name() string {

func (l *componentOnlyLibrary) Components() []*chasm.RegistrableComponent {
return []*chasm.RegistrableComponent{
chasm.NewRegistrableComponent[*Activity]("activity"),
chasm.NewRegistrableComponent[*Activity]("activity",
chasm.WithSearchAttributes(
TypeSearchAttribute,
StatusSearchAttribute,
TaskQueueSearchAttribute,
),
),
}
}

Expand Down
Loading
Loading