diff --git a/pom.xml b/pom.xml
index 9ab572b2..b2118f23 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,7 @@
org.jenkins-ci.plugins
durable-task
- 1.24
+ 1.25-rc353.399e3eb7deb7
org.jenkins-ci.plugins.workflow
diff --git a/src/main/java/org/jenkinsci/plugins/workflow/steps/durable_task/DurableTaskStep.java b/src/main/java/org/jenkinsci/plugins/workflow/steps/durable_task/DurableTaskStep.java
index 8e021ac1..7109901c 100644
--- a/src/main/java/org/jenkinsci/plugins/workflow/steps/durable_task/DurableTaskStep.java
+++ b/src/main/java/org/jenkinsci/plugins/workflow/steps/durable_task/DurableTaskStep.java
@@ -29,7 +29,9 @@
import hudson.AbortException;
import hudson.EnvVars;
import hudson.FilePath;
+import hudson.Functions;
import hudson.Launcher;
+import hudson.Main;
import hudson.Util;
import hudson.model.TaskListener;
import hudson.util.DaemonThreadFactory;
@@ -37,6 +39,7 @@
import hudson.util.LogTaskListener;
import hudson.util.NamingThreadFactory;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Set;
@@ -48,8 +51,10 @@
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import jenkins.util.Timer;
+import org.apache.commons.io.IOUtils;
import org.jenkinsci.plugins.durabletask.Controller;
import org.jenkinsci.plugins.durabletask.DurableTask;
+import org.jenkinsci.plugins.durabletask.Handler;
import org.jenkinsci.plugins.workflow.FilePathUtils;
import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl;
import org.jenkinsci.plugins.workflow.steps.Step;
@@ -132,15 +137,20 @@ public FormValidation doCheckReturnStatus(@QueryParameter boolean returnStdout,
}
+ interface ExecutionRemotable {
+ void exited(int code, byte[] output) throws Exception;
+ }
+
/**
* Represents one task that is believed to still be running.
*/
@SuppressFBWarnings(value="SE_TRANSIENT_FIELD_NOT_RESTORED", justification="recurrencePeriod is set in onResume, not deserialization")
- static final class Execution extends AbstractStepExecutionImpl implements Runnable {
+ static final class Execution extends AbstractStepExecutionImpl implements Runnable, ExecutionRemotable {
private static final long MIN_RECURRENCE_PERIOD = 250; // ¼s
private static final long MAX_RECURRENCE_PERIOD = 15000; // 15s
private static final float RECURRENCE_PERIOD_BACKOFF = 1.2f;
+ private static final long WATCHING_RECURRENCE_PERIOD = Main.isUnitTest ? /* 5s */5_000: /* 5m */300_000;
private static final ScheduledThreadPoolExecutor THREAD_POOL = new ScheduledThreadPoolExecutor(25, new NamingThreadFactory(new DaemonThreadFactory(), DurableTaskStep.class.getName()));
static {
@@ -158,6 +168,7 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
private String remote;
private boolean returnStdout; // serialized default is false
private boolean returnStatus; // serialized default is false
+ private boolean watching;
Execution(StepContext context, DurableTaskStep step) {
super(context);
@@ -174,14 +185,21 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
if (returnStdout) {
durableTask.captureOutput();
}
+ TaskListener listener = context.get(TaskListener.class);
if (step.encoding != null) {
durableTask.charset(Charset.forName(step.encoding));
} else {
durableTask.defaultCharset();
}
- controller = durableTask.launch(context.get(EnvVars.class), ws, context.get(Launcher.class), context.get(TaskListener.class));
+ controller = durableTask.launch(context.get(EnvVars.class), ws, context.get(Launcher.class), listener);
this.remote = ws.getRemote();
- setupTimer();
+ try {
+ controller.watch(ws, new HandlerImpl(this, ws, listener), listener);
+ watching = true;
+ } catch (UnsupportedOperationException x) {
+ LOGGER.log(Level.WARNING, null, x);
+ }
+ setupTimer(watching ? WATCHING_RECURRENCE_PERIOD : MIN_RECURRENCE_PERIOD);
return false;
}
@@ -192,26 +210,43 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
LOGGER.log(Level.FINE, "Jenkins is not running, no such node {0}, or it is offline", node);
return null;
}
+ if (watching) {
+ try {
+ controller.watch(ws, new HandlerImpl(this, ws, listener()), listener());
+ recurrencePeriod = WATCHING_RECURRENCE_PERIOD;
+ } catch (UnsupportedOperationException x) {
+ getContext().onFailure(x);
+ } catch (Exception x) {
+ getWorkspaceProblem(x);
+ return null;
+ }
+ }
}
boolean directory;
try (Timeout timeout = Timeout.limit(10, TimeUnit.SECONDS)) {
directory = ws.isDirectory();
} catch (Exception x) {
- // RequestAbortedException, ChannelClosedException, EOFException, wrappers thereof; InterruptedException if it just takes too long.
- LOGGER.log(Level.FINE, node + " is evidently offline now", x);
- ws = null;
- if (!printedCannotContactMessage) {
- listener().getLogger().println("Cannot contact " + node + ": " + x);
- printedCannotContactMessage = true;
- }
+ getWorkspaceProblem(x);
return null;
}
if (!directory) {
throw new AbortException("missing workspace " + remote + " on " + node);
}
+ LOGGER.log(Level.FINER, "{0} seems to be online so using {1}", new Object[] {node, remote});
return ws;
}
+ private void getWorkspaceProblem(Exception x) {
+ // RequestAbortedException, ChannelClosedException, EOFException, wrappers thereof; InterruptedException if it just takes too long.
+ LOGGER.log(Level.FINE, node + " is evidently offline now", x);
+ ws = null;
+ recurrencePeriod = MIN_RECURRENCE_PERIOD;
+ if (!printedCannotContactMessage) {
+ listener().getLogger().println("Cannot contact " + node + ": " + x);
+ printedCannotContactMessage = true;
+ }
+ }
+
private @Nonnull TaskListener listener() {
TaskListener l;
StepContext context = getContext();
@@ -252,6 +287,14 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
recurrencePeriod = 0;
listener().getLogger().println("After 10s process did not stop");
getContext().onFailure(cause);
+ try {
+ FilePath workspace = getWorkspace();
+ if (workspace != null) {
+ controller.cleanup(workspace);
+ }
+ } catch (IOException | InterruptedException x) {
+ Functions.printStackTrace(x, listener().getLogger());
+ }
}
}
}, 10, TimeUnit.SECONDS);
@@ -315,10 +358,20 @@ private void check() {
return;
}
if (workspace == null) {
+ recurrencePeriod = Math.min((long) (recurrencePeriod * RECURRENCE_PERIOD_BACKOFF), MAX_RECURRENCE_PERIOD);
return; // slave not yet ready, wait for another day
}
TaskListener listener = listener();
try (Timeout timeout = Timeout.limit(10, TimeUnit.SECONDS)) {
+ if (watching) {
+ Integer exitCode = controller.exitStatus(workspace, launcher());
+ if (exitCode == null) {
+ LOGGER.log(Level.FINE, "still running in {0} on {1}", new Object[] {remote, node});
+ } else {
+ LOGGER.log(Level.FINE, "exited with {0} in {1} on {2}; expect asynchronous exit soon", new Object[] {exitCode, remote, node});
+ // TODO if we get here again and exited has still not been called, assume we lost the notification somehow and end the step
+ }
+ } else { // legacy mode
if (controller.writeLog(workspace, listener.getLogger())) {
getContext().saveState();
recurrencePeriod = MIN_RECURRENCE_PERIOD; // got output, maybe we will get more soon
@@ -343,6 +396,7 @@ private void check() {
recurrencePeriod = 0;
controller.cleanup(workspace);
}
+ }
} catch (Exception x) {
LOGGER.log(Level.FINE, "could not check " + workspace, x);
ws = null;
@@ -353,12 +407,40 @@ private void check() {
}
}
+ // called remotely from HandlerImpl
+ @Override public void exited(int exitCode, byte[] output) throws Exception {
+ try {
+ getContext().get(TaskListener.class);
+ } catch (IOException | InterruptedException x) {
+ LOGGER.log(Level.FINE, "asynchronous exit notification with code " + exitCode + " in " + remote + " on " + node + " ignored since step already seems dead", x);
+ return;
+ }
+ LOGGER.log(Level.FINE, "asynchronous exit notification with code {0} in {1} on {2}", new Object[] {exitCode, remote, node});
+ if (returnStdout && output == null) {
+ getContext().onFailure(new IllegalStateException("expected output but got none"));
+ return;
+ } else if (!returnStdout && output != null) {
+ getContext().onFailure(new IllegalStateException("did not expect output but got some"));
+ return;
+ }
+ recurrencePeriod = 0;
+ if (returnStatus || exitCode == 0) {
+ getContext().onSuccess(returnStatus ? exitCode : returnStdout ? new String(output, StandardCharsets.UTF_8) : null);
+ } else {
+ if (returnStdout) {
+ listener().getLogger().write(output); // diagnostic
+ }
+ getContext().onFailure(new AbortException("script returned exit code " + exitCode));
+ }
+ }
+
@Override public void onResume() {
- setupTimer();
+ ws = null; // find it from scratch please, rewatching as needed
+ setupTimer(MIN_RECURRENCE_PERIOD);
}
- private void setupTimer() {
- recurrencePeriod = MIN_RECURRENCE_PERIOD;
+ private void setupTimer(long initialRecurrencePeriod) {
+ recurrencePeriod = initialRecurrencePeriod;
task = THREAD_POOL.schedule(this, recurrencePeriod, TimeUnit.MILLISECONDS);
}
@@ -366,4 +448,26 @@ private void setupTimer() {
}
+ private static class HandlerImpl extends Handler {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ExecutionRemotable execution;
+ private final TaskListener listener;
+
+ HandlerImpl(Execution execution, FilePath workspace, TaskListener listener) {
+ this.execution = workspace.getChannel().export(ExecutionRemotable.class, execution);
+ this.listener = listener;
+ }
+
+ @Override public void output(InputStream stream) throws Exception {
+ IOUtils.copy(stream, listener.getLogger());
+ }
+
+ @Override public void exited(int code, byte[] output) throws Exception {
+ execution.exited(code, output);
+ }
+
+ }
+
}
diff --git a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java
index 6800fe5f..cd714c72 100644
--- a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java
+++ b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java
@@ -63,10 +63,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.logging.ConsoleHandler;
-import java.util.logging.Handler;
import java.util.logging.Level;
-import java.util.logging.Logger;
import java.util.regex.Pattern;
import jenkins.model.Jenkins;
import jenkins.security.QueueItemAuthenticator;
@@ -74,6 +71,7 @@
import org.acegisecurity.Authentication;
import org.apache.commons.io.FileUtils;
import org.apache.tools.ant.util.JavaEnvUtils;
+import org.jenkinsci.plugins.durabletask.FileMonitoringTask;
import org.hamcrest.Matchers;
import org.jboss.marshalling.ObjectResolver;
import org.jenkinsci.plugins.workflow.actions.QueueItemAction;
@@ -99,6 +97,7 @@
import org.junit.runners.model.Statement;
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.Issue;
+import org.jvnet.hudson.test.LoggerRule;
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.MockAuthorizationStrategy;
import org.jvnet.hudson.test.RestartableJenkinsRule;
@@ -112,8 +111,9 @@ public class ExecutorStepTest {
@ClassRule public static BuildWatcher buildWatcher = new BuildWatcher();
@Rule public RestartableJenkinsRule story = new RestartableJenkinsRule();
@Rule public TemporaryFolder tmp = new TemporaryFolder();
+ @Rule public LoggerRule logging = new LoggerRule();
/* Currently too noisy due to unrelated warnings; might clear up if test dependencies updated:
- @Rule public LoggerRule logging = new LoggerRule().record(ExecutorStepExecution.class, Level.FINE);
+ .record(ExecutorStepExecution.class, Level.FINE);
*/
/**
@@ -227,11 +227,7 @@ private void startJnlpProc() throws Exception {
story.addStep(new Statement() {
@SuppressWarnings("SleepWhileInLoop")
@Override public void evaluate() throws Throwable {
- Logger LOGGER = Logger.getLogger(DurableTaskStep.class.getName());
- LOGGER.setLevel(Level.FINE);
- Handler handler = new ConsoleHandler();
- handler.setLevel(Level.ALL);
- LOGGER.addHandler(handler);
+ logging.record(DurableTaskStep.class, Level.FINE).record(FileMonitoringTask.class, Level.FINE);
// Cannot use regular JenkinsRule.createSlave due to JENKINS-26398.
// Nor can we can use JenkinsRule.createComputerLauncher, since spawned commands are killed by CommandLauncher somehow (it is not clear how; apparently before its onClosed kills them off).
DumbSlave s = new DumbSlave("dumbo", "dummy", tmp.getRoot().getAbsolutePath(), "1", Node.Mode.NORMAL, "", new JNLPLauncher(), RetentionStrategy.NOOP, Collections.>emptyList());
@@ -280,11 +276,7 @@ private void startJnlpProc() throws Exception {
story.addStep(new Statement() {
@SuppressWarnings("SleepWhileInLoop")
@Override public void evaluate() throws Throwable {
- Logger LOGGER = Logger.getLogger(DurableTaskStep.class.getName());
- LOGGER.setLevel(Level.FINE);
- Handler handler = new ConsoleHandler();
- handler.setLevel(Level.ALL);
- LOGGER.addHandler(handler);
+ logging.record(DurableTaskStep.class, Level.FINE).record(FileMonitoringTask.class, Level.FINE);
DumbSlave s = new DumbSlave("dumbo", "dummy", tmp.getRoot().getAbsolutePath(), "1", Node.Mode.NORMAL, "", new JNLPLauncher(), RetentionStrategy.NOOP, Collections.>emptyList());
story.j.jenkins.addNode(s);
startJnlpProc();