Skip to content

Commit

Permalink
[FLINK-9900][tests] Include more information on timeout in Zookeeper …
Browse files Browse the repository at this point in the history
…HA ITCase
  • Loading branch information
zentol authored Aug 20, 2018
1 parent ebceca1 commit a203351
Showing 1 changed file with 50 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;

Expand All @@ -62,6 +63,11 @@

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -163,7 +169,7 @@ public static void tearDown() throws Exception {
* restored successfully
* </ol>
*/
@Test(timeout = 120_000L)
@Test
public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
CheckpointBlockingFunction.successfulRestores.set(0);
Expand Down Expand Up @@ -256,13 +262,54 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
() -> clusterClient.getJobStatus(jobID),
Time.milliseconds(50),
deadline,
(jobStatus) -> jobStatus == JobStatus.FINISHED,
JobStatus::isGloballyTerminalState,
TestingUtils.defaultScheduledExecutor());
assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
try {
assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
} catch (Throwable e) {
// include additional debugging information
StringWriter error = new StringWriter();
try (PrintWriter out = new PrintWriter(error)) {
out.println("The job did not finish in time.");
out.println("allowedInitializeCallsWithoutRestore= " + CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.get());
out.println("illegalRestores= " + CheckpointBlockingFunction.illegalRestores.get());
out.println("successfulRestores= " + CheckpointBlockingFunction.successfulRestores.get());
out.println("afterMessWithZooKeeper= " + CheckpointBlockingFunction.afterMessWithZooKeeper.get());
out.println("failedAlready= " + CheckpointBlockingFunction.failedAlready.get());
out.println("currentJobStatus= " + clusterClient.getJobStatus(jobID).get());
out.println("numRestarts= " + RestartReporter.numRestarts.getValue());
out.println("threadDump= " + generateThreadDump());
}
throw new AssertionError(error.toString(), ExceptionUtils.stripCompletionException(e));
}

assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
}

private static String generateThreadDump() {
final StringBuilder dump = new StringBuilder();
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
for (ThreadInfo threadInfo : threadInfos) {
dump.append('"');
dump.append(threadInfo.getThreadName());
dump.append('"');
final Thread.State state = threadInfo.getThreadState();
dump.append(System.lineSeparator());
dump.append(" java.lang.Thread.State: ");
dump.append(state);
final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
for (final StackTraceElement stackTraceElement : stackTraceElements) {
dump.append(System.lineSeparator());
dump.append(" at ");
dump.append(stackTraceElement);
}
dump.append(System.lineSeparator());
dump.append(System.lineSeparator());
}
return dump.toString();
}

private static class UnboundedSource implements SourceFunction<String> {
private volatile boolean running = true;

Expand Down

0 comments on commit a203351

Please sign in to comment.