File tree Expand file tree Collapse file tree 3 files changed +30
-26
lines changed Expand file tree Collapse file tree 3 files changed +30
-26
lines changed Original file line number Diff line number Diff line change 11(ns postgres.async
2- (:require [postgres.async.impl :refer [consumer-fn defasync ] :as pg])
2+ (:require [postgres.async.impl :refer [consumer-fn defasync ] :as pg]
3+ [clojure.core.async :refer [<!]])
34 (:import [com.github.pgasync Db ConnectionPoolBuilder
45 QueryExecutor TransactionExecutor Transaction]
56 [com.github.pgasync.impl.conversion DataConverter]))
106107(defasync <rollback! [tx])
107108
108109(defmacro dosql
109- " Takes values from channels returned by db functions and returns [nil exception]
110- on first error. Returns [ result-of-body nil] on success. "
110+ " Takes values from channels returned by db functions and returns exception
111+ on first error. Returns the result of evaluating the given forms on success"
111112 [bindings & forms]
112- (let [err (gensym " e" )]
113- `(let [~@(pg/async-sql-bindings bindings err)]
114- (if ~err
115- [nil ~err]
116- [(do ~@forms) nil ]))))
113+ (if-let [[l r & bindings] (not-empty bindings)]
114+ `(let [~l (<! ~r)]
115+ (if (instance? Throwable ~l)
116+ ~l
117+ (dosql ~bindings ~@forms)))
118+ `(do ~@forms)))
Original file line number Diff line number Diff line change 11(ns postgres.async.impl
22 (:require [clojure.string :as string]
3- [clojure.core.async :refer [chan put! go <! ]])
3+ [clojure.core.async :refer [chan put! go]])
44 (:import [java.util.function Consumer]
55 [com.github.pgasync ResultSet]
66 [com.github.pgasync.impl PgRow]))
77
88(defmacro defasync [name args]
99 `(defn ~name [~@args]
1010 (let [c# (chan 1 )]
11- (~(symbol (subs (str name) 1 )) ~@args #(put! c# [% 1 %2 ] ))
11+ (~(symbol (subs (str name) 1 )) ~@args #(put! c# ( or %2 % 1 ) ))
1212 c#)))
1313
1414(defmacro consumer-fn [[param] body]
6060 " WHERE " (first where)
6161 (when returning
6262 (str " RETURNING " returning))))
63-
64- (defn async-sql-bindings
65- " Converts bindings x (f) to [x err] (if [err] [nil err] (<! (f)))"
66- [bindings err]
67- (let [vars (map (fn [v]
68- [v err])
69- (take-nth 2 bindings))
70- fs (map (fn [f]
71- `(if ~err [nil ~err] (<! ~f)))
72- (take-nth 2 (rest bindings)))]
73- (list* [err err] [nil nil ] (interleave vars fs))))
Original file line number Diff line number Diff line change 77(def table " clj_pg_test" )
88
99(defn- wait [channel]
10- (let [[r err] (<!! channel)]
11- (if err
12- (throw err )
13- r) ))
10+ (let [r (<!! channel)]
11+ (if ( instance? Throwable r)
12+ (throw r) )
13+ r ))
1414
1515(defn- create-tables [db]
1616 (wait (<execute! db [(str " drop table if exists " table)]))
7171 rs (<query! tx [" select 123 as x" ])
7272 rs (<query! tx [" select $1::text as t" (:x (first rs))])
7373 _ (<commit! tx)]
74- (:t (first rs)))))))))
74+ (:t (first rs))))))))
75+ (testing " dosql short-circuits on errors"
76+ (let [e (Exception. " Oops!" )
77+ executed (atom 0 )]
78+ (is (= (try
79+ (wait (go (dosql
80+ [_ (<query! *db* [" select 123 as t" ])
81+ _ (go e)
82+ _ (swap! executed inc)]
83+ " Just hanging out" )))
84+ (catch Exception caught
85+ {:caught caught}))
86+ {:caught e}))
87+ (is (= @executed 0 )))))
You can’t perform that action at this time.
0 commit comments