Skip to content

Commit 0a72587

Browse files
committed
Final updates
1 parent dc0acd8 commit 0a72587

11 files changed

+324
-270
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package datadog.common.queue;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.TimeUnit;
5+
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
6+
import org.openjdk.jmh.annotations.Benchmark;
7+
import org.openjdk.jmh.annotations.BenchmarkMode;
8+
import org.openjdk.jmh.annotations.Fork;
9+
import org.openjdk.jmh.annotations.Group;
10+
import org.openjdk.jmh.annotations.GroupThreads;
11+
import org.openjdk.jmh.annotations.Level;
12+
import org.openjdk.jmh.annotations.Measurement;
13+
import org.openjdk.jmh.annotations.Mode;
14+
import org.openjdk.jmh.annotations.OutputTimeUnit;
15+
import org.openjdk.jmh.annotations.Param;
16+
import org.openjdk.jmh.annotations.Scope;
17+
import org.openjdk.jmh.annotations.Setup;
18+
import org.openjdk.jmh.annotations.State;
19+
import org.openjdk.jmh.annotations.Warmup;
20+
import org.openjdk.jmh.infra.Blackhole;
21+
22+
/*
23+
Benchmark (capacity) Mode Cnt Score Error Units
24+
MPSCBlockingConsumerQueueBenchmark.queueTest 1024 thrpt 121.534 ops/us
25+
MPSCBlockingConsumerQueueBenchmark.queueTest:async 1024 thrpt NaN ---
26+
MPSCBlockingConsumerQueueBenchmark.queueTest:consume 1024 thrpt 110.962 ops/us
27+
MPSCBlockingConsumerQueueBenchmark.queueTest:produce 1024 thrpt 10.572 ops/us
28+
MPSCBlockingConsumerQueueBenchmark.queueTest 65536 thrpt 126.856 ops/us
29+
MPSCBlockingConsumerQueueBenchmark.queueTest:async 65536 thrpt NaN ---
30+
MPSCBlockingConsumerQueueBenchmark.queueTest:consume 65536 thrpt 113.213 ops/us
31+
MPSCBlockingConsumerQueueBenchmark.queueTest:produce 65536 thrpt 13.644 ops/us
32+
*/
33+
@BenchmarkMode(Mode.Throughput)
34+
@Warmup(iterations = 1, time = 30)
35+
@Measurement(iterations = 1, time = 30)
36+
@Fork(1)
37+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
38+
@State(Scope.Benchmark)
39+
public class JctoolsMPSCBlockingConsumerQueueBenchmark {
40+
@State(Scope.Group)
41+
public static class QueueState {
42+
MpscBlockingConsumerArrayQueue<Integer> queue;
43+
CountDownLatch consumerReady;
44+
45+
@Param({"1024", "65536"})
46+
int capacity;
47+
48+
@Setup(Level.Iteration)
49+
public void setup() {
50+
queue = new MpscBlockingConsumerArrayQueue<>(capacity);
51+
}
52+
}
53+
54+
@Benchmark
55+
@Group("queueTest")
56+
@GroupThreads(4)
57+
public void produce(QueueState state, Blackhole bh) {
58+
bh.consume(state.queue.offer(1));
59+
}
60+
61+
@Benchmark
62+
@Group("queueTest")
63+
@GroupThreads(1)
64+
public void consume(QueueState state, Blackhole bh) {
65+
Integer v = state.queue.poll();
66+
if (v != null) {
67+
bh.consume(v);
68+
}
69+
}
70+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package datadog.common.queue;
2+
3+
import java.util.concurrent.TimeUnit;
4+
import org.jctools.queues.MpscArrayQueue;
5+
import org.openjdk.jmh.annotations.Benchmark;
6+
import org.openjdk.jmh.annotations.BenchmarkMode;
7+
import org.openjdk.jmh.annotations.Fork;
8+
import org.openjdk.jmh.annotations.Group;
9+
import org.openjdk.jmh.annotations.GroupThreads;
10+
import org.openjdk.jmh.annotations.Level;
11+
import org.openjdk.jmh.annotations.Measurement;
12+
import org.openjdk.jmh.annotations.Mode;
13+
import org.openjdk.jmh.annotations.OutputTimeUnit;
14+
import org.openjdk.jmh.annotations.Param;
15+
import org.openjdk.jmh.annotations.Scope;
16+
import org.openjdk.jmh.annotations.Setup;
17+
import org.openjdk.jmh.annotations.State;
18+
import org.openjdk.jmh.annotations.Warmup;
19+
import org.openjdk.jmh.infra.Blackhole;
20+
21+
/*
22+
Benchmark (capacity) Mode Cnt Score Error Units
23+
JctoolsMPSCQueueBenchmark.queueTest 1024 thrpt 29.444 ops/us
24+
JctoolsMPSCQueueBenchmark.queueTest:consume 1024 thrpt 21.230 ops/us
25+
JctoolsMPSCQueueBenchmark.queueTest:produce 1024 thrpt 8.214 ops/us
26+
JctoolsMPSCQueueBenchmark.queueTest 65536 thrpt 30.218 ops/us
27+
JctoolsMPSCQueueBenchmark.queueTest:consume 65536 thrpt 22.846 ops/us
28+
JctoolsMPSCQueueBenchmark.queueTest:produce 65536 thrpt 7.372 ops/us
29+
*/
30+
@BenchmarkMode(Mode.Throughput)
31+
@Warmup(iterations = 1, time = 30)
32+
@Measurement(iterations = 1, time = 30)
33+
@Fork(1)
34+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
35+
@State(Scope.Benchmark)
36+
public class JctoolsMPSCQueueBenchmark {
37+
@State(Scope.Group)
38+
public static class QueueState {
39+
MpscArrayQueue<Integer> queue;
40+
41+
@Param({"1024", "65536"})
42+
int capacity;
43+
44+
@Setup(Level.Iteration)
45+
public void setup() {
46+
queue = new MpscArrayQueue<>(capacity);
47+
}
48+
}
49+
50+
@Benchmark
51+
@Group("queueTest")
52+
@GroupThreads(4)
53+
public void produce(QueueState state, Blackhole blackhole) {
54+
blackhole.consume(state.queue.offer(0));
55+
}
56+
57+
@Benchmark
58+
@Group("queueTest")
59+
@GroupThreads(1)
60+
public void consume(QueueState state, Blackhole bh) {
61+
Integer v = state.queue.poll();
62+
if (v != null) {
63+
bh.consume(v);
64+
}
65+
}
66+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package datadog.common.queue;
2+
3+
import java.util.concurrent.TimeUnit;
4+
import org.jctools.queues.SpscArrayQueue;
5+
import org.openjdk.jmh.annotations.Benchmark;
6+
import org.openjdk.jmh.annotations.BenchmarkMode;
7+
import org.openjdk.jmh.annotations.Fork;
8+
import org.openjdk.jmh.annotations.Group;
9+
import org.openjdk.jmh.annotations.GroupThreads;
10+
import org.openjdk.jmh.annotations.Level;
11+
import org.openjdk.jmh.annotations.Measurement;
12+
import org.openjdk.jmh.annotations.Mode;
13+
import org.openjdk.jmh.annotations.OutputTimeUnit;
14+
import org.openjdk.jmh.annotations.Param;
15+
import org.openjdk.jmh.annotations.Scope;
16+
import org.openjdk.jmh.annotations.Setup;
17+
import org.openjdk.jmh.annotations.State;
18+
import org.openjdk.jmh.annotations.Warmup;
19+
import org.openjdk.jmh.infra.Blackhole;
20+
21+
/*
22+
Benchmark (capacity) Mode Cnt Score Error Units
23+
JctoolsSPSCQueueBenchmark.queueTest 1024 thrpt 268.927 ops/us
24+
JctoolsSPSCQueueBenchmark.queueTest:consume 1024 thrpt 135.287 ops/us
25+
JctoolsSPSCQueueBenchmark.queueTest:produce 1024 thrpt 133.640 ops/us
26+
JctoolsSPSCQueueBenchmark.queueTest 65536 thrpt 531.895 ops/us
27+
JctoolsSPSCQueueBenchmark.queueTest:consume 65536 thrpt 266.084 ops/us
28+
JctoolsSPSCQueueBenchmark.queueTest:produce 65536 thrpt 265.811 ops/us
29+
*/
30+
@BenchmarkMode(Mode.Throughput)
31+
@Warmup(iterations = 3, time = 10)
32+
@Measurement(iterations = 1, time = 30)
33+
@Fork(1)
34+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
35+
@State(Scope.Benchmark)
36+
public class JctoolsSPSCQueueBenchmark {
37+
@State(Scope.Group)
38+
public static class QueueState {
39+
SpscArrayQueue<Integer> queue;
40+
41+
@Param({"1024", "65536"})
42+
int capacity;
43+
44+
@Setup(Level.Iteration)
45+
public void setup() {
46+
queue = new SpscArrayQueue<>(capacity);
47+
}
48+
}
49+
50+
@Benchmark
51+
@Group("queueTest")
52+
@GroupThreads(1)
53+
public void produce(QueueState state, Blackhole bh) {
54+
bh.consume(state.queue.offer(0));
55+
}
56+
57+
@Benchmark
58+
@Group("queueTest")
59+
@GroupThreads(1)
60+
public void consume(QueueState state, Blackhole bh) {
61+
Integer v = state.queue.poll();
62+
if (v != null) {
63+
bh.consume(v);
64+
}
65+
}
66+
}

utils/queue-utils/src/jmh/java/datadog/common/queue/MPSCBlockingConsumerQueueBenchmark.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@
2020

2121
/*
2222
Benchmark (capacity) Mode Cnt Score Error Units
23-
MPSCBlockingConsumerQueueBenchmark.queueTest 1024 thrpt 121.534 ops/us
24-
MPSCBlockingConsumerQueueBenchmark.queueTest:async 1024 thrpt NaN ---
25-
MPSCBlockingConsumerQueueBenchmark.queueTest:consume 1024 thrpt 110.962 ops/us
26-
MPSCBlockingConsumerQueueBenchmark.queueTest:produce 1024 thrpt 10.572 ops/us
27-
MPSCBlockingConsumerQueueBenchmark.queueTest 65536 thrpt 126.856 ops/us
28-
MPSCBlockingConsumerQueueBenchmark.queueTest:async 65536 thrpt NaN ---
29-
MPSCBlockingConsumerQueueBenchmark.queueTest:consume 65536 thrpt 113.213 ops/us
30-
MPSCBlockingConsumerQueueBenchmark.queueTest:produce 65536 thrpt 13.644 ops/us
23+
MPSCBlockingConsumerQueueBenchmark.queueTest 1024 thrpt 237.384 ops/us
24+
MPSCBlockingConsumerQueueBenchmark.queueTest:consume 1024 thrpt 225.826 ops/us
25+
MPSCBlockingConsumerQueueBenchmark.queueTest:produce 1024 thrpt 11.558 ops/us
26+
MPSCBlockingConsumerQueueBenchmark.queueTest 65536 thrpt 120.258 ops/us
27+
MPSCBlockingConsumerQueueBenchmark.queueTest:consume 65536 thrpt 112.679 ops/us
28+
MPSCBlockingConsumerQueueBenchmark.queueTest:produce 65536 thrpt 7.579 ops/us
3129
*/
3230
@BenchmarkMode(Mode.Throughput)
3331
@Warmup(iterations = 1, time = 30)

utils/queue-utils/src/jmh/java/datadog/common/queue/MPSCQueueBenchmark.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@
1919

2020
/*
2121
Benchmark (capacity) Mode Cnt Score Error Units
22-
MPSCQueueBenchmark.queueTest 1024 thrpt 272.751 ops/us
23-
MPSCQueueBenchmark.queueTest:consume 1024 thrpt 258.737 ops/us
24-
MPSCQueueBenchmark.queueTest:produce 1024 thrpt 14.013 ops/us
25-
MPSCQueueBenchmark.queueTest:·async 1024 thrpt NaN ---
26-
MPSCQueueBenchmark.queueTest 65536 thrpt 120.776 ops/us
27-
MPSCQueueBenchmark.queueTest:consume 65536 thrpt 108.595 ops/us
28-
MPSCQueueBenchmark.queueTest:produce 65536 thrpt 12.182 ops/us
22+
MPSCQueueBenchmark.queueTest 1024 thrpt 1229.501 ops/us
23+
MPSCQueueBenchmark.queueTest:consume 1024 thrpt 503.469 ops/us
24+
MPSCQueueBenchmark.queueTest:produce 1024 thrpt 726.033 ops/us
25+
MPSCQueueBenchmark.queueTest 65536 thrpt 136.218 ops/us
26+
MPSCQueueBenchmark.queueTest:consume 65536 thrpt 122.937 ops/us
27+
MPSCQueueBenchmark.queueTest:produce 65536 thrpt 13.281 ops/us
2928
*/
3029
@BenchmarkMode(Mode.Throughput)
3130
@Warmup(iterations = 1, time = 30)

utils/queue-utils/src/jmh/java/datadog/common/queue/SPSCQueueBenchmark.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919

2020
/*
2121
Benchmark (capacity) Mode Cnt Score Error Units
22-
SPSCQueueBenchmark.queueTest 1024 thrpt 91.112 ops/us
23-
SPSCQueueBenchmark.queueTest:consume 1024 thrpt 52.640 ops/us
24-
SPSCQueueBenchmark.queueTest:produce 1024 thrpt 38.472 ops/us
25-
SPSCQueueBenchmark.queueTest 65536 thrpt 140.663 ops/us
26-
SPSCQueueBenchmark.queueTest:consume 65536 thrpt 70.363 ops/us
27-
SPSCQueueBenchmark.queueTest:produce 65536 thrpt 70.300 ops/us
22+
SPSCQueueBenchmark.queueTest 1024 thrpt 115.861 ops/us
23+
SPSCQueueBenchmark.queueTest:consume 1024 thrpt 83.922 ops/us
24+
SPSCQueueBenchmark.queueTest:produce 1024 thrpt 31.939 ops/us
25+
SPSCQueueBenchmark.queueTest 65536 thrpt 543,237 ops/us
26+
SPSCQueueBenchmark.queueTest:consume 65536 thrpt 280,208 ops/us
27+
SPSCQueueBenchmark.queueTest:produce 65536 thrpt 263,029 ops/us
2828
*/
2929
@BenchmarkMode(Mode.Throughput)
30-
@Warmup(iterations = 1, time = 30)
30+
@Warmup(iterations = 3, time = 10)
3131
@Measurement(iterations = 1, time = 30)
3232
@Fork(1)
3333
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@@ -49,13 +49,8 @@ public void setup() {
4949
@Benchmark
5050
@Group("queueTest")
5151
@GroupThreads(1)
52-
public void produce(QueueState state) {
53-
54-
// bounded attempt: try once, then yield if full
55-
boolean offered = state.queue.offer(0);
56-
if (!offered) {
57-
Thread.yield();
58-
}
52+
public void produce(QueueState state, Blackhole bh) {
53+
bh.consume(state.queue.offer(0));
5954
}
6055

6156
@Benchmark

utils/queue-utils/src/main/java/datadog/common/queue/BaseQueue.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import static datadog.trace.util.BitUtils.nextPowerOfTwo;
44

5+
import java.lang.invoke.MethodHandles;
6+
import java.lang.invoke.VarHandle;
57
import java.util.AbstractQueue;
68
import java.util.Iterator;
79
import java.util.function.Consumer;
@@ -14,15 +16,52 @@
1416
* @param <E> the type of elements held by this queue
1517
*/
1618
abstract class BaseQueue<E> extends AbstractQueue<E> implements NonBlockingQueue<E> {
19+
protected static final VarHandle HEAD_HANDLE;
20+
protected static final VarHandle TAIL_HANDLE;
21+
protected static final VarHandle ARRAY_HANDLE;
22+
23+
static {
24+
try {
25+
final MethodHandles.Lookup lookup = MethodHandles.lookup();
26+
HEAD_HANDLE = lookup.findVarHandle(BaseQueue.class, "head", long.class);
27+
TAIL_HANDLE = lookup.findVarHandle(BaseQueue.class, "tail", long.class);
28+
ARRAY_HANDLE = MethodHandles.arrayElementVarHandle(Object[].class);
29+
} catch (ReflectiveOperationException e) {
30+
throw new ExceptionInInitializerError(e);
31+
}
32+
}
33+
1734
/** The capacity of the queue (must be a power of two) */
1835
protected final int capacity;
1936

2037
/** Mask for fast modulo operation (index = pos & mask) */
2138
protected final int mask;
2239

40+
/** The backing array (plain Java array for VarHandle access) */
41+
protected final Object[] buffer;
42+
43+
// Padding to avoid false sharing
44+
@SuppressWarnings("unused")
45+
private long p0, p1, p2, p3, p4, p5, p6;
46+
47+
/** Next free slot for producer (single-threaded) */
48+
protected volatile long tail = 0L;
49+
50+
// Padding around tail
51+
@SuppressWarnings("unused")
52+
private long q0, q1, q2, q3, q4, q5, q6;
53+
54+
/** Next slot to consume (multi-threaded) */
55+
protected volatile long head = 0L;
56+
57+
// Padding around head
58+
@SuppressWarnings("unused")
59+
private long r0, r1, r2, r3, r4, r5, r6;
60+
2361
public BaseQueue(int capacity) {
2462
this.capacity = nextPowerOfTwo(capacity);
2563
this.mask = this.capacity - 1;
64+
this.buffer = new Object[capacity];
2665
}
2766

2867
@Override
@@ -69,17 +108,24 @@ public int fill(@Nonnull Supplier<? extends E> supplier, int limit) {
69108
* @throws UnsupportedOperationException always
70109
*/
71110
@Override
72-
public Iterator<E> iterator() {
111+
public final Iterator<E> iterator() {
73112
throw new UnsupportedOperationException();
74113
}
75114

76115
@Override
77-
public int remainingCapacity() {
116+
public final int remainingCapacity() {
78117
return capacity - size();
79118
}
80119

81120
@Override
82-
public int capacity() {
121+
public final int capacity() {
83122
return capacity;
84123
}
124+
125+
@Override
126+
public final int size() {
127+
long currentTail = (long) TAIL_HANDLE.getVolatile(this);
128+
long currentHead = (long) HEAD_HANDLE.getVolatile(this);
129+
return (int) (currentTail - currentHead);
130+
}
85131
}

0 commit comments

Comments
 (0)