“Durable” in this context means that Jenkins makes an attempt to keep the external process running + * even if either the Jenkins master or an agent JVM is restarted. + * Process standard output is directed to a file near the workspace, rather than holding a file handle open. + * Whenever a Remoting connection between the two can be reëstablished, + * Jenkins again looks for any output sent since the last time it checked. + * When the process exits, the status code is also written to a file and ultimately results in the step passing or failing. + *
Tasks can also be run on the master node, which differs only in that there is no possibility of a network failure. */ public abstract class DurableTaskStep extends Step { @@ -132,11 +152,53 @@ public FormValidation doCheckReturnStatus(@QueryParameter boolean returnStdout, } + /** + * Something we will {@link Channel#export} to {@link HandlerImpl}. + */ + interface ExecutionRemotable { + /** @see Handler#exited */ + void exited(int code, byte[] output) throws Exception; + /** A potentially recoverable problem was encountered in the watch task. */ + void problem(Exception x); + } + + // TODO this and the other constants could be made customizable via system property + @SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL", justification = "public & mutable only for tests") + @Restricted(NoExternalUse.class) + public static long WATCHING_RECURRENCE_PERIOD = /* 5m */300_000; + + /** If set to false, disables {@link Execution#watching} mode. */ + @SuppressWarnings("FieldMayBeFinal") + private static boolean USE_WATCHING = !"false".equals(System.getProperty(DurableTaskStep.class.getName() + ".USE_WATCHING")); + /** * Represents one task that is believed to still be running. + *
This step has two modes, based on pulling or pushing log content from an agent. + * In the default (push) mode, {@link Controller#watch} is used to ask the agent to begin streaming log content. + * As new output is detected at regular intervals, it is streamed to the {@link TaskListener}, + * which in the default {@link StreamTaskListener} implementation sends chunks of text over Remoting. + * When the process exits, {@link #exited} is called and the step execution also ends. + * If Jenkins is restarted in the middle, {@link #onResume} starts a new watch task. + * Every {@link #WATCHING_RECURRENCE_PERIOD}, the master also checks to make sure the process still seems to be running using {@link Controller#exitStatus}. + * If the agent connection is closed, {@link #ws} will be stale + * ({@link FilePath#channel} will be {@link Channel#isClosingOrClosed}) + * and so {@link #getWorkspace} called from {@link #check} will call {@link #getWorkspaceProblem} + * and we will attempt to get a fresh {@link #ws} as soon as possible, with a new watch. + * (The persisted {@link #node} and {@link #remote} identify the workspace.) + * If the process does not seem to be running after two consecutive checks, + * yet no explicit process completion signal was sent, + * {@link #awaitingAsynchExit} will make the step assume that the watch task is broken and the step should fail. + * If sending output fails for any reason other than {@link ChannelClosedException}, + * {@link #problem} will attempt to record the issue but permit the step to proceed. + *
In the older pull mode, available on request by {@link #USE_WATCHING} or when encountering a noncompliant {@link Controller} implementation,
+ * the master looks for process output ({@link Controller#writeLog}) and/or exit status in {@link #check} at variable intervals,
+ * initially {@link #MIN_RECURRENCE_PERIOD} but slowing down by {@link #RECURRENCE_PERIOD_BACKOFF} up to {@link #MAX_RECURRENCE_PERIOD}.
+ * Any new output will be noted in a change to the state of {@link #controller}, which gets saved to the step state in turn.
+ * If there is any connection problem to the workspace (including master restarts and Remoting disconnects),
+ * {@link #ws} is nulled out and Jenkins waits until a fresh handle is available.
*/
@SuppressFBWarnings(value="SE_TRANSIENT_FIELD_NOT_RESTORED", justification="recurrencePeriod is set in onResume, not deserialization")
- static final class Execution extends AbstractStepExecutionImpl implements Runnable {
+ static final class Execution extends AbstractStepExecutionImpl implements Runnable, ExecutionRemotable {
private static final long MIN_RECURRENCE_PERIOD = 250; // ¼s
private static final long MAX_RECURRENCE_PERIOD = 15000; // 15s
@@ -148,16 +210,35 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
THREAD_POOL.allowCoreThreadTimeOut(true);
}
+ /** Used only during {@link #start}. */
private transient final DurableTaskStep step;
+ /** Current “live” connection to the workspace, or null if we might be offline at the moment. */
private transient FilePath ws;
+ /**
+ * How many ms we plan to sleep before running {@link #check} again.
+ * Zero is used as a signal to break out of the loop.
+ */
private transient long recurrencePeriod;
- private transient volatile ScheduledFuture> task, stopTask;
+ /** A handle for the fact that we plan to run {@link #check}. */
+ private transient volatile ScheduledFuture> task;
+ /** Defined only after {@link #stop} has been called. */
+ private transient volatile ScheduledFuture> stopTask;
+ /** Set if we have already notified the build log of a connectivity problem, which is done at most once per session. */
private transient boolean printedCannotContactMessage;
+ /** Serialized state of the controller. */
private Controller controller;
+ /** {@link Node#getNodeName} of {@link #ws}. */
private String node;
+ /** {@link FilePath#getRemote} of {@link #ws}. */
private String remote;
+ /** Whether the entire stdout of the process is to become the return value of the step. */
private boolean returnStdout; // serialized default is false
+ /** Whether the exit code of the process is to become the return value of the step. */
private boolean returnStatus; // serialized default is false
+ /** Whether we are using the newer push mode. */
+ private boolean watching; // serialized default is false
+ /** Only used when {@link #watching}, if after {@link #WATCHING_RECURRENCE_PERIOD} comes around twice {@link #exited} has yet to be called. */
+ private transient boolean awaitingAsynchExit;
Execution(StepContext context, DurableTaskStep step) {
super(context);
@@ -174,14 +255,24 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
if (returnStdout) {
durableTask.captureOutput();
}
+ TaskListener listener = context.get(TaskListener.class);
if (step.encoding != null) {
durableTask.charset(Charset.forName(step.encoding));
} else {
durableTask.defaultCharset();
}
- controller = durableTask.launch(context.get(EnvVars.class), ws, context.get(Launcher.class), context.get(TaskListener.class));
+ controller = durableTask.launch(context.get(EnvVars.class), ws, context.get(Launcher.class), listener);
this.remote = ws.getRemote();
- setupTimer();
+ if (USE_WATCHING) {
+ try {
+ controller.watch(ws, new HandlerImpl(this, ws, listener), listener);
+ watching = true;
+ } catch (UnsupportedOperationException x) {
+ LOGGER.log(Level.WARNING, /* default exception message suffices */null, x);
+ // and we fall back to polling mode
+ }
+ }
+ setupTimer(watching ? WATCHING_RECURRENCE_PERIOD : MIN_RECURRENCE_PERIOD);
return false;
}
@@ -192,26 +283,44 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
LOGGER.log(Level.FINE, "Jenkins is not running, no such node {0}, or it is offline", node);
return null;
}
+ if (watching) {
+ try {
+ controller.watch(ws, new HandlerImpl(this, ws, listener()), listener());
+ recurrencePeriod = WATCHING_RECURRENCE_PERIOD;
+ } catch (UnsupportedOperationException x) {
+ // Should not happen, since it worked in start() and a given Controller should not have *dropped* support.
+ getContext().onFailure(x);
+ } catch (Exception x) {
+ getWorkspaceProblem(x);
+ return null;
+ }
+ }
}
boolean directory;
try (Timeout timeout = Timeout.limit(10, TimeUnit.SECONDS)) {
directory = ws.isDirectory();
} catch (Exception x) {
- // RequestAbortedException, ChannelClosedException, EOFException, wrappers thereof; InterruptedException if it just takes too long.
- LOGGER.log(Level.FINE, node + " is evidently offline now", x);
- ws = null;
- if (!printedCannotContactMessage) {
- listener().getLogger().println("Cannot contact " + node + ": " + x);
- printedCannotContactMessage = true;
- }
+ getWorkspaceProblem(x);
return null;
}
if (!directory) {
throw new AbortException("missing workspace " + remote + " on " + node);
}
+ LOGGER.log(Level.FINER, "{0} seems to be online so using {1}", new Object[] {node, remote});
return ws;
}
+ private void getWorkspaceProblem(Exception x) {
+ // RequestAbortedException, ChannelClosedException, EOFException, wrappers thereof; InterruptedException if it just takes too long.
+ LOGGER.log(Level.FINE, node + " is evidently offline now", x);
+ ws = null;
+ recurrencePeriod = MIN_RECURRENCE_PERIOD;
+ if (!printedCannotContactMessage) {
+ listener().getLogger().println("Cannot contact " + node + ": " + x);
+ printedCannotContactMessage = true;
+ }
+ }
+
private @Nonnull TaskListener listener() {
TaskListener l;
StepContext context = getContext();
@@ -252,6 +361,14 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
recurrencePeriod = 0;
listener().getLogger().println("After 10s process did not stop");
getContext().onFailure(cause);
+ try {
+ FilePath workspace = getWorkspace();
+ if (workspace != null) {
+ controller.cleanup(workspace);
+ }
+ } catch (IOException | InterruptedException x) {
+ Functions.printStackTrace(x, listener().getLogger());
+ }
}
}
}, 10, TimeUnit.SECONDS);
@@ -315,10 +432,23 @@ private void check() {
return;
}
if (workspace == null) {
+ recurrencePeriod = Math.min((long) (recurrencePeriod * RECURRENCE_PERIOD_BACKOFF), MAX_RECURRENCE_PERIOD);
return; // slave not yet ready, wait for another day
}
TaskListener listener = listener();
try (Timeout timeout = Timeout.limit(10, TimeUnit.SECONDS)) {
+ if (watching) {
+ Integer exitCode = controller.exitStatus(workspace, launcher(), listener);
+ if (exitCode == null) {
+ LOGGER.log(Level.FINE, "still running in {0} on {1}", new Object[] {remote, node});
+ } else if (awaitingAsynchExit) {
+ recurrencePeriod = 0;
+ getContext().onFailure(new AbortException("script apparently exited with code " + exitCode + " but asynchronous notification was lost"));
+ } else {
+ LOGGER.log(Level.FINE, "exited with {0} in {1} on {2}; expect asynchronous exit soon", new Object[] {exitCode, remote, node});
+ awaitingAsynchExit = true;
+ }
+ } else { // legacy mode
if (controller.writeLog(workspace, listener.getLogger())) {
getContext().saveState();
recurrencePeriod = MIN_RECURRENCE_PERIOD; // got output, maybe we will get more soon
@@ -343,6 +473,7 @@ private void check() {
recurrencePeriod = 0;
controller.cleanup(workspace);
}
+ }
} catch (Exception x) {
LOGGER.log(Level.FINE, "could not check " + workspace, x);
ws = null;
@@ -353,12 +484,50 @@ private void check() {
}
}
+ // called remotely from HandlerImpl
+ @Override public void exited(int exitCode, byte[] output) throws Exception {
+ try {
+ getContext().get(TaskListener.class);
+ } catch (IOException | InterruptedException x) {
+ // E.g., CpsStepContext.doGet complaining that getThreadSynchronously() == null.
+ // If we cannot even print messages, there is no point proceeding.
+ LOGGER.log(Level.FINE, "asynchronous exit notification with code " + exitCode + " in " + remote + " on " + node + " ignored since step already seems dead", x);
+ return;
+ }
+ LOGGER.log(Level.FINE, "asynchronous exit notification with code {0} in {1} on {2}", new Object[] {exitCode, remote, node});
+ if (returnStdout && output == null) {
+ getContext().onFailure(new IllegalStateException("expected output but got none"));
+ return;
+ } else if (!returnStdout && output != null) {
+ getContext().onFailure(new IllegalStateException("did not expect output but got some"));
+ return;
+ }
+ recurrencePeriod = 0;
+ if (returnStatus || exitCode == 0) {
+ getContext().onSuccess(returnStatus ? exitCode : returnStdout ? new String(output, StandardCharsets.UTF_8) : null);
+ } else {
+ if (returnStdout) {
+ listener().getLogger().write(output); // diagnostic
+ }
+ getContext().onFailure(new AbortException("script returned exit code " + exitCode));
+ }
+ }
+
+ // ditto
+ @Override public void problem(Exception x) {
+ Functions.printStackTrace(x, listener().getLogger());
+ // note that if there is _also_ a problem in the master-side logger, PrintStream will mask it
+ }
+
@Override public void onResume() {
- setupTimer();
+ ws = null; // find it from scratch please
+ setupTimer(MIN_RECURRENCE_PERIOD);
+ // In watch mode, we will quickly enter the check.
+ // Then in getWorkspace when ws == null we will start a watch and go back to sleep.
}
- private void setupTimer() {
- recurrencePeriod = MIN_RECURRENCE_PERIOD;
+ private void setupTimer(long initialRecurrencePeriod) {
+ recurrencePeriod = initialRecurrencePeriod;
task = THREAD_POOL.schedule(this, recurrencePeriod, TimeUnit.MILLISECONDS);
}
@@ -366,4 +535,64 @@ private void setupTimer() {
}
+ private static class HandlerImpl extends Handler {
+
+ private static final Field printStreamDelegate;
+ static {
+ try {
+ printStreamDelegate = FilterOutputStream.class.getDeclaredField("out");
+ } catch (NoSuchFieldException x) {
+ // Defined in Java Platform and protected, so should not happen.
+ throw new ExceptionInInitializerError(x);
+ }
+ printStreamDelegate.setAccessible(true);
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private final ExecutionRemotable execution;
+ private final TaskListener listener;
+
+ HandlerImpl(Execution execution, FilePath workspace, TaskListener listener) {
+ this.execution = workspace.getChannel().export(ExecutionRemotable.class, execution);
+ this.listener = listener;
+ }
+
+ @Override public void output(InputStream stream) throws Exception {
+ PrintStream ps = listener.getLogger();
+ try {
+ if (ps.getClass() == PrintStream.class) {
+ // Try to extract the underlying stream, since swallowing exceptions is undesirable and PrintStream.checkError is useless.
+ OutputStream os = (OutputStream) printStreamDelegate.get(ps);
+ if (os == null) { // like PrintStream.ensureOpen
+ throw new IOException("Stream closed");
+ }
+ synchronized (ps) { // like PrintStream.write overloads do
+ IOUtils.copy(stream, os);
+ }
+ } else {
+ // A subclass. Who knows why, but trust any write(…) overrides it may have.
+ IOUtils.copy(stream, ps);
+ }
+ } catch (ChannelClosedException x) {
+ // We are giving up on this watch. Wait for some call to getWorkspace to rewatch.
+ throw x;
+ } catch (Exception x) {
+ // Try to report it to the master.
+ try {
+ execution.problem(x);
+ // OK, printed to log on master side, we may have lost some text but could continue.
+ } catch (Exception x2) { // e.g., RemotingSystemException
+ // No, channel seems to be broken, give up on this watch.
+ throw x;
+ }
+ }
+ }
+
+ @Override public void exited(int code, byte[] output) throws Exception {
+ execution.exited(code, output);
+ }
+
+ }
+
}
diff --git a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java
index 4f341c3a..2a788213 100644
--- a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java
+++ b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepTest.java
@@ -69,15 +69,20 @@
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
import jenkins.model.Jenkins;
import jenkins.security.QueueItemAuthenticator;
import jenkins.security.QueueItemAuthenticatorConfiguration;
import org.acegisecurity.Authentication;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.tools.ant.util.JavaEnvUtils;
import static org.hamcrest.Matchers.*;
import org.jboss.marshalling.ObjectResolver;
+import org.jenkinsci.plugins.durabletask.FileMonitoringTask;
+import org.jenkinsci.plugins.workflow.actions.LogAction;
import org.jenkinsci.plugins.workflow.actions.QueueItemAction;
import org.jenkinsci.plugins.workflow.actions.WorkspaceAction;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
@@ -85,6 +90,7 @@
import org.jenkinsci.plugins.workflow.graph.FlowGraphWalker;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.graphanalysis.DepthFirstScanner;
+import org.jenkinsci.plugins.workflow.graphanalysis.NodeStepTypePredicate;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.steps.durable_task.DurableTaskStep;
@@ -103,19 +109,18 @@
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.JenkinsRule;
+import org.jvnet.hudson.test.LoggerRule;
import org.jvnet.hudson.test.MockAuthorizationStrategy;
import org.jvnet.hudson.test.RestartableJenkinsRule;
import org.jvnet.hudson.test.recipes.LocalData;
-import javax.annotation.Nullable;
-import org.jvnet.hudson.test.LoggerRule;
-
/** Tests pertaining to {@code node} and {@code sh} steps. */
public class ExecutorStepTest {
@ClassRule public static BuildWatcher buildWatcher = new BuildWatcher();
@Rule public RestartableJenkinsRule story = new RestartableJenkinsRule();
@Rule public TemporaryFolder tmp = new TemporaryFolder();
+ // Currently too noisy due to unrelated warnings; might clear up if test dependencies updated: .record(ExecutorStepExecution.class, Level.FINE)
@Rule public LoggerRule logging = new LoggerRule();
/**
@@ -216,7 +221,7 @@ private void startJnlpProc() throws Exception {
// TODO @After does not seem to work at all in RestartableJenkinsRule
@AfterClass public static void killJnlpProc() {
if (jnlpProc != null) {
- jnlpProc.destroy();
+ jnlpProc.destroyForcibly();
jnlpProc = null;
}
}
@@ -226,11 +231,7 @@ private void startJnlpProc() throws Exception {
story.addStep(new Statement() {
@SuppressWarnings("SleepWhileInLoop")
@Override public void evaluate() throws Throwable {
- Logger LOGGER = Logger.getLogger(DurableTaskStep.class.getName());
- LOGGER.setLevel(Level.FINE);
- Handler handler = new ConsoleHandler();
- handler.setLevel(Level.ALL);
- LOGGER.addHandler(handler);
+ logging.record(DurableTaskStep.class, Level.FINE).record(FileMonitoringTask.class, Level.FINE);
// Cannot use regular JenkinsRule.createSlave due to JENKINS-26398.
// Nor can we can use JenkinsRule.createComputerLauncher, since spawned commands are killed by CommandLauncher somehow (it is not clear how; apparently before its onClosed kills them off).
DumbSlave s = new DumbSlave("dumbo", "dummy", tmp.getRoot().getAbsolutePath(), "1", Node.Mode.NORMAL, "", new JNLPLauncher(), RetentionStrategy.NOOP, Collections.