Skip to content

Commit 8e10363

Browse files
committed
refactor
1 parent a4523ec commit 8e10363

File tree

2 files changed

+112
-95
lines changed

2 files changed

+112
-95
lines changed

scheduler/reconciler/reconcile_node.go

Lines changed: 4 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,11 @@ func (nr *NodeReconciler) Compute(
6262
compatHadExistingDeployment := nr.DeploymentCurrent != nil
6363

6464
result := new(NodeReconcileResult)
65-
var deploymentComplete bool
6665
for nodeID, allocs := range nodeAllocs {
67-
diff, deploymentCompleteForNode := nr.computeForNode(job, nodeID, eligibleNodes,
66+
diff := nr.computeForNode(job, nodeID, eligibleNodes,
6867
notReadyNodes, taintedNodes, required, allocs, terminal,
6968
serverSupportsDisconnectedClients)
7069
result.Append(diff)
71-
72-
deploymentComplete = deploymentCompleteForNode
73-
if deploymentComplete {
74-
break
75-
}
7670
}
7771

7872
// COMPAT(1.14.0) prevent a new deployment from being created in the case
@@ -83,8 +77,6 @@ func (nr *NodeReconciler) Compute(
8377
nr.DeploymentCurrent = nil
8478
}
8579

86-
nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...)
87-
8880
return result
8981
}
9082

@@ -102,8 +94,7 @@ func (nr *NodeReconciler) Compute(
10294
// 8. those that may still be running on a node that has resumed reconnected.
10395
//
10496
// This method mutates the NodeReconciler fields, and returns a new
105-
// NodeReconcilerResult object and a boolean to indicate wither the deployment
106-
// is complete or not.
97+
// NodeReconcilerResult object.
10798
func (nr *NodeReconciler) computeForNode(
10899
job *structs.Job, // job whose allocs are going to be diff-ed
109100
nodeID string,
@@ -114,7 +105,7 @@ func (nr *NodeReconciler) computeForNode(
114105
liveAllocs []*structs.Allocation, // non-terminal allocations that exist
115106
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id)
116107
serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic
117-
) (*NodeReconcileResult, bool) {
108+
) *NodeReconcileResult {
118109
result := new(NodeReconcileResult)
119110

120111
// cancel deployments that aren't needed anymore
@@ -322,10 +313,6 @@ func (nr *NodeReconciler) computeForNode(
322313
})
323314
}
324315

325-
// as we iterate over require groups, we'll keep track of whether the
326-
// deployment is complete or not
327-
deploymentComplete := false
328-
329316
// Scan the required groups
330317
for name, tg := range required {
331318

@@ -343,7 +330,6 @@ func (nr *NodeReconciler) computeForNode(
343330
dstate.AutoPromote = tg.Update.AutoPromote
344331
dstate.ProgressDeadline = tg.Update.ProgressDeadline
345332
}
346-
dstate.DesiredTotal = len(eligibleNodes)
347333
}
348334

349335
// Check for an existing allocation
@@ -405,7 +391,6 @@ func (nr *NodeReconciler) computeForNode(
405391

406392
// check if deployment is place ready or complete
407393
deploymentPlaceReady := !deploymentPaused && !deploymentFailed
408-
deploymentComplete = nr.isDeploymentComplete(tg.Name, result, isCanarying[tg.Name])
409394

410395
// check if perhaps there's nothing else to do for this TG
411396
if existingDeployment ||
@@ -426,7 +411,7 @@ func (nr *NodeReconciler) computeForNode(
426411
}
427412
}
428413

429-
return result, deploymentComplete
414+
return result
430415
}
431416

432417
func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGroup,
@@ -485,74 +470,6 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro
485470
nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate
486471
}
487472

488-
func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult, isCanarying bool) bool {
489-
complete := len(buckets.Place)+len(buckets.Migrate)+len(buckets.Update) == 0
490-
491-
if !complete || nr.DeploymentCurrent == nil || isCanarying {
492-
return false
493-
}
494-
495-
// ensure everything is healthy
496-
if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok {
497-
if dstate.HealthyAllocs < dstate.DesiredTotal { // Make sure we have enough healthy allocs
498-
complete = false
499-
}
500-
}
501-
502-
return complete
503-
}
504-
505-
func (nr *NodeReconciler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate {
506-
statusUpdates := []*structs.DeploymentStatusUpdate{}
507-
508-
if d := nr.DeploymentCurrent; d != nil {
509-
510-
// Deployments that require promotion should have appropriate status set
511-
// immediately, no matter their completness.
512-
if d.RequiresPromotion() {
513-
if d.HasAutoPromote() {
514-
d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion
515-
} else {
516-
d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
517-
}
518-
return statusUpdates
519-
}
520-
521-
// Mark the deployment as complete if possible
522-
if deploymentComplete {
523-
if job.IsMultiregion() {
524-
// the unblocking/successful states come after blocked, so we
525-
// need to make sure we don't revert those states
526-
if d.Status != structs.DeploymentStatusUnblocking &&
527-
d.Status != structs.DeploymentStatusSuccessful {
528-
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
529-
DeploymentID: nr.DeploymentCurrent.ID,
530-
Status: structs.DeploymentStatusBlocked,
531-
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
532-
})
533-
}
534-
} else {
535-
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
536-
DeploymentID: nr.DeploymentCurrent.ID,
537-
Status: structs.DeploymentStatusSuccessful,
538-
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
539-
})
540-
}
541-
}
542-
543-
// Mark the deployment as pending since its state is now computed.
544-
if d.Status == structs.DeploymentStatusInitializing {
545-
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
546-
DeploymentID: nr.DeploymentCurrent.ID,
547-
Status: structs.DeploymentStatusPending,
548-
StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer,
549-
})
550-
}
551-
}
552-
553-
return statusUpdates
554-
}
555-
556473
// materializeSystemTaskGroups is used to materialize all the task groups
557474
// a system or sysbatch job requires.
558475
func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {

scheduler/scheduler_system.go

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,38 @@ func (s *SystemScheduler) computeJobAllocs() error {
285285
s.logger.Debug("reconciled current state with desired state", r.Fields()...)
286286
}
287287

288+
// track if any of the task groups is doing a canary update now
289+
isCanarying := map[string]bool{}
290+
for _, tg := range s.job.TaskGroups {
291+
if s.deployment == nil {
292+
break
293+
}
294+
295+
dstate, ok := s.deployment.TaskGroups[tg.Name]
296+
if !ok {
297+
continue
298+
}
299+
300+
isCanarying[tg.Name] = !tg.Update.IsEmpty() && tg.Update.Canary > 0 && dstate != nil && !dstate.Promoted
301+
}
302+
303+
// Initially, if the job requires canaries, we place all of them on all
304+
// eligible nodes. At this point we know which nodes are feasible, so we
305+
// evict unnedded canaries.
306+
if err := s.evictUnneededCanaries(s.job, s.nodes, r); err != nil {
307+
return fmt.Errorf("failed to evict canaries for job '%s': %v", s.eval.JobID, err)
308+
}
309+
310+
// check if the deployment is complete
311+
deploymentComplete := false
312+
for _, tg := range s.job.TaskGroups {
313+
groupComplete := s.isDeploymentComplete(tg.Name, r, isCanarying[tg.Name])
314+
deploymentComplete = deploymentComplete && groupComplete
315+
}
316+
317+
// adjust the deployment updates and set the right deployment status
318+
nr.DeploymentUpdates = append(nr.DeploymentUpdates, s.setDeploymentStatusAndUpdates(deploymentComplete, s.job)...)
319+
288320
// Add the deployment changes to the plan
289321
s.plan.Deployment = nr.DeploymentCurrent
290322
s.plan.DeploymentUpdates = nr.DeploymentUpdates
@@ -336,13 +368,6 @@ func (s *SystemScheduler) computeJobAllocs() error {
336368
// be limited by max_parallel
337369
s.limitReached = evictAndPlace(s.ctx, s.job, r, sstructs.StatusAllocUpdating)
338370

339-
// Initially, if the job requires canaries, we place all of them on all
340-
// eligible nodes. At this point we know which nodes are feasible, so we
341-
// evict unnedded canaries.
342-
if err := s.evictCanaries(s.job, s.nodes, r); err != nil {
343-
return fmt.Errorf("failed to evict canaries for job '%s': %v", s.eval.JobID, err)
344-
}
345-
346371
// Nothing remaining to do if placement is not required
347372
if len(r.Place) == 0 {
348373
if !s.job.Stopped() {
@@ -571,6 +596,13 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist
571596
}
572597

573598
s.plan.AppendAlloc(alloc, nil)
599+
600+
// we only now the total amountn of placements once we filter out
601+
// infeasible nodes, so for system jobs we do it backwards a bit: the
602+
// "desired" total is the total we were able to place.
603+
if s.deployment != nil {
604+
s.deployment.TaskGroups[tgName].DesiredTotal += 1
605+
}
574606
}
575607

576608
return nil
@@ -657,7 +689,7 @@ func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.Node
657689

658690
// evictAndPlaceCanaries checks how many canaries are needed against the amount
659691
// of feasible nodes, and evicts unnecessary placements.
660-
func (s *SystemScheduler) evictCanaries(job *structs.Job, readyNodes []*structs.Node,
692+
func (s *SystemScheduler) evictUnneededCanaries(job *structs.Job, readyNodes []*structs.Node,
661693
reconcileResult *reconciler.NodeReconcileResult) error {
662694

663695
if job.Stopped() {
@@ -733,3 +765,71 @@ func (s *SystemScheduler) evictCanaries(job *structs.Job, readyNodes []*structs.
733765

734766
return nil
735767
}
768+
769+
func (s *SystemScheduler) isDeploymentComplete(groupName string, buckets *reconciler.NodeReconcileResult, isCanarying bool) bool {
770+
complete := len(buckets.Place)+len(buckets.Migrate)+len(buckets.Update) == 0
771+
772+
if !complete || s.deployment == nil || isCanarying {
773+
return false
774+
}
775+
776+
// ensure everything is healthy
777+
if dstate, ok := s.deployment.TaskGroups[groupName]; ok {
778+
if dstate.HealthyAllocs < dstate.DesiredTotal { // Make sure we have enough healthy allocs
779+
complete = false
780+
}
781+
}
782+
783+
return complete
784+
}
785+
786+
func (s *SystemScheduler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate {
787+
statusUpdates := []*structs.DeploymentStatusUpdate{}
788+
789+
if d := s.deployment; d != nil {
790+
791+
// Deployments that require promotion should have appropriate status set
792+
// immediately, no matter their completness.
793+
if d.RequiresPromotion() {
794+
if d.HasAutoPromote() {
795+
d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion
796+
} else {
797+
d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
798+
}
799+
return statusUpdates
800+
}
801+
802+
// Mark the deployment as complete if possible
803+
if deploymentComplete {
804+
if job.IsMultiregion() {
805+
// the unblocking/successful states come after blocked, so we
806+
// need to make sure we don't revert those states
807+
if d.Status != structs.DeploymentStatusUnblocking &&
808+
d.Status != structs.DeploymentStatusSuccessful {
809+
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
810+
DeploymentID: s.deployment.ID,
811+
Status: structs.DeploymentStatusBlocked,
812+
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
813+
})
814+
}
815+
} else {
816+
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
817+
DeploymentID: s.deployment.ID,
818+
Status: structs.DeploymentStatusSuccessful,
819+
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
820+
})
821+
}
822+
}
823+
824+
// Mark the deployment as pending since its state is now computed.
825+
if d.Status == structs.DeploymentStatusInitializing {
826+
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
827+
DeploymentID: s.deployment.ID,
828+
Status: structs.DeploymentStatusPending,
829+
StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer,
830+
})
831+
}
832+
}
833+
834+
return statusUpdates
835+
}

0 commit comments

Comments
 (0)