Skip to content
This repository was archived by the owner on Apr 24, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion scheduler/config-k8s.edn
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@
:ssl false
:sslfactory "org.postgresql.ssl.NonValidatingFactory"
:password #config/env "PGPASSWORD"
:user "cook_scheduler"}
:user "cook_scheduler"
:mirror-to-postgres? true}
:scheduler {:offer-incubate-ms 15000
:task-constraints {:command-length-limit 5000
:cpus 10
Expand Down
72 changes: 72 additions & 0 deletions scheduler/datomic/data/read_resources.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@


; run this with lein


(ns data.read-resources
(:require [cook.datomic :as datomic]
[cook.postgres :as pg]
[cook.quota :as quota]
[cook.queries :as queries]
[datomic.api :as d]))

(def uri (second *command-line-args*))
(println "Datomic URI is" uri)

(defn pools
[db]
(->> (d/q '[:find [?p ...]
:in $ [?state ...]
:where
[?p :pool/state ?state]]
db [:pool.state/active :pool.state/inactive])
(map (partial d/entity db))
(map d/touch)))

(defn quotas
[db]
(->> (d/q '[:find [?q ...]
:in $
:where
[?q :quota/user]]
db)
(map (partial d/entity db))
(map d/touch)))

(try
(let [conn (datomic/create-connection {:settings {:mesos-datomic-uri uri}})]
(println "Connected to Datomic:" conn)
;(quota/set-quota! conn "default" "k8s-alpha" "For quota-related testing." :cpus 8 :mem 1024)
;(quota/set-quota! conn "default" "k8s-gamma" "For quota-related testing." :cpus 9 :mem 2048)
(println "Pools & Quotas:")
(run! (fn [{:keys [pool/name] :as p}]
(clojure.pprint/pprint p)
(clojure.pprint/pprint (quota/get-quota (d/db conn) "default" name)))
(pools (d/db conn)))
(let [db (d/db conn)]
(clojure.pprint/pprint (queries/get-all-resource-types db))
(clojure.pprint/pprint (quotas db))
(spit "datomic_quota_export.sql"
(str "truncate resource_limits;\n"
(clojure.string/join
"\n"
(map
(fn [quota]
(clojure.string/join
"\n"
(map
(fn [resource]
(str "INSERT INTO resource_limits "
"(resource_limit_type, pool_name, user_name, resource_name, amount, reason) VALUES "
"('quota','" (->> resource :resource/pool :db/id (d/entity db) :pool/name) "','" (-> quota :quota/user) "','" (name (-> resource :resource/type)) "', " (-> resource :resource/amount) ", '" (-> quota :quota/reason) "');"
))
(-> quota :quota/resource)))
)
(quotas db)))
"\n"))

)
(System/exit 0))
(catch Throwable t
(println "Failed to seed pools:" t)
(System/exit 1)))
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,34 @@
-- If you get a crazy error where 'No schema has been selected to
-- create in' when running the first CREATE TABLE. It can be caused by there being a capital
-- letter in cook_schema. Schema names are lowercased when created, but case-sensitive when
-- in the search path.
-- in the search path.

--- Setup the tables.
--changeset scrosby:2021-12-22-resources
create table resources (
resource_name varchar(30) PRIMARY KEY,
resource_description text
);

--changeset scrosby:2021-12-22-pools
create table pools (
pool_name varchar(60) PRIMARY KEY,
pool_active bool NOT NULL,
pool_description text NOT NULL
);

--changeset scrosby:2021-12-22-resource_limits
create table resource_limits (
resource_limit_type varchar(8) NOT NULL CHECK (resource_limit_type IN ('quota', 'share')),
pool_name varchar(60) NOT NULL, -- references pool(pool_name) NOT NULL,
user_name varchar(60) NOT NULL, -- 'default' is default user.
resource_name varchar(30) NOT NULL,
amount float NOT NULL,
reason text NOT NULL,
-- TODO: deletion_timestamp, -- NOT NULL means deleted.
PRIMARY KEY (resource_limit_type, pool_name, user_name, resource_name)
);

-- Just show the tables at the end.
--ignoreLines:1
\dt :cook_schema.*
2 changes: 2 additions & 0 deletions scheduler/liquibase/liquibase.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

liquibase.hub.mode=off
2 changes: 1 addition & 1 deletion scheduler/src/cook/caches.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[cook.cache :as ccache]
[cook.config :as config]
[mount.core :as mount])
(:import (com.google.common.cache Cache CacheBuilder)
(:import (com.google.common.cache Cache CacheLoader CacheBuilder)
(java.util.concurrent TimeUnit)))

(defn new-cache [config]
Expand Down
34 changes: 34 additions & 0 deletions scheduler/src/cook/postgres.clj
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,37 @@
(when-not @c3p0-connection-pool
(reset! c3p0-connection-pool (make-c3p0-datasource (get-pg-config))))
@c3p0-connection-pool))

(defn mirror-to-postgres?
[]
(-> config/config :settings :pg-config :mirror-to-postgres?))

(defn mirror-write
"Utility method to mirror a write to datomic with a write to postgres. Takes a description of what is being written and
a function to invoke that writes to postgres. The write to postgres only happens if mirror-to-postgres configuration is enabled"
[description postgres-write-fn log?]
(when (mirror-to-postgres?)
(postgres-write-fn)
(when log?
(log/info "Mirrored write of" description "to postgres"))))

(defn- datomic-and-pg-values-equal?
[datomic-value postgres-value]
(if (map? datomic-value)
(clojure.set/subset? (set datomic-value) (set postgres-value))
(= datomic-value postgres-value)))

(defn mirror-read
"Utility method to mirror a read from datomic with a read from postgres. Takes a description of what is being read and
functions to invoke that read from datomic and postgres. The read from postgres only happens if mirror-to-postgres configuration is enabled.
The postgres and datomic values are compared"
[description datomic-read-fn postgres-read-fn log?]
(if (mirror-to-postgres?)
(let [datomic-value (datomic-read-fn)
postgres-value (postgres-read-fn)]
(when log?
(log/info "Mirrored read of" description "from postgres"))
(when (not (datomic-and-pg-values-equal? datomic-value postgres-value))
(log/warn "Datomic and Postgres don't match on read of" description {:datomic-value datomic-value :postgres-value postgres-value}))
datomic-value)
(datomic-read-fn)))
106 changes: 65 additions & 41 deletions scheduler/src/cook/quota.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
[cook.config :as config]
[cook.datomic]
[cook.pool :as pool]
[cook.postgres :as pg]
[cook.quota-pg :as quota-pg]
[cook.rate-limit]
[cook.rate-limit.generic :as rtg]
[cook.queries :as queries]
Expand Down Expand Up @@ -86,29 +88,34 @@
`default-user`. If there is NO `default-user` value for a specific type,
return Double.MAX_VALUE."
[db user pool-name]
(let [mesos-resource-quota (pc/map-from-keys (fn [type] (get-quota-by-type-or-default db type user pool-name))
(queries/get-all-resource-types db))
;; As part of the pool migration, there might be a mix of quotas that have the count as an attribute or a resource.
;; We prefer to read the resource if available and fall back to the attribute.
count-quota (or (get-quota-by-type db :resource.type/count user pool-name) ; prefer resource
(:quota/count (d/entity db [:quota/user user])) ; then the field on the user
(get-quota-by-type db :resource.type/count default-user pool-name) ; then the resource from the default user
(:quota/count (d/entity db [:quota/user default-user])) ; then the field on the default user
Integer/MAX_VALUE)
launch-rate-saved (get-quota-extra db
:resource.type/launch-rate-saved
user
pool-name
default-launch-rate-saved)
launch-rate-per-minute (get-quota-extra
db
:resource.type/launch-rate-per-minute
user
pool-name
default-launch-rate-per-minute)]
(assoc mesos-resource-quota :count (int count-quota)
:launch-rate-saved launch-rate-saved
:launch-rate-per-minute launch-rate-per-minute)))
(pg/mirror-read
(str "get-quota " user " " pool-name)
(fn []
(let [mesos-resource-quota (pc/map-from-keys (fn [type] (get-quota-by-type-or-default db type user pool-name))
(queries/get-all-resource-types db))
;; As part of the pool migration, there might be a mix of quotas that have the count as an attribute or a resource.
;; We prefer to read the resource if available and fall back to the attribute.
count-quota (or (get-quota-by-type db :resource.type/count user pool-name) ; prefer resource
(:quota/count (d/entity db [:quota/user user])) ; then the field on the user
(get-quota-by-type db :resource.type/count default-user pool-name) ; then the resource from the default user
(:quota/count (d/entity db [:quota/user default-user])) ; then the field on the default user
Integer/MAX_VALUE)
launch-rate-saved (get-quota-extra db
:resource.type/launch-rate-saved
user
pool-name
default-launch-rate-saved)
launch-rate-per-minute (get-quota-extra
db
:resource.type/launch-rate-per-minute
user
pool-name
default-launch-rate-per-minute)]
(assoc mesos-resource-quota :count (int count-quota)
:launch-rate-saved launch-rate-saved
:launch-rate-per-minute launch-rate-per-minute)))
(partial quota-pg/get-quota db user pool-name)
true))

(defn pool+user->token-key
"Given a pool name and a user, create a key suitable for the per-user-per-pool ratelimit code"
Expand Down Expand Up @@ -155,6 +162,10 @@

(defn retract-quota!
[conn user pool-name reason]
(pg/mirror-write
(str "retract-quota! " user " " pool-name)
(partial quota-pg/retract-quota! conn user pool-name)
true)
(let [db (d/db conn)
pool-name' (pool/pool-name-or-default pool-name)
requesting-default-pool? (pool/requesting-default-pool? pool-name)
Expand Down Expand Up @@ -186,6 +197,10 @@
(set-quota! conn \"u1\" \"pool-a\" \"updating quota\" :cpus 20.0)
etc."
[conn user pool-name reason & kvs]
(pg/mirror-write
(str "set-quota! " user " " pool-name " " kvs)
(partial apply quota-pg/set-quota! conn user pool-name reason kvs)
true)
(let [pool-name' (pool/pool-name-or-default pool-name)
requesting-default-pool? (pool/requesting-default-pool? pool-name)
db (d/db conn)]
Expand Down Expand Up @@ -275,24 +290,33 @@
and returns the `default-user` value if a user is not returned.
This is usefully if the application will go over ALL users during processing"
[db pool-name]
(let [default-type->quota (get-quota db default-user pool-name)
default-count-quota (get default-type->quota :count)
all-resource-types (queries/get-all-resource-types db)
type->user->quota (pc/map-from-keys #(retrieve-user->resource-quota-amount db pool-name %) all-resource-types)
all-quota-users (d/q '[:find [?user ...] :where [?q :quota/user ?user]] db) ;; returns a sequence without duplicates
user->count-quota (retrieve-user->count-quota db pool-name all-quota-users default-count-quota)
user->quota-cache (-> (pc/map-from-keys
(fn [user]
(-> (pc/map-from-keys
#(get-in type->user->quota [% user] (get default-type->quota %))
all-resource-types)
(assoc :count (user->count-quota user))))
all-quota-users)
(assoc default-user default-type->quota))]
(fn user->quota
[user]
(or (get user->quota-cache user)
(get user->quota-cache default-user)))))
(let [datomic-fn (let [default-type->quota (get-quota db default-user pool-name)
default-count-quota (get default-type->quota :count)
all-resource-types (queries/get-all-resource-types db)
type->user->quota (pc/map-from-keys #(retrieve-user->resource-quota-amount db pool-name %) all-resource-types)
all-quota-users (d/q '[:find [?user ...] :where [?q :quota/user ?user]] db) ;; returns a sequence without duplicates
user->count-quota (retrieve-user->count-quota db pool-name all-quota-users default-count-quota)
user->quota-cache (-> (pc/map-from-keys
(fn [user]
(-> (pc/map-from-keys
#(get-in type->user->quota [% user] (get default-type->quota %))
all-resource-types)
(assoc :count (user->count-quota user))))
all-quota-users)
(assoc default-user default-type->quota))]
(fn user->quota
[user]
(or (get user->quota-cache user)
(get user->quota-cache default-user))))]
(if (pg/mirror-to-postgres?)
(let [pg-fn (quota-pg/create-user->quota-fn db pool-name)]
(fn [user]
(pg/mirror-read
(str "create-user->quota-fn " pool-name " " user)
(partial datomic-fn user)
(partial pg-fn user)
true)))
datomic-fn)))

(defn create-pool->user->quota-fn
"Creates a function that takes a pool name, and returns an equivalent of user->quota-fn for each pool"
Expand Down
Loading