Skip to content

Commit fd72025

Browse files
Retry telemetry requests if CI Visibility is enabled
1 parent a8b33d5 commit fd72025

File tree

6 files changed

+102
-15
lines changed

6 files changed

+102
-15
lines changed

communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java

+2
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ public void close() {
160160
}
161161

162162
public static class Factory {
163+
public static final Factory NEVER_RETRY = new Factory(0, 0, 0);
164+
163165
private final int maxRetries;
164166
private final long initialDelay;
165167
private final double delayFactor;

telemetry/src/main/java/datadog/telemetry/TelemetryClient.java

+24-8
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package datadog.telemetry;
22

3+
import datadog.communication.http.HttpRetryPolicy;
34
import datadog.communication.http.OkHttpUtils;
45
import datadog.trace.api.Config;
56
import datadog.trace.util.Strings;
67
import java.io.IOException;
8+
import java.io.InterruptedIOException;
79
import java.util.concurrent.TimeUnit;
810
import okhttp3.HttpUrl;
911
import okhttp3.OkHttpClient;
1012
import okhttp3.Request;
11-
import okhttp3.Response;
1213
import org.slf4j.Logger;
1314
import org.slf4j.LoggerFactory;
1415

@@ -17,16 +18,19 @@ public class TelemetryClient {
1718
public enum Result {
1819
SUCCESS,
1920
FAILURE,
20-
NOT_FOUND;
21+
NOT_FOUND,
22+
INTERRUPTED
2123
}
2224

23-
public static TelemetryClient buildAgentClient(OkHttpClient okHttpClient, HttpUrl agentUrl) {
25+
public static TelemetryClient buildAgentClient(
26+
OkHttpClient okHttpClient, HttpUrl agentUrl, HttpRetryPolicy.Factory httpRetryPolicy) {
2427
HttpUrl agentTelemetryUrl =
2528
agentUrl.newBuilder().addPathSegments(AGENT_TELEMETRY_API_ENDPOINT).build();
26-
return new TelemetryClient(okHttpClient, agentTelemetryUrl, null);
29+
return new TelemetryClient(okHttpClient, httpRetryPolicy, agentTelemetryUrl, null);
2730
}
2831

29-
public static TelemetryClient buildIntakeClient(Config config) {
32+
public static TelemetryClient buildIntakeClient(
33+
Config config, HttpRetryPolicy.Factory httpRetryPolicy) {
3034
String apiKey = config.getApiKey();
3135
if (apiKey == null) {
3236
log.debug("Cannot create Telemetry Intake because DD_API_KEY unspecified.");
@@ -44,7 +48,7 @@ public static TelemetryClient buildIntakeClient(Config config) {
4448

4549
long timeoutMillis = TimeUnit.SECONDS.toMillis(config.getAgentTimeout());
4650
OkHttpClient httpClient = OkHttpUtils.buildHttpClient(url, timeoutMillis);
47-
return new TelemetryClient(httpClient, url, apiKey);
51+
return new TelemetryClient(httpClient, httpRetryPolicy, url, apiKey);
4852
}
4953

5054
private static String buildIntakeTelemetryUrl(Config config) {
@@ -71,11 +75,17 @@ private static String buildIntakeTelemetryUrl(Config config) {
7175
private static final String DD_TELEMETRY_REQUEST_TYPE = "DD-Telemetry-Request-Type";
7276

7377
private final OkHttpClient okHttpClient;
78+
private final HttpRetryPolicy.Factory httpRetryPolicy;
7479
private final HttpUrl url;
7580
private final String apiKey;
7681

77-
public TelemetryClient(OkHttpClient okHttpClient, HttpUrl url, String apiKey) {
82+
public TelemetryClient(
83+
OkHttpClient okHttpClient,
84+
HttpRetryPolicy.Factory httpRetryPolicy,
85+
HttpUrl url,
86+
String apiKey) {
7887
this.okHttpClient = okHttpClient;
88+
this.httpRetryPolicy = httpRetryPolicy;
7989
this.url = url;
8090
this.apiKey = apiKey;
8191
}
@@ -92,7 +102,9 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {
92102

93103
Request httpRequest = httpRequestBuilder.build();
94104
String requestType = httpRequest.header(DD_TELEMETRY_REQUEST_TYPE);
95-
try (Response response = okHttpClient.newCall(httpRequest).execute()) {
105+
106+
try (okhttp3.Response response =
107+
OkHttpUtils.sendWithRetries(okHttpClient, httpRetryPolicy, httpRequest)) {
96108
if (response.code() == 404) {
97109
log.debug("Telemetry endpoint is disabled, dropping {} message.", requestType);
98110
return Result.NOT_FOUND;
@@ -105,6 +117,10 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {
105117
response.message());
106118
return Result.FAILURE;
107119
}
120+
} catch (InterruptedIOException e) {
121+
log.debug("Telemetry message {} sending interrupted: {}.", requestType, e.toString());
122+
return Result.INTERRUPTED;
123+
108124
} catch (IOException e) {
109125
log.debug("Telemetry message {} failed with exception: {}.", requestType, e.toString());
110126
return Result.FAILURE;

telemetry/src/main/java/datadog/telemetry/TelemetryRouter.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ public TelemetryClient.Result sendRequest(TelemetryRequest request) {
3636
Request.Builder httpRequestBuilder = request.httpRequest();
3737
TelemetryClient.Result result = currentClient.sendHttpRequest(httpRequestBuilder);
3838

39-
boolean requestFailed = result != TelemetryClient.Result.SUCCESS;
39+
boolean requestFailed =
40+
result != TelemetryClient.Result.SUCCESS
41+
// interrupted request is most likely due to telemetry system shutdown,
42+
// we do not want to log errors and reattempt in this case
43+
&& result != TelemetryClient.Result.INTERRUPTED;
4044
if (currentClient == agentClient) {
4145
if (requestFailed) {
4246
reportErrorOnce(currentClient.getUrl(), result);

telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
44
import datadog.communication.ddagent.SharedCommunicationObjects;
5+
import datadog.communication.http.HttpRetryPolicy;
56
import datadog.telemetry.TelemetryRunnable.TelemetryPeriodicAction;
67
import datadog.telemetry.dependency.DependencyPeriodicAction;
78
import datadog.telemetry.dependency.DependencyService;
@@ -81,8 +82,14 @@ public static void startTelemetry(
8182
boolean debug = config.isTelemetryDebugRequestsEnabled();
8283
DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery = sco.featuresDiscovery(config);
8384

84-
TelemetryClient agentClient = TelemetryClient.buildAgentClient(sco.okHttpClient, sco.agentUrl);
85-
TelemetryClient intakeClient = TelemetryClient.buildIntakeClient(config);
85+
HttpRetryPolicy.Factory httpRetryPolicy =
86+
config.isCiVisibilityEnabled()
87+
? new HttpRetryPolicy.Factory(2, 100, 2.0, true)
88+
: HttpRetryPolicy.Factory.NEVER_RETRY;
89+
90+
TelemetryClient agentClient =
91+
TelemetryClient.buildAgentClient(sco.okHttpClient, sco.agentUrl, httpRetryPolicy);
92+
TelemetryClient intakeClient = TelemetryClient.buildIntakeClient(config, httpRetryPolicy);
8693

8794
boolean useIntakeClientByDefault =
8895
config.isCiVisibilityEnabled() && config.isCiVisibilityAgentlessEnabled();

telemetry/src/test/groovy/datadog/telemetry/TelemetryClientTest.groovy

+25-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package datadog.telemetry
22

3+
import datadog.communication.http.HttpRetryPolicy
4+
import datadog.telemetry.api.RequestType
35
import datadog.trace.api.Config
6+
import okhttp3.HttpUrl
7+
import okhttp3.OkHttpClient
48
import spock.lang.Specification
59

610
class TelemetryClientTest extends Specification {
@@ -13,7 +17,7 @@ class TelemetryClientTest extends Specification {
1317
config.getSite() >> site
1418

1519
when:
16-
def intakeClient = TelemetryClient.buildIntakeClient(config)
20+
def intakeClient = TelemetryClient.buildIntakeClient(config, HttpRetryPolicy.Factory.NEVER_RETRY)
1721

1822
then:
1923
intakeClient.getUrl().toString() == expectedUrl
@@ -39,7 +43,7 @@ class TelemetryClientTest extends Specification {
3943
config.getCiVisibilityAgentlessUrl() >> ciVisAgentlessUrl
4044

4145
when:
42-
def intakeClient = TelemetryClient.buildIntakeClient(config)
46+
def intakeClient = TelemetryClient.buildIntakeClient(config, HttpRetryPolicy.Factory.NEVER_RETRY)
4347

4448
then:
4549
intakeClient.getUrl().toString() == expectedUrl
@@ -51,4 +55,23 @@ class TelemetryClientTest extends Specification {
5155
true | false | "http://ci.visibility.agentless.url" | "https://all-http-intake.logs.datad0g.com/api/v2/apmtelemetry"
5256
true | true | null | "https://all-http-intake.logs.datad0g.com/api/v2/apmtelemetry"
5357
}
58+
59+
def "Intake client retries telemetry request if configured to do so"() {
60+
setup:
61+
def httpClient = Mock(OkHttpClient)
62+
def httpRetryPolicy = new HttpRetryPolicy.Factory(2, 50, 1.5, true)
63+
def httpUrl = HttpUrl.get("https://intake.example.com")
64+
def intakeClient = new TelemetryClient(httpClient, httpRetryPolicy, httpUrl, "dummy-api-key")
65+
66+
when:
67+
intakeClient.sendHttpRequest(dummyRequest())
68+
69+
then:
70+
// original request + 2 retries
71+
3 * httpClient.newCall(_) >> { throw new ConnectException("exception") }
72+
}
73+
74+
def dummyRequest() {
75+
return new TelemetryRequest(Mock(EventSource), Mock(EventSink), 1000, RequestType.APP_STARTED, false).httpRequest()
76+
}
5477
}

telemetry/src/test/groovy/datadog/telemetry/TelemetryRouterSpecification.groovy

+37-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package datadog.telemetry
22

33
import datadog.communication.ddagent.DDAgentFeaturesDiscovery
4+
import datadog.communication.http.HttpRetryPolicy
45
import datadog.telemetry.api.RequestType
56
import okhttp3.Call
67
import okhttp3.HttpUrl
@@ -41,8 +42,8 @@ class TelemetryRouterSpecification extends Specification {
4142
OkHttpClient okHttpClient = Mock()
4243
DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery = Mock()
4344

44-
def agentTelemetryClient = TelemetryClient.buildAgentClient(okHttpClient, agentUrl)
45-
def intakeTelemetryClient = new TelemetryClient(okHttpClient, intakeUrl, apiKey)
45+
def agentTelemetryClient = TelemetryClient.buildAgentClient(okHttpClient, agentUrl, HttpRetryPolicy.Factory.NEVER_RETRY)
46+
def intakeTelemetryClient = new TelemetryClient(okHttpClient, HttpRetryPolicy.Factory.NEVER_RETRY, intakeUrl, apiKey)
4647
def httpClient = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, intakeTelemetryClient, false)
4748

4849
def 'map an http status code to the correct send result'() {
@@ -70,6 +71,15 @@ class TelemetryRouterSpecification extends Specification {
7071
1 * okHttpClient.newCall(_) >> { throw new IOException("exception") }
7172
}
7273

74+
def 'catch InterruptedIOException from OkHttpClient and return INTERRUPTED'() {
75+
when:
76+
def result = httpClient.sendRequest(dummyRequest())
77+
78+
then:
79+
result == TelemetryClient.Result.INTERRUPTED
80+
1 * okHttpClient.newCall(_) >> { throw new InterruptedIOException("interrupted") }
81+
}
82+
7383
def 'keep trying to send telemetry to Agent despite of return code when Intake client is null'() {
7484
setup:
7585
def httpClient = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, null, false)
@@ -172,6 +182,31 @@ class TelemetryRouterSpecification extends Specification {
172182
request.header(apiKeyHeader) == apiKey
173183
}
174184

185+
def 'when configured to prefer Intake: do not switch to Agent if request is interrupted'() {
186+
Request request
187+
188+
setup:
189+
def telemetryRouter = new TelemetryRouter(ddAgentFeaturesDiscovery, agentTelemetryClient, intakeTelemetryClient, true)
190+
191+
when:
192+
telemetryRouter.sendRequest(dummyRequest())
193+
194+
then:
195+
1 * ddAgentFeaturesDiscovery.discoverIfOutdated()
196+
1 * ddAgentFeaturesDiscovery.supportsTelemetryProxy() >> true
197+
1 * okHttpClient.newCall(_) >> { args -> request = args[0]; throw new InterruptedIOException("interrupted") }
198+
request.url() == intakeUrl
199+
request.header(apiKeyHeader) == apiKey
200+
201+
when:
202+
telemetryRouter.sendRequest(dummyRequest())
203+
204+
then:
205+
1 * okHttpClient.newCall(_) >> { args -> request = args[0]; mockResponse(200) }
206+
request.url() == intakeUrl
207+
request.header(apiKeyHeader) == apiKey
208+
}
209+
175210
def 'when configured to prefer Intake: switch to Agent if Intake request fails'() {
176211
Request request
177212

0 commit comments

Comments
 (0)