-
-
Notifications
You must be signed in to change notification settings - Fork 105
[JENKINS-52165] Calling Controller.watch API #63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 24 commits
5c20d5e
b4ff3a5
cb05308
13885df
af31da4
bd27dc6
5eccfbb
756a87d
6c424e0
0dcfd8e
12667dd
e2536cc
458d5ee
8440d04
335d00f
9a094f0
38e5a73
86d8074
705b254
6722cdb
62aeb45
ed4551a
c41f3ec
c859378
3673025
43b6d27
24ee12b
3f8ced4
d54b065
efb481c
77b81c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,14 +29,21 @@ | |
| import hudson.AbortException; | ||
| import hudson.EnvVars; | ||
| import hudson.FilePath; | ||
| import hudson.Functions; | ||
| import hudson.Launcher; | ||
| import hudson.Util; | ||
| import hudson.model.TaskListener; | ||
| import hudson.remoting.ChannelClosedException; | ||
| import hudson.util.DaemonThreadFactory; | ||
| import hudson.util.FormValidation; | ||
| import hudson.util.LogTaskListener; | ||
| import hudson.util.NamingThreadFactory; | ||
| import java.io.FilterOutputStream; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.OutputStream; | ||
| import java.io.PrintStream; | ||
| import java.lang.reflect.Field; | ||
| import java.nio.charset.Charset; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.Set; | ||
|
|
@@ -48,8 +55,10 @@ | |
| import javax.annotation.CheckForNull; | ||
| import javax.annotation.Nonnull; | ||
| import jenkins.util.Timer; | ||
| import org.apache.commons.io.IOUtils; | ||
| import org.jenkinsci.plugins.durabletask.Controller; | ||
| import org.jenkinsci.plugins.durabletask.DurableTask; | ||
| import org.jenkinsci.plugins.durabletask.Handler; | ||
| import org.jenkinsci.plugins.workflow.FilePathUtils; | ||
| import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl; | ||
| import org.jenkinsci.plugins.workflow.steps.Step; | ||
|
|
@@ -60,6 +69,7 @@ | |
| import org.jenkinsci.plugins.workflow.support.concurrent.WithThreadName; | ||
| import org.kohsuke.accmod.Restricted; | ||
| import org.kohsuke.accmod.restrictions.DoNotUse; | ||
| import org.kohsuke.accmod.restrictions.NoExternalUse; | ||
| import org.kohsuke.stapler.DataBoundSetter; | ||
| import org.kohsuke.stapler.QueryParameter; | ||
|
|
||
|
|
@@ -132,11 +142,25 @@ public FormValidation doCheckReturnStatus(@QueryParameter boolean returnStdout, | |
|
|
||
| } | ||
|
|
||
| interface ExecutionRemotable { | ||
| void exited(int code, byte[] output) throws Exception; | ||
| void problem(Exception x); | ||
| } | ||
|
|
||
| // TODO this and the other constants could be made customizable via system property | ||
| @SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL", justification = "public & mutable only for tests") | ||
| @Restricted(NoExternalUse.class) | ||
| public static long WATCHING_RECURRENCE_PERIOD = /* 5m */300_000; | ||
|
|
||
| /** If set to false, disables {@link Execution#watching} mode. */ | ||
| @SuppressWarnings("FieldMayBeFinal") | ||
| private static boolean USE_WATCHING = !"false".equals(System.getProperty(DurableTaskStep.class.getName() + ".USE_WATCHING")); | ||
|
|
||
| /** | ||
| * Represents one task that is believed to still be running. | ||
| */ | ||
| @SuppressFBWarnings(value="SE_TRANSIENT_FIELD_NOT_RESTORED", justification="recurrencePeriod is set in onResume, not deserialization") | ||
| static final class Execution extends AbstractStepExecutionImpl implements Runnable { | ||
| static final class Execution extends AbstractStepExecutionImpl implements Runnable, ExecutionRemotable { | ||
|
|
||
| private static final long MIN_RECURRENCE_PERIOD = 250; // ¼s | ||
| private static final long MAX_RECURRENCE_PERIOD = 15000; // 15s | ||
|
|
@@ -150,6 +174,10 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab | |
|
|
||
| private transient final DurableTaskStep step; | ||
| private transient FilePath ws; | ||
| /** | ||
| * How many ms we plan to sleep before running {@link #check} again. | ||
| * Zero is used as a signal to break out of the loop. | ||
| */ | ||
| private transient long recurrencePeriod; | ||
| private transient volatile ScheduledFuture<?> task, stopTask; | ||
| private transient boolean printedCannotContactMessage; | ||
|
|
@@ -158,6 +186,9 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab | |
| private String remote; | ||
| private boolean returnStdout; // serialized default is false | ||
| private boolean returnStatus; // serialized default is false | ||
| private boolean watching; // serialized default is false | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is what makes us safe for running builds on upgrade.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. For a |
||
| /** Only used when {@link #watching}, if after {@link #WATCHING_RECURRENCE_PERIOD} comes around twice {@link #exited} has yet to be called. */ | ||
| private transient boolean awaitingAsynchExit; | ||
|
|
||
| Execution(StepContext context, DurableTaskStep step) { | ||
| super(context); | ||
|
|
@@ -174,14 +205,24 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab | |
| if (returnStdout) { | ||
| durableTask.captureOutput(); | ||
| } | ||
| TaskListener listener = context.get(TaskListener.class); | ||
| if (step.encoding != null) { | ||
| durableTask.charset(Charset.forName(step.encoding)); | ||
| } else { | ||
| durableTask.defaultCharset(); | ||
| } | ||
| controller = durableTask.launch(context.get(EnvVars.class), ws, context.get(Launcher.class), context.get(TaskListener.class)); | ||
| controller = durableTask.launch(context.get(EnvVars.class), ws, context.get(Launcher.class), listener); | ||
| this.remote = ws.getRemote(); | ||
| setupTimer(); | ||
| if (USE_WATCHING) { | ||
| try { | ||
| controller.watch(ws, new HandlerImpl(this, ws, listener), listener); | ||
| watching = true; | ||
| } catch (UnsupportedOperationException x) { | ||
| LOGGER.log(Level.WARNING, /* default exception message suffices */null, x); | ||
| // and we fall back to polling mode | ||
| } | ||
| } | ||
| setupTimer(watching ? WATCHING_RECURRENCE_PERIOD : MIN_RECURRENCE_PERIOD); | ||
| return false; | ||
| } | ||
|
|
||
|
|
@@ -192,26 +233,44 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab | |
| LOGGER.log(Level.FINE, "Jenkins is not running, no such node {0}, or it is offline", node); | ||
| return null; | ||
| } | ||
| if (watching) { | ||
| try { | ||
| controller.watch(ws, new HandlerImpl(this, ws, listener()), listener()); | ||
| recurrencePeriod = WATCHING_RECURRENCE_PERIOD; | ||
| } catch (UnsupportedOperationException x) { | ||
| // Should not happen, since it worked in start() and a given Controller should not have *dropped* support. | ||
| getContext().onFailure(x); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be useful to set
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it should never happen; can make that clearer in a comment. |
||
| } catch (Exception x) { | ||
| getWorkspaceProblem(x); | ||
| return null; | ||
| } | ||
| } | ||
| } | ||
| boolean directory; | ||
| try (Timeout timeout = Timeout.limit(10, TimeUnit.SECONDS)) { | ||
| directory = ws.isDirectory(); | ||
| } catch (Exception x) { | ||
| // RequestAbortedException, ChannelClosedException, EOFException, wrappers thereof; InterruptedException if it just takes too long. | ||
| LOGGER.log(Level.FINE, node + " is evidently offline now", x); | ||
| ws = null; | ||
| if (!printedCannotContactMessage) { | ||
| listener().getLogger().println("Cannot contact " + node + ": " + x); | ||
| printedCannotContactMessage = true; | ||
| } | ||
| getWorkspaceProblem(x); | ||
| return null; | ||
| } | ||
| if (!directory) { | ||
| throw new AbortException("missing workspace " + remote + " on " + node); | ||
| } | ||
| LOGGER.log(Level.FINER, "{0} seems to be online so using {1}", new Object[] {node, remote}); | ||
| return ws; | ||
| } | ||
|
|
||
| private void getWorkspaceProblem(Exception x) { | ||
| // RequestAbortedException, ChannelClosedException, EOFException, wrappers thereof; InterruptedException if it just takes too long. | ||
| LOGGER.log(Level.FINE, node + " is evidently offline now", x); | ||
| ws = null; | ||
| recurrencePeriod = MIN_RECURRENCE_PERIOD; | ||
| if (!printedCannotContactMessage) { | ||
| listener().getLogger().println("Cannot contact " + node + ": " + x); | ||
| printedCannotContactMessage = true; | ||
| } | ||
| } | ||
|
|
||
| private @Nonnull TaskListener listener() { | ||
| TaskListener l; | ||
| StepContext context = getContext(); | ||
|
|
@@ -252,6 +311,14 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab | |
| recurrencePeriod = 0; | ||
| listener().getLogger().println("After 10s process did not stop"); | ||
| getContext().onFailure(cause); | ||
| try { | ||
| FilePath workspace = getWorkspace(); | ||
| if (workspace != null) { | ||
| controller.cleanup(workspace); | ||
| } | ||
| } catch (IOException | InterruptedException x) { | ||
| Functions.printStackTrace(x, listener().getLogger()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🐜 Might be worth mentioning that we failed to do workspace cleanup (so it's clear this is not super serious).
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could do that.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cancel that—at this point we have already printed an unusual message above, and any exception here probably came from |
||
| } | ||
| } | ||
| } | ||
| }, 10, TimeUnit.SECONDS); | ||
|
|
@@ -315,10 +382,23 @@ private void check() { | |
| return; | ||
| } | ||
| if (workspace == null) { | ||
| recurrencePeriod = Math.min((long) (recurrencePeriod * RECURRENCE_PERIOD_BACKOFF), MAX_RECURRENCE_PERIOD); | ||
| return; // slave not yet ready, wait for another day | ||
| } | ||
| TaskListener listener = listener(); | ||
| try (Timeout timeout = Timeout.limit(10, TimeUnit.SECONDS)) { | ||
| if (watching) { | ||
| Integer exitCode = controller.exitStatus(workspace, launcher()); | ||
| if (exitCode == null) { | ||
| LOGGER.log(Level.FINE, "still running in {0} on {1}", new Object[] {remote, node}); | ||
| } else if (awaitingAsynchExit) { | ||
| recurrencePeriod = 0; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe safer to have a slight nonzero delay?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, (exact) zero is the signal to avoid rescheduling.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, so it's a special signal value, normally that would be "-1".
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps. At any rate, zero has been used for this purpose from the start—not introduced in this patch. I can make that explicit in Javadoc. |
||
| getContext().onFailure(new AbortException("script apparently exited with code " + exitCode + " but asynchronous notification was lost")); | ||
| } else { | ||
| LOGGER.log(Level.FINE, "exited with {0} in {1} on {2}; expect asynchronous exit soon", new Object[] {exitCode, remote, node}); | ||
| awaitingAsynchExit = true; | ||
| } | ||
| } else { // legacy mode | ||
| if (controller.writeLog(workspace, listener.getLogger())) { | ||
| getContext().saveState(); | ||
| recurrencePeriod = MIN_RECURRENCE_PERIOD; // got output, maybe we will get more soon | ||
|
|
@@ -343,6 +423,7 @@ private void check() { | |
| recurrencePeriod = 0; | ||
| controller.cleanup(workspace); | ||
| } | ||
| } | ||
| } catch (Exception x) { | ||
| LOGGER.log(Level.FINE, "could not check " + workspace, x); | ||
| ws = null; | ||
|
|
@@ -353,17 +434,111 @@ private void check() { | |
| } | ||
| } | ||
|
|
||
| // called remotely from HandlerImpl | ||
| @Override public void exited(int exitCode, byte[] output) throws Exception { | ||
| try { | ||
| getContext().get(TaskListener.class); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we checking for TaskListener in the context in order to identify this condition? I'll take my answer in the form of a code comment, please.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (done) |
||
| } catch (IOException | InterruptedException x) { | ||
| // E.g., CpsStepContext.doGet complaining that getThreadSynchronously() == null. | ||
| // If we cannot even print messages, there is no point proceeding. | ||
| LOGGER.log(Level.FINE, "asynchronous exit notification with code " + exitCode + " in " + remote + " on " + node + " ignored since step already seems dead", x); | ||
| return; | ||
| } | ||
| LOGGER.log(Level.FINE, "asynchronous exit notification with code {0} in {1} on {2}", new Object[] {exitCode, remote, node}); | ||
| if (returnStdout && output == null) { | ||
| getContext().onFailure(new IllegalStateException("expected output but got none")); | ||
| return; | ||
| } else if (!returnStdout && output != null) { | ||
| getContext().onFailure(new IllegalStateException("did not expect output but got some")); | ||
| return; | ||
| } | ||
| recurrencePeriod = 0; | ||
| if (returnStatus || exitCode == 0) { | ||
| getContext().onSuccess(returnStatus ? exitCode : returnStdout ? new String(output, StandardCharsets.UTF_8) : null); | ||
| } else { | ||
| if (returnStdout) { | ||
| listener().getLogger().write(output); // diagnostic | ||
| } | ||
| getContext().onFailure(new AbortException("script returned exit code " + exitCode)); | ||
| } | ||
| } | ||
|
|
||
| // ditto | ||
| @Override public void problem(Exception x) { | ||
| Functions.printStackTrace(x, listener().getLogger()); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems to work. Due to an apparent mistake in jenkinsci/pipeline-cloudwatch-logs-plugin#5 affecting agent-side, but not master-side, logging, I saw in the build log: with the build proceeding nonetheless. |
||
| // note that if there is _also_ a problem in the master-side logger, PrintStream will mask it | ||
| } | ||
|
|
||
| @Override public void onResume() { | ||
| setupTimer(); | ||
| ws = null; // find it from scratch please | ||
| setupTimer(MIN_RECURRENCE_PERIOD); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does not look right when we were jenkinsci/durable-task-plugin#60 (comment) is something different: when we were not
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never mind, I forgot that
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jglick If you're the author and couldn't remember that, it's probably worth a code comment so future maintainers (cough) don't make the same assumption and waste time.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually did have a comment, but apparently need to expand it. |
||
| // In watch mode, we will quickly enter the check. | ||
| // Then in getWorkspace when ws == null we will start a watch and go back to sleep. | ||
| } | ||
|
|
||
| private void setupTimer() { | ||
| recurrencePeriod = MIN_RECURRENCE_PERIOD; | ||
| private void setupTimer(long initialRecurrencePeriod) { | ||
| recurrencePeriod = initialRecurrencePeriod; | ||
| task = THREAD_POOL.schedule(this, recurrencePeriod, TimeUnit.MILLISECONDS); | ||
| } | ||
|
|
||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| } | ||
|
|
||
| private static class HandlerImpl extends Handler { | ||
|
|
||
| private static final Field printStreamDelegate; | ||
| static { | ||
| try { | ||
| printStreamDelegate = FilterOutputStream.class.getDeclaredField("out"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh, reflection hacks: the universal panacea!
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately I could not find a better way to do this. |
||
| } catch (NoSuchFieldException x) { | ||
| // Defined in Java Platform and protected, so should not happen. | ||
| throw new ExceptionInInitializerError(x); | ||
| } | ||
| printStreamDelegate.setAccessible(true); | ||
| } | ||
|
|
||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| private final ExecutionRemotable execution; | ||
| private final TaskListener listener; | ||
|
|
||
| HandlerImpl(Execution execution, FilePath workspace, TaskListener listener) { | ||
| this.execution = workspace.getChannel().export(ExecutionRemotable.class, execution); | ||
| this.listener = listener; | ||
| } | ||
|
|
||
| @Override public void output(InputStream stream) throws Exception { | ||
| PrintStream ps = listener.getLogger(); | ||
| OutputStream os; | ||
| if (ps.getClass() == PrintStream.class) { | ||
| // Try to extract the underlying stream, since swallowing exceptions is undesirable and PrintStream.checkError is useless. | ||
| os = (OutputStream) printStreamDelegate.get(ps); | ||
| } else { | ||
| // A subclass. Who knows why, but trust any write(…) overrides it may have. | ||
| os = ps; | ||
| } | ||
| try { | ||
| IOUtils.copy(stream, os); | ||
|
||
| } catch (ChannelClosedException x) { | ||
| // We are giving up on this watch. Wait for some call to getWorkspace to rewatch. | ||
| throw x; | ||
| } catch (Exception x) { | ||
| // Try to report it to the master. | ||
| try { | ||
| execution.problem(x); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I predict we'll be seeing a lot of this particular report when there are network issues. Thankfully it won't be that often due to the between recurrence periods. |
||
| // OK, printed to log on master side, we may have lost some text but could continue. | ||
| } catch (Exception x2) { // e.g., RemotingSystemException | ||
| // No, channel seems to be broken, give up on this watch. | ||
| throw x; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override public void exited(int code, byte[] output) throws Exception { | ||
| execution.exited(code, output); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will flesh this out with an explanation of the expected state changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(done)