Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/common/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,3 +1156,26 @@ func (r *Resource) DominantResourceType(capacity *Resource) string {
}
return dominant
}

// ExtractLatestIfModified Extract the latest value for every resource type only if value has been modified
// If both resources are nil, return nil resource
// If no resource type matches, return nil resource
// If resource type exists and none modified, return nil resource
func ExtractLatestIfModified(old *Resource, new *Resource) *Resource {
if old == nil || new == nil {
return nil
}
latest := NewResource()
for k, v := range old.Resources {
if newValue, exists := new.Resources[k]; exists {
if v != newValue {
latest.Resources[k] = newValue
}
}
}
if len(latest.Resources) > 0 {
return latest
} else {
return nil
}
}
27 changes: 27 additions & 0 deletions pkg/common/resources/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2332,3 +2332,30 @@ func TestResource_Prune(t *testing.T) {
})
}
}

func TestResource_ExtractLatestIfModified(t *testing.T) {
var tests = []struct {
caseName string
old *Resource
new *Resource
latest *Resource
}{
{"nil case", nil, nil, nil},
{"nil with non empty", nil, NewResourceFromMap(map[string]Quantity{"first": 1}), nil},
{"non empty case", NewResourceFromMap(map[string]Quantity{"first": 1}), NewResourceFromMap(map[string]Quantity{"first": 2}), NewResourceFromMap(map[string]Quantity{"first": 2})},
{"non empty case, reversal", NewResourceFromMap(map[string]Quantity{"first": 2}), NewResourceFromMap(map[string]Quantity{"first": 1}), NewResourceFromMap(map[string]Quantity{"first": 1})},
{"non empty case, only one exist with different value", NewResourceFromMap(map[string]Quantity{"first": 1, "second": 2}), NewResourceFromMap(map[string]Quantity{"second": 3}), NewResourceFromMap(map[string]Quantity{"second": 3})},
{"non empty case, only one exist with same value", NewResourceFromMap(map[string]Quantity{"first": 1, "second": 2}), NewResourceFromMap(map[string]Quantity{"second": 2}), nil},
{"non empty case, disjoint sets and type doesn't exist", NewResourceFromMap(map[string]Quantity{"first": 1}), NewResourceFromMap(map[string]Quantity{"second": 2}), nil},
}
for _, tt := range tests {
t.Run(tt.caseName, func(t *testing.T) {
latest := ExtractLatestIfModified(tt.old, tt.new)
if tt.latest == nil {
assert.Equal(t, tt.latest, latest)
} else {
assert.Assert(t, DeepEquals(tt.latest, latest), "resource type maps are not equal")
}
})
}
}
76 changes: 61 additions & 15 deletions pkg/scheduler/objects/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,20 @@ func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
return false
}

oldRemaining := currentQueue.GetRemainingGuaranteedResource()
currentQueue.AddAllocation(p.ask.GetAllocatedResource())

// remove each allocation in turn, validating that at some point we free enough resources to allow this ask to fit
for _, snapshot := range queues {
for _, alloc := range snapshot.PotentialVictims {
snapshot.RemoveAllocation(alloc.GetAllocatedResource())
remaining := currentQueue.GetRemainingGuaranteedResource()
if remaining != nil && resources.StrictlyGreaterThanOrEquals(remaining, resources.Zero) {

// Net remaining guaranteed is nothing but the latest remaining guaranteed value resulted after
// the removal of victim on ask queue. In case of res type not used in victim but defined in ask queue's guaranteed,
// Net remaining value ensures it has only RELEVANT res types and not res types not being touched at all from victim context.
netRemaining := resources.ExtractLatestIfModified(oldRemaining, remaining)
if remaining != nil && resources.StrictlyGreaterThanOrEquals(netRemaining, resources.Zero) {
return true
}
}
Expand Down Expand Up @@ -272,21 +278,32 @@ func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, po
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
preemptableResource := queueSnapshot.GetPreemptableResource()

// Net remaining guaranteed is nothing but the latest remaining guaranteed value resulted after
// the removal of victim on victim queue. In case of res type not used in victim but defined in victim queue's guaranteed,
// Net remaining value ensures it has only RELEVANT res types and not res types not being touched at all from victim context.
netRemaining := resources.ExtractLatestIfModified(oldRemaining, queueSnapshot.GetRemainingGuaranteedResource())

// Did removing this allocation still keep the queue over-allocated?
// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
// So, as an additional check, -ve or zero net remaining guaranteed resource means
// some really useful victim is there.
// In case of victims densely populated on any specific node, checking/honouring the guaranteed quota on ask or preemptor queue
// acts as early filtering layer to carry forward only the required victims.
// For other cases like victims spread over multiple nodes, this doesn't add great value.
if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
(netRemaining == nil || resources.StrictlyGreaterThanOrEquals(resources.Zero, netRemaining)) {
// add the current victim into the ask queue
askQueueOldRemaining := askQueue.GetRemainingGuaranteedResource()
askQueue.AddAllocation(victim.GetAllocatedResource())
askQueueNewRemaining := askQueue.GetRemainingGuaranteedResource()

// Net remaining guaranteed is nothing but the latest remaining guaranteed value resulted after
// the removal of victim on ask queue. In case of res type not used in victim but defined in ask queue's guaranteed,
// Net remaining value ensures it has only RELEVANT res types and not res types not being touched at all from victim context.
askQueueNetRemaining := resources.ExtractLatestIfModified(askQueueOldRemaining, askQueueNewRemaining)

// Did adding this allocation make the ask queue over - utilized?
if askQueueNewRemaining != nil && resources.StrictlyGreaterThan(resources.Zero, askQueueNewRemaining) {
if askQueueNewRemaining != nil && resources.StrictlyGreaterThan(resources.Zero, askQueueNetRemaining) {
askQueue.RemoveAllocation(victim.GetAllocatedResource())
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
break
Expand Down Expand Up @@ -344,13 +361,18 @@ func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, po
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
preemptableResource := queueSnapshot.GetPreemptableResource()

// Net remaining guaranteed is nothing but the latest remaining guaranteed value resulted after
// the removal of victim on victim's queue. In case of res type not used in victim but defined in victim queue's guaranteed,
// Net remaining value ensures it has only RELEVANT res types and not res types not being touched at all from victim context.
netRemaining := resources.ExtractLatestIfModified(oldRemaining, queueSnapshot.GetRemainingGuaranteedResource())

// Did removing this allocation still keep the queue over-allocated?
// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
// So, as an additional check, -ve or zero net remaining guaranteed resource means
// some really useful victim is there.
// Similar checks could be added even on the ask or preemptor queue to prevent being over utilized.
if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
(netRemaining == nil || resources.StrictlyGreaterThanOrEquals(resources.Zero, netRemaining)) {
// removing task does not violate queue constraints, adjust queue and node
nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
// check if ask now fits and we haven't had this happen before
Expand Down Expand Up @@ -493,6 +515,8 @@ func (p *Preemptor) calculateAdditionalVictims(nodeVictims []*Allocation) ([]*Al
return compareAllocationLess(potentialVictims[i], potentialVictims[j])
})

askQueueRemaining := askQueue.GetRemainingGuaranteedResource()

// evaluate each potential victim in turn, stopping once sufficient resources have been freed
victims := make([]*Allocation, 0)
for _, victim := range potentialVictims {
Expand All @@ -502,20 +526,31 @@ func (p *Preemptor) calculateAdditionalVictims(nodeVictims []*Allocation) ([]*Al
oldRemaining := queueSnapshot.GetRemainingGuaranteedResource()
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())

// Net remaining guaranteed is nothing but the latest remaining guaranteed value resulted after
// the removal of victim on victim's queue. In case of res type not used in victim but defined in victim queue's guaranteed,
// Net remaining value ensures it has only RELEVANT res types and not res types not being touched at all from victim context.
netRemaining := resources.ExtractLatestIfModified(oldRemaining, queueSnapshot.GetRemainingGuaranteedResource())

// Did removing this allocation still keep the queue over-allocated?
// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
// So, as an additional check, -ve or zero net remaining guaranteed resource means
// some really useful victim is there.
preemptableResource := queueSnapshot.GetPreemptableResource()
if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
(netRemaining == nil || resources.StrictlyGreaterThanOrEquals(resources.Zero, netRemaining)) {
askQueueRemainingAfterVictimRemoval := askQueue.GetRemainingGuaranteedResource()

// add the current victim into the ask queue
askQueue.AddAllocation(victim.GetAllocatedResource())
askQueueNewRemaining := askQueue.GetRemainingGuaranteedResource()

// Net remaining guaranteed is nothing but the latest remaining guaranteed value resulted after
// the removal of victim on ask queue. In case of res type not used in victim but defined in ask queue's guaranteed,
// Net remaining value ensures it has only RELEVANT res types and not res types not being touched at all from victim context.
askQueueNetRemaining := resources.ExtractLatestIfModified(askQueueRemainingAfterVictimRemoval, askQueueNewRemaining)

// Did adding this allocation make the ask queue over - utilized?
if askQueueNewRemaining != nil && resources.StrictlyGreaterThan(resources.Zero, askQueueNewRemaining) {
if askQueueNewRemaining != nil && resources.StrictlyGreaterThan(resources.Zero, askQueueNetRemaining) {
askQueue.RemoveAllocation(victim.GetAllocatedResource())
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
break
Expand All @@ -536,12 +571,22 @@ func (p *Preemptor) calculateAdditionalVictims(nodeVictims []*Allocation) ([]*Al
}
}
}
// At last, did the ask queue usage under or equals guaranteed quota?
finalRemainingRes := askQueue.GetRemainingGuaranteedResource()
if finalRemainingRes != nil && resources.StrictlyGreaterThanOrEquals(finalRemainingRes, resources.Zero) {
return victims, true

// At last, did the ask queue usage under or equals guaranteed quota after finding the additional victims?
if len(victims) > 0 {
finalRemainingRes := askQueue.GetRemainingGuaranteedResource()

// Net remaining guaranteed is nothing but the latest remaining guaranteed value resulted after
// the removal of victim on ask queue. In case of res type not used in victim but defined in ask queue's guaranteed,
// Net remaining value ensures it has only RELEVANT res types and not res types not being touched at all from victim context.
netRemaining := resources.ExtractLatestIfModified(askQueueRemaining, finalRemainingRes)
if finalRemainingRes != nil && resources.StrictlyGreaterThanOrEquals(netRemaining, resources.Zero) {
return victims, true
} else {
return victims, false
}
}
return nil, false
return nil, true
}

// tryNodes attempts to find potential nodes for scheduling. For each node, potential victims are passed to
Expand Down Expand Up @@ -682,7 +727,8 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
log.Log(log.SchedPreemption).Info("Reserving node for ask after preemption",
zap.String("allocationKey", p.ask.GetAllocationKey()),
zap.String("nodeID", nodeID),
zap.Int("victimCount", len(victims)))
zap.Int("collected victim count", len(victims)),
zap.Int("preempted victim count", len(finalVictims)))
return newReservedAllocationResult(nodeID, p.ask), true
}

Expand Down
Loading