Skip to content

✨ Priorityqueue: Optionally return items within a priority fairly #3261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ linters:
# used through an interface and not directly..?
# Likely same issue as https://github.com/dominikh/go-tools/issues/1616
path: pkg/controller/priorityqueue/metrics\.go
- linters:
- unused
# Seems to incorrectly trigger on the two implementations that are only
# used through an interface and not directly..?
# Likely same issue as https://github.com/dominikh/go-tools/issues/1616
path: pkg/controller/priorityqueue/priorityqueue_test\.go
# The following are being worked on to remove their exclusion. This list should be reduced or go away all together over time.
# If it is decided they will not be addressed they should be moved above this comment.
- path: (.+)\.go$
Expand Down
97 changes: 97 additions & 0 deletions pkg/controller/priorityqueue/fairqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package priorityqueue

import (
"slices"

"k8s.io/utils/ptr"
)

type fairq[T comparable] struct {
children map[string]*fairq[T]
items []*item[T]
// order is the order in which to dequeue. Nil is used
// as a magic value to indicate we should dequeue from
// items instead of a child next.
order []*string
}

func (q *fairq[T]) add(i *item[T], dimensions ...string) {
if len(dimensions) == 0 {
if len(q.order) == len(q.children) {
q.order = append([]*string{nil}, q.order...)
}
q.items = append(q.items, i)

// Sort here so items that get unlocked can be added, rather
// than having to completely drain and rebuild the fairQueue.
slices.SortFunc(q.items, func(a, b *item[T]) int {
if less(a, b) {
return -1
}
return 1
})

return
}

dimension := dimensions[0]
dimensions = dimensions[1:]

if q.children == nil {
q.children = make(map[string]*fairq[T], 1)
}

_, exists := q.children[dimension]
if !exists {
q.children[dimension] = &fairq[T]{}
q.order = append([]*string{ptr.To(dimension)}, q.order...)
}

q.children[dimension].add(i, dimensions...)
}

func (q *fairq[T]) dequeue() (*item[T], bool) {
var item *item[T]
var hasItem bool

for idx, dimension := range q.order {
switch {
case dimension != nil: // child element
item, hasItem = q.children[*dimension].dequeue()
if !hasItem {
continue
}
case len(q.items) > 0: // leaf element
item = q.items[0]
q.items = q.items[1:]
default: // no items for current dimension, check next
continue
}

q.order = append(q.order[:idx], q.order[idx+1:]...)
q.order = append(q.order, dimension)

return item, true
}

return item, false
}

func (q *fairq[T]) drain() {
for _, child := range q.children {
child.drain()
}
q.items = nil
}

func (q *fairq[T]) isEmpty() bool {
if len(q.items) > 0 {
return false
}
for _, child := range q.children {
if !child.isEmpty() {
return false
}
}
return true
}
87 changes: 87 additions & 0 deletions pkg/controller/priorityqueue/fairqueue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package priorityqueue

import (
"strings"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("FairQueue", func() {
DescribeTable("Ordering tests",
Entry("simple",
[]string{"foo", "foo", "bar"},
[]string{"bar", "foo", "foo"},
),
Entry("balanced",
[]string{"foo", "foo", "bar", "bar"},
[]string{"bar", "foo", "bar", "foo"},
),
Entry("mixed-dimensional, multi-dimensional comes first",
[]string{"foo-foo", "foo-foo", "foo"},
[]string{"foo", "foo-foo", "foo-foo"},
),
Entry("mixed-dimensional, single dimension comes first",
[]string{"foo", "foo-foo", "foo-foo"},
[]string{"foo-foo", "foo", "foo-foo"},
),
Entry("complex mixed-dimensional",
[]string{"bar", "foo", "foo-foo", "foo-foo", "foo-bar"},
// First the foo with the latest-added second dimension, then the bar,
// then the remaining foos based on the second dimension.
[]string{"foo-bar", "bar", "foo-foo", "foo", "foo-foo"},
),

func(adds []string, expected []string) {
q := &fairq[string]{}
for idx, add := range adds {
q.add(&item[string]{Key: add, AddedCounter: uint64(idx)}, strings.Split(add, "-")...)
}

actual := make([]string, 0, len(expected))
for range len(expected) {
item, didGetItem := q.dequeue()
Expect(didGetItem).To(BeTrue())
actual = append(actual, item.Key)
}

_, didGetItem := q.dequeue()
Expect(didGetItem).To(BeFalse())
Expect(actual).To(Equal(expected))
},
)

It("retains fairness across queue operations", func() {
q := &fairq[string]{}
q.add(&item[string]{Key: "foo"}, "foo")
_, _ = q.dequeue()
q.add(&item[string]{Key: "bar", AddedCounter: 1}, "bar")
q.add(&item[string]{Key: "foo", AddedCounter: 2}, "foo")

item, _ := q.dequeue()
Expect(item.Key).To(Equal("bar"))
Expect(q.isEmpty()).To(BeFalse())
})

It("sorts by added counter", func() {
q := &fairq[string]{}
q.add(&item[string]{Key: "foo", AddedCounter: 2})
q.add(&item[string]{Key: "bar", AddedCounter: 1})

item, _ := q.dequeue()
Expect(item.Key).To(Equal("bar"))
Expect(q.isEmpty()).To(BeFalse())
})

It("drains all items", func() {
q := &fairq[string]{}
q.add(&item[string]{Key: "foo"}, "foo")
q.add(&item[string]{Key: "bar"}, "bar")
q.add(&item[string]{Key: "baz"}, "baz")

q.drain()
_, gotItem := q.dequeue()
Expect(gotItem).To(BeFalse())
Expect(q.isEmpty()).To(BeTrue())
})
})
Loading