diff --git a/.github/workflows/krew-release.yml b/.github/workflows/krew-release.yml deleted file mode 100644 index c5bc611b71..0000000000 --- a/.github/workflows/krew-release.yml +++ /dev/null @@ -1,11 +0,0 @@ -on: - release: - types: [released] -jobs: - krew-release: - runs-on: ubuntu-24.04 - steps: - - name: Checkout - uses: actions/checkout@v4 - - name: Update new version in krew-index - uses: rajatjindal/krew-release-bot@v0.0.47 \ No newline at end of file diff --git a/.github/workflows/lepton-build-image.yaml b/.github/workflows/lepton-build-image.yaml new file mode 100644 index 0000000000..ac6cd59e12 --- /dev/null +++ b/.github/workflows/lepton-build-image.yaml @@ -0,0 +1,55 @@ +name: Build Docker Image + +on: + workflow_call: + inputs: + repo: + required: true + type: string + tag: + required: false + type: string + default: test$(git rev-parse --short HEAD) + commit: + required: false + type: string + default: $(git rev-parse --short HEAD) + runs-on: + required: false + type: string + default: ubuntu-latest + +jobs: + build: + runs-on: ${{ inputs.runs-on }} + steps: + - uses: actions/checkout@v3 + with: + submodules: recursive + - name: Install awscli + run: | + python -m pip install --upgrade pip + pip install awscli + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + - name: Build and push the Docker image + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + run: | + set -x + + docker build . --file Dockerfile \ + --tag ${{ inputs.repo }}:${{ inputs.tag }} \ + --build-arg GIT_COMMIT=${{ inputs.commit }} \ + --build-arg REPO=$ECR_REGISTRY/ecr-public + + echo "tagging container image with ${{ inputs.repo }}:${{ inputs.tag }}}" + docker tag ${{ inputs.repo }}:${{ inputs.tag }} $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }} + docker push $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }} \ No newline at end of file diff --git a/.github/workflows/lepton-pr.yaml b/.github/workflows/lepton-pr.yaml new file mode 100644 index 0000000000..4179636794 --- /dev/null +++ b/.github/workflows/lepton-pr.yaml @@ -0,0 +1,12 @@ +name: lepton-pr + +on: + pull_request: + branches: ["**"] + +jobs: + build-release: + uses: ./.github/workflows/lepton-build-image.yaml + secrets: inherit + with: + repo: lepton-kueue \ No newline at end of file diff --git a/.github/workflows/lepton-release.yaml b/.github/workflows/lepton-release.yaml new file mode 100644 index 0000000000..c9eab875e8 --- /dev/null +++ b/.github/workflows/lepton-release.yaml @@ -0,0 +1,15 @@ +name: lepton-release + +on: + push: + tags: + - "*" + +jobs: + build-release: + uses: ./.github/workflows/lepton-build-image.yaml + secrets: inherit + with: + repo: lepton-kueue + tag: ${GITHUB_REF##*/} + commit: ${GITHUB_REF##*/} \ No newline at end of file diff --git a/pkg/lepton/apis/lepton_apis.go b/pkg/lepton/apis/lepton_apis.go new file mode 100644 index 0000000000..f9ec7cdc92 --- /dev/null +++ b/pkg/lepton/apis/lepton_apis.go @@ -0,0 +1,18 @@ +package apis + +import ( + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + +const ( + labelCanPreempt = "kueue.lepton.ai/can-preempt" + labelCanBePreempted = "kueue.lepton.ai/can-be-preempted" +) + +func CanPreempt(wl *kueue.Workload) bool { + return wl.Labels[labelCanPreempt] == "true" +} + +func CanBePreempted(wl *kueue.Workload) bool { + return wl.Labels[labelCanBePreempted] == "true" +} diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index f44b74cb6b..5dbf3102e8 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -38,6 +38,7 @@ import ( config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" + leptonapis "sigs.k8s.io/kueue/pkg/lepton/apis" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/resources" "sigs.k8s.io/kueue/pkg/scheduler/flavorassigner" @@ -116,9 +117,23 @@ type Target struct { func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*Target { frsNeedPreemption := flavorResourcesNeedPreemption(assignment) requests := assignment.TotalRequestsFor(&wl) + if leptonapis.CanPreempt(wl.Obj) { + return p.getTargetsByLepton(log, wl, requests, frsNeedPreemption, snapshot) + } return p.getTargets(log, wl, requests, frsNeedPreemption, snapshot) } +func (p *Preemptor) getTargetsByLepton(log logr.Logger, wl workload.Info, requests resources.FlavorResourceQuantities, + frsNeedPreemption sets.Set[resources.FlavorResource], snapshot *cache.Snapshot) []*Target { + cq := snapshot.ClusterQueues[wl.ClusterQueue] + candidates := p.findCandidatesByLepton(wl.Obj, cq, frsNeedPreemption) + if len(candidates) == 0 { + return nil + } + sort.Slice(candidates, candidatesOrdering(candidates, cq.Name, p.clock.Now())) + return minimalPreemptions(log, requests, cq, snapshot, frsNeedPreemption, candidates, true, nil) +} + func (p *Preemptor) getTargets(log logr.Logger, wl workload.Info, requests resources.FlavorResourceQuantities, frsNeedPreemption sets.Set[resources.FlavorResource], snapshot *cache.Snapshot) []*Target { cq := snapshot.ClusterQueues[wl.ClusterQueue] @@ -483,6 +498,45 @@ func flavorResourcesNeedPreemption(assignment flavorassigner.Assignment) sets.Se return resPerFlavor } +func (p *Preemptor) findCandidatesByLepton(wl *kueue.Workload, cq *cache.ClusterQueueSnapshot, frsNeedPreemption sets.Set[resources.FlavorResource]) []*workload.Info { + var candidates []*workload.Info + wlPriority := priority.Priority(wl) + + for _, candidateWl := range cq.Workloads { + if canBeCandidateByLepton(wlPriority, candidateWl, frsNeedPreemption) { + candidates = append(candidates, candidateWl) + } + } + + if cq.HasParent() { + for _, cohortCQ := range cq.Parent().Root().SubtreeClusterQueues() { + if cq == cohortCQ { + // Can't reclaim quota from itself or ClusterQueues that are not borrowing. + continue + } + for _, candidateWl := range cohortCQ.Workloads { + if canBeCandidateByLepton(wlPriority, candidateWl, frsNeedPreemption) { + candidates = append(candidates, candidateWl) + } + } + } + } + return candidates +} + +func canBeCandidateByLepton(selfPriority int32, candidateWl *workload.Info, frsNeedPreemption sets.Set[resources.FlavorResource]) bool { + if !leptonapis.CanBePreempted(candidateWl.Obj) { + return false + } + if priority.Priority(candidateWl.Obj) >= selfPriority { + return false + } + if !workloadUsesResources(candidateWl, frsNeedPreemption) { + return false + } + return true +} + // findCandidates obtains candidates for preemption within the ClusterQueue and // cohort that respect the preemption policy and are using a resource that the // preempting workload needs.