Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/common/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ type QueueConfig struct {
ChildTemplate ChildTemplate `yaml:",omitempty" json:",omitempty"`
Queues []QueueConfig `yaml:",omitempty" json:",omitempty"`
Limits []Limit `yaml:",omitempty" json:",omitempty"`
Preemption Preemption `yaml:",omitempty" json:",omitempty"`
}

type Preemption struct {
Delay uint64 `yaml:",omitempty" json:",omitempty"`
}

type ChildTemplate struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/configs/configvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,9 @@ func checkQueues(queue *QueueConfig, level int) error {
return fmt.Errorf("duplicate child name found with name '%s', level %d", child.Name, level)
}
queueMap[strings.ToLower(child.Name)] = true
if queue.Preemption.Delay != 0 && queue.Preemption.Delay <= 60 {
return fmt.Errorf("invalid preemption delay %d, must be greater than 60 seconds", queue.Preemption.Delay)
}
}

// recurse into the depth if this level passed
Expand Down
28 changes: 28 additions & 0 deletions pkg/common/configs/configvalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2327,6 +2327,34 @@ func TestCheckQueues(t *testing.T) { //nolint:funlen
assert.Equal(t, 2, len(q.Queues), "Expected two queues")
},
},
{
name: "Invalid Preemption delay for leaf queue",
queue: &QueueConfig{
Name: "root",
Queues: []QueueConfig{
{Name: "leaf",
Preemption: Preemption{
Delay: 10,
},
},
},
Preemption: Preemption{
Delay: 10,
},
},
level: 1,
expectedErrorMsg: "invalid preemption delay 10, must be greater than 60 seconds",
},
{
name: "Setting Preemption delay on root queue would be ignored",
queue: &QueueConfig{
Name: "root",
Preemption: Preemption{
Delay: 10,
},
},
level: 0,
},
}

for _, tc := range testCases {
Expand Down
100 changes: 69 additions & 31 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,43 +74,45 @@ type Queue struct {
// The queue properties should be treated as immutable the value is a merge of the
// parent properties with the config for this queue only manipulated during creation
// of the queue or via a queue configuration update.
properties map[string]string
adminACL security.ACL // admin ACL
submitACL security.ACL // submit ACL
maxResource *resources.Resource // When not set, max = nil
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
isLeaf bool // this is a leaf queue or not (i.e. parent)
isManaged bool // queue is part of the config, not auto created
stateMachine *fsm.FSM // the state of the queue for scheduling
stateTime time.Time // last time the state was updated (needed for cleanup)
maxRunningApps uint64
runningApps uint64
allocatingAcceptedApps map[string]bool
template *template.Template
queueEvents *schedEvt.QueueEvents
appQueueMapping *AppQueueMapping // appID mapping to queues
properties map[string]string
adminACL security.ACL // admin ACL
submitACL security.ACL // submit ACL
maxResource *resources.Resource // When not set, max = nil
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
isLeaf bool // this is a leaf queue or not (i.e. parent)
isManaged bool // queue is part of the config, not auto created
stateMachine *fsm.FSM // the state of the queue for scheduling
stateTime time.Time // last time the state was updated (needed for cleanup)
maxRunningApps uint64
runningApps uint64
allocatingAcceptedApps map[string]bool
template *template.Template
queueEvents *schedEvt.QueueEvents
appQueueMapping *AppQueueMapping // appID mapping to queues
quotaChangePreemptionDelay uint64

locking.RWMutex
}

// newBlankQueue creates a new empty queue objects with all values initialised.
func newBlankQueue() *Queue {
return &Queue{
children: make(map[string]*Queue),
childPriorities: make(map[string]int32),
applications: make(map[string]*Application),
appPriorities: make(map[string]int32),
reservedApps: make(map[string]int),
allocatingAcceptedApps: make(map[string]bool),
properties: make(map[string]string),
stateMachine: NewObjectState(),
allocatedResource: resources.NewResource(),
preemptingResource: resources.NewResource(),
pending: resources.NewResource(),
currentPriority: configs.MinPriority,
prioritySortEnabled: true,
preemptionDelay: configs.DefaultPreemptionDelay,
preemptionPolicy: policies.DefaultPreemptionPolicy,
children: make(map[string]*Queue),
childPriorities: make(map[string]int32),
applications: make(map[string]*Application),
appPriorities: make(map[string]int32),
reservedApps: make(map[string]int),
allocatingAcceptedApps: make(map[string]bool),
properties: make(map[string]string),
stateMachine: NewObjectState(),
allocatedResource: resources.NewResource(),
preemptingResource: resources.NewResource(),
pending: resources.NewResource(),
currentPriority: configs.MinPriority,
prioritySortEnabled: true,
preemptionDelay: configs.DefaultPreemptionDelay,
preemptionPolicy: policies.DefaultPreemptionPolicy,
quotaChangePreemptionDelay: 0,
}
}

Expand Down Expand Up @@ -155,7 +157,6 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue, silence bool, a
zap.String("queueName", sq.QueuePath))
sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged)
}

return sq, nil
}

Expand Down Expand Up @@ -372,17 +373,54 @@ func (sq *Queue) applyConf(conf configs.QueueConfig, silence bool) error {

// Load the max & guaranteed resources and maxApps for all but the root queue
if sq.Name != configs.RootQueue {
oldMaxResource := sq.maxResource
if err = sq.setResourcesFromConf(conf.Resources); err != nil {
return err
}
sq.maxRunningApps = conf.MaxApplications
sq.updateMaxRunningAppsMetrics()
sq.setPreemptionSettings(oldMaxResource, conf)
}

sq.properties = conf.Properties
return nil
}

// setPreemptionSettings Set Quota change preemption settings
func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource, conf configs.QueueConfig) {
newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
if err != nil {
log.Log(log.SchedQueue).Error("parsing failed on max resources this should not happen",
zap.String("queue", sq.QueuePath),
zap.Error(err))
return
}

switch {
// Set max res earlier but not now
case resources.IsZero(newMaxResource) && !resources.IsZero(oldMaxResource):
sq.quotaChangePreemptionDelay = 0
// Set max res now but not earlier
case !resources.IsZero(newMaxResource) && resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
sq.quotaChangePreemptionDelay = conf.Preemption.Delay
// Set max res earlier and now as well
default:
switch {
// Quota decrease
case resources.StrictlyGreaterThan(oldMaxResource, newMaxResource) && conf.Preemption.Delay != 0:
sq.quotaChangePreemptionDelay = conf.Preemption.Delay
// Quota increase
case resources.StrictlyGreaterThan(newMaxResource, oldMaxResource) && conf.Preemption.Delay != 0:
sq.quotaChangePreemptionDelay = 0
// Quota remains as is but delay has changed
case resources.Equals(oldMaxResource, newMaxResource) && conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay != conf.Preemption.Delay:
sq.quotaChangePreemptionDelay = conf.Preemption.Delay
default:
// noop
}
}
}

// setResourcesFromConf sets the maxResource and guaranteedResource of the queue from the config.
func (sq *Queue) setResourcesFromConf(resource configs.Resources) error {
maxResource, err := resources.NewResourceFromConf(resource.Max)
Expand Down
98 changes: 97 additions & 1 deletion pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2246,6 +2246,88 @@ func TestApplyConf(t *testing.T) {
assert.Equal(t, root.maxRunningApps, uint64(0))
}

func TestQuotaChangePreemptionSettings(t *testing.T) {
root, err := createManagedQueueWithProps(nil, "root", true, nil, nil)
assert.NilError(t, err, "failed to create basic queue: %v", err)

parent, err := createManagedQueueWithProps(root, "parent", false, nil, nil)
assert.NilError(t, err, "failed to create basic queue: %v", err)
testCases := []struct {
name string
conf configs.QueueConfig
expectedDelay uint64
}{{"first time queue setup without delay", configs.QueueConfig{
Resources: configs.Resources{
Max: getResourceConf(),
Guaranteed: getResourceConf(),
},
}, 0},
{"clearing max resources", configs.QueueConfig{
Resources: configs.Resources{
Max: nil,
Guaranteed: nil,
},
}, 0},
{"first time queue setup with delay", configs.QueueConfig{
Resources: configs.Resources{
Max: getResourceConf(),
Guaranteed: getResourceConf(),
},
Preemption: configs.Preemption{
Delay: 100,
},
}, 100},
{"increase max with delay", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100000000"},
},
Preemption: configs.Preemption{
Delay: 500,
},
}, 0},
{"decrease max with delay", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100"},
},
Preemption: configs.Preemption{
Delay: 500,
},
}, 500},
{"max remains as is but delay changed", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100"},
},
Preemption: configs.Preemption{
Delay: 200,
},
}, 200},
{"unrelated config change, should not impact earlier set preemption settings", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100"},
Guaranteed: map[string]string{"memory": "50"},
},
Preemption: configs.Preemption{
Delay: 200,
},
}, 200},
{"increase max again with delay", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "101"},
},
Preemption: configs.Preemption{
Delay: 200,
},
}, 0}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err = parent.ApplyConf(tc.conf)
assert.NilError(t, err, "failed to apply conf: %v", err)
assert.Equal(t, parent.quotaChangePreemptionDelay, tc.expectedDelay)
})
}
}

func TestNewConfiguredQueue(t *testing.T) {
// check variable assignment
properties := getProperties()
Expand Down Expand Up @@ -2275,6 +2357,7 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.DeepEqual(t, properties, parent.template.GetProperties())
assert.Assert(t, resources.Equals(resourceStruct, parent.template.GetMaxResource()))
assert.Assert(t, resources.Equals(resourceStruct, parent.template.GetGuaranteedResource()))
assert.Equal(t, parent.quotaChangePreemptionDelay, uint64(0))

// case 0: managed leaf queue can't use template
leafConfig := configs.QueueConfig{
Expand All @@ -2285,6 +2368,9 @@ func TestNewConfiguredQueue(t *testing.T) {
Max: getResourceConf(),
Guaranteed: getResourceConf(),
},
Preemption: configs.Preemption{
Delay: 500,
},
}
childLeaf, err := NewConfiguredQueue(leafConfig, parent, false, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
Expand All @@ -2297,11 +2383,15 @@ func TestNewConfiguredQueue(t *testing.T) {
childLeafGuaranteed, err := resources.NewResourceFromConf(leafConfig.Resources.Guaranteed)
assert.NilError(t, err, "Resource creation failed")
assert.Assert(t, resources.Equals(childLeaf.guaranteedResource, childLeafGuaranteed))
assert.Equal(t, childLeaf.quotaChangePreemptionDelay, uint64(500))

// case 1: non-leaf can't use template but it can inherit template from parent
NonLeafConfig := configs.QueueConfig{
Name: "nonleaf_queue",
Parent: true,
Preemption: configs.Preemption{
Delay: 500,
},
}
childNonLeaf, err := NewConfiguredQueue(NonLeafConfig, parent, false, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
Expand All @@ -2310,19 +2400,25 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.Equal(t, len(childNonLeaf.properties), 0)
assert.Assert(t, childNonLeaf.guaranteedResource == nil)
assert.Assert(t, childNonLeaf.maxResource == nil)
assert.Equal(t, childNonLeaf.quotaChangePreemptionDelay, uint64(0))

// case 2: do not send queue event when silence flag is set to true
events.Init()
eventSystem := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck
eventSystem.StartServiceWithPublisher(false)
rootConfig := configs.QueueConfig{
Name: "root",
Preemption: configs.Preemption{
Delay: 500,
},
}
_, err = NewConfiguredQueue(rootConfig, nil, true, nil)

rootQ, err := NewConfiguredQueue(rootConfig, nil, true, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
time.Sleep(time.Second)
noEvents := eventSystem.Store.CountStoredEvents()
assert.Equal(t, noEvents, uint64(0), "expected 0 event, got %d", noEvents)
assert.Equal(t, rootQ.quotaChangePreemptionDelay, uint64(0))
}

func TestResetRunningState(t *testing.T) {
Expand Down
Loading