diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 98a08a6ef80..bd8933216e7 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -76,6 +76,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final int MAX_ACCUMULATOR_SIZE = 50000; private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags."; private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage"; + private static final int OL_CIRCUIT_BREAKER_TIMEOUT_IN_SECONDS = 60; public volatile SparkListenerInterface openLineageSparkListener = null; public volatile SparkConf openLineageSparkConf = null; @@ -186,6 +187,7 @@ public void setupOpenLineage(DDTraceId traceId) { + ProcessTags.getTagsForSerialization() + ";_dd.ol_app_id:" + appId); + setupOpenLineageCircuitBreaker(); return; } log.debug( @@ -1323,6 +1325,29 @@ private static String getServiceForOpenLineage(SparkConf conf, boolean isRunning return sparkAppName; } + private void setupOpenLineageCircuitBreaker() { + if (!Config.get().isDataJobsOpenLineageTimeoutEnabled()) { + log.debug("OpenLineage timeout circuit breaker is not enabled"); + return; + } + if (classIsLoadable("io.openlineage.client.circuitBreaker.TimeoutCircuitBreaker")) { + log.debug( + "OpenLineage version without timeout circuit breaker. Probably OL version < 1.35.0"); + return; + } + if (openLineageSparkConf.contains("spark.openlineage.circuitBreaker.type")) { + log.debug( + "Other OpenLineage circuit breaker already configured: {}", + openLineageSparkConf.get("spark.openlineage.circuitBreaker.type")); + return; + } + + openLineageSparkConf.set("spark.openlineage.circuitBreaker.type", "timeout"); + openLineageSparkConf.set( + "spark.openlineage.circuitBreaker.timeoutInSeconds", + String.valueOf(OL_CIRCUIT_BREAKER_TIMEOUT_IN_SECONDS)); + } + private static void reportKafkaOffsets( final String appName, final AgentSpan span, final SourceProgress progress) { if (!traceConfig().isDataStreamsEnabled() diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index 67dc40477ce..9cc2cfa7bdc 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -568,6 +568,36 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification { .contains("_dd.ol_intake.process_tags:" + ProcessTags.getTagsForSerialization()) } + def "test setupOpenLineage fills circuit breaker config"( + Boolean configEnabled, + String sparkConfCircuitBreakerType, + String expectedCircuitBreakerType + ) { + setup: + injectSysConfig("data.jobs.openlineage.timeout.enabled", configEnabled.toString()) + def listener = getTestDatadogSparkListener() + listener.openLineageSparkListener = Mock(SparkListenerInterface) + listener.openLineageSparkConf = new SparkConf() + if (sparkConfCircuitBreakerType != null) { + listener.openLineageSparkConf.set("spark.openlineage.circuitBreaker.type", sparkConfCircuitBreakerType) + } + listener.setupOpenLineage(Mock(DDTraceId)) + + expect: + assert listener + .openLineageSparkConf + .getOption("spark.openlineage.circuitBreaker.type") == Option.apply(expectedCircuitBreakerType) + assert listener + .openLineageSparkConf + .getOption("spark.openlineage.circuitBreaker.timeoutInSeconds") == ((expectedCircuitBreakerType == "timeout") ? Option.apply("60") : Option.apply(null)) + + where: + configEnabled | sparkConfCircuitBreakerType | expectedCircuitBreakerType + true | null | "timeout" + true | "other" | "other" + false | null | null + } + protected validateRelativeError(double value, double expected, double relativeAccuracy) { double relativeError = Math.abs(value - expected) / expected assert relativeError < relativeAccuracy diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index ad94c6bed52..289409fd2c8 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -228,6 +228,7 @@ public final class ConfigDefaults { static final boolean DEFAULT_DATA_JOBS_ENABLED = false; static final boolean DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED = false; + static final boolean DEFAULT_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED = true; static final boolean DEFAULT_DATA_STREAMS_ENABLED = false; static final int DEFAULT_DATA_STREAMS_BUCKET_DURATION = 10; // seconds diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index 43c72885ef8..0a68ea849f9 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -77,6 +77,8 @@ public final class GeneralConfig { public static final String DATA_JOBS_ENABLED = "data.jobs.enabled"; public static final String DATA_JOBS_COMMAND_PATTERN = "data.jobs.command.pattern"; public static final String DATA_JOBS_OPENLINEAGE_ENABLED = "data.jobs.openlineage.enabled"; + public static final String DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED = + "data.jobs.openlineage.timeout.enabled"; public static final String DATA_STREAMS_ENABLED = "data.streams.enabled"; public static final String DATA_STREAMS_BUCKET_DURATION_SECONDS = diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index bb4e4f65d12..5262031fb90 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -47,6 +47,7 @@ import static datadog.trace.api.ConfigDefaults.DEFAULT_CWS_TLS_REFRESH; import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_JOBS_ENABLED; import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED; +import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED; import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_STREAMS_BUCKET_DURATION; import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_STREAMS_ENABLED; import static datadog.trace.api.ConfigDefaults.DEFAULT_DB_CLIENT_HOST_SPLIT_BY_HOST; @@ -330,6 +331,7 @@ import static datadog.trace.api.config.GeneralConfig.AZURE_APP_SERVICES; import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_ENABLED; import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_OPENLINEAGE_ENABLED; +import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED; import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_BUCKET_DURATION_SECONDS; import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_ENABLED; import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_ARGS; @@ -1162,6 +1164,7 @@ public static String getHostName() { private final boolean dataJobsEnabled; private final boolean dataJobsOpenLineageEnabled; + private final boolean dataJobsOpenLineageTimeoutEnabled; private final boolean dataStreamsEnabled; private final float dataStreamsBucketDurationSeconds; @@ -2578,6 +2581,9 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) dataJobsOpenLineageEnabled = configProvider.getBoolean( DATA_JOBS_OPENLINEAGE_ENABLED, DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED); + dataJobsOpenLineageTimeoutEnabled = + configProvider.getBoolean( + DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED, DEFAULT_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED); dataStreamsEnabled = configProvider.getBoolean(DATA_STREAMS_ENABLED, DEFAULT_DATA_STREAMS_ENABLED); @@ -4470,6 +4476,10 @@ public boolean isDataJobsOpenLineageEnabled() { return dataJobsOpenLineageEnabled; } + public boolean isDataJobsOpenLineageTimeoutEnabled() { + return dataJobsOpenLineageTimeoutEnabled; + } + public boolean isApmTracingEnabled() { return apmTracingEnabled; }