-
Notifications
You must be signed in to change notification settings - Fork 200
Fix a whole wad of Resume issues and add a bunch of diagnostic logging for FlowHead problems etc #216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix a whole wad of Resume issues and add a bunch of diagnostic logging for FlowHead problems etc #216
Changes from 26 commits
a7a926e
7b2ba86
c028682
046e27a
d33c90c
1ad3eb4
cf829bb
a017be9
5d08569
4c545d6
1f47c90
b34e1f1
ab88c6d
f4aad61
16dcb35
1a279ef
5aa15b5
b91d775
e69a24c
9d8f0b1
95d7ee2
a9c52d6
a4110d2
773855b
91212d2
5ecfd2c
1360644
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,7 +69,7 @@ | |
| <workflow-support-plugin.version>2.17</workflow-support-plugin.version> | ||
| <scm-api-plugin.version>2.0.8</scm-api-plugin.version> | ||
| <groovy-cps.version>1.24</groovy-cps.version> | ||
| <jenkins-test-harness.version>2.33</jenkins-test-harness.version> | ||
| <jenkins-test-harness.version>2.37</jenkins-test-harness.version> | ||
| </properties> | ||
| <dependencies> | ||
| <dependency> | ||
|
|
@@ -141,7 +141,7 @@ | |
| <dependency> | ||
| <groupId>org.jenkins-ci.plugins.workflow</groupId> | ||
| <artifactId>workflow-job</artifactId> | ||
| <version>2.17</version> | ||
| <version>2.18-20180406.172304-8</version> | ||
|
||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -134,6 +134,8 @@ | |
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collector; | ||
| import java.util.stream.Collectors; | ||
| import javax.annotation.CheckForNull; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.concurrent.GuardedBy; | ||
|
|
@@ -282,7 +284,8 @@ public class CpsFlowExecution extends FlowExecution implements BlockableResume { | |
| private transient List<String> startNodesSerial; // used only between unmarshal and onLoad | ||
|
|
||
| @GuardedBy("this") | ||
| private /* almost final*/ NavigableMap<Integer,FlowHead> heads = new TreeMap<Integer,FlowHead>(); | ||
| /* almost final*/ NavigableMap<Integer,FlowHead> heads = new TreeMap<Integer,FlowHead>(); // Non-private for unit tests | ||
|
|
||
| @SuppressFBWarnings({"IS_FIELD_NOT_GUARDED", "IS2_INCONSISTENT_SYNC"}) // irrelevant here | ||
| private transient Map<Integer,String> headsSerial; // used only between unmarshal and onLoad | ||
|
|
||
|
|
@@ -313,7 +316,7 @@ public class CpsFlowExecution extends FlowExecution implements BlockableResume { | |
| * {@link FlowExecution} gets loaded into memory for the build records that have been completed, | ||
| * and for those we don't want to load the program state, so that check should be efficient. | ||
| */ | ||
| private boolean done; | ||
| boolean done; // Only non-private for unit test use. | ||
|
|
||
| /** | ||
| * Groovy compiler with CPS+sandbox transformation correctly setup. | ||
|
|
@@ -598,14 +601,35 @@ int approximateNodeCount() { | |
| return iota.get(); | ||
| } | ||
|
|
||
| /** For diagnostic purposes only, this logs current heads to assist with troubleshooting. */ | ||
| private synchronized String getHeadsAsString() { | ||
| NavigableMap<Integer, FlowHead> myHeads = this.heads; | ||
| if (myHeads == null) { | ||
| return "null-heads"; | ||
| } else if (myHeads.size() == 0) { | ||
| return "empty-heads"; | ||
| } else { | ||
| return myHeads.entrySet().stream().map(h->h.getKey()+"::"+h.getValue()).collect(Collectors.joining(",")); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
| } | ||
|
|
||
| /** Handle failures where we can't load heads. */ | ||
| private void rebuildEmptyGraph() { | ||
| synchronized (this) { | ||
| // something went catastrophically wrong and there's no live head. fake one | ||
| LOGGER.log(Level.WARNING, "Failed to load pipeline heads, so faking some up for execution " + this.toString()); | ||
| if (this.startNodes == null) { | ||
| this.startNodes = new Stack<BlockStartNode>(); | ||
| } | ||
| this.heads.clear(); | ||
|
|
||
| if (this.heads != null && this.heads.size() > 0) { | ||
| if (LOGGER.isLoggable(Level.INFO)) { | ||
| LOGGER.log(Level.INFO, "Resetting heads to rebuild the Pipeline structure, tossing existing heads: "+getHeadsAsString()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oooo, this is a nice addition.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here would reduce to just if (heads != null && !heads.isEmpty()) {
LOGGER.log(Level.INFO, "Resetting heads to rebuild the Pipeline structure, tossing existing heads: {0}", heads);
heads.clear();
} |
||
| } | ||
| this.heads.clear(); | ||
| } | ||
|
|
||
| this.startNodes.clear(); | ||
| FlowHead head = new FlowHead(this); | ||
| heads.put(head.getId(), head); | ||
|
|
@@ -637,6 +661,7 @@ protected synchronized void initializeStorage() throws IOException { | |
| h.setForDeserialize(storage.getNode(entry.getValue())); | ||
| heads.put(h.getId(), h); | ||
| } else { | ||
| LOGGER.log(Level.WARNING, "Tried to load head FlowNodes for execution "+this.owner+" but FlowNode was not found in storage for head id:FlowNodeId "+entry.getKey()+":"+entry.getValue()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or just: LOGGER.log(Level.WARNING, "Tried to load head FlowNodes for execution {0} but FlowNode was not found in storage for head id:FlowNodeId {1}", new Object[] {owner, entry}); |
||
| storageErrors = true; | ||
| break; | ||
| } | ||
|
|
@@ -654,6 +679,7 @@ protected synchronized void initializeStorage() throws IOException { | |
| startNodes.add((BlockStartNode) storage.getNode(id)); | ||
| } else { | ||
| // TODO if possible, consider trying to close out unterminated blocks using heads, to keep existing graph history | ||
| LOGGER.log(Level.WARNING, "Tried to load startNode FlowNodes for execution "+this.owner+" but FlowNode was not found in storage for FlowNode Id "+id); | ||
| storageErrors = true; | ||
| break; | ||
| } | ||
|
|
@@ -687,6 +713,7 @@ public boolean canResume() { | |
| } | ||
|
|
||
| @Override | ||
| @SuppressFBWarnings(value = "RC_REF_COMPARISON_BAD_PRACTICE_BOOLEAN", justification = "We want to explicitly check for boolean not-null and true") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it wrong that I love that with
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A trilean, acc. to Wikipedia. |
||
| public void onLoad(FlowExecutionOwner owner) throws IOException { | ||
| this.owner = owner; | ||
| try { | ||
|
|
@@ -696,13 +723,15 @@ public void onLoad(FlowExecutionOwner owner) throws IOException { | |
| if (canResume()) { | ||
| loadProgramAsync(getProgramDataFile()); | ||
| } else { | ||
| // TODO if possible, consider tyring to close out unterminated blocks to keep existing graph history | ||
| // TODO if possible, consider trying to close out unterminated blocks to keep existing graph history | ||
| // That way we can visualize the graph in some error cases. | ||
| LOGGER.log(Level.WARNING, "Pipeline state not properly persisted, cannot resume "+owner.getUrl()); | ||
| throw new IOException("Cannot resume build -- was not cleanly saved when Jenkins shut down."); | ||
| } | ||
| } else if (done && !super.isComplete()) { | ||
| LOGGER.log(Level.WARNING, "Completed flow without FlowEndNode: "+this+" heads:"+getHeadsAsString()); | ||
| } | ||
| } catch (IOException e) { | ||
| } catch (Exception e) { // Multicatch ensures that failure to load does not nuke the master | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahem jenkinsci/workflow-api-plugin#54 :-) |
||
| SettableFuture<CpsThreadGroup> p = SettableFuture.create(); | ||
| programPromise = p; | ||
| loadProgramFailed(e, p); | ||
|
|
@@ -829,6 +858,11 @@ void croak(Throwable t) { | |
| setResult(Result.FAILURE); | ||
| onProgramEnd(new Outcome(null, t)); | ||
| cleanUpHeap(); | ||
| try { | ||
| saveOwner(); | ||
| } catch (Exception ex) { | ||
| LOGGER.log(Level.WARNING, "Failed to persist WorkflowRun after noting a serious failure for run: " + owner, ex); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1155,8 +1189,9 @@ public static void maybeAutoPersistNode(@Nonnull FlowNode node) { | |
| } | ||
|
|
||
| @Override | ||
| @SuppressFBWarnings(value = "RC_REF_COMPARISON_BAD_PRACTICE_BOOLEAN", justification = "We want to explicitly check for boolean not-null and true") | ||
| public boolean isComplete() { | ||
| return done || super.isComplete(); | ||
| return done || super.isComplete(); // Compare to Boolean.TRUE so null == false. | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1170,20 +1205,31 @@ synchronized void onProgramEnd(Outcome outcome) { | |
| } | ||
|
|
||
| // shrink everything into a single new head | ||
| done = true; | ||
| if (heads != null) { | ||
| FlowHead first = getFirstHead(); | ||
| first.setNewHead(head); | ||
| heads.clear(); | ||
| heads.put(first.getId(),first); | ||
| try { | ||
| if (heads != null) { | ||
| FlowHead first = getFirstHead(); | ||
| first.setNewHead(head); | ||
| done = Boolean.TRUE; // After setting the final head | ||
| heads.clear(); | ||
| heads.put(first.getId(), first); | ||
|
|
||
| String tempIotaStr = Integer.toString(this.iota.get()); | ||
| FlowHead lastHead = heads.get(first.getId()); | ||
| if (lastHead == null || lastHead.get() == null || !(lastHead.get().getId().equals(tempIotaStr))) { | ||
| // Warning of problems with the final call to FlowHead.setNewHead | ||
| LOGGER.log(Level.WARNING, "Invalid final head for execution "+this.owner+" with head: "+lastHead); | ||
| } | ||
| } | ||
| } catch (Exception ex) { | ||
| done = Boolean.TRUE; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| throw ex; | ||
| } | ||
|
|
||
| try { | ||
| this.getStorage().flush(); | ||
| } catch (IOException ioe) { | ||
| LOGGER.log(Level.WARNING, "Error flushing FlowNodeStorage to disk at end of run", ioe); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| void cleanUpHeap() { | ||
|
|
@@ -1431,6 +1477,7 @@ public void pause(final boolean v) throws IOException { | |
| if (executable instanceof AccessControlled) { | ||
| ((AccessControlled) executable).checkPermission(Item.CANCEL); | ||
| } | ||
| done = Boolean.FALSE; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto, could autobox |
||
| Futures.addCallback(programPromise, new FutureCallback<CpsThreadGroup>() { | ||
| @Override public void onSuccess(CpsThreadGroup g) { | ||
| if (v) { | ||
|
|
@@ -1523,6 +1570,7 @@ public void marshal(Object source, HierarchicalStreamWriter w, MarshallingContex | |
| for (BlockStartNode st : e.startNodes) { | ||
| writeChild(w, context, "start", st.getId(), String.class); | ||
| } | ||
| writeChild(w, context, "done", e.done, Boolean.class); | ||
| } | ||
| writeChild(w, context, "resumeBlocked", e.resumeBlocked, Boolean.class); | ||
|
|
||
|
|
@@ -1587,6 +1635,9 @@ public Object unmarshal(HierarchicalStreamReader reader, final UnmarshallingCont | |
| } else if (nodeName.equals("iota")) { | ||
| Integer iota = readChild(reader, context, Integer.class, result); | ||
| setField(result, "iota", new AtomicInteger(iota)); | ||
| } else if (nodeName.equals("done")) { | ||
| Boolean isDone = readChild(reader, context, Boolean.class, result); | ||
| setField(result, "done", isDone); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So you are doing something with historical builds which omit this field, right? Is there some
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Historical builds have this field. Also I've now dropped the use of Boolean (wasn't needed, can use primitive). |
||
| } else if (nodeName.equals("start")) { | ||
| String id = readChild(reader, context, String.class, result); | ||
| result.startNodesSerial.add(id); | ||
|
|
@@ -1750,7 +1801,7 @@ public void autopersist(@Nonnull FlowNode n) throws IOException { | |
| /** Save the owner that holds this execution. */ | ||
| void saveOwner() { | ||
| try { | ||
| if (this.owner.getExecutable() instanceof Saveable) { | ||
| if (this.owner != null && this.owner.getExecutable() instanceof Saveable) { // Null-check covers some anomalous cases we've seen | ||
| Saveable saveable = (Saveable)(this.owner.getExecutable()); | ||
| saveable.save(); | ||
| } | ||
|
|
@@ -1769,6 +1820,13 @@ private void checkpoint() { | |
| boolean persistOk = true; | ||
| FlowNodeStorage storage = getStorage(); | ||
| if (storage != null) { | ||
| try { // Node storage must be flushed first so program can be restored | ||
| storage.flush(); | ||
| } catch (IOException ioe) { | ||
| persistOk=false; | ||
| LOGGER.log(Level.WARNING, "Error persisting FlowNode storage before shutdown", ioe); | ||
| } | ||
|
|
||
| // Try to ensure we've saved the appropriate things -- the program is the last stumbling block. | ||
| try { | ||
| final SettableFuture<Void> myOutcome = SettableFuture.create(); | ||
|
|
@@ -1800,13 +1858,6 @@ public void onFailure(Throwable t) { | |
| persistOk = false; | ||
| LOGGER.log(Level.FINE, "Error saving program, that should be handled elsewhere.", ex); | ||
| } | ||
|
|
||
| try { | ||
| storage.flush(); | ||
| } catch (IOException ioe) { | ||
| persistOk=false; | ||
| LOGGER.log(Level.WARNING, "Error persisting FlowNode storage before shutdown", ioe); | ||
| } | ||
| persistedClean = persistOk; | ||
| saveOwner(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -534,9 +534,12 @@ synchronized boolean switchToAsyncMode() { | |
| @Override public void onSuccess(CpsThreadGroup result) { | ||
| try { | ||
| // TODO keep track of whether the program was saved anyway after saveState was called but before now, and do not bother resaving it in that case | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW how does durability interact with
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jglick Otherwise (for Of course with a clean shutdown the pipeline will persist via the |
||
| result.saveProgram(); | ||
| if (result.getExecution().getDurabilityHint().isPersistWithEveryStep()) { | ||
| result.getExecution().getStorage().flush(); | ||
| result.saveProgram(); | ||
| } | ||
| f.set(null); | ||
| } catch (IOException x) { | ||
| } catch (Exception x) { | ||
| f.setException(x); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,6 +68,7 @@ | |
| import static org.jenkinsci.plugins.workflow.cps.CpsFlowExecution.*; | ||
| import static org.jenkinsci.plugins.workflow.cps.persistence.PersistenceContext.*; | ||
| import org.jenkinsci.plugins.workflow.pickles.PickleFactory; | ||
| import org.jenkinsci.plugins.workflow.support.storage.FlowNodeStorage; | ||
|
|
||
| /** | ||
| * List of {@link CpsThread}s that form a single {@link CpsFlowExecution}. | ||
|
|
@@ -357,7 +358,6 @@ private boolean run() { | |
| } else { | ||
| stillRunnable |= t.isRunnable(); | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? |
||
| changed = true; | ||
| } | ||
| } | ||
|
|
@@ -426,6 +426,16 @@ public CpsThreadDump getThreadDump() { | |
| void saveProgramIfPossible(boolean enteringQuietState) { | ||
| if (this.getExecution() != null && (this.getExecution().getDurabilityHint().isPersistWithEveryStep() | ||
| || enteringQuietState)) { | ||
|
|
||
| try { // Program may depend on flownodes being saved, so save nodes | ||
| FlowNodeStorage storage = this.execution.getStorage(); | ||
| if (storage != null) { | ||
| storage.flush(); | ||
| } | ||
| } catch (IOException ioe) { | ||
| LOGGER.log(Level.WARNING, "Error persisting FlowNode storage before saving program", ioe); | ||
| } | ||
|
|
||
| try { | ||
| saveProgram(); | ||
| } catch (IOException x) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,8 @@ | |
| import org.jenkinsci.plugins.workflow.graph.FlowNode; | ||
| import org.jenkinsci.plugins.workflow.graph.FlowStartNode; | ||
|
|
||
| import javax.annotation.Nonnull; | ||
|
|
||
| /** | ||
| * Growing tip of the node graph. | ||
| * | ||
|
|
@@ -114,29 +116,33 @@ void newStartNode(FlowStartNode n) throws IOException { | |
| execution.storage.storeNode(head, false); | ||
| } | ||
|
|
||
| void setNewHead(FlowNode v) { | ||
| void setNewHead(@Nonnull FlowNode v) { | ||
| if (v == null) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So my brain briefly melts at the combination of
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it's protecting against some things that shouldn't happen but yet were still happening in the wild, even though Findbugs didn't find issues at compile-time (probably it wasn't digging deeply enough).
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I have no problem with that approach - probably worth adding a comment explaining that, though. It'd be awfully easy for a later maintainer (i.e., me, because I'm forgetful) to not know/remember that and just think "Oh, that's a pointless null check, lemme clear the dead code".
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uh, this is package-private so we can trivially enumerate all the call sites and put in |
||
| // Because Findbugs isn't 100% at catching cases where this can happen and we really need to fail hard-and-fast | ||
| throw new IllegalArgumentException("FlowHead.setNewHead called on FlowHead id="+this.id+" with a null FlowNode, execution="+this.execution); | ||
| } | ||
| try { | ||
| if (this.head != null) { | ||
| CpsFlowExecution.maybeAutoPersistNode(head); | ||
| } | ||
| execution.storage.storeNode(v, true); | ||
| v.addAction(new TimingAction()); | ||
| this.head = v; | ||
| CpsThreadGroup c = CpsThreadGroup.current(); | ||
| if (c !=null) { | ||
| // if the manipulation is from within the program executing thread, then | ||
| // defer the notification till we get to a safe point. | ||
| c.notifyNewHead(v); | ||
| } else { | ||
| // in recovering from error and such situation, we sometimes need to grow the graph | ||
| // without running the program. | ||
| // TODO can CpsThreadGroup.notifyNewHead be used instead to notify both kinds of listeners? | ||
| execution.notifyListeners(Collections.singletonList(v), true); | ||
| execution.notifyListeners(Collections.singletonList(v), false); | ||
| } | ||
| } catch (IOException e) { | ||
| LOGGER.log(Level.FINE, "Failed to record new head: " + v, e); | ||
| } | ||
| this.head = v; | ||
| CpsThreadGroup c = CpsThreadGroup.current(); | ||
| if (c !=null) { | ||
| // if the manipulation is from within the program executing thread, then | ||
| // defer the notification till we get to a safe point. | ||
| c.notifyNewHead(v); | ||
| } else { | ||
| // in recovering from error and such situation, we sometimes need to grow the graph | ||
| // without running the program. | ||
| // TODO can CpsThreadGroup.notifyNewHead be used instead to notify both kinds of listeners? | ||
| execution.notifyListeners(Collections.singletonList(v), true); | ||
| execution.notifyListeners(Collections.singletonList(v), false); | ||
| } | ||
| } | ||
|
|
||
| FlowNode get() { | ||
|
|
@@ -181,4 +187,9 @@ private Object readResolve() { | |
|
|
||
| private static final Logger LOGGER = Logger.getLogger(FlowHead.class.getName()); | ||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return id+":"+head; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file a request to update the parent POM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jglick jenkinsci/plugin-pom#101 - Done