Skip to content

Commit 72306bb

Browse files
authored
[GOBBLIN-2156] log error after each failed AzkabanClient retry (apache#4055)
1 parent c3d16b2 commit 72306bb

File tree

1 file changed

+35
-18
lines changed
  • gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration

1 file changed

+35
-18
lines changed

gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java

+35-18
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,6 @@
1717

1818
package org.apache.gobblin.service.modules.orchestration;
1919

20-
import com.github.rholder.retry.AttemptTimeLimiters;
21-
import com.github.rholder.retry.RetryException;
22-
import com.github.rholder.retry.Retryer;
23-
import com.github.rholder.retry.RetryerBuilder;
24-
import com.github.rholder.retry.StopStrategies;
25-
import com.github.rholder.retry.WaitStrategies;
26-
import com.google.common.base.Preconditions;
27-
import com.google.common.base.Throwables;
28-
import com.google.common.io.Closer;
29-
import com.google.gson.Gson;
30-
import com.google.gson.JsonElement;
31-
import com.google.gson.JsonObject;
32-
import com.google.gson.JsonParser;
3320
import java.io.Closeable;
3421
import java.io.File;
3522
import java.io.IOException;
@@ -42,7 +29,7 @@
4229
import java.util.concurrent.ExecutorService;
4330
import java.util.concurrent.Executors;
4431
import java.util.concurrent.TimeUnit;
45-
import lombok.Builder;
32+
4633
import org.apache.commons.io.IOUtils;
4734
import org.apache.commons.lang3.ObjectUtils;
4835
import org.apache.commons.lang3.StringUtils;
@@ -61,6 +48,24 @@
6148
import org.slf4j.Logger;
6249
import org.slf4j.LoggerFactory;
6350

51+
import com.github.rholder.retry.Attempt;
52+
import com.github.rholder.retry.AttemptTimeLimiters;
53+
import com.github.rholder.retry.RetryException;
54+
import com.github.rholder.retry.RetryListener;
55+
import com.github.rholder.retry.Retryer;
56+
import com.github.rholder.retry.RetryerBuilder;
57+
import com.github.rholder.retry.StopStrategies;
58+
import com.github.rholder.retry.WaitStrategies;
59+
import com.google.common.base.Preconditions;
60+
import com.google.common.base.Throwables;
61+
import com.google.common.io.Closer;
62+
import com.google.gson.Gson;
63+
import com.google.gson.JsonElement;
64+
import com.google.gson.JsonObject;
65+
import com.google.gson.JsonParser;
66+
67+
import lombok.Builder;
68+
6469

6570
/**
6671
* A simple http based client that uses Ajax API to communicate with Azkaban server.
@@ -80,9 +85,9 @@ public class AzkabanClient implements Closeable {
8085
protected CloseableHttpClient httpClient;
8186
private ExecutorService executorService;
8287
private Closer closer = Closer.create();
83-
private Retryer<AzkabanClientStatus> retryer;
84-
private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
85-
private Duration requestTimeout;
88+
private Retryer<AzkabanClientStatus<?>> retryer;
89+
private static final Logger log = LoggerFactory.getLogger(AzkabanClient.class);
90+
private final Duration requestTimeout;
8691

8792
/**
8893
* Child class should have a different builderMethodName.
@@ -109,13 +114,25 @@ protected AzkabanClient(String username,
109114
this.initializeClient();
110115
this.initializeSessionManager();
111116
this.intializeExecutorService();
117+
RetryListener retryListener = new RetryListener() {
118+
@Override
119+
public <V> void onRetry(Attempt<V> attempt) {
120+
if (attempt.hasException()) {
121+
String msg = String.format("(Likely retryable) failure running Azkaban API [attempt: %d; %s after start]",
122+
attempt.getAttemptNumber(), Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
123+
log.warn(msg, attempt.getExceptionCause());
124+
}
125+
}
126+
};
127+
112128

113-
this.retryer = RetryerBuilder.<AzkabanClientStatus>newBuilder()
129+
this.retryer = RetryerBuilder.<AzkabanClientStatus<?>>newBuilder()
114130
.retryIfExceptionOfType(InvalidSessionException.class)
115131
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.requestTimeout.toMillis(), TimeUnit.MILLISECONDS,
116132
this.executorService))
117133
.withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
118134
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
135+
.withRetryListener(retryListener)
119136
.build();
120137
try {
121138
this.sessionId = this.sessionManager.fetchSession();

0 commit comments

Comments
 (0)