11package datadog .common .queue ;
22
3+ import java .lang .invoke .MethodHandles ;
4+ import java .lang .invoke .MethodHandles .Lookup ;
5+ import java .lang .invoke .VarHandle ;
6+ import java .util .Objects ;
37import java .util .concurrent .TimeUnit ;
48import java .util .concurrent .locks .LockSupport ;
59import javax .annotation .Nonnull ;
610
711/**
8- * A MPSC Array queue offering blocking methods (take and timed poll) for a single consumer.
12+ * A Multiple-Producer, Single-Consumer (MPSC) bounded lock-free queue using a circular array and
13+ * VarHandles. It adds blocking capabilities for a single consumer (take, timed offer).
914 *
10- * <p>The wait is performed by parking/unparking the consumer thread.
15+ * <p>All operations are wait-free for the consumer and lock-free for producers.
16+ *
17+ * @param <E> the type of elements stored
1118 */
12- class MpscBlockingConsumerArrayQueueVarHandle <E > extends MpscArrayQueueVarHandle <E >
19+ class MpscBlockingConsumerArrayQueueVarHandle <E > extends BaseQueue <E >
1320 implements BlockingConsumerNonBlockingQueue <E > {
14- /** Consumer thread reference for wake-up. */
21+ private static final VarHandle ARRAY_HANDLE ;
22+ private static final VarHandle HEAD_HANDLE ;
23+ private static final VarHandle TAIL_HANDLE ;
24+ private static final VarHandle PRODUCER_LIMIT_HANDLE ;
25+ private static final VarHandle CONSUMER_THREAD_HANDLE ;
26+
27+ static {
28+ try {
29+ final Lookup lookup = MethodHandles .lookup ();
30+ TAIL_HANDLE =
31+ lookup .findVarHandle (MpscBlockingConsumerArrayQueueVarHandle .class , "tail" , long .class );
32+ HEAD_HANDLE =
33+ lookup .findVarHandle (MpscBlockingConsumerArrayQueueVarHandle .class , "head" , long .class );
34+ ARRAY_HANDLE = MethodHandles .arrayElementVarHandle (Object [].class );
35+ PRODUCER_LIMIT_HANDLE =
36+ lookup .findVarHandle (
37+ MpscBlockingConsumerArrayQueueVarHandle .class , "producerLimit" , long .class );
38+ CONSUMER_THREAD_HANDLE =
39+ lookup .findVarHandle (
40+ MpscBlockingConsumerArrayQueueVarHandle .class , "consumerThread" , Thread .class );
41+ } catch (Throwable t ) {
42+ throw new IllegalStateException (t );
43+ }
44+ }
45+
46+ /** The backing array (plain Java array for VarHandle access) */
47+ private final Object [] buffer ;
48+
49+ // Padding to prevent false sharing
50+ @ SuppressWarnings ("unused" )
51+ private long p0 , p1 , p2 , p3 , p4 , p5 , p6 ;
52+
53+ /** Next free slot for producers (multi-threaded) */
54+ private volatile long tail = 0L ;
55+
56+ // Padding around tail
57+ @ SuppressWarnings ("unused" )
58+ private long q0 , q1 , q2 , q3 , q4 , q5 , q6 ;
59+
60+ /** Cached producer limit to reduce volatile head reads */
61+ private volatile long producerLimit = 0L ;
62+
63+ // Padding around producerLimit
64+ @ SuppressWarnings ("unused" )
65+ private long r0 , r1 , r2 , r3 , r4 , r5 , r6 ;
66+
67+ /** Next slot to consume (single-threaded) */
68+ private volatile long head = 0L ;
69+
70+ // Padding around head
71+ @ SuppressWarnings ("unused" )
72+ private long s0 , s1 , s2 , s3 , s4 , s5 , s6 ;
73+
74+ /** Reference to the waiting consumer thread (set atomically). */
1575 private volatile Thread consumerThread ;
1676
17- public MpscBlockingConsumerArrayQueueVarHandle (int capacity ) {
18- super (capacity );
77+ /**
78+ * Creates a new MPSC queue.
79+ *
80+ * @param requestedCapacity the desired capacity, rounded up to next power of two
81+ */
82+ public MpscBlockingConsumerArrayQueueVarHandle (int requestedCapacity ) {
83+ super (requestedCapacity );
84+ this .buffer = new Object [capacity ];
85+ this .producerLimit = capacity ;
1986 }
2087
88+ /**
89+ * Attempts to add an element to the queue.
90+ *
91+ * @param e the element to add (must be non-null)
92+ * @return true if element was enqueued, false if queue is full
93+ */
2194 @ Override
2295 public boolean offer (E e ) {
23- final boolean success = super .offer (e );
24- if (success ) {
25- try {
26- final Thread c = consumerThread ;
27- LockSupport .unpark (c ); // unpark is safe if the arg is null
28- } finally {
29- consumerThread = null ;
96+ Objects .requireNonNull (e );
97+
98+ // jctools does the same local copy to have the jitter optimise the accesses
99+ final Object [] localBuffer = this .buffer ;
100+
101+ long localProducerLimit = (long ) PRODUCER_LIMIT_HANDLE .getVolatile (this );
102+ long cachedHead = 0L ; // Local cache of head to reduce volatile reads
103+
104+ int spinCycles = 0 ;
105+ boolean parkOnSpin = (Thread .currentThread ().getId () & 1 ) == 0 ;
106+
107+ while (true ) {
108+ long currentTail = (long ) TAIL_HANDLE .getVolatile (this );
109+
110+ // Check if producer limit exceeded
111+ if (currentTail >= localProducerLimit ) {
112+ // Refresh head only when necessary
113+ cachedHead = (long ) HEAD_HANDLE .getVolatile (this );
114+ localProducerLimit = cachedHead + capacity ;
115+
116+ if (currentTail >= localProducerLimit ) {
117+ return false ; // queue full
118+ }
119+
120+ // Update producerLimit so other producers also benefit
121+ PRODUCER_LIMIT_HANDLE .setVolatile (this , localProducerLimit );
122+ }
123+
124+ // Attempt to claim a slot
125+ if (TAIL_HANDLE .compareAndSet (this , currentTail , currentTail + 1 )) {
126+ final int index = (int ) (currentTail & mask );
127+
128+ // Release-store ensures producer's write is visible to consumer
129+ ARRAY_HANDLE .setRelease (localBuffer , index , e );
130+
131+ // Atomically clear and unpark the consumer if waiting
132+ Thread c = (Thread ) CONSUMER_THREAD_HANDLE .getAndSet (this , null );
133+ if (c != null ) {
134+ LockSupport .unpark (c );
135+ }
136+
137+ return true ;
30138 }
31- }
32139
33- return success ;
140+ // Backoff to reduce contention
141+ if ((spinCycles & 1 ) == 0 ) {
142+ Thread .onSpinWait ();
143+ } else {
144+ if (parkOnSpin ) {
145+ LockSupport .parkNanos (1 );
146+ } else {
147+ Thread .yield ();
148+ }
149+ }
150+ spinCycles ++;
151+ }
34152 }
35153
36154 /**
37- * Retrieves and removes the head element, waiting if necessary until one becomes available .
155+ * Removes and returns the next element, or null if empty .
38156 *
39- * @return the next element (never null)
40- * @throws InterruptedException if interrupted while waiting
157+ * @return dequeued element, or null if queue empty
41158 */
42159 @ Override
43- public E take () throws InterruptedException {
44- consumerThread = Thread .currentThread ();
45- E e ;
46- while ((e = poll ()) == null ) {
47- parkUntilNext (-1 );
160+ @ SuppressWarnings ("unchecked" )
161+ public E poll () {
162+ final Object [] localBuffer = this .buffer ;
163+
164+ long currentHead = (long ) HEAD_HANDLE .getOpaque (this );
165+ final int index = (int ) (currentHead & mask );
166+
167+ // Acquire-load ensures visibility of producer write
168+ Object value = ARRAY_HANDLE .getAcquire (localBuffer , index );
169+ if (value == null ) {
170+ return null ;
48171 }
49- return e ;
172+
173+ // Clear the slot without additional fence
174+ ARRAY_HANDLE .setOpaque (localBuffer , index , null );
175+
176+ // Advance head using opaque write (consumer-only)
177+ HEAD_HANDLE .setOpaque (this , currentHead + 1 );
178+
179+ return (E ) value ;
50180 }
51181
52182 /**
@@ -74,6 +204,50 @@ public E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException
74204 return poll ();
75205 }
76206
207+ /**
208+ * Retrieves and removes the head element, waiting if necessary until one becomes available.
209+ *
210+ * @return the next element (never null)
211+ * @throws InterruptedException if interrupted while waiting
212+ */
213+ @ Override
214+ public E take () throws InterruptedException {
215+ consumerThread = Thread .currentThread ();
216+ E e ;
217+ while ((e = poll ()) == null ) {
218+ parkUntilNext (-1 );
219+ }
220+ return e ;
221+ }
222+
223+ /**
224+ * Returns next element without removing it.
225+ *
226+ * <p>The memory visibility is only correct if the consumer calls it.
227+ *
228+ * @return next element or null if empty
229+ */
230+ @ Override
231+ @ SuppressWarnings ("unchecked" )
232+ public E peek () {
233+ final int index = (int ) ((long ) HEAD_HANDLE .getOpaque (this ) & mask );
234+ return (E ) ARRAY_HANDLE .getVolatile (buffer , index );
235+ }
236+
237+ /**
238+ * Returns number of elements in queue.
239+ *
240+ * <p>Volatile reads of tail and head ensure accurate result in multi-threaded context.
241+ *
242+ * @return current size
243+ */
244+ @ Override
245+ public int size () {
246+ long currentHead = (long ) HEAD_HANDLE .getVolatile (this );
247+ long currentTail = (long ) TAIL_HANDLE .getVolatile (this );
248+ return (int ) (currentTail - currentHead );
249+ }
250+
77251 /**
78252 * Blocks (parks) until an element becomes available or until the specified timeout elapses.
79253 *
@@ -85,20 +259,20 @@ public E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException
85259 * @throws InterruptedException if interrupted
86260 */
87261 private void parkUntilNext (long nanos ) throws InterruptedException {
88- try {
89- // register this thread as the waiting consumer
90- consumerThread = Thread .currentThread ();
91- if (nanos <= 0 ) {
92- LockSupport .park (this );
93- } else {
94- LockSupport .parkNanos (this , nanos );
95- }
96- if (Thread .interrupted ()) {
97- throw new InterruptedException ();
98- }
99- } finally {
100- // free the variable not to reference the consumer thread anymore
101- consumerThread = null ;
262+ Thread current = Thread .currentThread ();
263+ // Publish the consumer thread (no ordering required)
264+ CONSUMER_THREAD_HANDLE .setOpaque (this , current );
265+ if (nanos <= 0 ) {
266+ LockSupport .park (this );
267+ } else {
268+ LockSupport .parkNanos (this , nanos );
102269 }
270+
271+ if (Thread .interrupted ()) {
272+ throw new InterruptedException ();
273+ }
274+
275+ // Cleanup (no fence needed, single consumer)
276+ CONSUMER_THREAD_HANDLE .setOpaque (this , null );
103277 }
104278}
0 commit comments