diff --git a/pkg/common/configs/config.go b/pkg/common/configs/config.go index ce91d9cc9..a6d6bd4fe 100644 --- a/pkg/common/configs/config.go +++ b/pkg/common/configs/config.go @@ -78,6 +78,11 @@ type QueueConfig struct { ChildTemplate ChildTemplate `yaml:",omitempty" json:",omitempty"` Queues []QueueConfig `yaml:",omitempty" json:",omitempty"` Limits []Limit `yaml:",omitempty" json:",omitempty"` + Preemption Preemption `yaml:",omitempty" json:",omitempty"` +} + +type Preemption struct { + Delay uint64 `yaml:",omitempty" json:",omitempty"` } type ChildTemplate struct { diff --git a/pkg/common/configs/configvalidator.go b/pkg/common/configs/configvalidator.go index 76b537f7e..c311823b7 100644 --- a/pkg/common/configs/configvalidator.go +++ b/pkg/common/configs/configvalidator.go @@ -662,6 +662,9 @@ func checkQueues(queue *QueueConfig, level int) error { return fmt.Errorf("duplicate child name found with name '%s', level %d", child.Name, level) } queueMap[strings.ToLower(child.Name)] = true + if queue.Preemption.Delay != 0 && queue.Preemption.Delay <= 60 { + return fmt.Errorf("invalid preemption delay %d, must be greater than 60 seconds", queue.Preemption.Delay) + } } // recurse into the depth if this level passed diff --git a/pkg/common/configs/configvalidator_test.go b/pkg/common/configs/configvalidator_test.go index 4d0c64953..e62f4b3f6 100644 --- a/pkg/common/configs/configvalidator_test.go +++ b/pkg/common/configs/configvalidator_test.go @@ -2327,6 +2327,34 @@ func TestCheckQueues(t *testing.T) { //nolint:funlen assert.Equal(t, 2, len(q.Queues), "Expected two queues") }, }, + { + name: "Invalid Preemption delay for leaf queue", + queue: &QueueConfig{ + Name: "root", + Queues: []QueueConfig{ + {Name: "leaf", + Preemption: Preemption{ + Delay: 10, + }, + }, + }, + Preemption: Preemption{ + Delay: 10, + }, + }, + level: 1, + expectedErrorMsg: "invalid preemption delay 10, must be greater than 60 seconds", + }, + { + name: "Setting Preemption delay on root queue would be ignored", + queue: &QueueConfig{ + Name: "root", + Preemption: Preemption{ + Delay: 10, + }, + }, + level: 0, + }, } for _, tc := range testCases { diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index a81b1bfae..db6b972a2 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -74,21 +74,22 @@ type Queue struct { // The queue properties should be treated as immutable the value is a merge of the // parent properties with the config for this queue only manipulated during creation // of the queue or via a queue configuration update. - properties map[string]string - adminACL security.ACL // admin ACL - submitACL security.ACL // submit ACL - maxResource *resources.Resource // When not set, max = nil - guaranteedResource *resources.Resource // When not set, Guaranteed == 0 - isLeaf bool // this is a leaf queue or not (i.e. parent) - isManaged bool // queue is part of the config, not auto created - stateMachine *fsm.FSM // the state of the queue for scheduling - stateTime time.Time // last time the state was updated (needed for cleanup) - maxRunningApps uint64 - runningApps uint64 - allocatingAcceptedApps map[string]bool - template *template.Template - queueEvents *schedEvt.QueueEvents - appQueueMapping *AppQueueMapping // appID mapping to queues + properties map[string]string + adminACL security.ACL // admin ACL + submitACL security.ACL // submit ACL + maxResource *resources.Resource // When not set, max = nil + guaranteedResource *resources.Resource // When not set, Guaranteed == 0 + isLeaf bool // this is a leaf queue or not (i.e. parent) + isManaged bool // queue is part of the config, not auto created + stateMachine *fsm.FSM // the state of the queue for scheduling + stateTime time.Time // last time the state was updated (needed for cleanup) + maxRunningApps uint64 + runningApps uint64 + allocatingAcceptedApps map[string]bool + template *template.Template + queueEvents *schedEvt.QueueEvents + appQueueMapping *AppQueueMapping // appID mapping to queues + quotaChangePreemptionDelay uint64 locking.RWMutex } @@ -96,21 +97,22 @@ type Queue struct { // newBlankQueue creates a new empty queue objects with all values initialised. func newBlankQueue() *Queue { return &Queue{ - children: make(map[string]*Queue), - childPriorities: make(map[string]int32), - applications: make(map[string]*Application), - appPriorities: make(map[string]int32), - reservedApps: make(map[string]int), - allocatingAcceptedApps: make(map[string]bool), - properties: make(map[string]string), - stateMachine: NewObjectState(), - allocatedResource: resources.NewResource(), - preemptingResource: resources.NewResource(), - pending: resources.NewResource(), - currentPriority: configs.MinPriority, - prioritySortEnabled: true, - preemptionDelay: configs.DefaultPreemptionDelay, - preemptionPolicy: policies.DefaultPreemptionPolicy, + children: make(map[string]*Queue), + childPriorities: make(map[string]int32), + applications: make(map[string]*Application), + appPriorities: make(map[string]int32), + reservedApps: make(map[string]int), + allocatingAcceptedApps: make(map[string]bool), + properties: make(map[string]string), + stateMachine: NewObjectState(), + allocatedResource: resources.NewResource(), + preemptingResource: resources.NewResource(), + pending: resources.NewResource(), + currentPriority: configs.MinPriority, + prioritySortEnabled: true, + preemptionDelay: configs.DefaultPreemptionDelay, + preemptionPolicy: policies.DefaultPreemptionPolicy, + quotaChangePreemptionDelay: 0, } } @@ -155,7 +157,6 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue, silence bool, a zap.String("queueName", sq.QueuePath)) sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged) } - return sq, nil } @@ -372,17 +373,54 @@ func (sq *Queue) applyConf(conf configs.QueueConfig, silence bool) error { // Load the max & guaranteed resources and maxApps for all but the root queue if sq.Name != configs.RootQueue { + oldMaxResource := sq.maxResource if err = sq.setResourcesFromConf(conf.Resources); err != nil { return err } sq.maxRunningApps = conf.MaxApplications sq.updateMaxRunningAppsMetrics() + sq.setPreemptionSettings(oldMaxResource, conf) } sq.properties = conf.Properties return nil } +// setPreemptionSettings Set Quota change preemption settings +func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource, conf configs.QueueConfig) { + newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max) + if err != nil { + log.Log(log.SchedQueue).Error("parsing failed on max resources this should not happen", + zap.String("queue", sq.QueuePath), + zap.Error(err)) + return + } + + switch { + // Set max res earlier but not now + case resources.IsZero(newMaxResource) && !resources.IsZero(oldMaxResource): + sq.quotaChangePreemptionDelay = 0 + // Set max res now but not earlier + case !resources.IsZero(newMaxResource) && resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0: + sq.quotaChangePreemptionDelay = conf.Preemption.Delay + // Set max res earlier and now as well + default: + switch { + // Quota decrease + case resources.StrictlyGreaterThan(oldMaxResource, newMaxResource) && conf.Preemption.Delay != 0: + sq.quotaChangePreemptionDelay = conf.Preemption.Delay + // Quota increase + case resources.StrictlyGreaterThan(newMaxResource, oldMaxResource) && conf.Preemption.Delay != 0: + sq.quotaChangePreemptionDelay = 0 + // Quota remains as is but delay has changed + case resources.Equals(oldMaxResource, newMaxResource) && conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay != conf.Preemption.Delay: + sq.quotaChangePreemptionDelay = conf.Preemption.Delay + default: + // noop + } + } +} + // setResourcesFromConf sets the maxResource and guaranteedResource of the queue from the config. func (sq *Queue) setResourcesFromConf(resource configs.Resources) error { maxResource, err := resources.NewResourceFromConf(resource.Max) diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index c6c2594ed..67d5a4c26 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2246,6 +2246,88 @@ func TestApplyConf(t *testing.T) { assert.Equal(t, root.maxRunningApps, uint64(0)) } +func TestQuotaChangePreemptionSettings(t *testing.T) { + root, err := createManagedQueueWithProps(nil, "root", true, nil, nil) + assert.NilError(t, err, "failed to create basic queue: %v", err) + + parent, err := createManagedQueueWithProps(root, "parent", false, nil, nil) + assert.NilError(t, err, "failed to create basic queue: %v", err) + testCases := []struct { + name string + conf configs.QueueConfig + expectedDelay uint64 + }{{"first time queue setup without delay", configs.QueueConfig{ + Resources: configs.Resources{ + Max: getResourceConf(), + Guaranteed: getResourceConf(), + }, + }, 0}, + {"clearing max resources", configs.QueueConfig{ + Resources: configs.Resources{ + Max: nil, + Guaranteed: nil, + }, + }, 0}, + {"first time queue setup with delay", configs.QueueConfig{ + Resources: configs.Resources{ + Max: getResourceConf(), + Guaranteed: getResourceConf(), + }, + Preemption: configs.Preemption{ + Delay: 100, + }, + }, 100}, + {"increase max with delay", configs.QueueConfig{ + Resources: configs.Resources{ + Max: map[string]string{"memory": "100000000"}, + }, + Preemption: configs.Preemption{ + Delay: 500, + }, + }, 0}, + {"decrease max with delay", configs.QueueConfig{ + Resources: configs.Resources{ + Max: map[string]string{"memory": "100"}, + }, + Preemption: configs.Preemption{ + Delay: 500, + }, + }, 500}, + {"max remains as is but delay changed", configs.QueueConfig{ + Resources: configs.Resources{ + Max: map[string]string{"memory": "100"}, + }, + Preemption: configs.Preemption{ + Delay: 200, + }, + }, 200}, + {"unrelated config change, should not impact earlier set preemption settings", configs.QueueConfig{ + Resources: configs.Resources{ + Max: map[string]string{"memory": "100"}, + Guaranteed: map[string]string{"memory": "50"}, + }, + Preemption: configs.Preemption{ + Delay: 200, + }, + }, 200}, + {"increase max again with delay", configs.QueueConfig{ + Resources: configs.Resources{ + Max: map[string]string{"memory": "101"}, + }, + Preemption: configs.Preemption{ + Delay: 200, + }, + }, 0}} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err = parent.ApplyConf(tc.conf) + assert.NilError(t, err, "failed to apply conf: %v", err) + assert.Equal(t, parent.quotaChangePreemptionDelay, tc.expectedDelay) + }) + } +} + func TestNewConfiguredQueue(t *testing.T) { // check variable assignment properties := getProperties() @@ -2275,6 +2357,7 @@ func TestNewConfiguredQueue(t *testing.T) { assert.DeepEqual(t, properties, parent.template.GetProperties()) assert.Assert(t, resources.Equals(resourceStruct, parent.template.GetMaxResource())) assert.Assert(t, resources.Equals(resourceStruct, parent.template.GetGuaranteedResource())) + assert.Equal(t, parent.quotaChangePreemptionDelay, uint64(0)) // case 0: managed leaf queue can't use template leafConfig := configs.QueueConfig{ @@ -2285,6 +2368,9 @@ func TestNewConfiguredQueue(t *testing.T) { Max: getResourceConf(), Guaranteed: getResourceConf(), }, + Preemption: configs.Preemption{ + Delay: 500, + }, } childLeaf, err := NewConfiguredQueue(leafConfig, parent, false, nil) assert.NilError(t, err, "failed to create queue: %v", err) @@ -2297,11 +2383,15 @@ func TestNewConfiguredQueue(t *testing.T) { childLeafGuaranteed, err := resources.NewResourceFromConf(leafConfig.Resources.Guaranteed) assert.NilError(t, err, "Resource creation failed") assert.Assert(t, resources.Equals(childLeaf.guaranteedResource, childLeafGuaranteed)) + assert.Equal(t, childLeaf.quotaChangePreemptionDelay, uint64(500)) // case 1: non-leaf can't use template but it can inherit template from parent NonLeafConfig := configs.QueueConfig{ Name: "nonleaf_queue", Parent: true, + Preemption: configs.Preemption{ + Delay: 500, + }, } childNonLeaf, err := NewConfiguredQueue(NonLeafConfig, parent, false, nil) assert.NilError(t, err, "failed to create queue: %v", err) @@ -2310,6 +2400,7 @@ func TestNewConfiguredQueue(t *testing.T) { assert.Equal(t, len(childNonLeaf.properties), 0) assert.Assert(t, childNonLeaf.guaranteedResource == nil) assert.Assert(t, childNonLeaf.maxResource == nil) + assert.Equal(t, childNonLeaf.quotaChangePreemptionDelay, uint64(0)) // case 2: do not send queue event when silence flag is set to true events.Init() @@ -2317,12 +2408,17 @@ func TestNewConfiguredQueue(t *testing.T) { eventSystem.StartServiceWithPublisher(false) rootConfig := configs.QueueConfig{ Name: "root", + Preemption: configs.Preemption{ + Delay: 500, + }, } - _, err = NewConfiguredQueue(rootConfig, nil, true, nil) + + rootQ, err := NewConfiguredQueue(rootConfig, nil, true, nil) assert.NilError(t, err, "failed to create queue: %v", err) time.Sleep(time.Second) noEvents := eventSystem.Store.CountStoredEvents() assert.Equal(t, noEvents, uint64(0), "expected 0 event, got %d", noEvents) + assert.Equal(t, rootQ.quotaChangePreemptionDelay, uint64(0)) } func TestResetRunningState(t *testing.T) {