diff --git a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContext.java b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContext.java index c05c900d..a522fd4e 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContext.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContext.java @@ -24,6 +24,7 @@ package org.jenkinsci.plugins.workflow.support.steps; +import edu.umd.cs.findbugs.annotations.CheckForNull; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import hudson.Extension; @@ -143,6 +144,11 @@ void resume(StepContext context) throws Exception { return "ExecutorStepDynamicContext[" + path + "@" + node + "]"; } + @CheckForNull Node getNode() { + Jenkins j = Jenkins.get(); + return node.isEmpty() ? j : j.getNode(node); + } + private static abstract class Translator extends DynamicContext.Typed { @Override protected T get(DelegatedContext context) throws IOException, InterruptedException { @@ -251,8 +257,7 @@ private static abstract class Translator extends DynamicContext.Typed { if (c == null) { return null; } - Jenkins j = Jenkins.get(); - return c.node.isEmpty() ? j : j.getNode(c.node); + return c.getNode(); } } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java index abd14d48..25fc674d 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepExecution.java @@ -154,7 +154,7 @@ public boolean start() throws Exception { cob.print(listener); } } - }, 15, TimeUnit.SECONDS); + }, Main.isUnitTest ? 5 : 15, TimeUnit.SECONDS); return false; } @@ -508,8 +508,8 @@ public static final class PlaceholderTask implements ContinuedTask, Serializable private Object readResolve() { RunningTasks.add(context); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.log(FINE, null, new Exception("deserializing previously scheduled " + this)); + if (LOGGER.isLoggable(Level.FINER)) { + LOGGER.log(FINER, null, new Exception("deserializing previously scheduled " + this)); } return this; } @@ -747,7 +747,7 @@ public String getShortDescription() { @Override public @CheckForNull Queue.Executable getOwnerExecutable() { Run r = runForDisplay(); - return r instanceof Queue.Executable ? (Queue.Executable) r : null; + return r instanceof Queue.Executable qe ? qe : null; } @Exported @@ -1042,6 +1042,10 @@ private static final class Callback extends BodyExecutionCallback.TailCall { if (_lease != null) { _lease.release(); } + var node = state.getNode(); + if (node != null) { + UsageTracker.unregister(node, state.task); + } } } finally { finish(execution.getContext(), cookie); @@ -1104,6 +1108,7 @@ public final class PlaceholderExecutable implements ContinuableExecutable, Acces // Switches the label to a self-label, so if the executable is killed and restarted, it will run on the same node: label = computer.getName(); labelIsSelfLabel = true; + UsageTracker.register(node, PlaceholderTask.this); EnvVars env = computer.getEnvironment(); env.overrideExpandingAll(computer.buildEnvironment(listener)); diff --git a/src/main/java/org/jenkinsci/plugins/workflow/support/steps/UsageTracker.java b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/UsageTracker.java new file mode 100644 index 00000000..0cc06a8e --- /dev/null +++ b/src/main/java/org/jenkinsci/plugins/workflow/support/steps/UsageTracker.java @@ -0,0 +1,135 @@ +/* + * The MIT License + * + * Copyright 2025 CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package org.jenkinsci.plugins.workflow.support.steps; + +import hudson.Extension; +import hudson.model.Node; +import hudson.model.Queue; +import hudson.model.Run; +import hudson.model.queue.CauseOfBlockage; +import hudson.model.queue.QueueTaskDispatcher; +import hudson.slaves.NodeProperty; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; +import org.jenkinsci.plugins.durabletask.executors.ContinuedTask; + +// TODO move to ContinuedTask once tested + +/** + * Serves essentially the same function as {@link ContinuedTask}, but more reliably: + * the block on an agent in use does not rely on a {@link Queue.Item} having been scheduled. + */ +final class UsageTracker extends NodeProperty { + + private static final Logger LOGGER = Logger.getLogger(UsageTracker.class.getName()); + + final List tasks; + + private UsageTracker(List tasks) { + this.tasks = tasks; + } + + // TODO cannot use NodeProperty.canTake because NodeProperty.setNode is never called + + @Extension public static final class QTD extends QueueTaskDispatcher { + @Override public CauseOfBlockage canTake(Node node, Queue.BuildableItem item) { + if (item.task instanceof ContinuedTask ct && ct.isContinued()) { + LOGGER.finer(() -> item.task + " is a continued task, so we are not blocking it"); + return null; + } + var prop = node.getNodeProperty(UsageTracker.class); + if (prop == null) { + LOGGER.finer(() -> "not blocking " + item.task + " since " + node + " has no registrations"); + return null; + } + var c = node.toComputer(); + if (c == null) { + LOGGER.finer(() -> "not blocking " + item + " since " + node + " has no computer"); + return null; + } + var executors = c.getExecutors(); + TASK: for (var task : prop.tasks) { + for (var executor : executors) { + var executable = executor.getCurrentExecutable(); + if (executable != null && executable.getParent().equals(task)) { + LOGGER.finer(() -> "not blocking " + item + " due to " + task + " since that is already running on " + c); + continue TASK; + } + } + if (task.getOwnerExecutable() instanceof Run build && !build.isInProgress()) { + LOGGER.finer(() -> "not blocking " + item + " due to " + task + " on " + c + " since " + build + " was already completed"); + // TODO unregister stale entry + continue; + } + LOGGER.fine(() -> "blocking " + item.task + " in favor of " + task + " slated to run on " + c); + return new HoldOnPlease(task); + } + LOGGER.finer(() -> "no reason to block " + item.task); + return null; + } + } + + private static final class HoldOnPlease extends CauseOfBlockage { + private final Queue.Task task; + HoldOnPlease(Queue.Task task) { + this.task = task; + } + @Override public String getShortDescription() { + return task.getFullDisplayName() + " should be allowed to run first"; + } + } + + public static void register(Node node, ContinuedTask task) throws IOException { + LOGGER.fine(() -> "registering " + task + " on " + node); + synchronized (node) { + var prop = node.getNodeProperty(UsageTracker.class); + List tasks = prop != null ? new ArrayList<>(prop.tasks) : new ArrayList<>(); + tasks.add(task); + node.getNodeProperties().replace(new UsageTracker(tasks)); + } + } + + public static void unregister(Node node, ContinuedTask task) throws IOException { + LOGGER.fine(() -> "unregistering " + task + " from " + node); + synchronized (node) { + var prop = node.getNodeProperty(UsageTracker.class); + if (prop != null) { + var tasks = new ArrayList<>(prop.tasks); + tasks.remove(task); + if (tasks.isEmpty()) { + node.getNodeProperties().remove(UsageTracker.class); + } else { + node.getNodeProperties().replace(new UsageTracker(tasks)); + } + } + } + } + + // TODO override reconfigure + // TODO use NodeListener.onUpdated to transfer TrackingProperty so that io.jenkins.plugins.casc.core.JenkinsConfigurator will not delete info from permanent agents + +} diff --git a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContextTest.java b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContextTest.java index 5ecf1d51..45253d4a 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContextTest.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/support/steps/ExecutorStepDynamicContextTest.java @@ -38,20 +38,31 @@ import hudson.model.Label; import hudson.model.Queue; import hudson.model.Result; +import hudson.model.TaskListener; import hudson.slaves.DumbSlave; import hudson.slaves.RetentionStrategy; + import java.io.File; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.logging.Level; + import jenkins.model.InterruptedBuildAction; + import org.jenkinci.plugins.mock_slave.MockCloud; +import org.jenkinsci.plugins.durabletask.executors.ContinuedTask; import org.jenkinsci.plugins.durabletask.executors.OnceRetentionStrategy; import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; import org.jenkinsci.plugins.workflow.flow.FlowExecutionList; import org.jenkinsci.plugins.workflow.job.WorkflowJob; import org.jenkinsci.plugins.workflow.job.WorkflowRun; +import org.jenkinsci.plugins.workflow.steps.BodyExecutionCallback; +import org.jenkinsci.plugins.workflow.steps.Step; +import org.jenkinsci.plugins.workflow.steps.StepContext; +import org.jenkinsci.plugins.workflow.steps.StepDescriptor; +import org.jenkinsci.plugins.workflow.steps.StepExecution; import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; import org.junit.ClassRule; import org.junit.Rule; @@ -60,6 +71,8 @@ import org.jvnet.hudson.test.Issue; import org.jvnet.hudson.test.JenkinsSessionRule; import org.jvnet.hudson.test.LoggerRule; +import org.jvnet.hudson.test.TestExtension; +import org.kohsuke.stapler.DataBoundConstructor; public class ExecutorStepDynamicContextTest { @@ -249,4 +262,64 @@ private void commonSetup() { assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeCause.class))); }); } + + @Test public void tardyResume() throws Throwable { + commonSetup(); + logging.record(ContinuedTask.class, Level.FINER).record(UsageTracker.class, Level.FINER); + sessions.then(j -> { + j.createSlave("remote", "contended", null); + var prompt = j.createProject(WorkflowJob.class, "prompt"); + prompt.setDefinition(new CpsFlowDefinition("node('contended') {semaphore 'prompt'}", true)); + var tardy = j.createProject(WorkflowJob.class, "tardy"); + tardy.setDefinition(new CpsFlowDefinition("slowToResume {node('contended') {semaphore 'tardy'}}", true)); + SemaphoreStep.waitForStart("tardy/1", tardy.scheduleBuild2(0).waitForStart()); + j.waitForMessage("Still waiting to schedule task", prompt.scheduleBuild2(0).waitForStart()); + }); + sessions.then(j -> { + var promptB = j.jenkins.getItemByFullName("prompt", WorkflowJob.class).getBuildByNumber(1); + j.waitForMessage("Ready to run", promptB); + var tardyB = j.jenkins.getItemByFullName("tardy", WorkflowJob.class).getBuildByNumber(1); + j.waitForMessage("Ready to run", tardyB); + SemaphoreStep.success("tardy/1", null); + j.assertBuildStatusSuccess(j.waitForCompletion(tardyB)); + SemaphoreStep.waitForStart("prompt/1", promptB); + SemaphoreStep.success("prompt/1", null); + j.assertBuildStatusSuccess(j.waitForCompletion(promptB)); + }); + } + public static final class SlowToResumeStep extends Step { + @DataBoundConstructor public SlowToResumeStep() {} + @Override public StepExecution start(StepContext context) throws Exception { + return new Execution(context); + } + private static final class Execution extends StepExecution { + Execution(StepContext context) { + super(context); + } + @Override public boolean start() throws Exception { + getContext().newBodyInvoker().withCallback(BodyExecutionCallback.wrap(getContext())).start(); + return false; + } + @Override public void onResume() { + try { + getContext().get(TaskListener.class).getLogger().println("Will resume outer step…"); + Thread.sleep(3_000); + getContext().get(TaskListener.class).getLogger().println("…resumed."); + } catch (Exception x) { + throw new RuntimeException(x); + } + } + } + @TestExtension("tardyResume") public static class DescriptorImpl extends StepDescriptor { + @Override public String getFunctionName() { + return "slowToResume"; + } + @Override public boolean takesImplicitBlockArgument() { + return true; + } + @Override public Set> getRequiredContext() { + return Set.of(TaskListener.class); + } + } + } }