Skip to content

Commit

Permalink
Lepton commit
Browse files Browse the repository at this point in the history
  • Loading branch information
FillZpp committed Jan 2, 2025
1 parent 2a2486c commit e697ee0
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 11 deletions.
11 changes: 0 additions & 11 deletions .github/workflows/krew-release.yml

This file was deleted.

55 changes: 55 additions & 0 deletions .github/workflows/lepton-build-image.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
name: Build Docker Image

on:
workflow_call:
inputs:
repo:
required: true
type: string
tag:
required: false
type: string
default: test$(git rev-parse --short HEAD)
commit:
required: false
type: string
default: $(git rev-parse --short HEAD)
runs-on:
required: false
type: string
default: ubuntu-latest

jobs:
build:
runs-on: ${{ inputs.runs-on }}
steps:
- uses: actions/checkout@v3
with:
submodules: recursive
- name: Install awscli
run: |
python -m pip install --upgrade pip
pip install awscli
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Build and push the Docker image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
run: |
set -x
docker build . --file Dockerfile \
--tag ${{ inputs.repo }}:${{ inputs.tag }} \
--build-arg GIT_COMMIT=${{ inputs.commit }} \
--build-arg REPO=$ECR_REGISTRY/ecr-public
echo "tagging container image with ${{ inputs.repo }}:${{ inputs.tag }}}"
docker tag ${{ inputs.repo }}:${{ inputs.tag }} $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }}
docker push $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }}
12 changes: 12 additions & 0 deletions .github/workflows/lepton-pr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: lepton-pr

on:
pull_request:
branches: ["**"]

jobs:
build-release:
uses: ./.github/workflows/lepton-build-image.yaml
secrets: inherit
with:
repo: lepton-kueue
15 changes: 15 additions & 0 deletions .github/workflows/lepton-release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: lepton-release

on:
push:
tags:
- "*"

jobs:
build-release:
uses: ./.github/workflows/lepton-build-image.yaml
secrets: inherit
with:
repo: lepton-kueue
tag: ${GITHUB_REF##*/}
commit: ${GITHUB_REF##*/}
18 changes: 18 additions & 0 deletions pkg/lepton/apis/lepton_apis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package apis

import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

const (
labelCanPreempt = "kueue.lepton.ai/can-preempt"
labelCanBePreempted = "kueue.lepton.ai/can-be-preempted"
)

func CanPreempt(wl *kueue.Workload) bool {
return wl.Labels[labelCanPreempt] == "true"
}

func CanBePreempted(wl *kueue.Workload) bool {
return wl.Labels[labelCanBePreempted] == "true"
}
54 changes: 54 additions & 0 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
leptonapis "sigs.k8s.io/kueue/pkg/lepton/apis"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
Expand Down Expand Up @@ -116,9 +117,23 @@ type Target struct {
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*Target {
frsNeedPreemption := flavorResourcesNeedPreemption(assignment)
requests := assignment.TotalRequestsFor(&wl)
if leptonapis.CanPreempt(wl.Obj) {
return p.getTargetsByLepton(log, wl, requests, frsNeedPreemption, snapshot)
}
return p.getTargets(log, wl, requests, frsNeedPreemption, snapshot)
}

func (p *Preemptor) getTargetsByLepton(log logr.Logger, wl workload.Info, requests resources.FlavorResourceQuantities,
frsNeedPreemption sets.Set[resources.FlavorResource], snapshot *cache.Snapshot) []*Target {
cq := snapshot.ClusterQueues[wl.ClusterQueue]
candidates := p.findCandidatesByLepton(wl.Obj, cq, frsNeedPreemption)
if len(candidates) == 0 {
return nil
}
sort.Slice(candidates, candidatesOrdering(candidates, cq.Name, p.clock.Now()))
return minimalPreemptions(log, requests, cq, snapshot, frsNeedPreemption, candidates, true, nil)
}

func (p *Preemptor) getTargets(log logr.Logger, wl workload.Info, requests resources.FlavorResourceQuantities,
frsNeedPreemption sets.Set[resources.FlavorResource], snapshot *cache.Snapshot) []*Target {
cq := snapshot.ClusterQueues[wl.ClusterQueue]
Expand Down Expand Up @@ -483,6 +498,45 @@ func flavorResourcesNeedPreemption(assignment flavorassigner.Assignment) sets.Se
return resPerFlavor
}

func (p *Preemptor) findCandidatesByLepton(wl *kueue.Workload, cq *cache.ClusterQueueSnapshot, frsNeedPreemption sets.Set[resources.FlavorResource]) []*workload.Info {
var candidates []*workload.Info
wlPriority := priority.Priority(wl)

for _, candidateWl := range cq.Workloads {
if canBeCandidateByLepton(wlPriority, candidateWl, frsNeedPreemption) {
candidates = append(candidates, candidateWl)
}
}

if cq.HasParent() {
for _, cohortCQ := range cq.Parent().Root().SubtreeClusterQueues() {
if cq == cohortCQ {
// Can't reclaim quota from itself or ClusterQueues that are not borrowing.
continue
}
for _, candidateWl := range cohortCQ.Workloads {
if canBeCandidateByLepton(wlPriority, candidateWl, frsNeedPreemption) {
candidates = append(candidates, candidateWl)
}
}
}
}
return candidates
}

func canBeCandidateByLepton(selfPriority int32, candidateWl *workload.Info, frsNeedPreemption sets.Set[resources.FlavorResource]) bool {
if !leptonapis.CanBePreempted(candidateWl.Obj) {
return false
}
if priority.Priority(candidateWl.Obj) >= selfPriority {
return false
}
if !workloadUsesResources(candidateWl, frsNeedPreemption) {
return false
}
return true
}

// findCandidates obtains candidates for preemption within the ClusterQueue and
// cohort that respect the preemption policy and are using a resource that the
// preempting workload needs.
Expand Down

0 comments on commit e697ee0

Please sign in to comment.