Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style: clj kondo fixes #21

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .clj-kondo/async.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
(ns async)

(defmacro go-loop-try- [bindings & body]
`(try (loop ~bindings ~@body)
(finally)))

(defmacro go-try [S & body]
`(do (superv.async/check-supervisor ~S)
(try ~@body (finally))))

(defmacro go-try- [& body]
`(try ~@body (finally)))

(defmacro go-loop-try [S bindings & body]
`(do (superv.async/check-supervisor ~S)
(try (loop ~bindings ~@body)
(finally))))

(defmacro go-for [S & body]
`(do (superv.async/check-supervisor ~S)
(for ~@body)))

(defmacro go-loop-super [S bindings & body]
`(do (superv.async/check-supervisor ~S)
(loop ~bindings ~@body)))
8 changes: 8 additions & 0 deletions .clj-kondo/config.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{:hooks {:macroexpand {superv.async/go-try- async/go-try-
superv.async/go-try async/go-try
superv.async/go-for async/go-for
superv.async/go-loop-try- async/go-loop-try-
superv.async/go-loop-try async/go-loop-try
superv.async/go-loop-super async/go-loop-super}}

:linters {:clojure-lsp/unused-public-var {:exclude [build async]}}}
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ figwheel_server.log
out
node_modules.clj-kondo/
.lsp/
.clj-kondo/
.cache
node_modules/
.vscode/
6 changes: 5 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@
:ffix {:extra-deps {cljfmt/cljfmt {:mvn/version "0.9.2"}}
:main-opts ["-m" "cljfmt.main" "fix"]}
:format {:extra-deps {cljfmt/cljfmt {:mvn/version "0.9.2"}}
:main-opts ["-m" "cljfmt.main" "check"]}}}
:main-opts ["-m" "cljfmt.main" "check"]}
:clj-kondo {:replace-deps {clj-kondo/clj-kondo {:mvn/version "RELEASE"}}
:main-opts ["-m" "clj-kondo.main"]}
:outdated {:extra-deps {com.github.liquidz/antq {:mvn/version "2.2.983"}}
:main-opts ["-m" "antq.core"]}}}
75 changes: 36 additions & 39 deletions src/superv/async.cljc
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
(ns superv.async
#?(:clj (:gen-class :main true))
(:require [clojure.core.async :as async :refer [<! >! alt! alts! go go-loop promise-chan chan timeout put! close! take!
#?@(:clj [<!! >!! alt!! alts!! thread])]]
#?(:cljs (cljs.core.async.impl.protocols :refer [ReadPort])))
(:require [clojure.core.async :as async :refer [<! alts! go go-loop promise-chan chan timeout put! close! take!
#?@(:clj [<!! alts!! thread])]]
#?(:cljs (cljs.core.async.impl.protocols)))
#?(:cljs (:require-macros [superv.async :refer [wrap-abort! >? <? <?- go-try go-loop-try go-try- go-loop-try-
on-abort go-super go-loop-super go-for alts?]]))
#?(:clj (:import (clojure.core.async.impl.protocols ReadPort))))
#?(:clj (:import [clojure.core.async.impl.protocols ReadPort]
[java.util Date UUID])))

;; The protocols and the binding are needed for the channel ops to be
;; transparent for supervision, most importantly exception tracking
Expand All @@ -18,29 +19,27 @@
(-track-exception [this e])
(-free-exception [this e]))

#?(:clj
(defn ^java.util.Date now []
(java.util.Date.))
:cljs (defn now []
(js/Date.)))
(defn now
#?(:clj (^Date [] (Date.))
:cljs ([] (js/Date.))))

(defrecord TrackingSupervisor [error aborts registered pending-exceptions]
PSupervisor
(-error [this] error)
(-error [_this] error)
;; HACK: avoid too many pending takes on a single abort-ch
;; while this is somewhat hacky, it works without patching core.async
;; and is still bounded. The amount of total ops is also still bounded by a
;; million.
(-abort [this] (rand-nth aborts))
(-register-go [this body]
(let [id #?(:clj (java.util.UUID/randomUUID) :cljs (random-uuid))]
(-abort [_this] (rand-nth aborts))
(-register-go [_this body]
(let [id #?(:clj (UUID/randomUUID) :cljs (random-uuid))]
(swap! registered assoc id body)
id))
(-unregister-go [this id]
(-unregister-go [_this id]
(swap! registered dissoc id))
(-track-exception [this e]
(-track-exception [_this e]
(swap! pending-exceptions assoc e (now)))
(-free-exception [this e]
(-free-exception [_this e]
(swap! pending-exceptions dissoc e)))

(def ^:const NUM_ABORT_CHANS 1000)
Expand All @@ -50,7 +49,7 @@
close its abort channel manually if you want the context to stop. It is
supposed to be used at a boundary to an unsupervised system. If you want
strong supervision, use the restarting-supervisor instead."
[& {:keys [stale-timeout error-fn pending-fn]
[& {:keys [stale-timeout error-fn]
:or {stale-timeout (* 10 1000)
error-fn (fn [e] (println "Supervisor:" e
#?(:cljs (.-stack e))))}}]
Expand All @@ -65,7 +64,7 @@
(error-fn e)
(take! err-ch loop-fn)))
((fn pending [_]
(let [[[e _]] (filter (fn [[k v]]
(let [[[e _]] (filter (fn [[_k v]]
(> (- (.getTime (now)) stale-timeout)
#?(:clj (.getTime ^java.util.Date v)
:cljs (.getTime v))))
Expand Down Expand Up @@ -151,7 +150,6 @@
(satisfies? #?(:clj clojure.core.async.impl.protocols/ReadPort
:cljs cljs.core.async.impl.protocols/ReadPort)
x))

(defn- finally-exp? [exp]
(not (and (seq? exp) (= (first exp) 'finally))))

Expand Down Expand Up @@ -180,12 +178,11 @@
e#)
(finally ~@finally))))))

#?(:clj
(defmacro go-loop-try-
"Loop binding for go-try-."
{:style/indent 2}
[bindings & body]
`(go-try- ~S (loop ~bindings ~@body))))
(defmacro go-loop-try-
"Loop binding for go-try-."
{:style/indent 2}
[bindings & body]
`(go-try- (loop ~bindings ~@body)))

(defmacro go-try
"Asynchronously executes the body in a go block. You can provide catch and
Expand Down Expand Up @@ -329,7 +326,7 @@ deal with abortion."
(defmacro >?
"Same as core.async >! but throws an exception if the context has been aborted."
[S ch m]
`(throw-if-exception ~S (wrap-abort! ~S (>! ~ch ~m)))))
`(throw-if-exception ~S (wrap-abort! ~S (clojure.core.async/>! ~ch ~m)))))

(defn put?
"Same as core.async/put!, but tracks exceptions in supervisor. TODO
Expand Down Expand Up @@ -364,7 +361,7 @@ deal with abortion."
"Same as core.async alt! but throws an exception if the channel returns a
throwable object or the context has been aborted."
[S & clauses]
`(throw-if-exception ~S (wrap-abort! ~S (alt! ~@clauses)))))
`(throw-if-exception ~S (wrap-abort! ~S (clojure.core.async/alt! ~@clauses)))))

#?(:clj
(defmacro <<!
Expand All @@ -379,10 +376,10 @@ The input channel must be closed."
"Takes multiple results from a channel and returns them as a vector.
Throws if any result is an exception or the context has been aborted."
[S ch]
`(alt! (-abort ~S)
([v#] (throw (ex-info "Aborted operations" {:type :aborted})))
(go (<<! ~ch))
([v#] (doall (mapv (fn [e#] (throw-if-exception ~S e#)) v#))))))
`(clojure.core.async/alt! (-abort ~S)
([v#] (throw (ex-info "Aborted operations" {:type :aborted})))
(go (<<! ~ch))
([v#] (doall (mapv (fn [e#] (throw-if-exception ~S e#)) v#))))))

;; TODO lazy-seq vs. full vector in <<! ?
#?(:clj
Expand Down Expand Up @@ -663,7 +660,7 @@ Throws if any result is an exception or the context has been aborted."
~@body
(catch ~e e#
(let [err-ch# (-error ~S)]
(>! err-ch# e#)))
(clojure.core.async/>! err-ch# e#)))
(finally
(-unregister-go ~S id#)
~@finally)))))))
Expand Down Expand Up @@ -739,11 +736,11 @@ Throws if any result is an exception or the context has been aborted."
(conj groups [k v])))
[] (partition 2 seq-exprs)))
err (fn [& msg] (throw (IllegalArgumentException. ^String (apply str msg))))
emit-bind (fn emit-bind [res-ch [[bind expr & mod-pairs]
emit-bind (fn emit-bind [res-ch [[bind _expr & mod-pairs]
& [[_ next-expr] :as next-groups]]]
(let [giter (gensym "iter__")
gxs (gensym "s__")
do-mod (fn do-mod [[[k v :as pair] & etc]]
do-mod (fn do-mod [[[k v] & etc]]
(cond
(= k :let) `(let ~v ~(do-mod etc))
(= k :while) `(when ~v ~(do-mod etc))
Expand Down Expand Up @@ -773,7 +770,7 @@ Throws if any result is an exception or the context has been aborted."
(go (try (<? ~S (iter# ~(second seq-exprs)))
(catch ~e e#
(-track-exception ~S e#)
(>! ~res-ch e#))
(clojure.core.async/>! ~res-ch e#))
(finally (async/close! ~res-ch))))
~res-ch))))

Expand Down Expand Up @@ -817,7 +814,7 @@ Throws if any result is an exception or the context has been aborted."
:pending-exceptions (atom {})
:restarting true})
res-ch (start-fn s)
stale-timeout 1000]
stale-after stale-timeout]

(when supervisor
;; this will trigger a close event when all subroutines are stopped
Expand All @@ -828,9 +825,9 @@ Throws if any result is an exception or the context has been aborted."

(go-loop []
(when-not (some async/poll! ab-chs)
(<! (timeout stale-timeout))
(let [[[e _]] (filter (fn [[k v]]
(> (- (.getTime (now)) stale-timeout)
(<! (timeout stale-after))
(let [[[e _]] (filter (fn [[_k v]]
(> (- (.getTime (now)) stale-after)
(.getTime v)))
@(:pending-exceptions s))]
(if e
Expand Down
1 change: 1 addition & 0 deletions template/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<packaging>jar</packaging>
<name>superv.async</name>
<description>Supervised channel management for core.async.</description>
<version>0.0.0</version>
<licenses>
<license>
<name>Eclipse</name>
Expand Down
19 changes: 10 additions & 9 deletions test/superv/async_test.cljc
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
(ns superv.async-test
(:require [clojure.test :refer [deftest is testing #?@(:cljs [async])]]
[clojure.core.async :as async :refer [<! >! go chan close! alt! timeout chan close! take! to-chan! put! pub sub onto-chan!
#?@(:clj [<!! >!!])]]
[superv.async :refer [<? <?- >? S go-try go-try- <<! <<? <?* concat>> partition-all>> count> pmap>> alt? alts? restarting-supervisor go-super go-for map->TrackingSupervisor on-abort put? chan-super go-loop-try go-loop-super
[clojure.core.async :as async :refer [<! >! go chan close! timeout chan close! to-chan! onto-chan! #_put! #_pub #_sub
#?@(:clj [<!! >!!]
:cljs [take!])]]
[superv.async :refer [<? <?- >? S go-try go-try- <<! <<? <?* concat>> partition-all>> count> pmap>> alt? alts? restarting-supervisor go-super go-for map->TrackingSupervisor on-abort #_put? chan-super go-loop-try go-loop-super
#?@(:clj [<<!! <<?? <?? <!!* <??* thread-try thread-super reduce< <?? chan-super])]]))

(defn test-async
Expand Down Expand Up @@ -47,7 +48,7 @@
#?(:clj (/ 1 0)
:cljs (throw (js/Error. "Oops")))
(catch #?(:clj java.lang.ArithmeticException
:cljs js/Error) e
:cljs js/Error) _
(reset! exception-state 42))
(finally (reset! finally-state 42))))
(= @exception-state @finally-state 42))))))
Expand All @@ -63,7 +64,7 @@
#?(:clj (/ 1 0)
:cljs (throw (js/Error. "Oops")))
(catch #?(:clj java.lang.ArithmeticException
:cljs js/Error) e
:cljs js/Error) _e
(reset! exception-state 42))
(finally (reset! finally-state 42))))
(= @exception-state @finally-state 42))))))
Expand Down Expand Up @@ -238,7 +239,7 @@
(go
(is
(thrown? #?(:clj Exception :cljs js/Error)
(<? S (go-loop-try S [[f & r] [1 0]]
(<? S (go-loop-try S [[#?(:clj f :cljs _) & r] [1 0]]
#?(:clj (/ 1 f)
;; TODO - Better JS error. In cljs this never terminates without an explicit thrown error
:cljs (throw (js/Error. "Oops")))
Expand Down Expand Up @@ -276,7 +277,7 @@
abort (chan)
super (map->TrackingSupervisor {:error err-ch :abort abort
:registered (atom {})})]
(go-loop-super super [[f & r] [1 0]]
(go-loop-super super [[#?(:clj f :cljs _) & r] [1 0]]
#?(:clj (/ 1 f)
;; TODO - Better JS error. In cljs this never terminates without an explicit thrown error
:cljs (throw (js/Error. "Oops")))
Expand Down Expand Up @@ -310,8 +311,8 @@
#?(:clj (/ a b)
:cljs (get-in a b))))))
(is (thrown? #?(:clj Exception :cljs js/Error)
(<<? S (go-for S [a [1 2 3]
:let [b #?(:clj (/ 1 0) :cljs (throw (js/Error. "Oops")))]]
(<<? S (go-for S [_a [1 2 3]
:let [_b #?(:clj (/ 1 0) :cljs (throw (js/Error. "Oops")))]]
42)))))))

;; supervisor
Expand Down
2 changes: 1 addition & 1 deletion test/superv/node_runner.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
(cljs.test/run-tests (merge (cljs.test/empty-env) opts)
'superv.async-test)))

(defn -main [& args]
(defn -main [& _args]
(cljs.nodejs/enable-util-print!)
(defmethod cljs.test/report [:cljs.test/default :end-run-tests] [_] (js/process.exit 0))
(run-tests))
Expand Down