Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
436 changes: 224 additions & 212 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
go.opentelemetry.io/otel/sdk v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5
go.temporal.io/api v1.58.1-0.20251209235529-e917f5407b0e
go.temporal.io/sdk v1.35.0
go.uber.org/fx v1.24.0
go.uber.org/mock v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5 h1:7lFIrLVM+NPVcqFMrEwv5d8D9meA7n/Xl9GtCl8Gyhc=
go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.58.1-0.20251209235529-e917f5407b0e h1:g6xxnENbIdyubpzzEupQoStJWwnR4qP32or9nfylNoM=
go.temporal.io/api v1.58.1-0.20251209235529-e917f5407b0e/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ=
go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ message RecordWorkflowTaskStartedRequest {
// Revision number that was sent by matching when the task was dispatched. Used to resolve eventual consistency issues
// that may arise due to stale routing configs in task queue partitions.
int64 task_dispatch_revision_number = 12;
// Target worker deployment version according to matching when starting the task.
// Computed after matching with a poller, right before calling RecordWorkflowTaskStarted.
temporal.api.deployment.v1.WorkerDeploymentVersion target_deployment_version = 13;
}

message RecordWorkflowTaskStartedResponse {
Expand Down
1 change: 1 addition & 0 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewWorkflowWithSignal(
nil,
nil,
false,
nil,
)
if err != nil {
// Unable to add WorkflowTaskStarted event to history
Expand Down
1 change: 1 addition & 0 deletions service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func Invoke(
req.GetBuildIdRedirectInfo(),
workflowLease.GetContext().UpdateRegistry(ctx),
false,
req.TargetDeploymentVersion,
)
if err != nil {
// Unable to add WorkflowTaskStarted event to history
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
nil,
workflowLease.GetContext().UpdateRegistry(ctx),
false,
nil,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -701,6 +702,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
nil,
workflowLease.GetContext().UpdateRegistry(ctx),
false,
nil,
)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) createSentUpdate(tv *testvars.TestVa
nil,
nil,
false,
nil,
)
taskToken := &tokenspb.Task{
Attempt: 1,
Expand Down
1 change: 1 addition & 0 deletions service/history/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6475,6 +6475,7 @@ func addWorkflowTaskStartedEventWithRequestID(ms historyi.MutableState, schedule
nil,
nil,
false,
nil,
)

return event
Expand Down
16 changes: 9 additions & 7 deletions service/history/historybuilder/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,19 @@ func (b *EventFactory) CreateWorkflowTaskStartedEvent(
historySizeBytes int64,
versioningStamp *commonpb.WorkerVersionStamp,
buildIdRedirectCounter int64,
suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason,
) *historypb.HistoryEvent {
event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime)
event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{
WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{
ScheduledEventId: scheduledEventID,
Identity: identity,
RequestId: requestID,
SuggestContinueAsNew: suggestContinueAsNew,
HistorySizeBytes: historySizeBytes,
WorkerVersion: versioningStamp,
BuildIdRedirectCounter: buildIdRedirectCounter,
ScheduledEventId: scheduledEventID,
Identity: identity,
RequestId: requestID,
SuggestContinueAsNew: suggestContinueAsNew,
SuggestContinueAsNewReasons: suggestContinueAsNewReasons,
HistorySizeBytes: historySizeBytes,
WorkerVersion: versioningStamp,
BuildIdRedirectCounter: buildIdRedirectCounter,
},
}
return event
Expand Down
2 changes: 2 additions & 0 deletions service/history/historybuilder/history_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent(
historySizeBytes int64,
versioningStamp *commonpb.WorkerVersionStamp,
buildIdRedirectCounter int64,
suggestContinueAsNewReasons []enumspb.SuggestContinueAsNewReason,
) *historypb.HistoryEvent {
event := b.EventFactory.CreateWorkflowTaskStartedEvent(
scheduledEventID,
Expand All @@ -207,6 +208,7 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent(
historySizeBytes,
versioningStamp,
buildIdRedirectCounter,
suggestContinueAsNewReasons,
)
event, _ = b.EventStore.add(event)
return event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ func (s *sutTestingAdapter) AddWorkflowExecutionStartedEvent(_ ...eventConfig) *
}

func (s *sutTestingAdapter) AddWorkflowTaskStartedEvent(_ ...eventConfig) *historypb.HistoryEvent {
return s.HistoryBuilder.AddWorkflowTaskStartedEvent(64, "request-1", "identity-1", s.today, false, 100, nil, 0)
return s.HistoryBuilder.AddWorkflowTaskStartedEvent(64, "request-1", "identity-1", s.today, false, 100, nil, 0, nil)
}

func (s *sutTestingAdapter) AddWorkflowTaskCompletedEvent(_ ...eventConfig) *historypb.HistoryEvent {
Expand Down
1 change: 1 addition & 0 deletions service/history/historybuilder/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() {
123678,
nil,
int64(0),
nil,
)
s.Equal(event, s.flush())
s.Equal(&historypb.HistoryEvent{
Expand Down
2 changes: 1 addition & 1 deletion service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type (
AddFirstWorkflowTaskScheduled(parentClock *clockspb.VectorClock, event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error)
AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *timestamppb.Timestamp, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, update.Registry, bool) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, update.Registry, bool, *deploymentpb.WorkerDeploymentVersion) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
AddWorkflowTaskTimedOutEvent(workflowTask *WorkflowTaskInfo) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionCancelRequested(int64, namespace.Name, namespace.ID, string, string) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionSignaled(int64, namespace.Name, namespace.ID, string, string, string) (*historypb.HistoryEvent, error)
Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ func (r *workflowResetterImpl) failWorkflowTask(
nil,
// skipping versioning checks because this task is not actually dispatched but will fail immediately.
true,
nil,
)
if err != nil {
return err
Expand Down
20 changes: 15 additions & 5 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2454,10 +2454,13 @@ func (ms *MutableStateImpl) addWorkflowExecutionStartedEventForContinueAsNew(
// the first matching task-queue-in-version check.
newTQInPinnedVersion := false

// New run initiated by workflow ContinueAsNew of pinned run, will inherit the previous run's version if the
// new run's Task Queue belongs to that version.
// By default, the new run initiated by workflow ContinueAsNew of a Pinned run, will inherit the previous run's
// version if the new run's Task Queue belongs to that version.
// If the continue-as-new command says to use InitialVersioningBehavior AutoUpgrade, the new run will start as
// AutoUpgrade in the first task and then assume the SDK-sent behavior on first workflow task completion.
var inheritedPinnedVersion *deploymentpb.WorkerDeploymentVersion
if previousExecutionState.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED {
if previousExecutionState.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED &&
command.GetInitialVersioningBehavior() != enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE {
inheritedPinnedVersion = worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(previousExecutionState.GetEffectiveDeployment())
newTQ := command.GetTaskQueue().GetName()
if newTQ != previousExecutionInfo.GetTaskQueue() {
Expand All @@ -2483,9 +2486,15 @@ func (ms *MutableStateImpl) addWorkflowExecutionStartedEventForContinueAsNew(

// New run initiated by ContinueAsNew of an AUTO_UPGRADE workflow execution will inherit the previous run's
// deployment version and revision number iff the new run's Task Queue belongs to source deployment version.
//
// If the initiating workflow is PINNED and the continue-as-new command says to use InitialVersioningBehavior
// AutoUpgrade, the new run will start as AutoUpgrade in the first task and then assume the SDK-sent behavior
// after first workflow task completion.
var sourceDeploymentVersion *deploymentpb.WorkerDeploymentVersion
var sourceDeploymentRevisionNumber int64
if previousExecutionState.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE {
if previousExecutionState.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE ||
(previousExecutionState.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED &&
command.GetInitialVersioningBehavior() == enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE) {
sourceDeploymentVersion = worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(previousExecutionState.GetEffectiveDeployment())
sourceDeploymentRevisionNumber = previousExecutionState.GetVersioningRevisionNumber()

Expand Down Expand Up @@ -3192,12 +3201,13 @@ func (ms *MutableStateImpl) AddWorkflowTaskStartedEvent(
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
updateReg update.Registry,
skipVersioningCheck bool,
targetDeploymentVersion *deploymentpb.WorkerDeploymentVersion,
) (*historypb.HistoryEvent, *historyi.WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
if err := ms.checkMutability(opTag); err != nil {
return nil, nil, err
}
return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck, updateReg)
return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck, updateReg, targetDeploymentVersion)
}

func (ms *MutableStateImpl) ApplyWorkflowTaskStartedEvent(
Expand Down
38 changes: 31 additions & 7 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

commonpb "go.temporal.io/api/common/v1"
deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"
Expand Down Expand Up @@ -451,6 +452,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
skipVersioningCheck bool,
updateReg update.Registry,
targetDeploymentVersion *deploymentpb.WorkerDeploymentVersion,
) (*historypb.HistoryEvent, *historyi.WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
workflowTask := m.GetWorkflowTaskByID(scheduledEventID)
Expand All @@ -472,9 +474,20 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
// events. That's okay, it doesn't have to be 100% accurate. It just has to be kept
// consistent between the started event in history and the event that was sent to the SDK
// that resulted in the successful completion.
suggestContinueAsNew, historySizeBytes := m.getHistorySizeInfo()
if updateReg != nil {
suggestContinueAsNew = cmp.Or(suggestContinueAsNew, updateReg.SuggestContinueAsNew())
historySizeBytes, suggestContinueAsNewReasons := m.getHistorySizeInfo()
suggestContinueAsNew := len(suggestContinueAsNewReasons) > 0
if updateReg != nil && updateReg.SuggestContinueAsNew() {
suggestContinueAsNew = cmp.Or(suggestContinueAsNew, true)
suggestContinueAsNewReasons = append(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES)
}

if m.ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && targetDeploymentVersion != nil {
if currentDeploymentVersion := m.ms.GetEffectiveDeployment(); currentDeploymentVersion != nil &&
(currentDeploymentVersion.BuildId != targetDeploymentVersion.BuildId ||
currentDeploymentVersion.SeriesName != targetDeploymentVersion.DeploymentName) {
suggestContinueAsNew = cmp.Or(suggestContinueAsNew, true)
suggestContinueAsNewReasons = append(suggestContinueAsNewReasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED)
}
}

workflowTask, scheduledEventCreatedForRedirect, redirectCounter, err := m.processBuildIdRedirectInfo(versioningStamp, workflowTask, taskQueue, redirectInfo, skipVersioningCheck)
Expand Down Expand Up @@ -517,6 +530,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
historySizeBytes,
versioningStamp,
redirectCounter,
suggestContinueAsNewReasons,
)
m.ms.hBuilder.FlushAndCreateNewBatch()
startedEventID = startedEvent.GetEventId()
Expand Down Expand Up @@ -708,6 +722,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent(
workflowTask.HistorySizeBytes,
request.WorkerVersionStamp,
workflowTask.BuildIdRedirectCounter,
nil,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Continue-as-new reasons lost for transient/speculative workflow tasks

When transient or speculative workflow tasks are converted to normal tasks, nil is passed for suggestContinueAsNewReasons because the WorkflowTaskInfo struct only stores the SuggestContinueAsNew boolean, not the reasons array. The new TARGET_WORKER_DEPLOYMENT_VERSION_CHANGED reason is calculated in AddWorkflowTaskStartedEvent but never stored, so it's lost when the event is created during conversion. This results in events with SuggestContinueAsNew=true but an empty SuggestContinueAsNewReasons slice, preventing SDKs from knowing why continue-as-new was suggested.

Additional Locations (2)

Fix in Cursor Fix in Web

m.ms.hBuilder.FlushAndCreateNewBatch()
workflowTask.StartedEventID = startedEvent.GetEventId()
Expand Down Expand Up @@ -796,6 +811,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent(
workflowTask.HistorySizeBytes,
versioningStamp,
workflowTask.BuildIdRedirectCounter,
nil,
)
m.ms.hBuilder.FlushAndCreateNewBatch()
workflowTask.StartedEventID = startedEvent.GetEventId()
Expand Down Expand Up @@ -867,6 +883,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskTimedOutEvent(
workflowTask.HistorySizeBytes,
nil,
workflowTask.BuildIdRedirectCounter,
nil,
)
m.ms.hBuilder.FlushAndCreateNewBatch()
workflowTask.StartedEventID = startedEvent.GetEventId()
Expand Down Expand Up @@ -1318,10 +1335,11 @@ func (m *workflowTaskStateMachine) getStartToCloseTimeout(
return durationpb.New(startToCloseTimeout)
}

func (m *workflowTaskStateMachine) getHistorySizeInfo() (bool, int64) {
func (m *workflowTaskStateMachine) getHistorySizeInfo() (int64, []enumspb.SuggestContinueAsNewReason) {
var reasons []enumspb.SuggestContinueAsNewReason
stats := m.ms.GetExecutionInfo().ExecutionStats
if stats == nil {
return false, 0
return 0, reasons
}
// This only includes events that have actually been written to persistence, so it won't
// include the workflow task started event that we're currently writing. That's okay, it
Expand All @@ -1334,8 +1352,13 @@ func (m *workflowTaskStateMachine) getHistorySizeInfo() (bool, int64) {
namespaceName := m.ms.GetNamespaceEntry().Name().String()
sizeLimit := int64(config.HistorySizeSuggestContinueAsNew(namespaceName))
countLimit := int64(config.HistoryCountSuggestContinueAsNew(namespaceName))
suggestContinueAsNew := historySize >= sizeLimit || historyCount >= countLimit
return suggestContinueAsNew, historySize
if historySize >= sizeLimit {
reasons = append(reasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE)
}
if historyCount >= countLimit {
reasons = append(reasons, enumspb.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS)
}
return historySize, reasons
}

func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() error {
Expand Down Expand Up @@ -1390,6 +1413,7 @@ func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() erro
wt.HistorySizeBytes,
nil,
wt.BuildIdRedirectCounter,
nil,
)
m.ms.hBuilder.FlushAndCreateNewBatch()

Expand Down
2 changes: 1 addition & 1 deletion service/matching/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (t *ForwarderTestSuite) TestForwardWorkflowTask_WithBuildId() {
).Return(&matchingservice.AddWorkflowTaskResponse{}, nil)

taskInfo := randomTaskInfo()
task := newInternalTaskForSyncMatch(taskInfo.Data, nil, 0)
task := newInternalTaskForSyncMatch(taskInfo.Data, nil, 0, nil)
t.NoError(t.fwdr.ForwardTask(context.Background(), task))
t.NotNil(request)
t.Equal(mustParent(t.partition, 20).RpcName(), request.TaskQueue.GetName())
Expand Down
4 changes: 2 additions & 2 deletions service/matching/matcher_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *MatcherDataSuite) newSyncTask(fwdInfo *taskqueuespb.TaskForwardInfo) *i
t := &persistencespb.TaskInfo{
CreateTime: timestamppb.New(s.now()),
}
return newInternalTaskForSyncMatch(t, fwdInfo, 0)
return newInternalTaskForSyncMatch(t, fwdInfo, 0, nil)
}

func (s *MatcherDataSuite) newQueryTask(id string) *internalTask {
Expand Down Expand Up @@ -747,7 +747,7 @@ func FuzzMatcherData(f *testing.F) {
t := &persistencespb.TaskInfo{
CreateTime: timestamppb.New(ts.Now()),
}
md.FinishMatchAfterPollForward(res.poller, newInternalTaskForSyncMatch(t, nil, 0))
md.FinishMatchAfterPollForward(res.poller, newInternalTaskForSyncMatch(t, nil, 0, nil))
}()

case 6: // add task forwarder
Expand Down
Loading
Loading