Skip to content

Commit c1d1604

Browse files
put custom tags into a single defined attribute
1 parent 0614d73 commit c1d1604

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Properties;
3737
import java.util.UUID;
3838
import java.util.function.Consumer;
39+
import java.util.stream.Collectors;
3940
import org.apache.spark.ExceptionFailure;
4041
import org.apache.spark.SparkConf;
4142
import org.apache.spark.TaskFailedReason;
@@ -244,10 +245,18 @@ private void initApplicationSpanIfNotInitialized() {
244245
AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null);
245246

246247
if (applicationStart != null) {
248+
String customTags =
249+
Config.get().getGlobalTags().entrySet().stream()
250+
.map(e -> e.getKey() + ":" + e.getValue())
251+
.collect(Collectors.joining(","));
252+
Config.get().getGlobalTags().entrySet().stream()
253+
.forEach(e -> builder.withTag("dd.tags." + e.getKey(), e.getValue()));
254+
247255
builder
248256
.withStartTimestamp(applicationStart.time() * 1000)
249257
.withTag("application_name", applicationStart.appName())
250-
.withTag("spark_user", applicationStart.sparkUser());
258+
.withTag("spark_user", applicationStart.sparkUser())
259+
.withTag("run_group_by", customTags);
251260

252261
if (applicationStart.appAttemptId().isDefined()) {
253262
builder.withTag("app_attempt_id", applicationStart.appAttemptId().get());

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,28 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
588588
.contains("_dd.ol_intake.process_tags:" + ProcessTags.getTagsForSerialization())
589589
}
590590

591+
def "test all custom tags get propagated into a single attributes of application span"() {
592+
setup:
593+
def customTags = "tagKey1:tagKeyValue1,tagKey2:tagKeyValue2"
594+
injectSysConfig("dd.tags", customTags)
595+
def listener = getTestDatadogSparkListener()
596+
listener.onApplicationStart(applicationStartEvent(1000L))
597+
listener.onApplicationEnd(new SparkListenerApplicationEnd(5000L))
598+
599+
expect:
600+
assertTraces(1) {
601+
trace(1) {
602+
span {
603+
operationName "spark.application"
604+
spanType "spark"
605+
assert span.tags["dd.tags.tagKey1"] == "tagKeyValue1"
606+
assert span.tags["dd.tags.tagKey2"] == "tagKeyValue2"
607+
assert span.tags["run_group_by"] == customTags
608+
}
609+
}
610+
}
611+
}
612+
591613
def "test setupOpenLineage fills circuit breaker config"(
592614
Boolean configEnabled,
593615
String sparkConfCircuitBreakerType,

0 commit comments

Comments
 (0)