-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add concurrency test for async Versioning operations #8798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: shahab/async-2
Are you sure you want to change the base?
Conversation
| // First ensure deployment workflow is running | ||
| if !d.VersionState.StartedDeploymentWorkflow { | ||
| activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions) | ||
| err := workflow.ExecuteActivity(activityCtx, d.a.StartWorkerDeploymentWorkflow, &deploymentspb.StartWorkerDeploymentRequest{ | ||
| DeploymentName: d.VersionState.Version.DeploymentName, | ||
| RequestId: d.newUUID(ctx), | ||
| }).Get(ctx, nil) | ||
| if err != nil { | ||
| return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder: Could there ever be a case where a deployment workflow CAN's while the version workflow is processing an update?
In theory, this could happen. Imagine this scenario:
- User sends in a setCurrent update.
- Deployment workflow commences an async operation of the sync propagation to the task queues by calling the version workflow.
- Version workflow, when it comes here, sends a signal.
- The deployment workflow CAN's since according to itself, the state has changed.
I am not fully familiar about the signal "sending" semantics when a workflow is CAN'ing, but this could be important to consider so pasting this here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it's possible, but CaN is atomic which means there is no gap between closing the old execution and creating the new one. both pieces happen at the same time.
so the signal will go to the old or the new execution but it can't see "not found".
Even if that was not the case, this check does not protect against the mentioned scenario because the CaN can happen just after this check and before the signal.
| d.logger.Error("Update canceled before worker deployment workflow started") | ||
| return serviceerror.NewDeadlineExceeded("Update canceled before worker deployment workflow started") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can improve error message by saying something like:
Update canceled since the corresponding worker deployment workflow for this version did not start.
| // workflowVersion is set at workflow start based on the dynamic config of the worker | ||
| // that completes the first task. It remains constant for the lifetime of the run and | ||
| // only updates when the workflow performs continue-as-new. | ||
| // Tracks the version of the deployment workflow when a particular run of a workflow starts base on the dynamic config of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run of a workflow starts based on the dynamic config of the
worker that completes the first task of the workflow.workflowVersionremains the same until the workflow CaNs, after which it will get another chance to pick the latest manager version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part was undone unintentionally, will revert to the old comment which was your suggestion.
| if !asyncMode { | ||
| // Erase summary drainage status immediately, so it is not draining/drained. | ||
| d.setDrainageStatus(newRampingVersion, enumspb.VERSION_DRAINAGE_STATUS_UNSPECIFIED, routingUpdateTime) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slightly confused; I think we did have this functionality of changing the version's drainage status almost instantly if we realize it's being promoted. Did we delete it by mistake?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this because syncVersion takes care of updating summary already.
tests/worker_deployment_test.go
Outdated
| //t.Run("sync", func(t *testing.T) { | ||
| // suite.Run(t, &WorkerDeploymentSuite{workflowVersion: workerdeployment.InitialVersion}) | ||
| //}) | ||
| //t.Run("async", func(t *testing.T) { | ||
| // suite.Run(t, &WorkerDeploymentSuite{workflowVersion: workerdeployment.AsyncSetCurrentAndRamping}) | ||
| //}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reminder: remove the comments here
bd56986 to
4337879
Compare
What changed?
Add concurrency test and refine some rough edges.
Why?
testing is good!
How did you test it?
Potential risks
None