Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pkg/scheduler/objects/app_queue_mapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package objects

import "github.com/apache/yunikorn-core/pkg/locking"

// AppQueueMapping maintains a mapping between application IDs and their corresponding queues.
type AppQueueMapping struct {
byAppID map[string]*Queue
locking.RWMutex
}

func NewAppQueueMapping() *AppQueueMapping {
return &AppQueueMapping{
byAppID: make(map[string]*Queue),
}
}

func (aqm *AppQueueMapping) AddAppQueueMapping(appID string, queue *Queue) {
aqm.Lock()
defer aqm.Unlock()
aqm.byAppID[appID] = queue
}

func (aqm *AppQueueMapping) FindQueueByAppID(appID string) *Queue {
aqm.RLock()
defer aqm.RUnlock()
return aqm.byAppID[appID]
}

func (aqm *AppQueueMapping) RemoveAppQueueMapping(appID string) {
aqm.Lock()
defer aqm.Unlock()
delete(aqm.byAppID, appID)
}
56 changes: 56 additions & 0 deletions pkg/scheduler/objects/app_queue_mapping_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package objects

import (
"testing"
)

func TestNewAppQueueMapping(t *testing.T) {
aqm := NewAppQueueMapping()
if aqm == nil {
t.Fatal("Expected non-nil AppQueueMapping")
}
if len(aqm.byAppID) != 0 {
t.Errorf("Expected empty byAppID map, got %d entries", len(aqm.byAppID))
}
}
func TestAppQueueMappingOperations(t *testing.T) {
aqm := NewAppQueueMapping()
queue := &Queue{}
appID := "app-1234"

// Test AddAppQueueMapping
aqm.AddAppQueueMapping(appID, queue)
if len(aqm.byAppID) != 1 {
t.Errorf("Expected 1 entry in byAppID map, got %d", len(aqm.byAppID))
}

// Test FindQueueByAppID
foundQueue := aqm.FindQueueByAppID(appID)
if foundQueue != queue {
t.Errorf("Expected to find the correct queue for appID %s", appID)
}

// Test RemoveAppQueueMapping
aqm.RemoveAppQueueMapping(appID)
if len(aqm.byAppID) != 0 {
t.Errorf("Expected empty byAppID map after removal, got %d entries", len(aqm.byAppID))
}
}
44 changes: 27 additions & 17 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ func TestUpdateAllocationResourcePending(t *testing.T) {
app := newApplication(appID1, "default", "root.a")
root, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create root queue")
queue, err := createDynamicQueue(root, "test", false)
queue, err := createDynamicQueue(root, "test", false, nil)
assert.NilError(t, err, "failed to create test queue")
app.SetQueue(queue)

Expand Down Expand Up @@ -1088,7 +1088,7 @@ func TestUpdateAllocationResourceAllocated(t *testing.T) {
app := newApplication(appID1, "default", "root.a")
root, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create root queue")
queue, err := createDynamicQueue(root, "test", false)
queue, err := createDynamicQueue(root, "test", false, nil)
assert.NilError(t, err, "failed to create test queue")
app.SetQueue(queue)

Expand Down Expand Up @@ -1138,7 +1138,7 @@ func TestQueueUpdate(t *testing.T) {

root, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create root queue")
queue, err := createDynamicQueue(root, "test", false)
queue, err := createDynamicQueue(root, "test", false, nil)
assert.NilError(t, err, "failed to create test queue")
app.SetQueue(queue)
assert.Equal(t, app.GetQueuePath(), "root.test")
Expand Down Expand Up @@ -2195,19 +2195,21 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
getNode := func(nodeID string) *Node {
return nodeMap[nodeID]
}
appQueueMapping := NewAppQueueMapping()

rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"})
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"}, appQueueMapping)
assert.NilError(t, err)
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"})
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"})
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)

app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)
appQueueMapping.AddAppQueueMapping(appID1, childQ1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
err = app1.AddAllocationAsk(ask1)
assert.NilError(t, err)
Expand All @@ -2217,7 +2219,8 @@ func TestTryAllocatePreemptQueue(t *testing.T) {

app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
childQ2.applications[appID2] = app2
childQ2.AddApplication(app2)
appQueueMapping.AddAppQueueMapping(appID2, childQ2)
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask3.allowPreemptOther = true
err = app2.AddAllocationAsk(ask3)
Expand Down Expand Up @@ -2276,20 +2279,23 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID
return nodeMap[nodeID]
}

appQueueMapping := NewAppQueueMapping()

rootQ, err := createRootQueue(map[string]string{"first": "40"})
assert.NilError(t, err)
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"})
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"}, appQueueMapping)
assert.NilError(t, err)
unlimitedQ, err := createManagedQueueGuaranteed(rootQ, "unlimited", false, nil, nil)
unlimitedQ, err := createManagedQueueGuaranteed(rootQ, "unlimited", false, nil, nil, appQueueMapping)
assert.NilError(t, err)
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"})
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"})
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)

app0 := newApplication(appID0, "default", "root.unlimited")
app0.SetQueue(unlimitedQ)
unlimitedQ.applications[appID0] = app0
unlimitedQ.AddApplication(app0)
appQueueMapping.AddAppQueueMapping(appID0, unlimitedQ)
ask00 := newAllocationAsk("alloc0-0", appID0, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))
err = app0.AddAllocationAsk(ask00)
assert.NilError(t, err)
Expand All @@ -2299,7 +2305,8 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID

app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
childQ1.AddApplication(app1)
appQueueMapping.AddAppQueueMapping(appID1, childQ1)
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
err = app1.AddAllocationAsk(ask1)
assert.NilError(t, err)
Expand All @@ -2309,7 +2316,8 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID

app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
childQ2.applications[appID2] = app2
childQ2.AddApplication(app2)
appQueueMapping.AddAppQueueMapping(appID2, childQ2)
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask3.allowPreemptOther = true
err = app2.AddAllocationAsk(ask3)
Expand Down Expand Up @@ -2358,7 +2366,7 @@ func createPreemptNodeWithReservationsTestSetup(t *testing.T) (func() NodeIterat

app3 := newApplication(appID3, "default", "root.parent.child2")
app3.SetQueue(childQ2)
childQ2.applications[appID3] = app3
childQ2.AddApplication(app3)
ask4 := newAllocationAsk("alloc4", appID3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask4.allowPreemptOther = true
ask4.priority = math.MaxInt32
Expand Down Expand Up @@ -3257,13 +3265,15 @@ func TestRequiredNodePreemption(t *testing.T) {
getNode := func(nodeID string) *Node {
return node
}
appQueueMapping := NewAppQueueMapping()

// set queue
rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
childQ, err := createManagedQueue(rootQ, "default", false, map[string]string{"first": "20"})
childQ, err := createManagedQueueWithAppQueueMapping(rootQ, "default", false, map[string]string{"first": "20"}, appQueueMapping)
assert.NilError(t, err)
app.SetQueue(childQ)
appQueueMapping.AddAppQueueMapping(app.ApplicationID, childQ)

// add an ask
mockEvents := mock.NewEventSystem()
Expand Down
Loading