Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
private final int MAX_ACCUMULATOR_SIZE = 50000;
private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags.";
private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage";
private static final int OL_CIRCUIT_BREAKER_TIMEOUT_IN_SECONDS = 60;

public volatile SparkListenerInterface openLineageSparkListener = null;
public volatile SparkConf openLineageSparkConf = null;
Expand Down Expand Up @@ -186,6 +187,7 @@ public void setupOpenLineage(DDTraceId traceId) {
+ ProcessTags.getTagsForSerialization()
+ ";_dd.ol_app_id:"
+ appId);
setupOpenLineageCircuitBreaker();
return;
}
log.debug(
Expand Down Expand Up @@ -1323,6 +1325,29 @@ private static String getServiceForOpenLineage(SparkConf conf, boolean isRunning
return sparkAppName;
}

private void setupOpenLineageCircuitBreaker() {
if (!Config.get().isDataJobsOpenLineageTimeoutEnabled()) {
log.debug("OpenLineage timeout circuit breaker is not enabled");
return;
}
if (classIsLoadable("io.openlineage.client.circuitBreaker.TimeoutCircuitBreaker")) {
log.debug(
"OpenLineage version without timeout circuit breaker. Probably OL version < 1.35.0");
return;
}
if (openLineageSparkConf.contains("spark.openlineage.circuitBreaker.type")) {
log.debug(
"Other OpenLineage circuit breaker already configured: {}",
openLineageSparkConf.get("spark.openlineage.circuitBreaker.type"));
return;
}

openLineageSparkConf.set("spark.openlineage.circuitBreaker.type", "timeout");
openLineageSparkConf.set(
"spark.openlineage.circuitBreaker.timeoutInSeconds",
String.valueOf(OL_CIRCUIT_BREAKER_TIMEOUT_IN_SECONDS));
}

private static void reportKafkaOffsets(
final String appName, final AgentSpan span, final SourceProgress progress) {
if (!traceConfig().isDataStreamsEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,36 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
.contains("_dd.ol_intake.process_tags:" + ProcessTags.getTagsForSerialization())
}

def "test setupOpenLineage fills circuit breaker config"(
Boolean configEnabled,
String sparkConfCircuitBreakerType,
String expectedCircuitBreakerType
) {
setup:
injectSysConfig("data.jobs.openlineage.timeout.enabled", configEnabled.toString())
def listener = getTestDatadogSparkListener()
listener.openLineageSparkListener = Mock(SparkListenerInterface)
listener.openLineageSparkConf = new SparkConf()
if (sparkConfCircuitBreakerType != null) {
listener.openLineageSparkConf.set("spark.openlineage.circuitBreaker.type", sparkConfCircuitBreakerType)
}
listener.setupOpenLineage(Mock(DDTraceId))

expect:
assert listener
.openLineageSparkConf
.getOption("spark.openlineage.circuitBreaker.type") == Option.apply(expectedCircuitBreakerType)
assert listener
.openLineageSparkConf
.getOption("spark.openlineage.circuitBreaker.timeoutInSeconds") == ((expectedCircuitBreakerType == "timeout") ? Option.apply("60") : Option.apply(null))

where:
configEnabled | sparkConfCircuitBreakerType | expectedCircuitBreakerType
true | null | "timeout"
true | "other" | "other"
false | null | null
}

protected validateRelativeError(double value, double expected, double relativeAccuracy) {
double relativeError = Math.abs(value - expected) / expected
assert relativeError < relativeAccuracy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public final class ConfigDefaults {

static final boolean DEFAULT_DATA_JOBS_ENABLED = false;
static final boolean DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED = false;
static final boolean DEFAULT_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED = true;

static final boolean DEFAULT_DATA_STREAMS_ENABLED = false;
static final int DEFAULT_DATA_STREAMS_BUCKET_DURATION = 10; // seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public final class GeneralConfig {
public static final String DATA_JOBS_ENABLED = "data.jobs.enabled";
public static final String DATA_JOBS_COMMAND_PATTERN = "data.jobs.command.pattern";
public static final String DATA_JOBS_OPENLINEAGE_ENABLED = "data.jobs.openlineage.enabled";
public static final String DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED =
"data.jobs.openlineage.timeout.enabled";

public static final String DATA_STREAMS_ENABLED = "data.streams.enabled";
public static final String DATA_STREAMS_BUCKET_DURATION_SECONDS =
Expand Down
10 changes: 10 additions & 0 deletions internal-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static datadog.trace.api.ConfigDefaults.DEFAULT_CWS_TLS_REFRESH;
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_JOBS_ENABLED;
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED;
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED;
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_STREAMS_BUCKET_DURATION;
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_STREAMS_ENABLED;
import static datadog.trace.api.ConfigDefaults.DEFAULT_DB_CLIENT_HOST_SPLIT_BY_HOST;
Expand Down Expand Up @@ -330,6 +331,7 @@
import static datadog.trace.api.config.GeneralConfig.AZURE_APP_SERVICES;
import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_ENABLED;
import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_OPENLINEAGE_ENABLED;
import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED;
import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_BUCKET_DURATION_SECONDS;
import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_ENABLED;
import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_ARGS;
Expand Down Expand Up @@ -1162,6 +1164,7 @@ public static String getHostName() {

private final boolean dataJobsEnabled;
private final boolean dataJobsOpenLineageEnabled;
private final boolean dataJobsOpenLineageTimeoutEnabled;

private final boolean dataStreamsEnabled;
private final float dataStreamsBucketDurationSeconds;
Expand Down Expand Up @@ -2578,6 +2581,9 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())
dataJobsOpenLineageEnabled =
configProvider.getBoolean(
DATA_JOBS_OPENLINEAGE_ENABLED, DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED);
dataJobsOpenLineageTimeoutEnabled =
configProvider.getBoolean(
DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED, DEFAULT_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED);

dataStreamsEnabled =
configProvider.getBoolean(DATA_STREAMS_ENABLED, DEFAULT_DATA_STREAMS_ENABLED);
Expand Down Expand Up @@ -4470,6 +4476,10 @@ public boolean isDataJobsOpenLineageEnabled() {
return dataJobsOpenLineageEnabled;
}

public boolean isDataJobsOpenLineageTimeoutEnabled() {
return dataJobsOpenLineageTimeoutEnabled;
}

public boolean isApmTracingEnabled() {
return apmTracingEnabled;
}
Expand Down