Skip to content

Commit

Permalink
[FLINK-9815][yarn][tests] Harden tests against slow job shutdowns
Browse files Browse the repository at this point in the history
This closes apache#6352.
  • Loading branch information
zentol committed Jul 19, 2018
1 parent e022acb commit 230f817
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testMultipleAMKill() throws Exception {
"@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" +
"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");

ClusterClient<ApplicationId> yarnCluster = null;
ClusterClient<ApplicationId> yarnClusterClient = null;

final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);

Expand All @@ -147,10 +147,10 @@ public void testMultipleAMKill() throws Exception {
.createClusterSpecification();

try {
yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
yarnClusterClient = flinkYarnClient.deploySessionCluster(clusterSpecification);

highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
yarnCluster.getFlinkConfiguration(),
yarnClusterClient.getFlinkConfiguration(),
Executors.directExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

Expand Down Expand Up @@ -201,8 +201,10 @@ protected void run() {

}};
} finally {
if (yarnCluster != null) {
yarnCluster.shutdown();
if (yarnClusterClient != null) {
log.info("Shutting down the Flink Yarn application.");
yarnClusterClient.shutDownCluster();
yarnClusterClient.shutdown();
}

if (highAvailabilityServices != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.yarn;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
Expand Down Expand Up @@ -67,6 +68,7 @@
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -79,6 +81,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* This base class allows to use the MiniYARNCluster.
Expand Down Expand Up @@ -186,39 +189,55 @@ public static void populateYarnSecureConfigurations(Configuration conf, String p
conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]");
}

/**
* Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
*/
@After
public void sleep() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Assert.fail("Should not happen");
}
}

@Before
public void checkClusterEmpty() throws IOException, YarnException {
public void checkClusterEmpty() {
if (yarnClient == null) {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(getYarnConfiguration());
yarnClient.start();
}

List<ApplicationReport> apps = yarnClient.getApplications();
for (ApplicationReport app : apps) {
if (app.getYarnApplicationState() != YarnApplicationState.FINISHED
&& app.getYarnApplicationState() != YarnApplicationState.KILLED
&& app.getYarnApplicationState() != YarnApplicationState.FAILED) {
Assert.fail("There is at least one application on the cluster is not finished." +
"App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState());
flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);

isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
}

/**
* Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
*/
@After
public void sleep() throws IOException, YarnException {
Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10));

boolean isAnyJobRunning = yarnClient.getApplications().stream()
.anyMatch(YarnTestBase::isApplicationRunning);

while (deadline.hasTimeLeft() && isAnyJobRunning) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Assert.fail("Should not happen");
}
isAnyJobRunning = yarnClient.getApplications().stream()
.anyMatch(YarnTestBase::isApplicationRunning);
}

flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
if (isAnyJobRunning) {
final List<String> runningApps = yarnClient.getApplications().stream()
.filter(YarnTestBase::isApplicationRunning)
.map(app -> "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState() + '.')
.collect(Collectors.toList());
if (!runningApps.isEmpty()) {
Assert.fail("There is at least one application on the cluster that is not finished." + runningApps);
}
}
}

isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
private static boolean isApplicationRunning(ApplicationReport app) {
final YarnApplicationState yarnApplicationState = app.getYarnApplicationState();
return yarnApplicationState != YarnApplicationState.FINISHED
&& app.getYarnApplicationState() != YarnApplicationState.KILLED
&& app.getYarnApplicationState() != YarnApplicationState.FAILED;
}

@Nullable
Expand Down

0 comments on commit 230f817

Please sign in to comment.