Skip to content

Commit dc0acd8

Browse files
committed
Refine documentation
1 parent 152f53f commit dc0acd8

10 files changed

+162
-141
lines changed

utils/queue-utils/src/main/java/datadog/common/queue/BaseQueue.java

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88
import java.util.function.Supplier;
99
import javax.annotation.Nonnull;
1010

11+
/**
12+
* Base class for non-blocking queuing operations.
13+
*
14+
* @param <E> the type of elements held by this queue
15+
*/
1116
abstract class BaseQueue<E> extends AbstractQueue<E> implements NonBlockingQueue<E> {
1217
/** The capacity of the queue (must be a power of two) */
1318
protected final int capacity;
@@ -20,32 +25,13 @@ public BaseQueue(int capacity) {
2025
this.mask = this.capacity - 1;
2126
}
2227

23-
/**
24-
* Drains all available elements from the queue to a consumer.
25-
*
26-
* <p>This is efficient since it avoids repeated size() checks and returns immediately when empty.
27-
*
28-
* @param consumer a consumer to accept elements
29-
* @return number of elements drained
30-
*/
3128
@Override
3229
public int drain(Consumer<E> consumer) {
3330
return drain(consumer, Integer.MAX_VALUE);
3431
}
3532

36-
/**
37-
* Drains up to {@code limit} elements from the queue to a consumer.
38-
*
39-
* <p>This method is useful for batch processing.
40-
*
41-
* <p>Each element is removed atomically using poll() and passed to the consumer.
42-
*
43-
* @param consumer a consumer to accept elements
44-
* @param limit maximum number of elements to drain
45-
* @return number of elements drained
46-
*/
4733
@Override
48-
public int drain(Consumer<E> consumer, int limit) {
34+
public int drain(@Nonnull Consumer<E> consumer, int limit) {
4935
int count = 0;
5036
E e;
5137
while (count < limit && (e = poll()) != null) {
@@ -55,14 +41,6 @@ public int drain(Consumer<E> consumer, int limit) {
5541
return count;
5642
}
5743

58-
/**
59-
* Fills the queue with elements provided by the supplier until either: - the queue is full, or -
60-
* the supplier runs out of elements (returns null)
61-
*
62-
* @param supplier a supplier of elements
63-
* @param limit maximum number of elements to attempt to insert
64-
* @return number of elements successfully enqueued
65-
*/
6644
@Override
6745
public int fill(@Nonnull Supplier<? extends E> supplier, int limit) {
6846
if (limit <= 0) {
@@ -95,21 +73,11 @@ public Iterator<E> iterator() {
9573
throw new UnsupportedOperationException();
9674
}
9775

98-
/**
99-
* Returns the remaining capacity.
100-
*
101-
* @return number of additional elements this queue can accept
102-
*/
10376
@Override
10477
public int remainingCapacity() {
10578
return capacity - size();
10679
}
10780

108-
/**
109-
* Returns the maximum queue capacity.
110-
*
111-
* @return number of total elements this queue can accept
112-
*/
11381
@Override
11482
public int capacity() {
11583
return capacity;

utils/queue-utils/src/main/java/datadog/common/queue/BlockingConsumerNonBlockingQueue.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,45 @@
33
import java.util.concurrent.TimeUnit;
44
import javax.annotation.Nonnull;
55

6+
/**
7+
* A hybrid queue interface combining non-blocking producer semantics with blocking consumer
8+
* operations.
9+
*
10+
* <p>This interface extends {@link NonBlockingQueue} and adds methods that allow consumers to block
11+
* while waiting for elements to become available. It is intended for use in scenarios with:
12+
*
13+
* <ul>
14+
* <li>Multiple or single <b>producers</b> enqueue elements using non-blocking operations (e.g.,
15+
* {@link #offer(Object)}).
16+
* <li>A single <b>consumer</b> that may block until elements are ready (i.e., using {@link
17+
* #take()} or {@link #poll(long, TimeUnit)}).
18+
* </ul>
19+
*
20+
* @param <E> the type of elements held in this queue
21+
*/
622
public interface BlockingConsumerNonBlockingQueue<E> extends NonBlockingQueue<E> {
23+
24+
/**
25+
* Retrieves and removes the head of this queue, waiting up to the specified wait time if
26+
* necessary for an element to become available.
27+
*
28+
* @param timeout how long to wait before giving up, in units of {@code unit}
29+
* @param unit the time unit of the {@code timeout} argument; must not be {@code null}
30+
* @return the head of this queue, or {@code null} if the specified waiting time elapses before an
31+
* element becomes available
32+
* @throws InterruptedException if interrupted while waiting
33+
*/
734
E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException;
835

36+
/**
37+
* Retrieves and removes the head of this queue, waiting if necessary until an element becomes
38+
* available.
39+
*
40+
* <p>This operation blocks the consumer thread if the queue is empty, while producers continue to
41+
* operate in a non-blocking manner.
42+
*
43+
* @return the head of this queue
44+
* @throws InterruptedException if interrupted while waiting
45+
*/
946
E take() throws InterruptedException;
1047
}

utils/queue-utils/src/main/java/datadog/common/queue/JctoolsMpscBlockingConsumerWrappedQueue.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@
55
import javax.annotation.Nonnull;
66
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
77

8+
/**
9+
* A {@link BlockingConsumerNonBlockingQueue} implementation that wraps a JCTools {@link
10+
* MpscBlockingConsumerArrayQueue}.
11+
*
12+
* <p>All operations delegate directly to the underlying JCTools queue to preserve performance and
13+
* memory semantics.
14+
*
15+
* @param <E> the type of elements held in this queue
16+
*/
817
class JctoolsMpscBlockingConsumerWrappedQueue<E> extends JctoolsWrappedQueue<E>
918
implements BlockingConsumerNonBlockingQueue<E> {
1019

utils/queue-utils/src/main/java/datadog/common/queue/JctoolsWrappedQueue.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@
77
import javax.annotation.Nonnull;
88
import org.jctools.queues.MessagePassingQueue;
99

10+
/**
11+
* A {@link NonBlockingQueue} implementation that wraps a {@link MessagePassingQueue} from the
12+
* JCTools library to provide a consistent, framework-independent interface.
13+
*
14+
* <p>This adapter bridges JCTools’ queue APIs with the {@link NonBlockingQueue} abstraction used by
15+
* this library. All operations are directly delegated to the underlying {@code MessagePassingQueue}
16+
*
17+
* @param <E> the type of elements held in this queue
18+
*/
1019
class JctoolsWrappedQueue<E> extends AbstractQueue<E> implements NonBlockingQueue<E> {
1120
private final MessagePassingQueue<E> delegate;
1221

utils/queue-utils/src/main/java/datadog/common/queue/MpscArrayQueueVarHandle.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,6 @@ public MpscArrayQueueVarHandle(int requestedCapacity) {
7272
this.producerLimit = capacity;
7373
}
7474

75-
/**
76-
* Attempts to add an element to the queue.
77-
*
78-
* @param e the element to add (must be non-null)
79-
* @return true if element was enqueued, false if queue is full
80-
*/
8175
@Override
8276
public boolean offer(E e) {
8377
Objects.requireNonNull(e);
@@ -131,11 +125,6 @@ public boolean offer(E e) {
131125
}
132126
}
133127

134-
/**
135-
* Removes and returns the next element, or null if empty.
136-
*
137-
* @return dequeued element, or null if queue empty
138-
*/
139128
@Override
140129
@SuppressWarnings("unchecked")
141130
public E poll() {
@@ -159,27 +148,13 @@ public E poll() {
159148
return (E) value;
160149
}
161150

162-
/**
163-
* Returns next element without removing it.
164-
*
165-
* <p>The memory visibility is only correct if the consumer calls it.
166-
*
167-
* @return next element or null if empty
168-
*/
169151
@Override
170152
@SuppressWarnings("unchecked")
171153
public E peek() {
172154
final int index = (int) ((long) HEAD_HANDLE.getOpaque(this) & mask);
173155
return (E) ARRAY_HANDLE.getVolatile(buffer, index);
174156
}
175157

176-
/**
177-
* Returns number of elements in queue.
178-
*
179-
* <p>Volatile reads of tail and head ensure accurate result in multi-threaded context.
180-
*
181-
* @return current size
182-
*/
183158
@Override
184159
public int size() {
185160
long currentHead = (long) HEAD_HANDLE.getVolatile(this);

utils/queue-utils/src/main/java/datadog/common/queue/MpscBlockingConsumerArrayQueueVarHandle.java

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,6 @@ public MpscBlockingConsumerArrayQueueVarHandle(int requestedCapacity) {
8585
this.producerLimit = capacity;
8686
}
8787

88-
/**
89-
* Attempts to add an element to the queue.
90-
*
91-
* @param e the element to add (must be non-null)
92-
* @return true if element was enqueued, false if queue is full
93-
*/
9488
@Override
9589
public boolean offer(E e) {
9690
Objects.requireNonNull(e);
@@ -151,11 +145,6 @@ public boolean offer(E e) {
151145
}
152146
}
153147

154-
/**
155-
* Removes and returns the next element, or null if empty.
156-
*
157-
* @return dequeued element, or null if queue empty
158-
*/
159148
@Override
160149
@SuppressWarnings("unchecked")
161150
public E poll() {
@@ -179,14 +168,6 @@ public E poll() {
179168
return (E) value;
180169
}
181170

182-
/**
183-
* Polls with a timeout.
184-
*
185-
* @param timeout max wait time
186-
* @param unit time unit
187-
* @return the head element, or null if timed out
188-
* @throws InterruptedException if interrupted
189-
*/
190171
@Override
191172
public E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
192173
E e = poll();
@@ -204,12 +185,6 @@ public E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedException
204185
return poll();
205186
}
206187

207-
/**
208-
* Retrieves and removes the head element, waiting if necessary until one becomes available.
209-
*
210-
* @return the next element (never null)
211-
* @throws InterruptedException if interrupted while waiting
212-
*/
213188
@Override
214189
public E take() throws InterruptedException {
215190
consumerThread = Thread.currentThread();
@@ -220,27 +195,13 @@ public E take() throws InterruptedException {
220195
return e;
221196
}
222197

223-
/**
224-
* Returns next element without removing it.
225-
*
226-
* <p>The memory visibility is only correct if the consumer calls it.
227-
*
228-
* @return next element or null if empty
229-
*/
230198
@Override
231199
@SuppressWarnings("unchecked")
232200
public E peek() {
233201
final int index = (int) ((long) HEAD_HANDLE.getOpaque(this) & mask);
234202
return (E) ARRAY_HANDLE.getVolatile(buffer, index);
235203
}
236204

237-
/**
238-
* Returns number of elements in queue.
239-
*
240-
* <p>Volatile reads of tail and head ensure accurate result in multi-threaded context.
241-
*
242-
* @return current size
243-
*/
244205
@Override
245206
public int size() {
246207
long currentHead = (long) HEAD_HANDLE.getVolatile(this);
@@ -256,7 +217,7 @@ public int size() {
256217
*
257218
* @param nanos max wait time in nanoseconds. If negative, it will park indefinably until waken or
258219
* interrupted
259-
* @throws InterruptedException if interrupted
220+
* @throws InterruptedException if interrupted while waiting
260221
*/
261222
private void parkUntilNext(long nanos) throws InterruptedException {
262223
Thread current = Thread.currentThread();

utils/queue-utils/src/main/java/datadog/common/queue/NonBlockingQueue.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,68 @@
55
import java.util.function.Supplier;
66
import javax.annotation.Nonnull;
77

8+
/**
9+
* A non-blocking, concurrent queue supporting high-performance operations for producer-consumer
10+
* scenarios. This interface extends {@link Queue} and adds specialized methods for bulk draining
11+
* and filling, as well as querying the queue’s fixed capacity.
12+
*
13+
* <p>Unlike typical {@link java.util.concurrent.BlockingQueue} implementations, this interface does
14+
* not provide blocking operations. Instead, producers and consumers are expected to retry or yield
15+
* when the queue is full or empty, respectively.
16+
*
17+
* <p>Implementations are typically array-backed and rely on non-blocking atomic operations (such as
18+
* VarHandles or Unsafe-based CAS) to achieve concurrent performance without locks.
19+
*
20+
* @param <E> the type of elements held in this queue
21+
* @see java.util.Queue
22+
* @see java.util.concurrent.ConcurrentLinkedQueue
23+
*/
824
public interface NonBlockingQueue<E> extends Queue<E> {
25+
26+
/**
27+
* Drains all available elements from this queue, passing each to the given {@link Consumer}.
28+
*
29+
* <p>This method will consume as many elements as are currently available, up to the queue’s size
30+
* at the time of the call.
31+
*
32+
* @param consumer the consumer that will process each element; must not be {@code null}
33+
* @return the number of elements drained
34+
* @throws NullPointerException if {@code consumer} is {@code null}
35+
*/
936
int drain(Consumer<E> consumer);
1037

38+
/**
39+
* Drains up to the specified number of elements from this queue, passing each to the given {@link
40+
* Consumer}.
41+
*
42+
* @param consumer the consumer that will process each element; must not be {@code null}
43+
* @param limit the maximum number of elements to drain
44+
* @return the actual number of elements drained (maybe less than {@code limit})
45+
*/
1146
int drain(Consumer<E> consumer, int limit);
1247

48+
/**
49+
* Fills the queue with elements supplied by the given {@link Supplier}, up to the specified limit
50+
* or until the queue becomes full.
51+
*
52+
* @param supplier the supplier that provides elements to insert; must not be {@code null}
53+
* @param limit the maximum number of elements to insert
54+
* @return the number of elements successfully added (maybe less than {@code limit})
55+
*/
1356
int fill(@Nonnull Supplier<? extends E> supplier, int limit);
1457

58+
/**
59+
* Returns the number of additional elements that can be inserted into this queue without
60+
* exceeding its capacity.
61+
*
62+
* @return the number of remaining slots available for insertion
63+
*/
1564
int remainingCapacity();
1665

66+
/**
67+
* Returns the total fixed capacity of this queue.
68+
*
69+
* @return the maximum number of elements this queue can hold
70+
*/
1771
int capacity();
1872
}

0 commit comments

Comments
 (0)