diff --git a/service/worker/common/chasm_util.go b/service/worker/common/chasm_util.go new file mode 100644 index 0000000000..35f9a8d32f --- /dev/null +++ b/service/worker/common/chasm_util.go @@ -0,0 +1,42 @@ +package common + +import ( + "fmt" + "strconv" + "unicode" + + workflowpb "go.temporal.io/api/workflow/v1" + "go.temporal.io/server/chasm" + "go.temporal.io/server/common/payload" + "go.temporal.io/server/common/searchattribute/sadefs" +) + +func ArchetypeIDFromExecutionInfo( + executionInfo *workflowpb.WorkflowExecutionInfo, +) (chasm.ArchetypeID, error) { + indexedField := executionInfo.SearchAttributes.GetIndexedFields() + if indexedField == nil { + return chasm.WorkflowArchetypeID, nil + } + + nsDivisionPayload, ok := indexedField[sadefs.TemporalNamespaceDivision] + if !ok { + return chasm.WorkflowArchetypeID, nil + } + + var nsDivisionStr string + if err := payload.Decode(nsDivisionPayload, &nsDivisionStr); err != nil { + return chasm.UnspecifiedArchetypeID, fmt.Errorf("failed to decode TemporalNamespaceDivision field: %w", err) + } + + if len(nsDivisionStr) == 0 || !unicode.IsDigit(rune(nsDivisionStr[0])) { + return chasm.WorkflowArchetypeID, nil + } + + archetypeID, err := strconv.ParseUint(nsDivisionStr, 10, 32) + if err != nil { + return chasm.UnspecifiedArchetypeID, fmt.Errorf("failed to parse archetypeID: %w", err) + } + + return chasm.ArchetypeID(archetypeID), nil +} diff --git a/service/worker/common/chasm_util_test.go b/service/worker/common/chasm_util_test.go new file mode 100644 index 0000000000..ddb3d1c94e --- /dev/null +++ b/service/worker/common/chasm_util_test.go @@ -0,0 +1,67 @@ +package common + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + workflowpb "go.temporal.io/api/workflow/v1" + "go.temporal.io/server/chasm" + "go.temporal.io/server/common/payload" + "go.temporal.io/server/common/searchattribute/sadefs" +) + +func TestArchetypeIDFromExecutionInfo(t *testing.T) { + t.Run("NoSearchAttributes", func(t *testing.T) { + execInfo := &workflowpb.WorkflowExecutionInfo{} + id, err := ArchetypeIDFromExecutionInfo(execInfo) + require.NoError(t, err) + require.Equal(t, chasm.WorkflowArchetypeID, id) + }) + + t.Run("NoNamespaceDivision", func(t *testing.T) { + execInfo := &workflowpb.WorkflowExecutionInfo{ + SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{}}, + } + id, err := ArchetypeIDFromExecutionInfo(execInfo) + require.NoError(t, err) + require.Equal(t, chasm.WorkflowArchetypeID, id) + }) + + t.Run("Scheduler", func(t *testing.T) { + p := payload.EncodeString("TemporalScheduler") + execInfo := &workflowpb.WorkflowExecutionInfo{ + SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{ + sadefs.TemporalNamespaceDivision: p, + }}, + } + id, err := ArchetypeIDFromExecutionInfo(execInfo) + require.NoError(t, err) + require.Equal(t, chasm.WorkflowArchetypeID, id) + }) + + t.Run("CHASM", func(t *testing.T) { + p := payload.EncodeString(strconv.FormatUint(42, 10)) + execInfo := &workflowpb.WorkflowExecutionInfo{ + SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{ + sadefs.TemporalNamespaceDivision: p, + }}, + } + id, err := ArchetypeIDFromExecutionInfo(execInfo) + require.NoError(t, err) + require.Equal(t, chasm.ArchetypeID(42), id) + }) + + t.Run("ErrorOnInvalidNumber", func(t *testing.T) { + p := payload.EncodeString("1x") + + execInfo := &workflowpb.WorkflowExecutionInfo{ + SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{ + sadefs.TemporalNamespaceDivision: p, + }}, + } + _, err := ArchetypeIDFromExecutionInfo(execInfo) + require.Error(t, err) + }) +} diff --git a/service/worker/deletenamespace/deleteexecutions/activities.go b/service/worker/deletenamespace/deleteexecutions/activities.go index 04ac97bb00..6cfc666b35 100644 --- a/service/worker/deletenamespace/deleteexecutions/activities.go +++ b/service/worker/deletenamespace/deleteexecutions/activities.go @@ -8,7 +8,9 @@ import ( "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/sdk/activity" + "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/chasm" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" @@ -18,6 +20,7 @@ import ( "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/searchattribute/sadefs" + workercommon "go.temporal.io/server/service/worker/common" ) type ( @@ -179,10 +182,35 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete logger.Error("Workflow executions delete rate limiter error.", tag.Error(err)) return result, fmt.Errorf("rate limiter error: %w", err) } - _, err = a.historyClient.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{ - NamespaceId: params.NamespaceID.String(), - WorkflowExecution: execution.Execution, - }) + + archetypeID, err := workercommon.ArchetypeIDFromExecutionInfo(execution) + if err != nil { + logger.Error("Failed to extract archetype ID from execution info.", tag.Error(err)) + return result, fmt.Errorf("archetypeID extraction error: %w", err) + } + + if archetypeID == chasm.WorkflowArchetypeID { + // TODO: consider using ForceDeleteWorkflowExecution for workflow as well. + _, err = a.historyClient.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{ + NamespaceId: params.NamespaceID.String(), + WorkflowExecution: execution.Execution, + }) + } else { + // NOTE: ForceDeleteWorkflowExecution is NOT design as a API to be consumed programmatically, + // and only performs best effort deletion on execution histories. + // It works for CHASM now as CHASM executions don't have any history events, so as long as this API, + // returns nil error, it means we have successfully deleted the mutable state and visibility records. + _, err = a.historyClient.ForceDeleteWorkflowExecution(ctx, &historyservice.ForceDeleteWorkflowExecutionRequest{ + NamespaceId: params.NamespaceID.String(), + ArchetypeId: archetypeID, + Request: &adminservice.DeleteWorkflowExecutionRequest{ + // Namespace and Archetype fields are not required since we are calling history + // service directly. + Execution: execution.Execution, + }, + }) + } + switch err.(type) { case nil: result.SuccessCount++ diff --git a/service/worker/deletenamespace/deleteexecutions/workflow_test.go b/service/worker/deletenamespace/deleteexecutions/workflow_test.go index 4b6b1865d3..9c72e3e282 100644 --- a/service/worker/deletenamespace/deleteexecutions/workflow_test.go +++ b/service/worker/deletenamespace/deleteexecutions/workflow_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" stderrors "errors" + "strconv" "sync/atomic" "testing" "time" @@ -17,10 +18,13 @@ import ( "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/testsuite" "go.temporal.io/sdk/workflow" + "go.temporal.io/server/api/adminservice/v1" + "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/historyservicemock/v1" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/payload" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/searchattribute/sadefs" @@ -343,6 +347,104 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_ManyExecutions(t *testing.T) require.Equal(t, 3, result.SuccessCount) } +func Test_DeleteExecutionsWorkflow_NoActivityMocks_ChasmExecutions(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + testSuite.SetLogger(log.NewSdkLogger(log.NewTestLogger())) + env := testSuite.NewTestWorkflowEnvironment() + + execution1 := &commonpb.WorkflowExecution{ + WorkflowId: "workflow-id-1", + RunId: "run-id-1", + } + archetypeID1 := 12345 + execution2 := &commonpb.WorkflowExecution{ + WorkflowId: "workflow-id-2", + RunId: "run-id-2", + } + archetypeID2 := 54321 + + ctrl := gomock.NewController(t) + visibilityManager := manager.NewMockVisibilityManager(ctrl) + visibilityManager.EXPECT().ListWorkflowExecutions(gomock.Any(), &manager.ListWorkflowExecutionsRequestV2{ + NamespaceID: "namespace-id", + Namespace: "namespace", + PageSize: 2, + NextPageToken: nil, + Query: sadefs.QueryWithAnyNamespaceDivision(""), + }).Return(&manager.ListWorkflowExecutionsResponse{ + Executions: []*workflowpb.WorkflowExecutionInfo{ + { + Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + Execution: execution1, + SearchAttributes: &commonpb.SearchAttributes{ + IndexedFields: map[string]*commonpb.Payload{ + sadefs.TemporalNamespaceDivision: payload.EncodeString(strconv.FormatUint(uint64(archetypeID1), 10)), + }, + }, + }, + { + Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + Execution: execution2, + SearchAttributes: &commonpb.SearchAttributes{ + IndexedFields: map[string]*commonpb.Payload{ + sadefs.TemporalNamespaceDivision: payload.EncodeString(strconv.FormatUint(uint64(archetypeID2), 10)), + }, + }, + }, + }, + }, nil).Times(2) + + historyClient := historyservicemock.NewMockHistoryServiceClient(ctrl) + historyClient.EXPECT().ForceDeleteWorkflowExecution(gomock.Any(), &historyservice.ForceDeleteWorkflowExecutionRequest{ + NamespaceId: "namespace-id", + ArchetypeId: uint32(archetypeID1), + Request: &adminservice.DeleteWorkflowExecutionRequest{ + Execution: execution1, + }, + }).Return(nil, nil).Times(1) + historyClient.EXPECT().ForceDeleteWorkflowExecution(gomock.Any(), &historyservice.ForceDeleteWorkflowExecutionRequest{ + NamespaceId: "namespace-id", + ArchetypeId: uint32(archetypeID2), + Request: &adminservice.DeleteWorkflowExecutionRequest{ + Execution: execution2, + }, + }).Return(nil, nil).Times(1) + + a := &Activities{ + visibilityManager: visibilityManager, + historyClient: historyClient, + deleteActivityRPS: func(callback func(int)) (v int, cancel func()) { + return 100, func() {} + }, + metricsHandler: metrics.NoopMetricsHandler, + logger: log.NewTestLogger(), + } + la := &LocalActivities{ + visibilityManager: visibilityManager, + metricsHandler: metrics.NoopMetricsHandler, + logger: log.NewTestLogger(), + } + + env.RegisterActivity(la.GetNextPageTokenActivity) + env.RegisterActivity(a.DeleteExecutionsActivity) + + env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ + NamespaceID: "namespace-id", + Namespace: "namespace", + Config: DeleteExecutionsConfig{ + PageSize: 2, + }, + }) + + ctrl.Finish() + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + var result DeleteExecutionsResult + require.NoError(t, env.GetWorkflowResult(&result)) + require.Equal(t, 0, result.ErrorCount) + require.Equal(t, 2, result.SuccessCount) +} + func Test_DeleteExecutionsWorkflow_NoActivityMocks_HistoryClientError(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} testSuite.SetLogger(log.NewSdkLogger(log.NewTestLogger())) diff --git a/service/worker/migration/activities.go b/service/worker/migration/activities.go index 924fb4d9dd..72f6b6c80c 100644 --- a/service/worker/migration/activities.go +++ b/service/worker/migration/activities.go @@ -5,9 +5,7 @@ import ( "fmt" "math" "sort" - "strconv" "time" - "unicode" "github.com/pkg/errors" commonpb "go.temporal.io/api/common/v1" @@ -29,11 +27,10 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/payload" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/rpc/interceptor" - "go.temporal.io/server/common/searchattribute/sadefs" + workercommon "go.temporal.io/server/service/worker/common" "google.golang.org/grpc/metadata" ) @@ -503,21 +500,15 @@ func (a *activities) ListWorkflows(ctx context.Context, request *workflowservice ArchetypeId: chasm.UnspecifiedArchetypeID, } - if indexedField := e.SearchAttributes.GetIndexedFields(); indexedField != nil { - if nsDivisionPayload, ok := indexedField[sadefs.TemporalNamespaceDivision]; ok { - var nsDivisionStr string - if err := payload.Decode(nsDivisionPayload, &nsDivisionStr); err != nil { - return nil, fmt.Errorf("failed to decode TemporalNamespaceDivision field: %w", err) - } - - if len(nsDivisionStr) != 0 && unicode.IsDigit(rune(nsDivisionStr[0])) { - archetypeID, err := strconv.ParseUint(nsDivisionStr, 10, 32) - if err != nil { - return nil, fmt.Errorf("failed to parse archetypeID: %w", err) - } - executionInfo.ArchetypeId = chasm.ArchetypeID(archetypeID) - } - } + archetypeID, err := workercommon.ArchetypeIDFromExecutionInfo(e) + if err != nil { + return nil, fmt.Errorf("archetypeID extraction error: %w", err) + } + if archetypeID != chasm.WorkflowArchetypeID { + // For backward compatibility reason we need this field to be 0 + // to avoid unmarshaling errors for workflows. + // Check comment above for more details. + executionInfo.ArchetypeId = archetypeID } executions = append(executions, executionInfo) diff --git a/tests/chasm_test.go b/tests/chasm_test.go index 56f62fa4d6..888bd8884b 100644 --- a/tests/chasm_test.go +++ b/tests/chasm_test.go @@ -8,9 +8,11 @@ import ( "time" "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/tests" "go.temporal.io/server/chasm/lib/tests/gen/testspb/v1" @@ -317,8 +319,8 @@ func (s *ChasmTestSuite) TestListExecutions() { s.Eventually( func() bool { resp, err := chasm.ListExecutions[*tests.PayloadStore, *testspb.TestPayloadStore](ctx, &chasm.ListExecutionsRequest{ - NamespaceID: string(s.NamespaceID()), - NamespaceName: string(s.Namespace()), + NamespaceID: s.NamespaceID().String(), + NamespaceName: s.Namespace().String(), PageSize: 10, Query: visQuery + " AND ExecutionStatus = 'Completed' AND PayloadTotalCount > 0", }) @@ -497,4 +499,92 @@ func (s *ChasmTestSuite) TestListWorkflowExecutions() { s.False(hasTotalSize, "CHASM search attribute TemporalInt02 should not be exposed") } +func (s *ChasmTestSuite) TestPayloadStoreForceDelete() { + tv := testvars.New(s.T()) + + ctx, cancel := context.WithTimeout(s.chasmContext, chasmTestTimeout) + defer cancel() + + storeID := tv.Any().String() + createResp, err := tests.NewPayloadStoreHandler( + ctx, + tests.NewPayloadStoreRequest{ + NamespaceID: s.NamespaceID(), + StoreID: storeID, + IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate, + IDConflictPolicy: chasm.BusinessIDConflictPolicyFail, + }, + ) + s.NoError(err) + + // Make sure visibility record is created, so that we can test its deletion later. + archetypeID, ok := s.FunctionalTestBase.GetTestCluster().Host().GetCHASMRegistry().ComponentIDFor(&tests.PayloadStore{}) + s.True(ok) + visQuery := fmt.Sprintf("TemporalNamespaceDivision = '%d' AND WorkflowId = '%s'", archetypeID, storeID) + var executionInfo *workflowpb.WorkflowExecutionInfo + s.Eventually( + func() bool { + resp, err := s.FrontendClient().ListWorkflowExecutions(testcore.NewContext(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: s.Namespace().String(), + PageSize: 10, + Query: visQuery, + }) + s.NoError(err) + if len(resp.Executions) > 0 { + executionInfo = resp.Executions[0] + } + return len(resp.Executions) == 1 + }, + testcore.WaitForESToSettle, + 100*time.Millisecond, + ) + archetypePayload, ok := executionInfo.SearchAttributes.GetIndexedFields()[sadefs.TemporalNamespaceDivision] + s.True(ok) + var archetypeIDStr string + s.NoError(payload.Decode(archetypePayload, &archetypeIDStr)) + parsedArchetypeID, err := strconv.ParseUint(archetypeIDStr, 10, 32) + s.NoError(err) + s.Equal(archetypeID, chasm.ArchetypeID(parsedArchetypeID)) + + archetype, ok := s.FunctionalTestBase.GetTestCluster().Host().GetCHASMRegistry().ComponentFqnByID(archetypeID) + s.True(ok) + _, err = s.AdminClient().DeleteWorkflowExecution(testcore.NewContext(), &adminservice.DeleteWorkflowExecutionRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: storeID, + RunId: createResp.RunID, + }, + Archetype: archetype, + }) + s.NoError(err) + + // Validate mutable state is deleted. + _, err = s.AdminClient().DescribeMutableState(testcore.NewContext(), &adminservice.DescribeMutableStateRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: storeID, + RunId: createResp.RunID, + }, + Archetype: archetype, + }) + var notFoundErr *serviceerror.NotFound + s.ErrorAs(err, ¬FoundErr) + + // Validate visibility record is deleted. + s.Eventually( + func() bool { + resp, err := chasm.ListExecutions[*tests.PayloadStore, *testspb.TestPayloadStore](ctx, &chasm.ListExecutionsRequest{ + NamespaceID: s.NamespaceID().String(), + NamespaceName: s.Namespace().String(), + PageSize: 10, + Query: visQuery, + }) + s.NoError(err) + return len(resp.Executions) == 0 + }, + testcore.WaitForESToSettle, + 100*time.Millisecond, + ) +} + // TODO: More tests here...