diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 01e37f38c45..19d66ebe2ae 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -36,6 +36,7 @@ import java.util.Properties; import java.util.UUID; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.spark.ExceptionFailure; import org.apache.spark.SparkConf; import org.apache.spark.TaskFailedReason; @@ -244,10 +245,18 @@ private void initApplicationSpanIfNotInitialized() { AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null); if (applicationStart != null) { + String customTags = + Config.get().getGlobalTags().entrySet().stream() + .map(e -> e.getKey() + ":" + e.getValue()) + .collect(Collectors.joining(",")); + Config.get().getGlobalTags().entrySet().stream() + .forEach(e -> builder.withTag("dd.tags." + e.getKey(), e.getValue())); + builder .withStartTimestamp(applicationStart.time() * 1000) .withTag("application_name", applicationStart.appName()) - .withTag("spark_user", applicationStart.sparkUser()); + .withTag("spark_user", applicationStart.sparkUser()) + .withTag("run_group_by", customTags); if (applicationStart.appAttemptId().isDefined()) { builder.withTag("app_attempt_id", applicationStart.appAttemptId().get()); diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index e5d9aafad53..ad099a0a72d 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -588,6 +588,28 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification { .contains("_dd.ol_intake.process_tags:" + ProcessTags.getTagsForSerialization()) } + def "test all custom tags get propagated into a single attributes of application span"() { + setup: + def customTags = "tagKey1:tagKeyValue1,tagKey2:tagKeyValue2" + injectSysConfig("dd.tags", customTags) + def listener = getTestDatadogSparkListener() + listener.onApplicationStart(applicationStartEvent(1000L)) + listener.onApplicationEnd(new SparkListenerApplicationEnd(5000L)) + + expect: + assertTraces(1) { + trace(1) { + span { + operationName "spark.application" + spanType "spark" + assert span.tags["dd.tags.tagKey1"] == "tagKeyValue1" + assert span.tags["dd.tags.tagKey2"] == "tagKeyValue2" + assert span.tags["run_group_by"] == customTags + } + } + } + } + def "test setupOpenLineage fills circuit breaker config"( Boolean configEnabled, String sparkConfCircuitBreakerType,