Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dd-java-agent/agent-llmobs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ dependencies {
implementation project(':communication')
implementation project(':components:json')
implementation project(':internal-api')
implementation project(':utils:queue-utils')


testImplementation project(':dd-java-agent:testing')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import datadog.common.queue.BlockingConsumerNonBlockingQueue;
import datadog.common.queue.Queues;
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.communication.http.HttpRetryPolicy;
Expand All @@ -20,7 +22,6 @@
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,7 +35,7 @@ public class EvalProcessingWorker implements AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class);

private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
private final BlockingConsumerNonBlockingQueue<LLMObsEval> queue;
private final Thread serializerThread;

public EvalProcessingWorker(
Expand All @@ -43,7 +44,7 @@ public EvalProcessingWorker(
final TimeUnit timeUnit,
final SharedCommunicationObjects sco,
Config config) {
this.queue = new MpscBlockingConsumerArrayQueue<>(capacity);
this.queue = Queues.mpscBlockingConsumerArrayQueue(capacity);

boolean isAgentless = config.isLlmObsAgentlessEnabled();
if (isAgentless && (config.getApiKey() == null || config.getApiKey().isEmpty())) {
Expand Down Expand Up @@ -98,7 +99,7 @@ public static class EvalSerializingHandler implements Runnable {
private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class);
private static final int FLUSH_THRESHOLD = 50;

private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
private final BlockingConsumerNonBlockingQueue<LLMObsEval> queue;
private final long ticksRequiredToFlush;
private long lastTicks;

Expand All @@ -111,7 +112,7 @@ public static class EvalSerializingHandler implements Runnable {
private final List<LLMObsEval> buffer = new ArrayList<>();

public EvalSerializingHandler(
final MpscBlockingConsumerArrayQueue<LLMObsEval> queue,
final BlockingConsumerNonBlockingQueue<LLMObsEval> queue,
final long flushInterval,
final TimeUnit timeUnit,
final HttpUrl submissionUrl,
Expand Down
3 changes: 3 additions & 0 deletions dd-java-agent/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ dependencies {
sharedShadowInclude project(':utils:socket-utils'), {
transitive = false
}
sharedShadowInclude project(':utils:queue-utils'), {
transitive = false
}
sharedShadowInclude project(':utils:version-utils'), {
transitive = false
}
Expand Down
1 change: 1 addition & 0 deletions dd-trace-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dependencies {
implementation project(':components:json')
implementation project(':utils:container-utils')
implementation project(':utils:socket-utils')
implementation project(':utils:queue-utils')
// for span exception debugging
compileOnly project(':dd-java-agent:agent-debugger:debugger-bootstrap')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import datadog.common.queue.NonBlockingQueue;
import datadog.trace.common.metrics.SignalItem.StopSignal;
import datadog.trace.core.util.LRUCache;
import java.util.Iterator;
Expand All @@ -10,8 +11,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscCompoundQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,7 +22,7 @@ final class Aggregator implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Aggregator.class);

private final Queue<Batch> batchPool;
private final MpscCompoundQueue<InboxItem> inbox;
private final NonBlockingQueue<InboxItem> inbox;
private final LRUCache<MetricKey, AggregateMetric> aggregates;
private final ConcurrentMap<MetricKey, Batch> pending;
private final Set<MetricKey> commonKeys;
Expand All @@ -39,7 +39,7 @@ final class Aggregator implements Runnable {
Aggregator(
MetricWriter writer,
Queue<Batch> batchPool,
MpscCompoundQueue<InboxItem> inbox,
NonBlockingQueue<InboxItem> inbox,
ConcurrentMap<MetricKey, Batch> pending,
final Set<MetricKey> commonKeys,
int maxAggregates,
Expand All @@ -60,7 +60,7 @@ final class Aggregator implements Runnable {
Aggregator(
MetricWriter writer,
Queue<Batch> batchPool,
MpscCompoundQueue<InboxItem> inbox,
NonBlockingQueue<InboxItem> inbox,
ConcurrentMap<MetricKey, Batch> pending,
final Set<MetricKey> commonKeys,
int maxAggregates,
Expand Down Expand Up @@ -103,7 +103,7 @@ public void run() {
log.debug("metrics aggregator exited");
}

private final class Drainer implements MessagePassingQueue.Consumer<InboxItem> {
private final class Drainer implements Consumer<InboxItem> {

boolean stopped = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static java.util.Collections.unmodifiableSet;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.common.queue.NonBlockingQueue;
import datadog.common.queue.Queues;
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
Expand Down Expand Up @@ -46,8 +48,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.jctools.queues.MpscCompoundQueue;
import org.jctools.queues.SpmcArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,7 +93,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private final ConcurrentHashMap<MetricKey, Batch> pending;
private final ConcurrentHashMap<MetricKey, MetricKey> keys;
private final Thread thread;
private final MpscCompoundQueue<InboxItem> inbox;
private final NonBlockingQueue<InboxItem> inbox;
private final Sink sink;
private final Aggregator aggregator;
private final long reportingInterval;
Expand Down Expand Up @@ -176,8 +176,8 @@ public ConflatingMetricsAggregator(
long reportingInterval,
TimeUnit timeUnit) {
this.ignoredResources = ignoredResources;
this.inbox = new MpscCompoundQueue<>(queueSize);
this.batchPool = new SpmcArrayQueue<>(maxAggregates);
this.inbox = Queues.mpscArrayQueue(queueSize);
this.batchPool = Queues.spmcArrayQueue(maxAggregates);
this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3);
this.keys = new ConcurrentHashMap<>();
this.features = features;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import static datadog.trace.common.metrics.EventListener.EventType.OK;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.common.queue.NonBlockingQueue;
import datadog.common.queue.Queues;
import datadog.trace.util.AgentTaskScheduler;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -23,7 +25,6 @@
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,7 +37,7 @@ public final class OkHttpSink implements Sink, EventListener {
private final OkHttpClient client;
private final HttpUrl metricsUrl;
private final List<EventListener> listeners;
private final SpscArrayQueue<Request> enqueuedRequests = new SpscArrayQueue<>(10);
private final NonBlockingQueue<Request> enqueuedRequests = Queues.spscArrayQueue(16);
private final AtomicLong lastRequestTime = new AtomicLong();
private final AtomicLong asyncRequestCounter = new AtomicLong();
private final boolean bufferingEnabled;
Expand Down Expand Up @@ -157,9 +158,9 @@ private void handleFailure(okhttp3.Response response) throws IOException {

private static final class Sender implements AgentTaskScheduler.Task<OkHttpSink> {

private final SpscArrayQueue<Request> inbox;
private final NonBlockingQueue<Request> inbox;

private Sender(SpscArrayQueue<Request> inbox) {
private Sender(NonBlockingQueue<Request> inbox) {
this.inbox = inbox;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import datadog.common.queue.BlockingConsumerNonBlockingQueue;
import datadog.common.queue.NonBlockingQueue;
import datadog.common.queue.Queues;
import datadog.communication.ddagent.DroppingPolicy;
import datadog.trace.common.sampling.SingleSpanSampler;
import datadog.trace.core.DDSpan;
import datadog.trace.core.monitor.HealthMetrics;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -45,7 +46,7 @@ class DefaultSpanSamplingWorker implements SpanSamplingWorker {

private final Thread spanSamplingThread;
private final SamplingHandler samplingHandler;
private final MpscBlockingConsumerArrayQueue<Object> spanSamplingQueue;
private final BlockingConsumerNonBlockingQueue<Object> spanSamplingQueue;
private final Queue<Object> primaryQueue;
private final Queue<Object> secondaryQueue;
private final SingleSpanSampler singleSpanSampler;
Expand All @@ -62,7 +63,7 @@ protected DefaultSpanSamplingWorker(
DroppingPolicy droppingPolicy) {
this.samplingHandler = new SamplingHandler();
this.spanSamplingThread = newAgentThread(SPAN_SAMPLING_PROCESSOR, samplingHandler);
this.spanSamplingQueue = new MpscBlockingConsumerArrayQueue<>(capacity);
this.spanSamplingQueue = Queues.mpscBlockingConsumerArrayQueue(capacity);
this.primaryQueue = primaryQueue;
this.secondaryQueue = secondaryQueue;
this.singleSpanSampler = singleSpanSampler;
Expand Down Expand Up @@ -172,7 +173,7 @@ public void onEvent(Object event) {
}
}

private void consumeBatch(MessagePassingQueue<Object> queue) {
private void consumeBatch(NonBlockingQueue<Object> queue) {
queue.drain(this::onEvent, queue.size());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import datadog.common.queue.BlockingConsumerNonBlockingQueue;
import datadog.common.queue.NonBlockingQueue;
import datadog.common.queue.Queues;
import datadog.communication.ddagent.DroppingPolicy;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor;
Expand All @@ -19,8 +22,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,8 +37,8 @@ public class TraceProcessingWorker implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(TraceProcessingWorker.class);

private final PrioritizationStrategy prioritizationStrategy;
private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
private final BlockingConsumerNonBlockingQueue<Object> primaryQueue;
private final BlockingConsumerNonBlockingQueue<Object> secondaryQueue;
private final TraceSerializingHandler serializingHandler;
private final Thread serializerThread;
private final int capacity;
Expand Down Expand Up @@ -121,23 +122,23 @@ public long getRemainingCapacity() {
return primaryQueue.remainingCapacity();
}

private static MpscBlockingConsumerArrayQueue<Object> createQueue(int capacity) {
return new MpscBlockingConsumerArrayQueue<>(capacity);
private static BlockingConsumerNonBlockingQueue<Object> createQueue(int capacity) {
return Queues.mpscBlockingConsumerArrayQueue(capacity);
}

public static class TraceSerializingHandler implements Runnable {

private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
private final BlockingConsumerNonBlockingQueue<Object> primaryQueue;
private final BlockingConsumerNonBlockingQueue<Object> secondaryQueue;
private final HealthMetrics healthMetrics;
private final long ticksRequiredToFlush;
private final boolean doTimeFlush;
private final PayloadDispatcher payloadDispatcher;
private long lastTicks;

public TraceSerializingHandler(
final MpscBlockingConsumerArrayQueue<Object> primaryQueue,
final MpscBlockingConsumerArrayQueue<Object> secondaryQueue,
final BlockingConsumerNonBlockingQueue<Object> primaryQueue,
final BlockingConsumerNonBlockingQueue<Object> secondaryQueue,
final HealthMetrics healthMetrics,
final PayloadDispatcher payloadDispatcher,
final long flushInterval,
Expand Down Expand Up @@ -238,7 +239,7 @@ private boolean shouldFlush() {
return false;
}

private void consumeBatch(MessagePassingQueue<Object> queue) {
private void consumeBatch(NonBlockingQueue<Object> queue) {
queue.drain(this::onEvent, queue.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.Comparator.comparingLong;

import datadog.common.queue.BlockingConsumerNonBlockingQueue;
import datadog.common.queue.Queues;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.flare.TracerFlare;
Expand All @@ -18,10 +20,10 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.zip.ZipOutputStream;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -61,7 +63,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private static final CommandElement DUMP_ELEMENT = new CommandElement();
private static final CommandElement STAND_IN_ELEMENT = new CommandElement();

private final MpscBlockingConsumerArrayQueue<Element> queue;
private final BlockingConsumerNonBlockingQueue<Element> queue;
private final Thread worker;
private final TimeSource timeSource;

Expand Down Expand Up @@ -136,7 +138,7 @@ public void flush() {
}
}

private static final class WriteDrain implements MessagePassingQueue.Consumer<Element> {
private static final class WriteDrain implements Consumer<Element> {
private static final WriteDrain WRITE_DRAIN = new WriteDrain();

@Override
Expand All @@ -145,8 +147,7 @@ public void accept(Element pendingTrace) {
}
}

private static final class DumpDrain
implements MessagePassingQueue.Consumer<Element>, MessagePassingQueue.Supplier<Element> {
private static final class DumpDrain implements Consumer<Element>, Supplier<Element> {
private static final Logger LOGGER = LoggerFactory.getLogger(DumpDrain.class);
private static final DumpDrain DUMP_DRAIN = new DumpDrain();
private static final int MAX_DUMPED_TRACES = 50;
Expand Down Expand Up @@ -292,7 +293,7 @@ public DelayingPendingTraceBuffer(
Config config,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetrics) {
this.queue = new MpscBlockingConsumerArrayQueue<>(bufferSize);
this.queue = Queues.mpscBlockingConsumerArrayQueue(bufferSize);
this.worker = newAgentThread(TRACE_MONITOR, new Worker());
this.timeSource = timeSource;
boolean runningSpansEnabled = config.isLongRunningTraceEnabled();
Expand Down
Loading
Loading