Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,12 @@ func (sq *Queue) GetMaxResource() *resources.Resource {
return sq.internalGetMax(limit)
}

func (sq *Queue) CloneMaxResource() *resources.Resource {
sq.RLock()
defer sq.RUnlock()
return sq.maxResource.Clone()
}

// GetFairMaxResource computes the fair max resources for a given queue.
// Starting with the root, descend down to the target queue allowing children to override Resource values .
// If the root includes an explicit 0 value for a Resource, do not include it in the accumulator and treat it as missing.
Expand Down
29 changes: 29 additions & 0 deletions pkg/scheduler/objects/quota_change_preemptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

package objects

import (
"math"

"github.com/apache/yunikorn-core/pkg/common/resources"
)

type QuotaChangePreemptionContext struct {
queue *Queue
}
Expand Down Expand Up @@ -48,3 +54,26 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
// quota change preemption has really evicted victims, so mark the flag
qcp.queue.MarkTriggerredQuotaChangePreemption()
}

// GetPreemptableResources Get the preemptable resources for the queue
// Subtracting the usage from the max resource gives the preemptable resources.
// It could contain both positive and negative values. Only negative values are preemptable.
func (qcp *QuotaChangePreemptionContext) GetPreemptableResources() *resources.Resource {
maxRes := qcp.queue.CloneMaxResource()
used := resources.SubOnlyExisting(qcp.queue.GetAllocatedResource(), qcp.queue.GetPreemptingResource())
if maxRes.IsEmpty() || used.IsEmpty() {
return nil
}
actual := resources.SubOnlyExisting(maxRes, used)
preemptableResource := resources.NewResource()
// Keep only the resource type which needs to be preempted
for k, v := range actual.Resources {
if v < 0 {
preemptableResource.Resources[k] = resources.Quantity(math.Abs(float64(v)))
}
}
if preemptableResource.IsEmpty() {
return nil
}
return preemptableResource
}
32 changes: 32 additions & 0 deletions pkg/scheduler/objects/quota_change_preemptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,35 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
})
}
}

func TestQuotaChangeGetPreemptableResource(t *testing.T) {
leaf, err := NewConfiguredQueue(configs.QueueConfig{
Name: "leaf",
}, nil, false, nil)
assert.NilError(t, err)

testCases := []struct {
name string
queue *Queue
maxResource *resources.Resource
usedResource *resources.Resource
preemptable *resources.Resource
}{
{"nil max and nil used", leaf, nil, nil, nil},
{"nil max", leaf, nil, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), nil},
{"nil used", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), nil, nil},
{"used below max", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500}), nil},
{"used above max", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})},
{"used above max in specific res type", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "cpu": 10}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})},
{"used above max and below max in specific res type", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "cpu": 10}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500, "cpu": 10}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})},
{"used res type but max undefined", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 150}), nil},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tc.queue.maxResource = tc.maxResource
tc.queue.allocatedResource = tc.usedResource
preemptor := NewQuotaChangePreemptor(tc.queue)
assert.Equal(t, resources.Equals(preemptor.GetPreemptableResources(), tc.preemptable), true)
})
}
}