Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<dependency>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>durable-task</artifactId>
<version>1.24</version>
<version>1.25-rc353.399e3eb7deb7</version> <!-- TODO https://github.com/jenkinsci/durable-task-plugin/pull/60 -->
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
import hudson.AbortException;
import hudson.EnvVars;
import hudson.FilePath;
import hudson.Functions;
import hudson.Launcher;
import hudson.Main;
import hudson.Util;
import hudson.model.TaskListener;
import hudson.util.DaemonThreadFactory;
import hudson.util.FormValidation;
import hudson.util.LogTaskListener;
import hudson.util.NamingThreadFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Set;
Expand All @@ -48,8 +51,10 @@
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import jenkins.util.Timer;
import org.apache.commons.io.IOUtils;
import org.jenkinsci.plugins.durabletask.Controller;
import org.jenkinsci.plugins.durabletask.DurableTask;
import org.jenkinsci.plugins.durabletask.Handler;
import org.jenkinsci.plugins.workflow.FilePathUtils;
import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl;
import org.jenkinsci.plugins.workflow.steps.Step;
Expand Down Expand Up @@ -132,15 +137,20 @@ public FormValidation doCheckReturnStatus(@QueryParameter boolean returnStdout,

}

interface ExecutionRemotable {
void exited(int code, byte[] output) throws Exception;
}

/**
* Represents one task that is believed to still be running.
*/
@SuppressFBWarnings(value="SE_TRANSIENT_FIELD_NOT_RESTORED", justification="recurrencePeriod is set in onResume, not deserialization")
static final class Execution extends AbstractStepExecutionImpl implements Runnable {
static final class Execution extends AbstractStepExecutionImpl implements Runnable, ExecutionRemotable {

private static final long MIN_RECURRENCE_PERIOD = 250; // ¼s
private static final long MAX_RECURRENCE_PERIOD = 15000; // 15s
private static final float RECURRENCE_PERIOD_BACKOFF = 1.2f;
private static final long WATCHING_RECURRENCE_PERIOD = Main.isUnitTest ? /* 5s */5_000: /* 5m */300_000;

private static final ScheduledThreadPoolExecutor THREAD_POOL = new ScheduledThreadPoolExecutor(25, new NamingThreadFactory(new DaemonThreadFactory(), DurableTaskStep.class.getName()));
static {
Expand All @@ -158,6 +168,7 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
private String remote;
private boolean returnStdout; // serialized default is false
private boolean returnStatus; // serialized default is false
private boolean watching;

Execution(StepContext context, DurableTaskStep step) {
super(context);
Expand All @@ -174,14 +185,21 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
if (returnStdout) {
durableTask.captureOutput();
}
TaskListener listener = context.get(TaskListener.class);
if (step.encoding != null) {
durableTask.charset(Charset.forName(step.encoding));
} else {
durableTask.defaultCharset();
}
controller = durableTask.launch(context.get(EnvVars.class), ws, context.get(Launcher.class), context.get(TaskListener.class));
controller = durableTask.launch(context.get(EnvVars.class), ws, context.get(Launcher.class), listener);
this.remote = ws.getRemote();
setupTimer();
try {
controller.watch(ws, new HandlerImpl(this, ws, listener), listener);
watching = true;
} catch (UnsupportedOperationException x) {
LOGGER.log(Level.WARNING, null, x);
}
setupTimer(watching ? WATCHING_RECURRENCE_PERIOD : MIN_RECURRENCE_PERIOD);
return false;
}

Expand All @@ -192,26 +210,43 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
LOGGER.log(Level.FINE, "Jenkins is not running, no such node {0}, or it is offline", node);
return null;
}
if (watching) {
try {
controller.watch(ws, new HandlerImpl(this, ws, listener()), listener());
recurrencePeriod = WATCHING_RECURRENCE_PERIOD;
} catch (UnsupportedOperationException x) {
getContext().onFailure(x);
} catch (Exception x) {
getWorkspaceProblem(x);
return null;
}
}
}
boolean directory;
try (Timeout timeout = Timeout.limit(10, TimeUnit.SECONDS)) {
directory = ws.isDirectory();
} catch (Exception x) {
// RequestAbortedException, ChannelClosedException, EOFException, wrappers thereof; InterruptedException if it just takes too long.
LOGGER.log(Level.FINE, node + " is evidently offline now", x);
ws = null;
if (!printedCannotContactMessage) {
listener().getLogger().println("Cannot contact " + node + ": " + x);
printedCannotContactMessage = true;
}
getWorkspaceProblem(x);
return null;
}
if (!directory) {
throw new AbortException("missing workspace " + remote + " on " + node);
}
LOGGER.log(Level.FINER, "{0} seems to be online so using {1}", new Object[] {node, remote});
return ws;
}

private void getWorkspaceProblem(Exception x) {
// RequestAbortedException, ChannelClosedException, EOFException, wrappers thereof; InterruptedException if it just takes too long.
LOGGER.log(Level.FINE, node + " is evidently offline now", x);
ws = null;
recurrencePeriod = MIN_RECURRENCE_PERIOD;
if (!printedCannotContactMessage) {
listener().getLogger().println("Cannot contact " + node + ": " + x);
printedCannotContactMessage = true;
}
}

private @Nonnull TaskListener listener() {
TaskListener l;
StepContext context = getContext();
Expand Down Expand Up @@ -252,6 +287,14 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
recurrencePeriod = 0;
listener().getLogger().println("After 10s process did not stop");
getContext().onFailure(cause);
try {
FilePath workspace = getWorkspace();
if (workspace != null) {
controller.cleanup(workspace);
}
} catch (IOException | InterruptedException x) {
Functions.printStackTrace(x, listener().getLogger());
}
}
}
}, 10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -315,10 +358,20 @@ private void check() {
return;
}
if (workspace == null) {
recurrencePeriod = Math.min((long) (recurrencePeriod * RECURRENCE_PERIOD_BACKOFF), MAX_RECURRENCE_PERIOD);
return; // slave not yet ready, wait for another day
}
TaskListener listener = listener();
try (Timeout timeout = Timeout.limit(10, TimeUnit.SECONDS)) {
if (watching) {
Integer exitCode = controller.exitStatus(workspace, launcher());
if (exitCode == null) {
LOGGER.log(Level.FINE, "still running in {0} on {1}", new Object[] {remote, node});
} else {
LOGGER.log(Level.FINE, "exited with {0} in {1} on {2}; expect asynchronous exit soon", new Object[] {exitCode, remote, node});
// TODO if we get here again and exited has still not been called, assume we lost the notification somehow and end the step
}
} else { // legacy mode
if (controller.writeLog(workspace, listener.getLogger())) {
getContext().saveState();
recurrencePeriod = MIN_RECURRENCE_PERIOD; // got output, maybe we will get more soon
Expand All @@ -343,6 +396,7 @@ private void check() {
recurrencePeriod = 0;
controller.cleanup(workspace);
}
}
} catch (Exception x) {
LOGGER.log(Level.FINE, "could not check " + workspace, x);
ws = null;
Expand All @@ -353,17 +407,67 @@ private void check() {
}
}

// called remotely from HandlerImpl
@Override public void exited(int exitCode, byte[] output) throws Exception {
try {
getContext().get(TaskListener.class);
} catch (IOException | InterruptedException x) {
LOGGER.log(Level.FINE, "asynchronous exit notification with code " + exitCode + " in " + remote + " on " + node + " ignored since step already seems dead", x);
return;
}
LOGGER.log(Level.FINE, "asynchronous exit notification with code {0} in {1} on {2}", new Object[] {exitCode, remote, node});
if (returnStdout && output == null) {
getContext().onFailure(new IllegalStateException("expected output but got none"));
return;
} else if (!returnStdout && output != null) {
getContext().onFailure(new IllegalStateException("did not expect output but got some"));
return;
}
recurrencePeriod = 0;
if (returnStatus || exitCode == 0) {
getContext().onSuccess(returnStatus ? exitCode : returnStdout ? new String(output, StandardCharsets.UTF_8) : null);
} else {
if (returnStdout) {
listener().getLogger().write(output); // diagnostic
}
getContext().onFailure(new AbortException("script returned exit code " + exitCode));
}
}

@Override public void onResume() {
setupTimer();
ws = null; // find it from scratch please, rewatching as needed
setupTimer(MIN_RECURRENCE_PERIOD);
}

private void setupTimer() {
recurrencePeriod = MIN_RECURRENCE_PERIOD;
private void setupTimer(long initialRecurrencePeriod) {
recurrencePeriod = initialRecurrencePeriod;
task = THREAD_POOL.schedule(this, recurrencePeriod, TimeUnit.MILLISECONDS);
}

private static final long serialVersionUID = 1L;

}

private static class HandlerImpl extends Handler {

private static final long serialVersionUID = 1L;

private final ExecutionRemotable execution;
private final TaskListener listener;

HandlerImpl(Execution execution, FilePath workspace, TaskListener listener) {
this.execution = workspace.getChannel().export(ExecutionRemotable.class, execution);
this.listener = listener;
}

@Override public void output(InputStream stream) throws Exception {
IOUtils.copy(stream, listener.getLogger());
}

@Override public void exited(int code, byte[] output) throws Exception {
execution.exited(code, output);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,15 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import jenkins.model.Jenkins;
import jenkins.security.QueueItemAuthenticator;
import jenkins.security.QueueItemAuthenticatorConfiguration;
import org.acegisecurity.Authentication;
import org.apache.commons.io.FileUtils;
import org.apache.tools.ant.util.JavaEnvUtils;
import org.jenkinsci.plugins.durabletask.FileMonitoringTask;
import org.hamcrest.Matchers;
import org.jboss.marshalling.ObjectResolver;
import org.jenkinsci.plugins.workflow.actions.QueueItemAction;
Expand All @@ -99,6 +97,7 @@
import org.junit.runners.model.Statement;
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.LoggerRule;
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.MockAuthorizationStrategy;
import org.jvnet.hudson.test.RestartableJenkinsRule;
Expand All @@ -112,8 +111,9 @@ public class ExecutorStepTest {
@ClassRule public static BuildWatcher buildWatcher = new BuildWatcher();
@Rule public RestartableJenkinsRule story = new RestartableJenkinsRule();
@Rule public TemporaryFolder tmp = new TemporaryFolder();
@Rule public LoggerRule logging = new LoggerRule();
/* Currently too noisy due to unrelated warnings; might clear up if test dependencies updated:
@Rule public LoggerRule logging = new LoggerRule().record(ExecutorStepExecution.class, Level.FINE);
.record(ExecutorStepExecution.class, Level.FINE);
*/

/**
Expand Down Expand Up @@ -227,11 +227,7 @@ private void startJnlpProc() throws Exception {
story.addStep(new Statement() {
@SuppressWarnings("SleepWhileInLoop")
@Override public void evaluate() throws Throwable {
Logger LOGGER = Logger.getLogger(DurableTaskStep.class.getName());
LOGGER.setLevel(Level.FINE);
Handler handler = new ConsoleHandler();
handler.setLevel(Level.ALL);
LOGGER.addHandler(handler);
logging.record(DurableTaskStep.class, Level.FINE).record(FileMonitoringTask.class, Level.FINE);
// Cannot use regular JenkinsRule.createSlave due to JENKINS-26398.
// Nor can we can use JenkinsRule.createComputerLauncher, since spawned commands are killed by CommandLauncher somehow (it is not clear how; apparently before its onClosed kills them off).
DumbSlave s = new DumbSlave("dumbo", "dummy", tmp.getRoot().getAbsolutePath(), "1", Node.Mode.NORMAL, "", new JNLPLauncher(), RetentionStrategy.NOOP, Collections.<NodeProperty<?>>emptyList());
Expand Down Expand Up @@ -280,11 +276,7 @@ private void startJnlpProc() throws Exception {
story.addStep(new Statement() {
@SuppressWarnings("SleepWhileInLoop")
@Override public void evaluate() throws Throwable {
Logger LOGGER = Logger.getLogger(DurableTaskStep.class.getName());
LOGGER.setLevel(Level.FINE);
Handler handler = new ConsoleHandler();
handler.setLevel(Level.ALL);
LOGGER.addHandler(handler);
logging.record(DurableTaskStep.class, Level.FINE).record(FileMonitoringTask.class, Level.FINE);
DumbSlave s = new DumbSlave("dumbo", "dummy", tmp.getRoot().getAbsolutePath(), "1", Node.Mode.NORMAL, "", new JNLPLauncher(), RetentionStrategy.NOOP, Collections.<NodeProperty<?>>emptyList());
story.j.jenkins.addNode(s);
startJnlpProc();
Expand Down