-
-
Notifications
You must be signed in to change notification settings - Fork 105
ContinuedTask cannot rely on queue scheduling order
#472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
src/main/java/org/jenkinsci/plugins/workflow/support/steps/UsageTracker.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| /* | ||
| * The MIT License | ||
| * | ||
| * Copyright 2025 CloudBees, Inc. | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| * of this software and associated documentation files (the "Software"), to deal | ||
| * in the Software without restriction, including without limitation the rights | ||
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the Software is | ||
| * furnished to do so, subject to the following conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be included in | ||
| * all copies or substantial portions of the Software. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
| * THE SOFTWARE. | ||
| */ | ||
|
|
||
| package org.jenkinsci.plugins.workflow.support.steps; | ||
|
|
||
| import hudson.Extension; | ||
| import hudson.model.Node; | ||
| import hudson.model.Queue; | ||
| import hudson.model.Run; | ||
| import hudson.model.queue.CauseOfBlockage; | ||
| import hudson.model.queue.QueueTaskDispatcher; | ||
| import hudson.slaves.NodeProperty; | ||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.logging.Logger; | ||
| import org.jenkinsci.plugins.durabletask.executors.ContinuedTask; | ||
|
|
||
| // TODO move to ContinuedTask once tested | ||
|
|
||
| /** | ||
| * Serves essentially the same function as {@link ContinuedTask}, but more reliably: | ||
| * the block on an agent in use does not rely on a {@link Queue.Item} having been scheduled. | ||
| */ | ||
| final class UsageTracker extends NodeProperty<Node> { | ||
|
|
||
| private static final Logger LOGGER = Logger.getLogger(UsageTracker.class.getName()); | ||
|
|
||
| final List<ContinuedTask> tasks; | ||
|
|
||
| private UsageTracker(List<ContinuedTask> tasks) { | ||
| this.tasks = tasks; | ||
| } | ||
|
|
||
| // TODO cannot use NodeProperty.canTake because NodeProperty.setNode is never called | ||
|
|
||
| @Extension public static final class QTD extends QueueTaskDispatcher { | ||
| @Override public CauseOfBlockage canTake(Node node, Queue.BuildableItem item) { | ||
| if (item.task instanceof ContinuedTask ct && ct.isContinued()) { | ||
| LOGGER.finer(() -> item.task + " is a continued task, so we are not blocking it"); | ||
| return null; | ||
| } | ||
| var prop = node.getNodeProperty(UsageTracker.class); | ||
| if (prop == null) { | ||
| LOGGER.finer(() -> "not blocking " + item.task + " since " + node + " has no registrations"); | ||
| return null; | ||
| } | ||
| var c = node.toComputer(); | ||
| if (c == null) { | ||
| LOGGER.finer(() -> "not blocking " + item + " since " + node + " has no computer"); | ||
| return null; | ||
| } | ||
| var executors = c.getExecutors(); | ||
| TASK: for (var task : prop.tasks) { | ||
| for (var executor : executors) { | ||
| var executable = executor.getCurrentExecutable(); | ||
| if (executable != null && executable.getParent().equals(task)) { | ||
| LOGGER.finer(() -> "not blocking " + item + " due to " + task + " since that is already running on " + c); | ||
| continue TASK; | ||
| } | ||
| } | ||
| if (task.getOwnerExecutable() instanceof Run<?, ?> build && !build.isInProgress()) { | ||
| LOGGER.finer(() -> "not blocking " + item + " due to " + task + " on " + c + " since " + build + " was already completed"); | ||
| // TODO unregister stale entry | ||
| continue; | ||
| } | ||
| LOGGER.fine(() -> "blocking " + item.task + " in favor of " + task + " slated to run on " + c); | ||
| return new HoldOnPlease(task); | ||
| } | ||
| LOGGER.finer(() -> "no reason to block " + item.task); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| private static final class HoldOnPlease extends CauseOfBlockage { | ||
| private final Queue.Task task; | ||
| HoldOnPlease(Queue.Task task) { | ||
| this.task = task; | ||
| } | ||
| @Override public String getShortDescription() { | ||
| return task.getFullDisplayName() + " should be allowed to run first"; | ||
| } | ||
| } | ||
|
|
||
| public static void register(Node node, ContinuedTask task) throws IOException { | ||
| LOGGER.fine(() -> "registering " + task + " on " + node); | ||
| synchronized (node) { | ||
| var prop = node.getNodeProperty(UsageTracker.class); | ||
| List<ContinuedTask> tasks = prop != null ? new ArrayList<>(prop.tasks) : new ArrayList<>(); | ||
| tasks.add(task); | ||
| node.getNodeProperties().replace(new UsageTracker(tasks)); | ||
| } | ||
| } | ||
|
|
||
| public static void unregister(Node node, ContinuedTask task) throws IOException { | ||
| LOGGER.fine(() -> "unregistering " + task + " from " + node); | ||
| synchronized (node) { | ||
| var prop = node.getNodeProperty(UsageTracker.class); | ||
| if (prop != null) { | ||
| var tasks = new ArrayList<>(prop.tasks); | ||
| tasks.remove(task); | ||
| if (tasks.isEmpty()) { | ||
| node.getNodeProperties().remove(UsageTracker.class); | ||
| } else { | ||
| node.getNodeProperties().replace(new UsageTracker(tasks)); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // TODO override reconfigure | ||
| // TODO use NodeListener.onUpdated to transfer TrackingProperty so that io.jenkins.plugins.casc.core.JenkinsConfigurator will not delete info from permanent agents | ||
|
Check warning on line 133 in src/main/java/org/jenkinsci/plugins/workflow/support/steps/UsageTracker.java
|
||
|
|
||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,20 +38,31 @@ | |
| import hudson.model.Label; | ||
| import hudson.model.Queue; | ||
| import hudson.model.Result; | ||
| import hudson.model.TaskListener; | ||
| import hudson.slaves.DumbSlave; | ||
| import hudson.slaves.RetentionStrategy; | ||
|
|
||
| import java.io.File; | ||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.logging.Level; | ||
|
|
||
| import jenkins.model.InterruptedBuildAction; | ||
|
|
||
| import org.jenkinci.plugins.mock_slave.MockCloud; | ||
| import org.jenkinsci.plugins.durabletask.executors.ContinuedTask; | ||
| import org.jenkinsci.plugins.durabletask.executors.OnceRetentionStrategy; | ||
| import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; | ||
| import org.jenkinsci.plugins.workflow.flow.FlowExecutionList; | ||
| import org.jenkinsci.plugins.workflow.job.WorkflowJob; | ||
| import org.jenkinsci.plugins.workflow.job.WorkflowRun; | ||
| import org.jenkinsci.plugins.workflow.steps.BodyExecutionCallback; | ||
| import org.jenkinsci.plugins.workflow.steps.Step; | ||
| import org.jenkinsci.plugins.workflow.steps.StepContext; | ||
| import org.jenkinsci.plugins.workflow.steps.StepDescriptor; | ||
| import org.jenkinsci.plugins.workflow.steps.StepExecution; | ||
| import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; | ||
| import org.junit.ClassRule; | ||
| import org.junit.Rule; | ||
|
|
@@ -60,6 +71,8 @@ | |
| import org.jvnet.hudson.test.Issue; | ||
| import org.jvnet.hudson.test.JenkinsSessionRule; | ||
| import org.jvnet.hudson.test.LoggerRule; | ||
| import org.jvnet.hudson.test.TestExtension; | ||
| import org.kohsuke.stapler.DataBoundConstructor; | ||
|
|
||
| public class ExecutorStepDynamicContextTest { | ||
|
|
||
|
|
@@ -249,4 +262,64 @@ private void commonSetup() { | |
| assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeCause.class))); | ||
| }); | ||
| } | ||
|
|
||
| @Test public void tardyResume() throws Throwable { | ||
| commonSetup(); | ||
| logging.record(ContinuedTask.class, Level.FINER).record(UsageTracker.class, Level.FINER); | ||
| sessions.then(j -> { | ||
| j.createSlave("remote", "contended", null); | ||
| var prompt = j.createProject(WorkflowJob.class, "prompt"); | ||
| prompt.setDefinition(new CpsFlowDefinition("node('contended') {semaphore 'prompt'}", true)); | ||
| var tardy = j.createProject(WorkflowJob.class, "tardy"); | ||
| tardy.setDefinition(new CpsFlowDefinition("slowToResume {node('contended') {semaphore 'tardy'}}", true)); | ||
| SemaphoreStep.waitForStart("tardy/1", tardy.scheduleBuild2(0).waitForStart()); | ||
| j.waitForMessage("Still waiting to schedule task", prompt.scheduleBuild2(0).waitForStart()); | ||
| }); | ||
| sessions.then(j -> { | ||
| var promptB = j.jenkins.getItemByFullName("prompt", WorkflowJob.class).getBuildByNumber(1); | ||
| j.waitForMessage("Ready to run", promptB); | ||
| var tardyB = j.jenkins.getItemByFullName("tardy", WorkflowJob.class).getBuildByNumber(1); | ||
| j.waitForMessage("Ready to run", tardyB); | ||
| SemaphoreStep.success("tardy/1", null); | ||
| j.assertBuildStatusSuccess(j.waitForCompletion(tardyB)); | ||
| SemaphoreStep.waitForStart("prompt/1", promptB); | ||
| SemaphoreStep.success("prompt/1", null); | ||
| j.assertBuildStatusSuccess(j.waitForCompletion(promptB)); | ||
| }); | ||
| } | ||
| public static final class SlowToResumeStep extends Step { | ||
| @DataBoundConstructor public SlowToResumeStep() {} | ||
| @Override public StepExecution start(StepContext context) throws Exception { | ||
| return new Execution(context); | ||
| } | ||
| private static final class Execution extends StepExecution { | ||
| Execution(StepContext context) { | ||
| super(context); | ||
| } | ||
| @Override public boolean start() throws Exception { | ||
| getContext().newBodyInvoker().withCallback(BodyExecutionCallback.wrap(getContext())).start(); | ||
| return false; | ||
| } | ||
| @Override public void onResume() { | ||
| try { | ||
| getContext().get(TaskListener.class).getLogger().println("Will resume outer step…"); | ||
| Thread.sleep(3_000); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Passes without this delay. |
||
| getContext().get(TaskListener.class).getLogger().println("…resumed."); | ||
| } catch (Exception x) { | ||
| throw new RuntimeException(x); | ||
| } | ||
| } | ||
| } | ||
| @TestExtension("tardyResume") public static class DescriptorImpl extends StepDescriptor { | ||
| @Override public String getFunctionName() { | ||
| return "slowToResume"; | ||
| } | ||
| @Override public boolean takesImplicitBlockArgument() { | ||
| return true; | ||
| } | ||
| @Override public Set<? extends Class<?>> getRequiredContext() { | ||
| return Set.of(TaskListener.class); | ||
| } | ||
| } | ||
| } | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.