Skip to content

Commit 5ecb7c3

Browse files
committed
wip, task impl.
1 parent bf03523 commit 5ecb7c3

File tree

1 file changed

+56
-32
lines changed

1 file changed

+56
-32
lines changed

src/main/clojure/clojure/core/async.clj

+56-32
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,23 @@ to catch and handle."
113113
[^long msecs]
114114
(timers/timeout msecs))
115115

116+
;; task
117+
118+
(defn- check-not-in-vthread [op]
119+
(when (not (Thread/.isVirtual (Thread/currentThread)))
120+
(assert nil (str op " used not in virtual thread"))))
121+
122+
(defmacro defvthreadcheckingop
123+
[op doc arglist & body]
124+
(let [as (mapv #(list 'quote %) arglist)]
125+
`(def ~(with-meta op {:arglists `(list ~as) :doc doc})
126+
(if (Boolean/getBoolean "clojure.core.async.vthread-checking")
127+
(fn ~arglist
128+
(check-not-in-vthread ~op)
129+
~@body)
130+
(fn ~arglist
131+
~@body)))))
132+
116133
(defmacro defblockingop
117134
[op doc arglist & body]
118135
(let [as (mapv #(list 'quote %) arglist)]
@@ -137,11 +154,11 @@ to catch and handle."
137154
@ret
138155
(deref p))))
139156

140-
(defn <!
141-
"takes a val from port. Must be called inside a (go ...) block. Will
157+
(defvthreadcheckingop <!
158+
"takes a val from port. Must be called inside a (task ...) block. Will
142159
return nil if closed. Will park if nothing is available."
143160
[port]
144-
(assert nil "<! used not in (go ...) block"))
161+
(<!! port))
145162

146163
(defn take!
147164
"Asynchronously takes a val from port, passing to fn1. Will pass nil
@@ -176,12 +193,12 @@ to catch and handle."
176193
@ret
177194
(deref p))))
178195

179-
(defn >!
196+
(defvthreadcheckingop >!
180197
"puts a val into port. nil values are not allowed. Must be called
181-
inside a (go ...) block. Will park if no buffer space is available.
198+
inside a (task ...) block. Will park if no buffer space is available.
182199
Returns true unless port is already closed."
183200
[port val]
184-
(assert nil ">! used not in (go ...) block"))
201+
(>!! port val))
185202

186203
(defn- nop [_])
187204
(def ^:private fhnop (fn-handler nop))
@@ -313,7 +330,7 @@ to catch and handle."
313330
@ret
314331
(deref p))))
315332

316-
(defn alts!
333+
(defvthreadcheckingop alts!
317334
"Completes at most one of several channel operations. Must be called
318335
inside a (go ...) block. ports is a vector of channel endpoints,
319336
which can be either a channel to take from or a vector of
@@ -337,7 +354,7 @@ to catch and handle."
337354
depended upon for side effects."
338355

339356
[ports & {:as opts}]
340-
(assert nil "alts! used not in (go ...) block"))
357+
(alts!! ports opts))
341358

342359
(defn do-alt [alts clauses]
343360
(assert (even? (count clauses)) "unbalanced clauses")
@@ -440,34 +457,41 @@ to catch and handle."
440457
(let [ret (impl/take! port (fn-handler nop false))]
441458
(when ret @ret)))
442459

443-
(defmacro go
444-
"Asynchronously executes the body, returning immediately to the
445-
calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
446-
channel operations within the body will block (if necessary) by
447-
'parking' the calling thread rather than tying up an OS thread (or
448-
the only JS thread when in ClojureScript). Upon completion of the
449-
operation, the body will be resumed.
450-
451-
go blocks should not (either directly or indirectly) perform operations
452-
that may block indefinitely. Doing so risks depleting the fixed pool of
453-
go block threads, causing all go block processing to stop. This includes
454-
core.async blocking ops (those ending in !!) and other blocking IO.
460+
;; task
461+
462+
(def ^:private task-factory
463+
(-> (Thread/ofVirtual)
464+
(Thread$Builder/.name "task-" 0)
465+
.factory))
466+
467+
(defmacro task
468+
"Asynchronously executes the body in a virtual thread, returning immediately
469+
to the calling thread.
470+
471+
task blocks should not (either directly or indirectly) perform operations
472+
that may block indefinitely. Doing so risks pinning the virtual thread
473+
to its carrier thread.
455474
456475
Returns a channel which will receive the result of the body when
457476
completed"
458477
[& body]
459-
(let [crossing-env (zipmap (keys &env) (repeatedly gensym))]
460-
`(let [c# (chan 1)
461-
captured-bindings# (Var/getThreadBindingFrame)]
462-
(dispatch/run
463-
(^:once fn* []
464-
(let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
465-
f# ~(ioc/state-machine `(do ~@body) 1 [crossing-env &env] ioc/async-custom-terminators)
466-
state# (-> (f#)
467-
(ioc/aset-all! ioc/USER-START-IDX c#
468-
ioc/BINDINGS-IDX captured-bindings#))]
469-
(ioc/run-state-machine-wrapped state#))))
470-
c#)))
478+
`(let [c# (chan 1)
479+
captured-bindings# (Var/getThreadBindingFrame)]
480+
(.execute
481+
(Executors/newThreadPerTaskExecutor task-factory)
482+
(^:once fn* []
483+
(Var/resetThreadBindingFrame captured-bindings#)
484+
(try
485+
(let [result# (do ~@body)]
486+
(>!! c# result#))
487+
(finally
488+
(close! c#)))))
489+
c#))
490+
491+
(defmacro go
492+
"Dispatches to task macro."
493+
[& body]
494+
`(task ~body))
471495

472496
(defonce ^:private ^Executor thread-macro-executor
473497
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true)))

0 commit comments

Comments
 (0)