Asynchronous PostgreSQL client for Clojure.
Add the following to your Leiningen project.clj:
A pool of connections to PostgreSQL backend is created with open-db. Each connection pool starts a single I/O thread used in communicating with PostgreSQL backend.
(require '[postgres.async :refer :all])
(def db (open-db {:hostname "db.example.com"
:port 5432 ; default
:database "exampledb"
:username "user"
:password "pass"
:pool-size 25})) ; defaultThe connection pool is closed with close-db!. This closes all open connections and stops the pool I/O thread.
(close-db! db)Queries are executed with functions query! insert! update! execute!. All functions have two arities that use either callbacks or acore.async channels.
Channel-based functions return a channel where either query result or exception is put.
All other query functions delegate to execute!. This takes a db and a seq of sql followed by optional parameters.
;; async channel
(<!! (execute! db ["select name, price from products where id = $1" 1001]))
; {:updated 0, :rows [{:id 1001, :name "hammer", :price 10}]}
(<!! (execute! db ["select * from foobar"]))
; #<SqlException com.github.pgasync.SqlException: ERROR: SQLSTATE=42P01, MESSAGE=relation "foobar" does not exist>
;; callback-based higher arity
(execute! db ["select $1::text" "hello world"] (fn [rs err]
(println rs err))
; nilquery! passes only :rows to callback.
(<!! (query! db ["select name, price from products"]))
; [{:id 1000, :name "screwdriver", :price 15} {:id 1001, :name "hammer", :price 10}]
(<!! (query! db ["select name, price from products where id = $1" 1001]))
; [{:id 1001, :name "hammer", :price 10}]Insert is executed with an sql-spec that supports keys :table and :returning.
(<!! (insert! db {:table "products"} {:name "screwdriver" :price 15}))
; {:updated 1, :rows []}
(<!! (insert! db {:table "products" :returning "id"} {:name "hammer" :price 5}))
; {:updated 1, :rows [{:id 1001}]}Multiple rows can be inserted by passing a sequence to insert!.
(<!! (insert! db {:table "products" :returning "id"}
[{:name "hammer" :price 5}
{:name "nail" :price 1}]))
; {:updated 2, :rows [{:id 1001} {:id 1002}]}Update is executed with an sql-spec that supports keys :table :returning and :where.
(<!! (update! db {:table "users" :where ["id = $1" 1001}} {:price 6}))
; {:updated 1, :rows []}Starting a transaction with begin! borrows a connection from the connection pool until commit!, rollback! or query failure. Transactional operations must be issued to the transaction instead of db.
See composition below for example.
Channel-returning functions can be composed with dosql macro that returns result of last form or first exception.
(<!! (go
(dosql [tx (begin! db)
rs (insert! tx {:table products :returning "id"} {:name "saw"})
_ (insert! tx {:table promotions} {:product_id (get-in rs [:rows 0 :id])})
rs (query! tx ["select * from promotions"])
_ (commit! tx)]
{:now-promoting rs})))
; {:now-promoting [{:id 1, product_id 1002}]}Using JSON types requires [cheshire "5.5.0"] and reading the ns postgres.async.json.
(require '[postgres.async.json])
(<!! (query! db ["select $1::JSONB" {:hello "world"}]))
; [{:jsonb {:hello "world"}}]Support for custom types can be added by extending IPgParameter protocol and from-pg-value multimethod.
(extend-protocol IPgParameter
com.example.MyHStore
(to-pg-value [store]
(.getBytes (str store) "UTF-8")))
(defmethod from-pg-value com.github.pgasync.impl.Oid/HSTORE [oid ^bytes value]
(my-h-store/parse-string (String. value "UTF-8")))from-pg-value can also be used for overriding "core" types. This is especially useful with temporal data types that are by default converted to java.sql.Date, java.sql.Time, java.sql.Timestamp.
(defmethod from-pg-value com.github.pgasync.impl.Oid/DATE [oid ^bytes value]
(java.time.LocalDate/parse (String. value "UTF-8")))- postgres-async-driver
- core.async
- Java 8