3
3
import static datadog .trace .util .AgentThreadFactory .AgentThread .TRACE_MONITOR ;
4
4
import static datadog .trace .util .AgentThreadFactory .THREAD_JOIN_TIMOUT_MS ;
5
5
import static datadog .trace .util .AgentThreadFactory .newAgentThread ;
6
+ import static java .util .Comparator .comparingLong ;
6
7
7
8
import datadog .communication .ddagent .SharedCommunicationObjects ;
8
9
import datadog .trace .api .Config ;
10
+ import datadog .trace .api .flare .TracerFlare ;
9
11
import datadog .trace .api .time .TimeSource ;
12
+ import datadog .trace .common .writer .TraceDumpJsonExporter ;
10
13
import datadog .trace .core .monitor .HealthMetrics ;
14
+ import java .io .IOException ;
15
+ import java .util .ArrayList ;
16
+ import java .util .Comparator ;
17
+ import java .util .List ;
11
18
import java .util .concurrent .TimeUnit ;
12
19
import java .util .concurrent .atomic .AtomicInteger ;
20
+ import java .util .function .Predicate ;
21
+ import java .util .zip .ZipOutputStream ;
13
22
import org .jctools .queues .MessagePassingQueue ;
14
23
import org .jctools .queues .MpscBlockingConsumerArrayQueue ;
15
24
import org .slf4j .Logger ;
@@ -47,13 +56,16 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
47
56
private static final long FORCE_SEND_DELAY_MS = TimeUnit .SECONDS .toMillis (5 );
48
57
private static final long SEND_DELAY_NS = TimeUnit .MILLISECONDS .toNanos (500 );
49
58
private static final long SLEEP_TIME_MS = 100 ;
59
+ private static final CommandElement FLUSH_ELEMENT = new CommandElement ();
60
+ private static final CommandElement DUMP_ELEMENT = new CommandElement ();
50
61
51
62
private final MpscBlockingConsumerArrayQueue <Element > queue ;
52
63
private final Thread worker ;
53
64
private final TimeSource timeSource ;
54
65
55
66
private volatile boolean closed = false ;
56
67
private final AtomicInteger flushCounter = new AtomicInteger (0 );
68
+ private final AtomicInteger dumpCounter = new AtomicInteger (0 );
57
69
58
70
private final LongRunningTracesTracker runningTracesTracker ;
59
71
@@ -78,6 +90,7 @@ public void enqueue(Element pendingTrace) {
78
90
79
91
@ Override
80
92
public void start () {
93
+ TracerFlare .addReporter (new TracerDump (this ));
81
94
worker .start ();
82
95
}
83
96
@@ -108,10 +121,10 @@ public void flush() {
108
121
if (worker .isAlive ()) {
109
122
int count = flushCounter .get ();
110
123
int loop = 1 ;
111
- boolean signaled = queue .offer (FlushElement . FLUSH_ELEMENT );
124
+ boolean signaled = queue .offer (FLUSH_ELEMENT );
112
125
while (!closed && !signaled ) {
113
126
yieldOrSleep (loop ++);
114
- signaled = queue .offer (FlushElement . FLUSH_ELEMENT );
127
+ signaled = queue .offer (FLUSH_ELEMENT );
115
128
}
116
129
int newCount = flushCounter .get ();
117
130
while (!closed && count >= newCount ) {
@@ -130,9 +143,44 @@ public void accept(Element pendingTrace) {
130
143
}
131
144
}
132
145
133
- private static final class FlushElement implements Element {
134
- static FlushElement FLUSH_ELEMENT = new FlushElement ();
146
+ private static final class DumpDrain
147
+ implements MessagePassingQueue .Consumer <Element >, MessagePassingQueue .Supplier <Element > {
148
+ private static final DumpDrain DUMP_DRAIN = new DumpDrain ();
149
+ private static final int MAX_DUMPED_TRACES = 50 ;
135
150
151
+ private static final Comparator <Element > TRACE_BY_START_TIME =
152
+ comparingLong (trace -> trace .getRootSpan ().getStartTime ());
153
+ private static final Predicate <Element > NOT_PENDING_TRACE =
154
+ element -> !(element instanceof PendingTrace );
155
+
156
+ private volatile List <Element > data = new ArrayList <>();
157
+ private int index = 0 ;
158
+
159
+ @ Override
160
+ public void accept (Element pendingTrace ) {
161
+ data .add (pendingTrace );
162
+ }
163
+
164
+ @ Override
165
+ public Element get () {
166
+ if (index < data .size ()) {
167
+ return data .get (index ++);
168
+ }
169
+ return null ; // Should never reach here or else queue may break according to
170
+ // MessagePassingQueue docs
171
+ }
172
+
173
+ public List <Element > collectTraces () {
174
+ List <Element > traces = data ;
175
+ data = new ArrayList <>();
176
+ traces .removeIf (NOT_PENDING_TRACE );
177
+ // Storing oldest traces first
178
+ traces .sort (TRACE_BY_START_TIME );
179
+ return traces ;
180
+ }
181
+ }
182
+
183
+ private static final class CommandElement implements Element {
136
184
@ Override
137
185
public long oldestFinishedTime () {
138
186
return 0 ;
@@ -180,13 +228,21 @@ public void run() {
180
228
pendingTrace = queue .take (); // block until available;
181
229
}
182
230
183
- if (pendingTrace instanceof FlushElement ) {
231
+ if (pendingTrace == FLUSH_ELEMENT ) {
184
232
// Since this is an MPSC queue, the drain needs to be called on the consumer thread
185
233
queue .drain (WriteDrain .WRITE_DRAIN );
186
234
flushCounter .incrementAndGet ();
187
235
continue ;
188
236
}
189
237
238
+ if (pendingTrace == DUMP_ELEMENT ) {
239
+ queue .fill (
240
+ DumpDrain .DUMP_DRAIN ,
241
+ queue .drain (DumpDrain .DUMP_DRAIN , DumpDrain .MAX_DUMPED_TRACES ));
242
+ dumpCounter .incrementAndGet ();
243
+ continue ;
244
+ }
245
+
190
246
// The element is no longer in the queue
191
247
pendingTrace .setEnqueued (false );
192
248
@@ -208,7 +264,7 @@ public void run() {
208
264
// Trace has been unmodified long enough, go ahead and write whatever is finished.
209
265
pendingTrace .write ();
210
266
} else {
211
- // Trace is too new. Requeue it and sleep to avoid a hot loop.
267
+ // Trace is too new. Requeue it and sleep to avoid a hot loop.
212
268
enqueue (pendingTrace );
213
269
Thread .sleep (SLEEP_TIME_MS );
214
270
}
@@ -277,4 +333,42 @@ public static PendingTraceBuffer discarding() {
277
333
public abstract void flush ();
278
334
279
335
public abstract void enqueue (Element pendingTrace );
336
+
337
+ private static class TracerDump implements TracerFlare .Reporter {
338
+ private final DelayingPendingTraceBuffer buffer ;
339
+
340
+ private TracerDump (DelayingPendingTraceBuffer buffer ) {
341
+ this .buffer = buffer ;
342
+ }
343
+
344
+ @ Override
345
+ public void prepareForFlare () {
346
+ if (buffer .worker .isAlive ()) {
347
+ int count = buffer .dumpCounter .get ();
348
+ int loop = 1 ;
349
+ boolean signaled = buffer .queue .offer (DelayingPendingTraceBuffer .DUMP_ELEMENT );
350
+ while (!buffer .closed && !signaled ) {
351
+ buffer .yieldOrSleep (loop ++);
352
+ signaled = buffer .queue .offer (DelayingPendingTraceBuffer .DUMP_ELEMENT );
353
+ }
354
+ int newCount = buffer .dumpCounter .get ();
355
+ while (!buffer .closed && count >= newCount ) {
356
+ buffer .yieldOrSleep (loop ++);
357
+ newCount = buffer .dumpCounter .get ();
358
+ }
359
+ }
360
+ }
361
+
362
+ @ Override
363
+ public void addReportToFlare (ZipOutputStream zip ) throws IOException {
364
+ TraceDumpJsonExporter writer = new TraceDumpJsonExporter (zip );
365
+ for (Element e : DelayingPendingTraceBuffer .DumpDrain .DUMP_DRAIN .collectTraces ()) {
366
+ if (e instanceof PendingTrace ) {
367
+ PendingTrace trace = (PendingTrace ) e ;
368
+ writer .write (trace .getSpans ());
369
+ }
370
+ }
371
+ writer .flush ();
372
+ }
373
+ }
280
374
}
0 commit comments