Skip to content

Commit 6c77b88

Browse files
committed
ASYNC-258 Datafy buffers and report capacity
1 parent 400a929 commit 6c77b88

File tree

3 files changed

+45
-14
lines changed

3 files changed

+45
-14
lines changed

src/main/clojure/clojure/core/async/impl/buffers.clj

+35-6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
(set! *warn-on-reflection* true)
1616

17+
1718
(deftype FixedBuffer [^LinkedList buf ^long n]
1819
impl/Buffer
1920
(full? [_this]
@@ -26,12 +27,14 @@
2627
(close-buf! [_this])
2728
Counted
2829
(count [_this]
29-
(.size buf)))
30+
(.size buf))
31+
impl/Capacity
32+
(capacity [_this]
33+
n))
3034

3135
(defn fixed-buffer [^long n]
3236
(FixedBuffer. (LinkedList.) n))
3337

34-
3538
(deftype DroppingBuffer [^LinkedList buf ^long n]
3639
impl/UnblockingBuffer
3740
impl/Buffer
@@ -46,7 +49,10 @@
4649
(close-buf! [_this])
4750
Counted
4851
(count [_this]
49-
(.size buf)))
52+
(.size buf))
53+
impl/Capacity
54+
(capacity [_this]
55+
n))
5056

5157
(defn dropping-buffer [n]
5258
(DroppingBuffer. (LinkedList.) n))
@@ -66,7 +72,10 @@
6672
(close-buf! [_this])
6773
Counted
6874
(count [_this]
69-
(.size buf)))
75+
(.size buf))
76+
impl/Capacity
77+
(capacity [_this]
78+
n))
7079

7180
(defn sliding-buffer [n]
7281
(SlidingBuffer. (LinkedList.) n))
@@ -91,7 +100,27 @@
91100
(set! val nil)))
92101
Counted
93102
(count [_]
94-
(if (undelivered? val) 0 1)))
103+
(if (undelivered? val) 0 1))
104+
impl/Capacity
105+
(capacity [_this]
106+
1))
95107

96108
(defn promise-buffer []
97-
(PromiseBuffer. NO-VAL))
109+
(PromiseBuffer. NO-VAL))
110+
111+
(defn datafy-buffer [buffer]
112+
{:type (-> buffer class .getSimpleName symbol)
113+
:count (count buffer)
114+
:capacity (impl/capacity buffer)})
115+
116+
(extend-protocol clojure.core.protocols/Datafiable
117+
FixedBuffer
118+
(datafy [b] (datafy-buffer b))
119+
DroppingBuffer
120+
(datafy [b] (datafy-buffer b))
121+
SlidingBuffer
122+
(datafy [b] (datafy-buffer b))
123+
PromiseBuffer
124+
(datafy [b] (datafy-buffer b))
125+
Object
126+
(datafy [b] nil))

src/main/clojure/clojure/core/async/impl/channels.clj

+7-8
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88

99
(ns ^{:skip-wiki true}
1010
clojure.core.async.impl.channels
11-
(:require [clojure.core.async.impl.protocols :as impl]
11+
(:require [clojure.datafy :as datafy]
12+
[clojure.core.async.impl.protocols :as impl]
1213
[clojure.core.async.impl.dispatch :as dispatch]
1314
[clojure.core.async.impl.mutex :as mutex])
1415
(:import [java.util LinkedList Queue]
@@ -308,10 +309,8 @@
308309
ManyToManyChannel
309310
(datafy [c]
310311
(let [b (.buf c)]
311-
{:buffer-type (if b
312-
(-> b class .getSimpleName symbol)
313-
:none)
314-
:buffer-count (count b)
315-
:put-count (count (.puts c))
316-
:take-count (count (.takes c))
317-
:closed? (impl/closed? c)})))
312+
(cond->
313+
{:put-count (count (.puts c))
314+
:take-count (count (.takes c))
315+
:closed? (impl/closed? c)}
316+
b (assoc :buffer (clojure.datafy/datafy b))))))

src/main/clojure/clojure/core/async/impl/protocols.clj

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
(add!* [b itm] "if room, add item to the buffer, returns b, called under chan mutex")
3535
(close-buf! [b] "called on chan closed under chan mutex, return ignored"))
3636

37+
(defprotocol Capacity
38+
(capacity [b] "The max capacity of the buffer, nil if unknown"))
39+
3740
(defn add!
3841
([b] b)
3942
([b itm]

0 commit comments

Comments
 (0)