Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.jenkinsci.plugins.workflow.steps.durable_task;

import hudson.Extension;
import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl;
import org.jenkinsci.plugins.workflow.steps.Step;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.kohsuke.stapler.DataBoundConstructor;

import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.Set;

/**
* Waits for the task forked in {@code sh(background:true, ...)} to complete
*
* TODO: native timeout support. in the mean time, combine with the timeout step
*
* @author Kohsuke Kawaguchi
* @see BackgroundTask
*/
public class BackgroundDurableTaskJoinStep extends Step {
private final BackgroundTask t;

@DataBoundConstructor
public BackgroundDurableTaskJoinStep(BackgroundTask t) {
this.t = t;
}

@Override
public StepExecution start(StepContext context) throws Exception {
return new Execution(context, t.getExecution());
}

static final class Execution extends AbstractStepExecutionImpl {
private DurableTaskStep.Execution task;
public Execution(StepContext context, DurableTaskStep.Execution t) {
super(context);
this.task = t;
}

@Override
public boolean start() throws Exception {
task.addCompletionHandler(getContext());
return false;
}

@Override
public void stop(@Nonnull Throwable cause) throws Exception {
// interrupting this step shouldn't cause the process to die
// DO NOT: task.stop(cause);
}
}

@Extension
public static class DescriptorImpl extends StepDescriptor {
@Override
public Set<? extends Class<?>> getRequiredContext() {
return Collections.emptySet();
}

@Override
public String getFunctionName() {
return "backgroundDurableTaskJoin";
}

/**
* Marking as advanced for now since this step is
* meant to be used in {@link BackgroundTask#join()}
*
* If we are to open this up to the general audience
* it should get a better name
*/
@Override
public boolean isAdvanced() {
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.jenkinsci.plugins.workflow.steps.durable_task;

import hudson.AbortException;

import java.io.Serializable;

/**
* Represents an object that tracks background task
* forked off by {@code sh(background:true)}
*
* <p>
* This object is serialized along with the pipeline program.
*
* @author Kohsuke Kawaguchi
*/
public class BackgroundTask implements Serializable {
private final DurableTaskStep.Execution execution;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eek!


/*package*/ BackgroundTask(DurableTaskStep.Execution execution) {
this.execution = execution;
}

/*package*/ DurableTaskStep.Execution getExecution() {
return execution;
}

/**
* Suspends until the process is done.
*
* @see BackgroundDurableTaskJoinStep
*/
public int join() {
// currently cannot be implemented as an instance method
// because this module doesn't depend on workflow-cps.
// use BackgroundDurableTaskJoinStep
throw new UnsupportedOperationException();
}

/**
* Immediately kills the task.
*/
public void kill() throws Exception {
execution.stop(new AbortException());
}

/*
def proc = sh(background:true, script:'android something')

proc.join()
*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package org.jenkinsci.plugins.workflow.steps.durable_task;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import hudson.AbortException;
import hudson.EnvVars;
Expand Down Expand Up @@ -72,6 +73,7 @@ public abstract class DurableTaskStep extends Step {
private boolean returnStdout;
private String encoding = DurableTaskStepDescriptor.defaultEncoding;
private boolean returnStatus;
private boolean background;

protected abstract DurableTask task();

Expand Down Expand Up @@ -99,6 +101,14 @@ public boolean isReturnStatus() {
this.returnStatus = returnStatus;
}

public boolean isBackground() {
return background;
}

@DataBoundSetter public void setBackground(boolean background) {
this.background = background;
}

@Override public StepExecution start(StepContext context) throws Exception {
return new Execution(context, this);
}
Expand All @@ -119,9 +129,9 @@ public FormValidation doCheckEncoding(@QueryParameter boolean returnStdout, @Que
return FormValidation.ok();
}

public FormValidation doCheckReturnStatus(@QueryParameter boolean returnStdout, @QueryParameter boolean returnStatus) {
if (returnStdout && returnStatus) {
return FormValidation.error("You may not select both returnStdout and returnStatus.");
public FormValidation doCheckReturnStatus(@QueryParameter boolean returnStdout, @QueryParameter boolean returnStatus, @QueryParameter boolean background) {
if ((returnStdout?1:0)+(returnStatus?1:0)+(background?1:0)>1) {
return FormValidation.error("You can only select one of returnStdout, returnStatus, or background.");
}
return FormValidation.ok();
}
Expand Down Expand Up @@ -152,6 +162,14 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
private boolean returnStdout; // serialized default is false
private String encoding; // serialized default is irrelevant
private boolean returnStatus; // serialized default is false
private boolean background; // serialized default is false

/**
* In this class, the completion of the async task should be sent here,
* instead of {@code getContext()} like normal step is, in order for
* us to support background task execution. See {@link DurableTaskStep#background}
*/
private FutureCallbackProxy<Object> callback = new FutureCallbackProxy<>();

Execution(StepContext context, DurableTaskStep step) {
super(context);
Expand All @@ -161,8 +179,11 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
@Override public boolean start() throws Exception {
returnStdout = step.returnStdout;
encoding = step.encoding;
returnStatus = step.returnStatus;
returnStatus = step.returnStatus || step.background;
background = step.background;
StepContext context = getContext();
if (!background) // unless run in background, the completion of the task means completion of this step
callback.addCallback(context);
ws = context.get(FilePath.class);
node = FilePathUtils.getNodeName(ws);
DurableTask durableTask = step.task();
Expand All @@ -172,7 +193,12 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
controller = durableTask.launch(context.get(EnvVars.class), ws, context.get(Launcher.class), context.get(TaskListener.class));
this.remote = ws.getRemote();
setupTimer();
return false;
if (background) {
context.onSuccess(new BackgroundTask(this));
return true;
} else {
return false;
}
}

private @CheckForNull FilePath getWorkspace() throws AbortException {
Expand Down Expand Up @@ -217,7 +243,7 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
LOGGER.log(Level.WARNING, "JENKINS-34021: could not get TaskListener in " + context, x);
l = new LogTaskListener(LOGGER, Level.FINE);
recurrencePeriod = 0;
getContext().onFailure(x);
callback.onFailure(x);
}
return l.getLogger();
}
Expand All @@ -243,14 +269,14 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
if (recurrencePeriod > 0) {
recurrencePeriod = 0;
logger().println("After 10s process did not stop");
getContext().onFailure(cause);
callback.onFailure(cause);
}
}
}, 10, TimeUnit.SECONDS);
} else {
logger().println("Could not connect to " + node + " to send interrupt signal to process");
recurrencePeriod = 0;
getContext().onFailure(cause);
callback.onFailure(cause);
}
}

Expand Down Expand Up @@ -290,7 +316,14 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
}
}

private void check() {
/**
* Registers a callback that gets called when the task is completed.
*/
/*package*/ synchronized void addCompletionHandler(FutureCallback<Object> callback) {
this.callback.addCallback(callback);
}

private synchronized void check() {
if (recurrencePeriod == 0) { // from stop
return;
}
Expand All @@ -299,7 +332,7 @@ private void check() {
workspace = getWorkspace();
} catch (AbortException x) {
recurrencePeriod = 0;
getContext().onFailure(x);
callback.onFailure(x);
return;
}
if (workspace == null) {
Expand All @@ -321,13 +354,13 @@ private void check() {
if (controller.writeLog(workspace, logger())) {
LOGGER.log(Level.FINE, "last-minute output in {0} on {1}", new Object[] {remote, node});
}
if (returnStatus || exitCode == 0) {
getContext().onSuccess(returnStatus ? exitCode : returnStdout ? new String(controller.getOutput(workspace, launcher()), encoding) : null);
if (background || returnStatus || exitCode == 0) {
callback.onSuccess(returnStatus ? exitCode : returnStdout ? new String(controller.getOutput(workspace, launcher()), encoding) : null);
} else {
if (returnStdout) {
logger().write(controller.getOutput(workspace, launcher())); // diagnostic
}
getContext().onFailure(new AbortException("script returned exit code " + exitCode));
callback.onFailure(new AbortException("script returned exit code " + exitCode));
}
recurrencePeriod = 0;
controller.cleanup(workspace);
Expand All @@ -342,6 +375,11 @@ private void check() {
}

@Override public void onResume() {
if (callback==null) {
callback = new FutureCallbackProxy<>();
if (!background)
callback.addCallback(getContext());
}
setupTimer();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.jenkinsci.plugins.workflow.steps.durable_task;

import com.google.common.util.concurrent.FutureCallback;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
* {@link FutureCallback} that buffers the result and forwards
* it to any number of {@link FutureCallback}s.
* @author Kohsuke Kawaguchi
*/
final class FutureCallbackProxy<T> implements FutureCallback<T>, Serializable {
private T result;
private Throwable t;
private boolean completed;

private final List<FutureCallback<? super T>> callbacks = new ArrayList<>();

@Override
public void onSuccess(T result) {
this.result = result;
fire();
}

@Override
public void onFailure(Throwable t) {
this.t = t;
fire();
}

private void fire() {
List<FutureCallback<? super T>> clone;
synchronized (this) {
completed = true;
clone = new ArrayList<>(callbacks);
}
for (FutureCallback<? super T> c : clone) {
fire(c);
}

}

private void fire(FutureCallback<? super T> c) {
if (t!=null)
c.onFailure(t);
else
c.onSuccess(result);
}

public void addCallback(FutureCallback<? super T> callback) {
boolean fireNow = false;
synchronized (this) {
if (!completed)
callbacks.add(callback);
else
fireNow = true;
}

if (fireNow)
fire(callback);
}

private static final long serialVersionUID = 1L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,28 @@ public DescriptorImpl() {
j.assertLogContains("truth is 0 but falsity is 1", j.assertBuildStatusSuccess(p.scheduleBuild2(0)));
}

@Test
public void backgroundTask() throws Exception {
WorkflowJob p = j.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node { " +
"def x = sh(script:'echo hello world',background:true);" +
"def e = backgroundDurableTaskJoin(x);" +
"echo('output='+e);" +
"}"));
j.assertLogContains("output=0", j.assertBuildStatusSuccess(p.scheduleBuild2(0)));
}

@Test
public void backgroundTaskKill() throws Exception {
WorkflowJob p = j.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node { " +
"def x = sh(script:'sleep 60',background:true);" +
"x.kill()"+
// TODO: how do I prove that it actually killed the process?
"}"));
j.assertBuildStatusSuccess(p.scheduleBuild2(0));
}

/**
* Asserts that the predicate remains true up to the given timeout.
*/
Expand Down