Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/common/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ type QueueConfig struct {
ChildTemplate ChildTemplate `yaml:",omitempty" json:",omitempty"`
Queues []QueueConfig `yaml:",omitempty" json:",omitempty"`
Limits []Limit `yaml:",omitempty" json:",omitempty"`
Preemption Preemption `yaml:",omitempty" json:",omitempty"`
}

type Preemption struct {
Delay uint64 `yaml:",omitempty" json:",omitempty"`
}

type ChildTemplate struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/configs/configvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,9 @@ func checkQueues(queue *QueueConfig, level int) error {
return fmt.Errorf("duplicate child name found with name '%s', level %d", child.Name, level)
}
queueMap[strings.ToLower(child.Name)] = true
if queue.Preemption.Delay != 0 && queue.Preemption.Delay <= 60 {
return fmt.Errorf("invalid preemption delay %d, must be greater than 60 seconds", queue.Preemption.Delay)
}
}

// recurse into the depth if this level passed
Expand Down
28 changes: 28 additions & 0 deletions pkg/common/configs/configvalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2327,6 +2327,34 @@ func TestCheckQueues(t *testing.T) { //nolint:funlen
assert.Equal(t, 2, len(q.Queues), "Expected two queues")
},
},
{
name: "Invalid Preemption delay for leaf queue",
queue: &QueueConfig{
Name: "root",
Queues: []QueueConfig{
{Name: "leaf",
Preemption: Preemption{
Delay: 10,
},
},
},
Preemption: Preemption{
Delay: 10,
},
},
level: 1,
expectedErrorMsg: "invalid preemption delay 10, must be greater than 60 seconds",
},
{
name: "Setting Preemption delay on root queue would be ignored",
queue: &QueueConfig{
Name: "root",
Preemption: Preemption{
Delay: 10,
},
},
level: 0,
},
}

for _, tc := range testCases {
Expand Down
113 changes: 82 additions & 31 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,43 +74,45 @@ type Queue struct {
// The queue properties should be treated as immutable the value is a merge of the
// parent properties with the config for this queue only manipulated during creation
// of the queue or via a queue configuration update.
properties map[string]string
adminACL security.ACL // admin ACL
submitACL security.ACL // submit ACL
maxResource *resources.Resource // When not set, max = nil
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
isLeaf bool // this is a leaf queue or not (i.e. parent)
isManaged bool // queue is part of the config, not auto created
stateMachine *fsm.FSM // the state of the queue for scheduling
stateTime time.Time // last time the state was updated (needed for cleanup)
maxRunningApps uint64
runningApps uint64
allocatingAcceptedApps map[string]bool
template *template.Template
queueEvents *schedEvt.QueueEvents
appQueueMapping *AppQueueMapping // appID mapping to queues
properties map[string]string
adminACL security.ACL // admin ACL
submitACL security.ACL // submit ACL
maxResource *resources.Resource // When not set, max = nil
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
isLeaf bool // this is a leaf queue or not (i.e. parent)
isManaged bool // queue is part of the config, not auto created
stateMachine *fsm.FSM // the state of the queue for scheduling
stateTime time.Time // last time the state was updated (needed for cleanup)
maxRunningApps uint64
runningApps uint64
allocatingAcceptedApps map[string]bool
template *template.Template
queueEvents *schedEvt.QueueEvents
appQueueMapping *AppQueueMapping // appID mapping to queues
quotaChangePreemptionDelay uint64

locking.RWMutex
}

// newBlankQueue creates a new empty queue objects with all values initialised.
func newBlankQueue() *Queue {
return &Queue{
children: make(map[string]*Queue),
childPriorities: make(map[string]int32),
applications: make(map[string]*Application),
appPriorities: make(map[string]int32),
reservedApps: make(map[string]int),
allocatingAcceptedApps: make(map[string]bool),
properties: make(map[string]string),
stateMachine: NewObjectState(),
allocatedResource: resources.NewResource(),
preemptingResource: resources.NewResource(),
pending: resources.NewResource(),
currentPriority: configs.MinPriority,
prioritySortEnabled: true,
preemptionDelay: configs.DefaultPreemptionDelay,
preemptionPolicy: policies.DefaultPreemptionPolicy,
children: make(map[string]*Queue),
childPriorities: make(map[string]int32),
applications: make(map[string]*Application),
appPriorities: make(map[string]int32),
reservedApps: make(map[string]int),
allocatingAcceptedApps: make(map[string]bool),
properties: make(map[string]string),
stateMachine: NewObjectState(),
allocatedResource: resources.NewResource(),
preemptingResource: resources.NewResource(),
pending: resources.NewResource(),
currentPriority: configs.MinPriority,
prioritySortEnabled: true,
preemptionDelay: configs.DefaultPreemptionDelay,
preemptionPolicy: policies.DefaultPreemptionPolicy,
quotaChangePreemptionDelay: 0,
}
}

Expand Down Expand Up @@ -155,7 +157,6 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue, silence bool, a
zap.String("queueName", sq.QueuePath))
sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged)
}

return sq, nil
}

Expand Down Expand Up @@ -372,17 +373,67 @@ func (sq *Queue) applyConf(conf configs.QueueConfig, silence bool) error {

// Load the max & guaranteed resources and maxApps for all but the root queue
if sq.Name != configs.RootQueue {
oldMaxResource := sq.maxResource
if err = sq.setResourcesFromConf(conf.Resources); err != nil {
return err
}
sq.maxRunningApps = conf.MaxApplications
sq.updateMaxRunningAppsMetrics()
sq.setPreemptionSettings(oldMaxResource, conf)
}

sq.properties = conf.Properties
return nil
}

// setPreemptionSettings Set Quota change preemption settings
func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource, conf configs.QueueConfig) {
newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
if err != nil {
log.Log(log.SchedQueue).Error("parsing failed on max resources this should not happen",
zap.String("queue", sq.QueuePath),
zap.Error(err))
return
}
reset := false
set := false
switch {
// Set max res earlier but not now
case resources.IsZero(newMaxResource) && !resources.IsZero(oldMaxResource):
reset = true
// Set max res now but not earlier
case !resources.IsZero(newMaxResource) && resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
set = true
// Set max res earlier and now as well
default:
switch {
// Quota decrease
case resources.StrictlyGreaterThan(oldMaxResource, newMaxResource) && conf.Preemption.Delay != 0:
set = true
// Quota increase
case resources.StrictlyGreaterThan(newMaxResource, oldMaxResource) && conf.Preemption.Delay != 0:
reset = true
// Quota remains as is but delay has changed
case resources.Equals(oldMaxResource, newMaxResource) && conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay != conf.Preemption.Delay:
sq.quotaChangePreemptionDelay = conf.Preemption.Delay
default:
// noop
}
}

// Set preemption settings
if set {
sq.quotaChangePreemptionDelay = conf.Preemption.Delay
}

// Reset preemption settings
if reset {
if sq.quotaChangePreemptionDelay != 0 {
sq.quotaChangePreemptionDelay = 0
}
}
}

// setResourcesFromConf sets the maxResource and guaranteedResource of the queue from the config.
func (sq *Queue) setResourcesFromConf(resource configs.Resources) error {
maxResource, err := resources.NewResourceFromConf(resource.Max)
Expand Down
84 changes: 83 additions & 1 deletion pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2246,6 +2246,74 @@ func TestApplyConf(t *testing.T) {
assert.Equal(t, root.maxRunningApps, uint64(0))
}

func TestQuotaChangePreemptionSettings(t *testing.T) {
root, err := createManagedQueueWithProps(nil, "root", true, nil, nil)
assert.NilError(t, err, "failed to create basic queue: %v", err)

parent, err := createManagedQueueWithProps(root, "parent", false, getResourceConf(), getResourceConf())
assert.NilError(t, err, "failed to create basic queue: %v", err)
testCases := []struct {
name string
conf configs.QueueConfig
expectedDelay uint64
}{{"first queue setup without delay", configs.QueueConfig{
Resources: configs.Resources{
Max: getResourceConf(),
Guaranteed: getResourceConf(),
},
}, 0},
{"increase max with delay", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100000000"},
},
Preemption: configs.Preemption{
Delay: 500,
},
}, 0},
{"decrease max with delay", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100"},
},
Preemption: configs.Preemption{
Delay: 500,
},
}, 500},
{"max remains as is but delay changed", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100"},
},
Preemption: configs.Preemption{
Delay: 200,
},
}, 200},
{"unrelated config change, should not impact earlier set preemption settings", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100"},
Guaranteed: map[string]string{"memory": "50"},
},
Preemption: configs.Preemption{
Delay: 200,
},
}, 200},
{"increase max again with delay", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "101"},
},
Preemption: configs.Preemption{
Delay: 200,
},
}, 0}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err = parent.ApplyConf(tc.conf)
assert.NilError(t, err, "failed to apply conf: %v", err)
assert.Assert(t, parent.maxResource != nil)
assert.Equal(t, parent.quotaChangePreemptionDelay, tc.expectedDelay)
})
}
}

func TestNewConfiguredQueue(t *testing.T) {
// check variable assignment
properties := getProperties()
Expand Down Expand Up @@ -2275,6 +2343,7 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.DeepEqual(t, properties, parent.template.GetProperties())
assert.Assert(t, resources.Equals(resourceStruct, parent.template.GetMaxResource()))
assert.Assert(t, resources.Equals(resourceStruct, parent.template.GetGuaranteedResource()))
assert.Equal(t, parent.quotaChangePreemptionDelay, uint64(0))

// case 0: managed leaf queue can't use template
leafConfig := configs.QueueConfig{
Expand All @@ -2285,6 +2354,9 @@ func TestNewConfiguredQueue(t *testing.T) {
Max: getResourceConf(),
Guaranteed: getResourceConf(),
},
Preemption: configs.Preemption{
Delay: 500,
},
}
childLeaf, err := NewConfiguredQueue(leafConfig, parent, false, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
Expand All @@ -2297,11 +2369,15 @@ func TestNewConfiguredQueue(t *testing.T) {
childLeafGuaranteed, err := resources.NewResourceFromConf(leafConfig.Resources.Guaranteed)
assert.NilError(t, err, "Resource creation failed")
assert.Assert(t, resources.Equals(childLeaf.guaranteedResource, childLeafGuaranteed))
assert.Equal(t, childLeaf.quotaChangePreemptionDelay, uint64(500))

// case 1: non-leaf can't use template but it can inherit template from parent
NonLeafConfig := configs.QueueConfig{
Name: "nonleaf_queue",
Parent: true,
Preemption: configs.Preemption{
Delay: 500,
},
}
childNonLeaf, err := NewConfiguredQueue(NonLeafConfig, parent, false, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
Expand All @@ -2310,19 +2386,25 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.Equal(t, len(childNonLeaf.properties), 0)
assert.Assert(t, childNonLeaf.guaranteedResource == nil)
assert.Assert(t, childNonLeaf.maxResource == nil)
assert.Equal(t, childNonLeaf.quotaChangePreemptionDelay, uint64(0))

// case 2: do not send queue event when silence flag is set to true
events.Init()
eventSystem := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck
eventSystem.StartServiceWithPublisher(false)
rootConfig := configs.QueueConfig{
Name: "root",
Preemption: configs.Preemption{
Delay: 500,
},
}
_, err = NewConfiguredQueue(rootConfig, nil, true, nil)

rootQ, err := NewConfiguredQueue(rootConfig, nil, true, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
time.Sleep(time.Second)
noEvents := eventSystem.Store.CountStoredEvents()
assert.Equal(t, noEvents, uint64(0), "expected 0 event, got %d", noEvents)
assert.Equal(t, rootQ.quotaChangePreemptionDelay, uint64(0))
}

func TestResetRunningState(t *testing.T) {
Expand Down