diff --git a/scheduler/src/cook/config.clj b/scheduler/src/cook/config.clj index eef8e1c0d0..637fa05215 100644 --- a/scheduler/src/cook/config.clj +++ b/scheduler/src/cook/config.clj @@ -472,6 +472,8 @@ (assoc :default-env []) (not (:quotas pools)) (assoc :quotas []))) + :rank (fnk [[:config {rank {:number-to-force 1000}}]] + rank) :api-only? (fnk [[:config {api-only? false}]] api-only?) :cache-working-set-size (fnk [[:config {cache-working-set-size 1000000}]] diff --git a/scheduler/src/cook/kubernetes/api.clj b/scheduler/src/cook/kubernetes/api.clj index 6fc920ef45..6da525b75f 100644 --- a/scheduler/src/cook/kubernetes/api.clj +++ b/scheduler/src/cook/kubernetes/api.clj @@ -766,8 +766,9 @@ (defn num-pods-on-node "Returns the number of pods assigned to the given node" [node-name pods] - (let [node-name->pods (group-by pod->node-name pods)] - (-> node-name->pods (get node-name []) count))) + (->> pods + (filter #(= (pod->node-name %) node-name)) + count)) (defn node->resource-map "Given a node, returns the resource map used internally by Cook" diff --git a/scheduler/src/cook/kubernetes/compute_cluster.clj b/scheduler/src/cook/kubernetes/compute_cluster.clj index f1584b4e3e..a7d0f06eb1 100644 --- a/scheduler/src/cook/kubernetes/compute_cluster.clj +++ b/scheduler/src/cook/kubernetes/compute_cluster.clj @@ -291,7 +291,15 @@ list of those pods with currently starting pods in the compute cluster added in" [compute-cluster pods] (let [starting-pods (controller/starting-namespaced-pod-name->pod compute-cluster)] - (-> pods (merge starting-pods) vals))) + (vals (into pods starting-pods)))) + +(defn add-starting-pods-reverse + "Weird offshoot of add-starting-pods-reverse that is run when determining node capacity. + (We want to add the smaller set onto the bigger set. In this context, the bigger set is + the set of starting pods.)" + [compute-cluster pods] + (let [starting-pods (controller/starting-namespaced-pod-name->pod compute-cluster)] + (vals (into starting-pods pods)))) (defn synthetic-pod->job-uuid "If the given pod is a synthetic pod for autoscaling, returns the job uuid @@ -630,7 +638,7 @@ (num-tasks-on-host [this hostname] (->> (get @node-name->pod-name->pod hostname {}) - (add-starting-pods this) + (add-starting-pods-reverse this) (api/num-pods-on-node hostname))) (retrieve-sandbox-url-path diff --git a/scheduler/src/cook/scheduler/scheduler.clj b/scheduler/src/cook/scheduler/scheduler.clj index 69fe54b985..cd51e58c3c 100644 --- a/scheduler/src/cook/scheduler/scheduler.clj +++ b/scheduler/src/cook/scheduler/scheduler.clj @@ -1171,7 +1171,10 @@ (defn handle-resource-offers! "Gets a list of offers from mesos. Decides what to do with them all--they should all - be accepted or rejected at the end of the function." + be accepted or rejected at the end of the function. + + Returns true if we matched the head or didn't match anything (so that we will not reduce + max-considerable next iteration)" [conn fenzo-state pool-name->pending-jobs-atom mesos-run-as-user user->usage user->quota num-considerable offers rebalancer-reservation-atom pool-name compute-clusters job->acceptable-compute-clusters-fn] @@ -1186,7 +1189,7 @@ (timers/time! (timers/timer (metric-title "handle-resource-offer!-duration" pool-name)) (try - (let [db (db conn) + (let [db (d/db conn) pending-jobs (get @pool-name->pending-jobs-atom pool-name) considerable-jobs (prom/with-duration prom/scheduler-handle-resource-offers-pending-to-considerable-duration {:pool pool-name} @@ -1325,23 +1328,26 @@ (reset! front-of-job-queue-mem-atom (or (:mem first-considerable-job-resources) 0)) (reset! front-of-job-queue-cpus-atom (or (:cpus first-considerable-job-resources) 0)) - (let [matched-head-or-no-matches? + (let [no-matches? (empty? matches) + matched-head-or-no-matches? (or no-matches? matched-considerable-jobs-head?) ;; Possible innocuous reasons for no matches: no offers, or no pending jobs. ;; Even beyond that, if Fenzo fails to match ANYTHING, "penalizing" it in the form of giving ;; it fewer jobs to look at is unlikely to improve the situation. ;; "Penalization" should only be employed when Fenzo does successfully match, ;; but the matches don't align with Cook's priorities. - (if (empty? matches) - true - (do + _ (when-not no-matches? (swap! pool-name->pending-jobs-atom remove-matched-jobs-from-pending-jobs matched-job-uuids pool-name) + ; Force this list to happen async. + (let [number-to-force (get-in config/config [:settings :rank :number-to-force])] + (->> (get @pool-name->pending-jobs-atom pool-name) + (take number-to-force) + doall)) (log-structured/debug (print-str "Updated pool-name->pending-jobs-atom:" @pool-name->pending-jobs-atom) {:pool pool-name}) (launch-matched-tasks! matches conn db (:fenzo fenzo-state) mesos-run-as-user pool-name) - (update-host-reservations! rebalancer-reservation-atom matched-job-uuids) - matched-considerable-jobs-head?)) + (update-host-reservations! rebalancer-reservation-atom matched-job-uuids)) ; Absolute maximum jobs we will consider autoscaling to. {:keys [max-jobs-for-autoscaling autoscaling-scale-factor]} (config/kubernetes) ; The fraction of jobs we tried to match that didn't actually get matched. @@ -1967,17 +1973,20 @@ "Return a map of lists of job entities ordered by dru, keyed by pool. It ranks the jobs by dru first and then apply several filters if provided." - [unfiltered-db offensive-job-filter] + [unfiltered-db offensive-job-filter number-to-force] (prom/with-duration prom/scheduler-rank-cycle-duration {} (timers/time! rank-jobs-duration (try - (->> (sort-jobs-by-dru-pool unfiltered-db) - ;; Apply the offensive job filter first before taking. - (pc/map-vals offensive-job-filter) - (pc/map-vals #(map tools/job-ent->map %)) - (pc/map-vals #(remove nil? %))) + (let [jobs (->> (sort-jobs-by-dru-pool unfiltered-db) + ;; Apply the offensive job filter first before taking. + (pc/map-vals offensive-job-filter) + (pc/map-vals #(map tools/job-ent->map %)) + (pc/map-vals #(remove nil? %)))] + ; Force the first N jobs. + (doall (take number-to-force jobs)) + jobs) (catch Throwable t (log/error t "Failed to rank jobs") (meters/mark! rank-jobs-failures) @@ -1985,13 +1994,14 @@ (defn- start-jobs-prioritizer! [conn pool-name->pending-jobs-atom task-constraints trigger-chan] - (let [offensive-jobs-ch (make-offensive-job-stifler conn) + (let [number-to-force (get-in config/config [:settings :rank :number-to-force]) + offensive-jobs-ch (make-offensive-job-stifler conn) offensive-job-filter (partial filter-offensive-jobs task-constraints offensive-jobs-ch)] (tools/chime-at-ch trigger-chan (fn rank-jobs-event [] (log/info "Starting pending job ranking") (reset! pool-name->pending-jobs-atom - (rank-jobs (d/db conn) offensive-job-filter)) + (rank-jobs (d/db conn) offensive-job-filter number-to-force)) (log/info "Done with pending job ranking"))))) (meters/defmeter [cook-mesos scheduler mesos-error]) diff --git a/scheduler/test/cook/test/benchmark.clj b/scheduler/test/cook/test/benchmark.clj index 70e6432523..8ba9ad3d6f 100644 --- a/scheduler/test/cook/test/benchmark.clj +++ b/scheduler/test/cook/test/benchmark.clj @@ -49,12 +49,12 @@ offensive-jobs-ch (sched/make-offensive-job-stifler conn) offensive-job-filter (partial sched/filter-offensive-jobs task-constraints offensive-jobs-ch)] (println "============ rank-jobs timing ============") - (cc/quick-bench (sched/rank-jobs db offensive-job-filter)))) + (cc/quick-bench (sched/rank-jobs db offensive-job-filter 0)))) (testing "rank-jobs minus offensive-job-filter" (let [db (d/db conn) offensive-job-filter identity] (println "============ rank-jobs minus offensive-job-filter timing ============") - (cc/quick-bench (sched/rank-jobs db offensive-job-filter)))) + (cc/quick-bench (sched/rank-jobs db offensive-job-filter 0)))) (testing "sort-jobs-by-dru-helper" (let [db (d/db conn) pending-task-ents (->> (queries/get-pending-job-ents db) diff --git a/scheduler/test/cook/test/scheduler/scheduler.clj b/scheduler/test/cook/test/scheduler/scheduler.clj index bdc07a31e9..4f9230357a 100644 --- a/scheduler/test/cook/test/scheduler/scheduler.clj +++ b/scheduler/test/cook/test/scheduler/scheduler.clj @@ -873,7 +873,7 @@ offensive-job-filter (partial sched/filter-offensive-jobs constraints offensive-jobs-ch)] (testing "enough offers for all normal jobs." (is (= {"no-pool" (list (tools/job-ent->map job-entity))} - (sched/rank-jobs test-db offensive-job-filter)))))) + (sched/rank-jobs test-db offensive-job-filter 5)))))) (deftest test-mesos-virtual-machine-lease-adapter ;; ensure that the VirtualMachineLeaseAdapter can successfully handle an offer from Mesomatic.