Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2207,7 +2207,7 @@ func TestTryAllocatePreemptQueue(t *testing.T) {

app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
err = app1.AddAllocationAsk(ask1)
assert.NilError(t, err)
Expand All @@ -2217,7 +2217,7 @@ func TestTryAllocatePreemptQueue(t *testing.T) {

app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
childQ2.applications[appID2] = app2
childQ2.AddApplication(app2)
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask3.allowPreemptOther = true
err = app2.AddAllocationAsk(ask3)
Expand Down Expand Up @@ -2289,7 +2289,7 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID

app0 := newApplication(appID0, "default", "root.unlimited")
app0.SetQueue(unlimitedQ)
unlimitedQ.applications[appID0] = app0
unlimitedQ.AddApplication(app0)
ask00 := newAllocationAsk("alloc0-0", appID0, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))
err = app0.AddAllocationAsk(ask00)
assert.NilError(t, err)
Expand All @@ -2299,7 +2299,7 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID

app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
err = app1.AddAllocationAsk(ask1)
assert.NilError(t, err)
Expand All @@ -2309,7 +2309,7 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID

app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
childQ2.applications[appID2] = app2
childQ2.AddApplication(app2)
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask3.allowPreemptOther = true
err = app2.AddAllocationAsk(ask3)
Expand Down Expand Up @@ -2358,7 +2358,7 @@ func createPreemptNodeWithReservationsTestSetup(t *testing.T) (func() NodeIterat

app3 := newApplication(appID3, "default", "root.parent.child2")
app3.SetQueue(childQ2)
childQ2.applications[appID3] = app3
childQ2.AddApplication(app3)
ask4 := newAllocationAsk("alloc4", appID3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask4.allowPreemptOther = true
ask4.priority = math.MaxInt32
Expand Down
48 changes: 24 additions & 24 deletions pkg/scheduler/objects/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func creatApp1(
) (*Allocation, *Allocation, error) {
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)

ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(app1Rec))
ask1.createTime = time.Now().Add(-1 * time.Minute)
Expand Down Expand Up @@ -97,7 +97,7 @@ func creatApp2(
) (*Application, *Allocation, error) {
app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
childQ2.applications[appID2] = app2
childQ2.AddApplication(app2)
ask3 := newAllocationAsk(allocID, appID2, resources.NewResourceFromMap(app2Res))
if err := app2.AddAllocationAsk(ask3); err != nil {
return nil, nil, err
Expand All @@ -115,7 +115,7 @@ func TestCheckPreconditions(t *testing.T) {
assert.NilError(t, err)
app := newApplication(appID1, "default", "root.child")
app.SetQueue(childQ)
childQ.applications[appID1] = app
childQ.AddApplication(app)
ask := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask.allowPreemptOther = true
ask.createTime = time.Now().Add(-1 * time.Minute)
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestTryPreemptionOnNodeWithOGParentAndUGPreemptor(t *testing.T) {
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)

for i := 1; i <= 6; i++ {
ask1 := newAllocationAsk("alloc"+strconv.Itoa(i), appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}))
Expand Down Expand Up @@ -662,7 +662,7 @@ func TestTryPreemption_VictimsAvailableOnDifferentNodes(t *testing.T) {
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 4, "pods": 1}))
ask1.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t *testing.T) {

app3 := newApplication(appID3, "default", "root.parent.child3")
app3.SetQueue(childQ3)
childQ3.applications[appID3] = app3
childQ3.AddApplication(app3)

ask4 := newAllocationAsk("alloc4", appID3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask4.createTime = time.Now()
Expand Down Expand Up @@ -801,7 +801,7 @@ func TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask1.createTime = time.Now().Add(-2 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
Expand All @@ -823,7 +823,7 @@ func TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {

app3 := newApplication(appID3, "default", "root.parent.child3")
app3.SetQueue(childQ3)
childQ3.applications[appID3] = app3
childQ3.AddApplication(app3)

ask4 := newAllocationAskPriority("alloc4", appID3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 1000)
ask4.createTime = time.Now()
Expand Down Expand Up @@ -892,7 +892,7 @@ func TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t *test

app1 := newApplication(appID1, "default", "root.parent.parent2.child2")
app1.SetQueue(childQ2)
childQ2.applications[appID1] = app1
childQ2.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1, "mem": 200}))
ask1.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
Expand All @@ -911,7 +911,7 @@ func TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t *test
assert.NilError(t, childQ2.TryIncAllocatedResource(ask2.GetAllocatedResource()))
app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ1)
childQ1.applications[appID2] = app2
childQ1.AddApplication(app2)
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app2.AddAllocationAsk(ask3))
headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 2})
Expand Down Expand Up @@ -965,7 +965,7 @@ func TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(

app1 := newApplication(appID1, "default", "root.parent.parent2.child2")
app1.SetQueue(childQ2)
childQ2.applications[appID1] = app1
childQ2.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1, "mem": 200}))
ask1.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
Expand All @@ -984,7 +984,7 @@ func TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(
assert.NilError(t, childQ2.TryIncAllocatedResource(ask2.GetAllocatedResource()))
app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ1)
childQ1.applications[appID2] = app2
childQ1.AddApplication(app2)
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app2.AddAllocationAsk(ask3))
headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 2})
Expand Down Expand Up @@ -1205,13 +1205,13 @@ func TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreem
func createVictimApplications(childQ2 *Queue) (*Application, *Application, *Application) {
app1 := newApplication(appID1, "default", "root.parent.parent2.child2")
app1.SetQueue(childQ2)
childQ2.applications[appID1] = app1
childQ2.AddApplication(app1)
app2 := newApplication(appID2, "default", "root.parent.parent2.child2")
app2.SetQueue(childQ2)
childQ2.applications[appID2] = app2
childQ2.AddApplication(app2)
app3 := newApplication(appID3, "default", "root.parent.parent2.child2")
app3.SetQueue(childQ2)
childQ2.applications[appID3] = app3
childQ2.AddApplication(app3)
return app1, app2, app3
}

Expand Down Expand Up @@ -1637,7 +1637,7 @@ func TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_GNotSetOnVictimChil

app1 := newApplication(appID1, "default", "root.parent.parent1.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask1.createTime = time.Now().Add(-2 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
Expand All @@ -1649,7 +1649,7 @@ func TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_GNotSetOnVictimChil

app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ2)
childQ1.applications[appID2] = app2
childQ1.AddApplication(app2)
ask2 := newAllocationAsk("alloc2", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask2.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app2.AddAllocationAsk(ask2))
Expand All @@ -1661,7 +1661,7 @@ func TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_GNotSetOnVictimChil

app3 := newApplication(appID3, "default", "root.parent.parent1.child2")
app3.SetQueue(childQ2)
childQ2.applications[appID3] = app3
childQ2.AddApplication(app3)
ask3 := newAllocationAsk("alloc3", appID3, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app3.AddAllocationAsk(ask3))

Expand Down Expand Up @@ -1714,7 +1714,7 @@ func TestTryPreemption_OnNode_UGParent_With_GNotSetOnBothChilds(t *testing.T) {

app1 := newApplication(appID1, "default", "root.parent.parent1.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask1.createTime = time.Now().Add(-2 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
Expand All @@ -1726,7 +1726,7 @@ func TestTryPreemption_OnNode_UGParent_With_GNotSetOnBothChilds(t *testing.T) {

app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ2)
childQ1.applications[appID2] = app2
childQ1.AddApplication(app2)
ask2 := newAllocationAsk("alloc2", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask2.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app2.AddAllocationAsk(ask2))
Expand All @@ -1738,7 +1738,7 @@ func TestTryPreemption_OnNode_UGParent_With_GNotSetOnBothChilds(t *testing.T) {

app3 := newApplication(appID3, "default", "root.parent.parent1.child2")
app3.SetQueue(childQ2)
childQ2.applications[appID3] = app3
childQ2.AddApplication(app3)
ask3 := newAllocationAsk("alloc3", appID3, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app3.AddAllocationAsk(ask3))

Expand Down Expand Up @@ -1785,7 +1785,7 @@ func TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_OGVictimChild_As_Si

app1 := newApplication(appID1, "default", "root.parent.parent1.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask1.createTime = time.Now().Add(-2 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
Expand All @@ -1797,7 +1797,7 @@ func TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_OGVictimChild_As_Si

app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ2)
childQ1.applications[appID2] = app2
childQ1.AddApplication(app2)
ask2 := newAllocationAsk("alloc2", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask2.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app2.AddAllocationAsk(ask2))
Expand All @@ -1809,7 +1809,7 @@ func TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_OGVictimChild_As_Si

app3 := newApplication(appID3, "default", "root.parent.parent1.child2")
app3.SetQueue(childQ2)
childQ2.applications[appID3] = app3
childQ2.AddApplication(app3)
ask3 := newAllocationAsk("alloc3", appID3, resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app3.AddAllocationAsk(ask3))

Expand Down
91 changes: 84 additions & 7 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,42 @@ type Queue struct {
template *template.Template
queueEvents *schedEvt.QueueEvents

// appID -> queuePath index
appIndex *appQueueMapping

locking.RWMutex
}

// appQueueMapping is a thread safe mapping from applicationID to queuePath
type appQueueMapping struct {
byAppID map[string]string
locking.RWMutex
}
Copy link
Contributor

@pbacsko pbacsko Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be maintained globally, not in a per-queue basis. I think the ideal place is PartitionContext. However, the data type has to stay in pkg/scheduler/objects to avoid circular references.

Here is my take:

  1. Make it public (AppQueueMapping)
  2. Extract this type to a separate file eg. pkg/scheduler/objects/app_queue_mapping.go, create simple unit tests for it
  3. Create a single instance inside PartitionContext when the context is created
  4. When a Queue is created, inject the instance to the Queue. Extend NewConfiguredQueue() and NewRecoveryQueue() and NewDynamicQueue() with an extra argument.
  5. Have a reference inside Queue to AppQueueMapping
  6. You can add/remove mappings in PartitionContext.AddApplication() and PartitionContext.RemoveApplication()

With this approach, everything is much simpler and we don't have to walk the Queue hierarchy or find the root queue at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially tried managing the global state in PartitionContext,
but FindQueueByAppID is often called together with other PartitionContext operations.
When PartitionContext holds a lock and the queue calls any getXXXX function, it can cause a deadlock.

As you suggested, making AppQueueMapping public, limiting the lock scope to AppQueueMapping itself, and injecting it into the queue should prevent this issue.

I’ll confirm the scope of changes and push an updated version.
Thanks for the review!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The structure would look like this:

type PartitionContext struct {
    appQueueMapping *object.AppQueueMapping
}

type Queue struct {
    appQueueMapping *AppQueueMapping
}

I'll try to inject PartitionContext.appQueueMapping when creating the queue.


func (aqm *appQueueMapping) addAppQueueMapping(appID, queuePath string) {
aqm.Lock()
defer aqm.Unlock()
aqm.byAppID[appID] = queuePath
}

func (aqm *appQueueMapping) findQueuePathByAppID(appID string) string {
aqm.RLock()
defer aqm.RUnlock()
return aqm.byAppID[appID]
}

func (aqm *appQueueMapping) removeAppQueueMapping(appID string) {
aqm.Lock()
defer aqm.Unlock()
delete(aqm.byAppID, appID)
}

func newAppQueueMapping() *appQueueMapping {
return &appQueueMapping{
byAppID: make(map[string]string),
}
}

// newBlankQueue creates a new empty queue objects with all values initialised.
func newBlankQueue() *Queue {
return &Queue{
Expand All @@ -110,6 +143,7 @@ func newBlankQueue() *Queue {
prioritySortEnabled: true,
preemptionDelay: configs.DefaultPreemptionDelay,
preemptionPolicy: policies.DefaultPreemptionPolicy,
appIndex: newAppQueueMapping(),
}
}

Expand Down Expand Up @@ -789,6 +823,7 @@ func (sq *Queue) AddApplication(app *Application) {
appID := app.ApplicationID
sq.applications[appID] = app
sq.queueEvents.SendNewApplicationEvent(sq.QueuePath, appID)
sq.addAppQueueMapping(appID, sq.QueuePath)
}

// RemoveApplication removes the app from the list of tracked applications. Make sure that the app
Expand Down Expand Up @@ -843,6 +878,7 @@ func (sq *Queue) RemoveApplication(app *Application) {
app.appEvents.SendRemoveApplicationEvent(appID)

sq.parent.UpdateQueuePriority(sq.Name, priority)
sq.removeAppQueueMapping(appID)

log.Log(log.SchedQueue).Info("Application completed and removed from queue",
zap.String("queueName", sq.QueuePath),
Expand Down Expand Up @@ -1627,13 +1663,7 @@ func (sq *Queue) GetApplication(appID string) *Application {

// FindQueueByAppID searches the queue hierarchy for an application with the given appID and returns the queue it belongs to
func (sq *Queue) FindQueueByAppID(appID string) *Queue {
if sq == nil {
return nil
}
if sq.parent != nil {
return sq.parent.FindQueueByAppID(appID)
}
return sq.findQueueByAppIDInternal(appID)
return sq.findQueueByAppID(appID)
}

func (sq *Queue) findQueueByAppIDInternal(appID string) *Queue {
Expand Down Expand Up @@ -2059,3 +2089,50 @@ func (sq *Queue) recalculatePriority() int32 {
sq.currentPriority = curr
return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
}

func (sq *Queue) findRoot() *Queue {
if sq == nil {
return nil
}
if sq.parent != nil {
return sq.parent.findRoot()
}
return sq
}

func (sq *Queue) findQueueByAppID(appID string) *Queue {
path := sq.findQueuePathByAppID(appID)
if path == "" {
return nil
}
return sq.findQueueByPath(path)
}

func (sq *Queue) addAppQueueMapping(appID, queuePath string) {
root := sq.findRoot()
root.appIndex.addAppQueueMapping(appID, queuePath)
}

func (sq *Queue) removeAppQueueMapping(appID string) {
root := sq.findRoot()
root.appIndex.removeAppQueueMapping(appID)
}

func (sq *Queue) findQueuePathByAppID(appID string) string {
root := sq.findRoot()
return root.appIndex.findQueuePathByAppID(appID)
}

func (sq *Queue) findQueueByPath(path string) *Queue {
queue := sq.findRoot()
part := strings.Split(strings.ToLower(path), configs.DOT)
if len(part) < 1 {
return nil
}
for _, p := range part[1:] {
if queue = queue.GetChildQueue(p); queue == nil {
break
}
}
return queue
}