Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions service/worker/common/chasm_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package common

import (
"fmt"
"strconv"
"unicode"

workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/searchattribute/sadefs"
)

func ArchetypeIDFromExecutionInfo(
executionInfo *workflowpb.WorkflowExecutionInfo,
) (chasm.ArchetypeID, error) {
indexedField := executionInfo.SearchAttributes.GetIndexedFields()
if indexedField == nil {
return chasm.WorkflowArchetypeID, nil
}

nsDivisionPayload, ok := indexedField[sadefs.TemporalNamespaceDivision]
if !ok {
return chasm.WorkflowArchetypeID, nil
}

var nsDivisionStr string
if err := payload.Decode(nsDivisionPayload, &nsDivisionStr); err != nil {
return chasm.UnspecifiedArchetypeID, fmt.Errorf("failed to decode TemporalNamespaceDivision field: %w", err)
}

if len(nsDivisionStr) == 0 || !unicode.IsDigit(rune(nsDivisionStr[0])) {
return chasm.WorkflowArchetypeID, nil
}

archetypeID, err := strconv.ParseUint(nsDivisionStr, 10, 32)
if err != nil {
return chasm.UnspecifiedArchetypeID, fmt.Errorf("failed to parse archetypeID: %w", err)
}

return chasm.ArchetypeID(archetypeID), nil
}
67 changes: 67 additions & 0 deletions service/worker/common/chasm_util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package common

import (
"strconv"
"testing"

"github.com/stretchr/testify/require"
commonpb "go.temporal.io/api/common/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/searchattribute/sadefs"
)

func TestArchetypeIDFromExecutionInfo(t *testing.T) {
t.Run("NoSearchAttributes", func(t *testing.T) {
execInfo := &workflowpb.WorkflowExecutionInfo{}
id, err := ArchetypeIDFromExecutionInfo(execInfo)
require.NoError(t, err)
require.Equal(t, chasm.WorkflowArchetypeID, id)
})

t.Run("NoNamespaceDivision", func(t *testing.T) {
execInfo := &workflowpb.WorkflowExecutionInfo{
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{}},
}
id, err := ArchetypeIDFromExecutionInfo(execInfo)
require.NoError(t, err)
require.Equal(t, chasm.WorkflowArchetypeID, id)
})

t.Run("Scheduler", func(t *testing.T) {
p := payload.EncodeString("TemporalScheduler")
execInfo := &workflowpb.WorkflowExecutionInfo{
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{
sadefs.TemporalNamespaceDivision: p,
}},
}
id, err := ArchetypeIDFromExecutionInfo(execInfo)
require.NoError(t, err)
require.Equal(t, chasm.WorkflowArchetypeID, id)
})

t.Run("CHASM", func(t *testing.T) {
p := payload.EncodeString(strconv.FormatUint(42, 10))
execInfo := &workflowpb.WorkflowExecutionInfo{
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{
sadefs.TemporalNamespaceDivision: p,
}},
}
id, err := ArchetypeIDFromExecutionInfo(execInfo)
require.NoError(t, err)
require.Equal(t, chasm.ArchetypeID(42), id)
})

t.Run("ErrorOnInvalidNumber", func(t *testing.T) {
p := payload.EncodeString("1x")

execInfo := &workflowpb.WorkflowExecutionInfo{
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{
sadefs.TemporalNamespaceDivision: p,
}},
}
_, err := ArchetypeIDFromExecutionInfo(execInfo)
require.Error(t, err)
})
}
36 changes: 32 additions & 4 deletions service/worker/deletenamespace/deleteexecutions/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand All @@ -18,6 +20,7 @@ import (
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/searchattribute/sadefs"
workercommon "go.temporal.io/server/service/worker/common"
)

type (
Expand Down Expand Up @@ -179,10 +182,35 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete
logger.Error("Workflow executions delete rate limiter error.", tag.Error(err))
return result, fmt.Errorf("rate limiter error: %w", err)
}
_, err = a.historyClient.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{
NamespaceId: params.NamespaceID.String(),
WorkflowExecution: execution.Execution,
})

archetypeID, err := workercommon.ArchetypeIDFromExecutionInfo(execution)
if err != nil {
logger.Error("Failed to extract archetype ID from execution info.", tag.Error(err))
return result, fmt.Errorf("archetypeID extraction error: %w", err)
}

if archetypeID == chasm.WorkflowArchetypeID {
// TODO: consider using ForceDeleteWorkflowExecution for workflow as well.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only used for namespace deletion? if true, then we should just use force delete here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to refactor force deletion api handle a bit though. Today it only does best effort deletion on history events, we will need a non best effort mode first and then switch to it.

_, err = a.historyClient.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{
NamespaceId: params.NamespaceID.String(),
WorkflowExecution: execution.Execution,
})
} else {
// NOTE: ForceDeleteWorkflowExecution is NOT design as a API to be consumed programmatically,
// and only performs best effort deletion on execution histories.
// It works for CHASM now as CHASM executions don't have any history events, so as long as this API,
// returns nil error, it means we have successfully deleted the mutable state and visibility records.
_, err = a.historyClient.ForceDeleteWorkflowExecution(ctx, &historyservice.ForceDeleteWorkflowExecutionRequest{
NamespaceId: params.NamespaceID.String(),
ArchetypeId: archetypeID,
Request: &adminservice.DeleteWorkflowExecutionRequest{
// Namespace and Archetype fields are not required since we are calling history
// service directly.
Execution: execution.Execution,
},
})
}

switch err.(type) {
case nil:
result.SuccessCount++
Expand Down
102 changes: 102 additions & 0 deletions service/worker/deletenamespace/deleteexecutions/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
stderrors "errors"
"strconv"
"sync/atomic"
"testing"
"time"
Expand All @@ -17,10 +18,13 @@ import (
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/historyservicemock/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/searchattribute/sadefs"
Expand Down Expand Up @@ -343,6 +347,104 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_ManyExecutions(t *testing.T)
require.Equal(t, 3, result.SuccessCount)
}

func Test_DeleteExecutionsWorkflow_NoActivityMocks_ChasmExecutions(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
testSuite.SetLogger(log.NewSdkLogger(log.NewTestLogger()))
env := testSuite.NewTestWorkflowEnvironment()

execution1 := &commonpb.WorkflowExecution{
WorkflowId: "workflow-id-1",
RunId: "run-id-1",
}
archetypeID1 := 12345
execution2 := &commonpb.WorkflowExecution{
WorkflowId: "workflow-id-2",
RunId: "run-id-2",
}
archetypeID2 := 54321

ctrl := gomock.NewController(t)
visibilityManager := manager.NewMockVisibilityManager(ctrl)
visibilityManager.EXPECT().ListWorkflowExecutions(gomock.Any(), &manager.ListWorkflowExecutionsRequestV2{
NamespaceID: "namespace-id",
Namespace: "namespace",
PageSize: 2,
NextPageToken: nil,
Query: sadefs.QueryWithAnyNamespaceDivision(""),
}).Return(&manager.ListWorkflowExecutionsResponse{
Executions: []*workflowpb.WorkflowExecutionInfo{
{
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
Execution: execution1,
SearchAttributes: &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
sadefs.TemporalNamespaceDivision: payload.EncodeString(strconv.FormatUint(uint64(archetypeID1), 10)),
},
},
},
{
Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
Execution: execution2,
SearchAttributes: &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
sadefs.TemporalNamespaceDivision: payload.EncodeString(strconv.FormatUint(uint64(archetypeID2), 10)),
},
},
},
},
}, nil).Times(2)

historyClient := historyservicemock.NewMockHistoryServiceClient(ctrl)
historyClient.EXPECT().ForceDeleteWorkflowExecution(gomock.Any(), &historyservice.ForceDeleteWorkflowExecutionRequest{
NamespaceId: "namespace-id",
ArchetypeId: uint32(archetypeID1),
Request: &adminservice.DeleteWorkflowExecutionRequest{
Execution: execution1,
},
}).Return(nil, nil).Times(1)
historyClient.EXPECT().ForceDeleteWorkflowExecution(gomock.Any(), &historyservice.ForceDeleteWorkflowExecutionRequest{
NamespaceId: "namespace-id",
ArchetypeId: uint32(archetypeID2),
Request: &adminservice.DeleteWorkflowExecutionRequest{
Execution: execution2,
},
}).Return(nil, nil).Times(1)

a := &Activities{
visibilityManager: visibilityManager,
historyClient: historyClient,
deleteActivityRPS: func(callback func(int)) (v int, cancel func()) {
return 100, func() {}
},
metricsHandler: metrics.NoopMetricsHandler,
logger: log.NewTestLogger(),
}
la := &LocalActivities{
visibilityManager: visibilityManager,
metricsHandler: metrics.NoopMetricsHandler,
logger: log.NewTestLogger(),
}

env.RegisterActivity(la.GetNextPageTokenActivity)
env.RegisterActivity(a.DeleteExecutionsActivity)

env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{
NamespaceID: "namespace-id",
Namespace: "namespace",
Config: DeleteExecutionsConfig{
PageSize: 2,
},
})

ctrl.Finish()
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result DeleteExecutionsResult
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, 0, result.ErrorCount)
require.Equal(t, 2, result.SuccessCount)
}

func Test_DeleteExecutionsWorkflow_NoActivityMocks_HistoryClientError(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
testSuite.SetLogger(log.NewSdkLogger(log.NewTestLogger()))
Expand Down
29 changes: 10 additions & 19 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"fmt"
"math"
"sort"
"strconv"
"time"
"unicode"

"github.com/pkg/errors"
commonpb "go.temporal.io/api/common/v1"
Expand All @@ -29,11 +27,10 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/common/searchattribute/sadefs"
workercommon "go.temporal.io/server/service/worker/common"
"google.golang.org/grpc/metadata"
)

Expand Down Expand Up @@ -503,21 +500,15 @@ func (a *activities) ListWorkflows(ctx context.Context, request *workflowservice
ArchetypeId: chasm.UnspecifiedArchetypeID,
}

if indexedField := e.SearchAttributes.GetIndexedFields(); indexedField != nil {
if nsDivisionPayload, ok := indexedField[sadefs.TemporalNamespaceDivision]; ok {
var nsDivisionStr string
if err := payload.Decode(nsDivisionPayload, &nsDivisionStr); err != nil {
return nil, fmt.Errorf("failed to decode TemporalNamespaceDivision field: %w", err)
}

if len(nsDivisionStr) != 0 && unicode.IsDigit(rune(nsDivisionStr[0])) {
archetypeID, err := strconv.ParseUint(nsDivisionStr, 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse archetypeID: %w", err)
}
executionInfo.ArchetypeId = chasm.ArchetypeID(archetypeID)
}
}
archetypeID, err := workercommon.ArchetypeIDFromExecutionInfo(e)
if err != nil {
return nil, fmt.Errorf("archetypeID extraction error: %w", err)
}
if archetypeID != chasm.WorkflowArchetypeID {
// For backward compatibility reason we need this field to be 0
// to avoid unmarshaling errors for workflows.
// Check comment above for more details.
executionInfo.ArchetypeId = archetypeID
}

executions = append(executions, executionInfo)
Expand Down
Loading
Loading