Skip to content
This repository was archived by the owner on Apr 24, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scheduler/src/cook/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@
(assoc :default-env [])
(not (:quotas pools))
(assoc :quotas [])))
:rank (fnk [[:config {rank {:number-to-force 1000}}]]
rank)
:api-only? (fnk [[:config {api-only? false}]]
api-only?)
:cache-working-set-size (fnk [[:config {cache-working-set-size 1000000}]]
Expand Down
5 changes: 3 additions & 2 deletions scheduler/src/cook/kubernetes/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,9 @@
(defn num-pods-on-node
"Returns the number of pods assigned to the given node"
[node-name pods]
(let [node-name->pods (group-by pod->node-name pods)]
(-> node-name->pods (get node-name []) count)))
(->> pods
(filter #(= (pod->node-name %) node-name))
count))

(defn node->resource-map
"Given a node, returns the resource map used internally by Cook"
Expand Down
12 changes: 10 additions & 2 deletions scheduler/src/cook/kubernetes/compute_cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,15 @@
list of those pods with currently starting pods in the compute cluster added in"
[compute-cluster pods]
(let [starting-pods (controller/starting-namespaced-pod-name->pod compute-cluster)]
(-> pods (merge starting-pods) vals)))
(vals (into pods starting-pods))))

(defn add-starting-pods-reverse
"Weird offshoot of add-starting-pods-reverse that is run when determining node capacity.
(We want to add the smaller set onto the bigger set. In this context, the bigger set is
the set of starting pods.)"
[compute-cluster pods]
(let [starting-pods (controller/starting-namespaced-pod-name->pod compute-cluster)]
(vals (into starting-pods pods))))

(defn synthetic-pod->job-uuid
"If the given pod is a synthetic pod for autoscaling, returns the job uuid
Expand Down Expand Up @@ -630,7 +638,7 @@

(num-tasks-on-host [this hostname]
(->> (get @node-name->pod-name->pod hostname {})
(add-starting-pods this)
(add-starting-pods-reverse this)
(api/num-pods-on-node hostname)))

(retrieve-sandbox-url-path
Expand Down
42 changes: 26 additions & 16 deletions scheduler/src/cook/scheduler/scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,10 @@

(defn handle-resource-offers!
"Gets a list of offers from mesos. Decides what to do with them all--they should all
be accepted or rejected at the end of the function."
be accepted or rejected at the end of the function.

Returns true if we matched the head or didn't match anything (so that we will not reduce
max-considerable next iteration)"
[conn fenzo-state pool-name->pending-jobs-atom mesos-run-as-user
user->usage user->quota num-considerable offers rebalancer-reservation-atom pool-name compute-clusters
job->acceptable-compute-clusters-fn]
Expand All @@ -1186,7 +1189,7 @@
(timers/time!
(timers/timer (metric-title "handle-resource-offer!-duration" pool-name))
(try
(let [db (db conn)
(let [db (d/db conn)
pending-jobs (get @pool-name->pending-jobs-atom pool-name)
considerable-jobs (prom/with-duration
prom/scheduler-handle-resource-offers-pending-to-considerable-duration {:pool pool-name}
Expand Down Expand Up @@ -1325,23 +1328,26 @@
(reset! front-of-job-queue-mem-atom (or (:mem first-considerable-job-resources) 0))
(reset! front-of-job-queue-cpus-atom (or (:cpus first-considerable-job-resources) 0))

(let [matched-head-or-no-matches?
(let [no-matches? (empty? matches)
matched-head-or-no-matches? (or no-matches? matched-considerable-jobs-head?)
;; Possible innocuous reasons for no matches: no offers, or no pending jobs.
;; Even beyond that, if Fenzo fails to match ANYTHING, "penalizing" it in the form of giving
;; it fewer jobs to look at is unlikely to improve the situation.
;; "Penalization" should only be employed when Fenzo does successfully match,
;; but the matches don't align with Cook's priorities.
(if (empty? matches)
true
(do
_ (when-not no-matches?
(swap! pool-name->pending-jobs-atom
remove-matched-jobs-from-pending-jobs
matched-job-uuids pool-name)
; Force this list to happen async.
(let [number-to-force (get-in config/config [:settings :rank :number-to-force])]
(->> (get @pool-name->pending-jobs-atom pool-name)
(take number-to-force)
doall))
(log-structured/debug (print-str "Updated pool-name->pending-jobs-atom:" @pool-name->pending-jobs-atom)
{:pool pool-name})
(launch-matched-tasks! matches conn db (:fenzo fenzo-state) mesos-run-as-user pool-name)
(update-host-reservations! rebalancer-reservation-atom matched-job-uuids)
matched-considerable-jobs-head?))
(update-host-reservations! rebalancer-reservation-atom matched-job-uuids))
; Absolute maximum jobs we will consider autoscaling to.
{:keys [max-jobs-for-autoscaling autoscaling-scale-factor]} (config/kubernetes)
; The fraction of jobs we tried to match that didn't actually get matched.
Expand Down Expand Up @@ -1967,31 +1973,35 @@
"Return a map of lists of job entities ordered by dru, keyed by pool.

It ranks the jobs by dru first and then apply several filters if provided."
[unfiltered-db offensive-job-filter]
[unfiltered-db offensive-job-filter number-to-force]
(prom/with-duration
prom/scheduler-rank-cycle-duration {}
(timers/time!
rank-jobs-duration
(try
(->> (sort-jobs-by-dru-pool unfiltered-db)
;; Apply the offensive job filter first before taking.
(pc/map-vals offensive-job-filter)
(pc/map-vals #(map tools/job-ent->map %))
(pc/map-vals #(remove nil? %)))
(let [jobs (->> (sort-jobs-by-dru-pool unfiltered-db)
;; Apply the offensive job filter first before taking.
(pc/map-vals offensive-job-filter)
(pc/map-vals #(map tools/job-ent->map %))
(pc/map-vals #(remove nil? %)))]
; Force the first N jobs.
(doall (take number-to-force jobs))
jobs)
(catch Throwable t
(log/error t "Failed to rank jobs")
(meters/mark! rank-jobs-failures)
{})))))

(defn- start-jobs-prioritizer!
[conn pool-name->pending-jobs-atom task-constraints trigger-chan]
(let [offensive-jobs-ch (make-offensive-job-stifler conn)
(let [number-to-force (get-in config/config [:settings :rank :number-to-force])
offensive-jobs-ch (make-offensive-job-stifler conn)
offensive-job-filter (partial filter-offensive-jobs task-constraints offensive-jobs-ch)]
(tools/chime-at-ch trigger-chan
(fn rank-jobs-event []
(log/info "Starting pending job ranking")
(reset! pool-name->pending-jobs-atom
(rank-jobs (d/db conn) offensive-job-filter))
(rank-jobs (d/db conn) offensive-job-filter number-to-force))
(log/info "Done with pending job ranking")))))

(meters/defmeter [cook-mesos scheduler mesos-error])
Expand Down
4 changes: 2 additions & 2 deletions scheduler/test/cook/test/benchmark.clj
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
offensive-jobs-ch (sched/make-offensive-job-stifler conn)
offensive-job-filter (partial sched/filter-offensive-jobs task-constraints offensive-jobs-ch)]
(println "============ rank-jobs timing ============")
(cc/quick-bench (sched/rank-jobs db offensive-job-filter))))
(cc/quick-bench (sched/rank-jobs db offensive-job-filter 0))))
(testing "rank-jobs minus offensive-job-filter"
(let [db (d/db conn)
offensive-job-filter identity]
(println "============ rank-jobs minus offensive-job-filter timing ============")
(cc/quick-bench (sched/rank-jobs db offensive-job-filter))))
(cc/quick-bench (sched/rank-jobs db offensive-job-filter 0))))
(testing "sort-jobs-by-dru-helper"
(let [db (d/db conn)
pending-task-ents (->> (queries/get-pending-job-ents db)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/test/cook/test/scheduler/scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@
offensive-job-filter (partial sched/filter-offensive-jobs constraints offensive-jobs-ch)]
(testing "enough offers for all normal jobs."
(is (= {"no-pool" (list (tools/job-ent->map job-entity))}
(sched/rank-jobs test-db offensive-job-filter))))))
(sched/rank-jobs test-db offensive-job-filter 5))))))

(deftest test-mesos-virtual-machine-lease-adapter
;; ensure that the VirtualMachineLeaseAdapter can successfully handle an offer from Mesomatic.
Expand Down