Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b8ff343
scheduler: perform feasibility checks for system canaries before comp…
pkazmierczak Oct 15, 2025
edc1bb9
scheduler: perform feasibility checks for system canaries before comp…
pkazmierczak Oct 28, 2025
c8d7a9a
system scheduler: get the right desiredTotal values
pkazmierczak Oct 28, 2025
9c66203
system scheduler: fixes to computeJobAllocs
pkazmierczak Oct 28, 2025
f96bff6
system deployment tests: fix and annotate counts (#27006)
tgross Oct 29, 2025
19e5a8b
system scheduler: fixes to computeJobAllocs
pkazmierczak Oct 29, 2025
21c7708
system scheduler: evictUnneededCanaries fixes
chrisroberts Oct 30, 2025
a60950b
system scheduler: unit test fixes
pkazmierczak Oct 30, 2025
8a1a402
scheduler: separate the sysbatch scheduler from non-batch system sche…
pkazmierczak Oct 30, 2025
0b4de76
system scheduler: do not leave empty keys in plan.NodeUpdate
pkazmierczak Oct 30, 2025
d318f0f
scheduler: maintain node feasibility information
chrisroberts Oct 31, 2025
e51cce3
scheduler: un-flake TestSystemSched_evictUnneededCanaries
pkazmierczak Oct 31, 2025
fcbe34e
system deployments: failing tests
tgross Oct 3, 2025
91c7acc
system scheduler: handle empty deployment states correctly
pkazmierczak Oct 31, 2025
6fe7a98
comments from @jrasell
pkazmierczak Oct 31, 2025
d55ab6c
system scheduler: reset eligibility when selecting nodes
chrisroberts Oct 31, 2025
c6fbb8a
system scheduler: calculate deployment completion based on deployment…
pkazmierczak Oct 31, 2025
06ebff0
system scheduler: remove obsolete limitReached property
pkazmierczak Nov 3, 2025
55ed562
system scheduler: handle old deployments correctly in the node reconc…
pkazmierczak Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 19 additions & 201 deletions scheduler/reconciler/reconcile_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package reconciler

import (
"fmt"
"maps"
"math"
"slices"
"time"

Expand Down Expand Up @@ -61,25 +59,14 @@ func (nr *NodeReconciler) Compute(
// Create the required task groups.
required := materializeSystemTaskGroups(job)

// Canary deployments deploy to the TaskGroup.UpdateStrategy.Canary
// percentage of eligible nodes, so we create a mapping of task group name
// to a list of nodes that canaries should be placed on.
canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes)

compatHadExistingDeployment := nr.DeploymentCurrent != nil

result := new(NodeReconcileResult)
var deploymentComplete bool
for nodeID, allocs := range nodeAllocs {
diff, deploymentCompleteForNode := nr.computeForNode(job, nodeID, eligibleNodes,
notReadyNodes, taintedNodes, canaryNodes[nodeID], canariesPerTG, required,
allocs, terminal, serverSupportsDisconnectedClients)
diff := nr.computeForNode(job, nodeID, eligibleNodes,
notReadyNodes, taintedNodes, required, allocs, terminal,
serverSupportsDisconnectedClients)
result.Append(diff)

deploymentComplete = deploymentCompleteForNode
if deploymentComplete {
break
}
}

// COMPAT(1.14.0) prevent a new deployment from being created in the case
Expand All @@ -90,93 +77,9 @@ func (nr *NodeReconciler) Compute(
nr.DeploymentCurrent = nil
}

nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...)

return result
}

// computeCanaryNodes is a helper function that, given required task groups,
// mappings of nodes to their live allocs and terminal allocs, and a map of
// eligible nodes, outputs a map[nodeID] -> map[TG] -> bool which indicates
// which TGs this node is a canary for, and a map[TG] -> int to indicate how
// many total canaries are to be placed for a TG.
func (nr *NodeReconciler) computeCanaryNodes(required map[string]*structs.TaskGroup,
liveAllocs map[string][]*structs.Allocation, terminalAllocs structs.TerminalByNodeByName,
eligibleNodes map[string]*structs.Node) (map[string]map[string]bool, map[string]int) {

canaryNodes := map[string]map[string]bool{}
eligibleNodesList := slices.Collect(maps.Values(eligibleNodes))
canariesPerTG := map[string]int{}

for _, tg := range required {
if tg.Update.IsEmpty() || tg.Update.Canary == 0 {
continue
}

// round up to the nearest integer
numberOfCanaryNodes := int(math.Ceil(float64(tg.Update.Canary) * float64(len(eligibleNodes)) / 100))
canariesPerTG[tg.Name] = numberOfCanaryNodes

// check if there are any live allocations on any nodes that are/were
// canaries.
for nodeID, allocs := range liveAllocs {
for _, a := range allocs {
eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes(
eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID)
}
}

// check if there are any terminal allocations that were canaries
for nodeID, terminalAlloc := range terminalAllocs {
for _, a := range terminalAlloc {
eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes(
eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID)
}
}

for i, n := range eligibleNodesList {
if i > numberOfCanaryNodes-1 {
break
}

if _, ok := canaryNodes[n.ID]; !ok {
canaryNodes[n.ID] = map[string]bool{}
}

canaryNodes[n.ID][tg.Name] = true
}
}

return canaryNodes, canariesPerTG
}

func (nr *NodeReconciler) findOldCanaryNodes(nodesList []*structs.Node, numberOfCanaryNodes int,
a *structs.Allocation, tg *structs.TaskGroup, canaryNodes map[string]map[string]bool, nodeID string) ([]*structs.Node, int) {

if a.DeploymentStatus == nil || a.DeploymentStatus.Canary == false ||
nr.DeploymentCurrent == nil {
return nodesList, numberOfCanaryNodes
}

nodes := nodesList
numberOfCanaries := numberOfCanaryNodes
if a.TaskGroup == tg.Name {
if _, ok := canaryNodes[nodeID]; !ok {
canaryNodes[nodeID] = map[string]bool{}
}
canaryNodes[nodeID][tg.Name] = true

// this node should no longer be considered when searching
// for canary nodes
numberOfCanaries -= 1
nodes = slices.DeleteFunc(
nodes,
func(n *structs.Node) bool { return n.ID == nodeID },
)
}
return nodes, numberOfCanaries
}

// computeForNode is used to do a set difference between the target
// allocations and the existing allocations for a particular node. This returns
// 8 sets of results:
Expand All @@ -191,21 +94,18 @@ func (nr *NodeReconciler) findOldCanaryNodes(nodesList []*structs.Node, numberOf
// 8. those that may still be running on a node that has resumed reconnected.
//
// This method mutates the NodeReconciler fields, and returns a new
// NodeReconcilerResult object and a boolean to indicate wither the deployment
// is complete or not.
// NodeReconcilerResult object.
func (nr *NodeReconciler) computeForNode(
job *structs.Job, // job whose allocs are going to be diff-ed
nodeID string,
eligibleNodes map[string]*structs.Node,
notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down (by node id)
canaryNode map[string]bool, // indicates whether this node is a canary node for tg
canariesPerTG map[string]int, // indicates how many canary placements we expect per tg
required map[string]*structs.TaskGroup, // set of allocations that must exist
liveAllocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id)
serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic
) (*NodeReconcileResult, bool) {
) *NodeReconcileResult {
result := new(NodeReconcileResult)

// cancel deployments that aren't needed anymore
Expand All @@ -225,9 +125,6 @@ func (nr *NodeReconciler) computeForNode(
deploymentFailed = nr.DeploymentCurrent.Status == structs.DeploymentStatusFailed
}

// Track desired total and desired canaries across all loops
desiredCanaries := map[string]int{}

// Track whether we're during a canary update
isCanarying := map[string]bool{}

Expand Down Expand Up @@ -255,7 +152,7 @@ func (nr *NodeReconciler) computeForNode(
// deployment
var dstate = new(structs.DeploymentState)
if nr.DeploymentCurrent != nil {
dstate, _ = nr.DeploymentCurrent.TaskGroups[tg.Name]
dstate = nr.DeploymentCurrent.TaskGroups[tg.Name]
}

supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients)
Expand Down Expand Up @@ -388,17 +285,14 @@ func (nr *NodeReconciler) computeForNode(

// If the definition is updated we need to update
if job.JobModifyIndex != alloc.Job.JobModifyIndex {
if canariesPerTG[tg.Name] > 0 && dstate != nil && !dstate.Promoted {
if !tg.Update.IsEmpty() && tg.Update.Canary > 0 && dstate != nil && !dstate.Promoted {
isCanarying[tg.Name] = true
if canaryNode[tg.Name] {
result.Update = append(result.Update, AllocTuple{
Name: name,
TaskGroup: tg,
Alloc: alloc,
Canary: true,
})
desiredCanaries[tg.Name] += 1
}
result.Update = append(result.Update, AllocTuple{
Name: name,
TaskGroup: tg,
Alloc: alloc,
Canary: true,
})
} else {
result.Update = append(result.Update, AllocTuple{
Name: name,
Expand All @@ -419,13 +313,8 @@ func (nr *NodeReconciler) computeForNode(
})
}

// as we iterate over require groups, we'll keep track of whether the
// deployment is complete or not
deploymentComplete := false

// Scan the required groups
for name, tg := range required {

// populate deployment state for this task group
var dstate = new(structs.DeploymentState)
var existingDeployment bool
Expand All @@ -440,16 +329,10 @@ func (nr *NodeReconciler) computeForNode(
dstate.AutoPromote = tg.Update.AutoPromote
dstate.ProgressDeadline = tg.Update.ProgressDeadline
}
dstate.DesiredTotal = len(eligibleNodes)
}

if isCanarying[tg.Name] && !dstate.Promoted {
dstate.DesiredCanaries = canariesPerTG[tg.Name]
}

// Check for an existing allocation
if _, ok := existing[name]; !ok {

// Check for a terminal sysbatch allocation, which should be not placed
// again unless the job has been updated.
if job.Type == structs.JobTypeSysBatch {
Expand Down Expand Up @@ -494,6 +377,11 @@ func (nr *NodeReconciler) computeForNode(
Alloc: termOnNode,
}

// If the terminal allocation was a canary, mark it as such.
if termOnNode != nil && termOnNode.DeploymentStatus != nil && termOnNode.DeploymentStatus.Canary {
allocTuple.Canary = true
}

// If the new allocation isn't annotated with a previous allocation
// or if the previous allocation isn't from the same node then we
// annotate the allocTuple with a new Allocation
Expand All @@ -506,12 +394,10 @@ func (nr *NodeReconciler) computeForNode(

// check if deployment is place ready or complete
deploymentPlaceReady := !deploymentPaused && !deploymentFailed
deploymentComplete = nr.isDeploymentComplete(tg.Name, result, isCanarying[tg.Name])

// check if perhaps there's nothing else to do for this TG
if existingDeployment ||
tg.Update.IsEmpty() ||
(dstate.DesiredTotal == 0 && dstate.DesiredCanaries == 0) ||
!deploymentPlaceReady {
continue
}
Expand All @@ -527,7 +413,7 @@ func (nr *NodeReconciler) computeForNode(
}
}

return result, deploymentComplete
return result
}

func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGroup,
Expand Down Expand Up @@ -586,74 +472,6 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro
nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate
}

func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult, isCanarying bool) bool {
complete := len(buckets.Place)+len(buckets.Migrate)+len(buckets.Update) == 0

if !complete || nr.DeploymentCurrent == nil || isCanarying {
return false
}

// ensure everything is healthy
if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok {
if dstate.HealthyAllocs < dstate.DesiredTotal { // Make sure we have enough healthy allocs
complete = false
}
}

return complete
}

func (nr *NodeReconciler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate {
statusUpdates := []*structs.DeploymentStatusUpdate{}

if d := nr.DeploymentCurrent; d != nil {

// Deployments that require promotion should have appropriate status set
// immediately, no matter their completness.
if d.RequiresPromotion() {
if d.HasAutoPromote() {
d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion
} else {
d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
}
return statusUpdates
}

// Mark the deployment as complete if possible
if deploymentComplete {
if job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if d.Status != structs.DeploymentStatusUnblocking &&
d.Status != structs.DeploymentStatusSuccessful {
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: nr.DeploymentCurrent.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
})
}
} else {
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: nr.DeploymentCurrent.ID,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
})
}
}

// Mark the deployment as pending since its state is now computed.
if d.Status == structs.DeploymentStatusInitializing {
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: nr.DeploymentCurrent.ID,
Status: structs.DeploymentStatusPending,
StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer,
})
}
}

return statusUpdates
}

// materializeSystemTaskGroups is used to materialize all the task groups
// a system or sysbatch job requires.
func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
Expand Down
Loading
Loading