Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor;
import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor.ExecutionMode;
import org.knime.core.node.workflow.contextv2.WorkflowContextV2;
import org.knime.core.node.workflow.virtual.DefaultVirtualPortObjectOutNodeModel;
import org.knime.core.node.workflow.virtual.VirtualNodeContext;
import org.knime.core.node.workflow.virtual.VirtualNodeContext.Restriction;
import org.knime.core.util.FileUtil;
Expand Down Expand Up @@ -513,6 +514,61 @@ void testExecuteToolsWithCombinedWorkflow()
assertThat(combinedExecutor.getSourcePortIds()).isNull();
}

@Test
void testExecuteToolsWithCombinedWorkflowAndFailure() throws ToolIncompatibleWorkflowException, IOException {
addAndConnectNodes(m_toolWfm, inData -> {
throw new RuntimeException("Purposely fail on execute");
}, true);
var failingTool = WorkflowToolCell.createFromAndModifyWorkflow(m_toolWfm, null,
FileStoreFactory.createNotInWorkflowFileStoreFactory());

m_toolWfm.getNodeContainers().forEach(nc -> m_toolWfm.removeNode(nc.getID()));
addAndConnectNodes(m_toolWfm, inData -> {
//
}, true);
var tool = WorkflowToolCell.createFromAndModifyWorkflow(m_toolWfm, null,
FileStoreFactory.createNotInWorkflowFileStoreFactory());

// setup execution
var exec = executionContextExtension.getExecutionContext();
var inputs = new PortObject[]{TestNodeModel.createTable(exec)};
var hostNode = (NativeNodeContainer)NodeContext.getContext().getNodeContainer();
var combinedExecutorBuilder =
WorkflowSegmentExecutor.builder(hostNode, ExecutionMode.DETACHED, "test_combined_tools_workflow", s -> {
}, exec, false).combined(inputs);
var combinedExecutor = combinedExecutorBuilder.build();

// execute a failing tool as first tool
var res = failingTool.execute(combinedExecutor, "", combinedExecutor.getSourcePortIds(), exec, Map.of());
assertThat(res.message())
.startsWith("Tool execution failed with: Workflow contains one node with execution failure:"
+ System.lineSeparator() + "TestNodeFactory #3: Purposely fail on execute");
var combinedWorkflow = combinedExecutor.getWorkflow();
assertThat(WorkflowManager.ROOT.containsNodeContainer(combinedWorkflow.getID())).isTrue();
assertThat(combinedWorkflow.getNodeContainerState().isExecuted()).isFalse();
assertThat(combinedWorkflow.getNodeContainers().size()).isEqualTo(2); // virtual in + tool component
assertThat(res.outputs()).isNull();
assertThat(res.outputIds()).isNull();

// execute tool successfully after failure
res = tool.execute(combinedExecutor, "", combinedExecutor.getSourcePortIds(), exec, Map.of());
assertThat(combinedWorkflow.getNodeContainers().size()).isEqualTo(4); // virtual in + 2 tool components + virtual out
var outputNodeId =
combinedWorkflow.findNodes(DefaultVirtualPortObjectOutNodeModel.class, false).keySet().iterator().next();
assertThat(combinedWorkflow.getNodeContainer(outputNodeId).getNodeContainerState().isExecuted()).isTrue();

// execute failing tool again, but remove it from combined workflow
res = failingTool.execute(combinedExecutor, "", combinedExecutor.getSourcePortIds(), exec, Map.of());
assertThat(combinedWorkflow.getNodeContainers().size()).isEqualTo(5); // virtual in + 3 tool components + virtual out

// execute failing tool again using a new combined executor, configured to remove failing tools from the combined workflow
combinedExecutor = combinedExecutorBuilder.removeFailedSegments(true).build();
res = failingTool.execute(combinedExecutor, "", combinedExecutor.getSourcePortIds(), exec, Map.of());
combinedWorkflow = combinedExecutor.getWorkflow();
assertThat(combinedWorkflow.getNodeContainers().size()).isEqualTo(1); // just the virtual in

}

private static NativeNodeContainer addAndConnectNodes(final WorkflowManager wfm, final boolean addMessageOutput) {
return addAndConnectNodes(wfm, null, addMessageOutput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.knime.core.node.workflow.WorkflowManager;
import org.knime.core.node.workflow.capture.CombinedExecutor;
import org.knime.core.node.workflow.capture.CombinedExecutor.PortId;
import org.knime.core.node.workflow.capture.CombinedExecutor.WorkflowSegmentExecutionResult;
import org.knime.core.node.workflow.capture.IsolatedExecutor;
import org.knime.core.node.workflow.capture.WorkflowSegment;
import org.knime.core.node.workflow.capture.WorkflowSegment.Input;
Expand Down Expand Up @@ -545,20 +546,10 @@ public WorkflowToolResult execute(final CombinedExecutor workflowExecutor, final
var result = workflowExecutor.execute(ws, inputs, parseParameters(parameters), dataAreaPath,
Restriction.WORKFLOW_RELATIVE_RESOURCE_ACCESS, Restriction.WORKFLOW_DATA_AREA_ACCESS);

String[] viewNodeIds = null;
var component = result.component();
if (Boolean.parseBoolean(executionHints.get("with-view-nodes"))) {
viewNodeIds = component.getWorkflowManager().getNodeContainers().stream()
.filter(nc -> nc instanceof NativeNodeContainer nnc
&& nnc.getNode().getFactory() instanceof WizardPageContribution wpc && wpc.hasNodeView()) //
.map(nc -> NodeIDSuffix.create(workflowExecutor.getWorkflow().getID(), nc.getID()).toString())
.toArray(String[]::new);
}

setComponentMetadata(workflowExecutor, inputs, result.outputIds(), component);

var outputIds = Stream.of(result.outputIds()).map(id -> id.nodeIDSuffix().toString() + "#" + id.portIndex())
.toArray(String[]::new);
var viewNodeIds = getViewNodeIds(workflowExecutor, executionHints, result);
setComponentMetadata(workflowExecutor, inputs, result.outputIds(), result.component());
var outputIds = result.outputIds() == null ? null : Stream.of(result.outputIds())
.map(id -> id.nodeIDSuffix().toString() + "#" + id.portIndex()).toArray(String[]::new);
return new WorkflowToolResult(
extractMessage(result.outputs(),
() -> WorkflowSegmentNodeMessage.compileSingleErrorMessage(result.nodeMessages())),
Expand All @@ -574,8 +565,26 @@ public WorkflowToolResult execute(final CombinedExecutor workflowExecutor, final
}
}

private static String[] getViewNodeIds(final CombinedExecutor workflowExecutor,
final Map<String, String> executionHints, final WorkflowSegmentExecutionResult result) {
if (result.outputs() == null || result.component() == null
|| !Boolean.parseBoolean(executionHints.get("with-view-nodes"))) {
return null;
}
var component = result.component();
return component.getWorkflowManager().getNodeContainers().stream()
.filter(nc -> nc instanceof NativeNodeContainer nnc
&& nnc.getNode().getFactory() instanceof WizardPageContribution wpc && wpc.hasNodeView()) //
.map(nc -> NodeIDSuffix.create(workflowExecutor.getWorkflow().getID(), nc.getID()).toString())
.toArray(String[]::new);

}

private void setComponentMetadata(final CombinedExecutor workflowExecutor, final List<PortId> inputs,
final PortId[] outputs, final SubNodeContainer component) {
if (component == null) {
return;
}
var metadataBuilder = ComponentMetadata.fluentBuilder();
var wfm = workflowExecutor.getWorkflow();
var inPortMetadata = new Port[component.getNrInPorts() - 1];
Expand All @@ -589,7 +598,7 @@ private void setComponentMetadata(final CombinedExecutor workflowExecutor, final
for (var pm : inPortMetadata) {
metadataBuilder.withInPort(pm.name(), pm.description());
}
var outPortMetadata = new Port[component.getNrOutPorts() - 1];
var outPortMetadata = new Port[outputs == null ? 0 : component.getNrOutPorts() - 1];
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ternary expression is computing array size but component could be null (as checked on line 585). Consider using a guard condition to return early if outputs is null, as component.getNrOutPorts() would throw NullPointerException if component is null.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot open a new pull request to apply changes based on this feedback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the component can never be null here (see null check and early return on top)

for (int i = 0; i < outPortMetadata.length; i++) {
var portId = outputs[i];
if (i == m_messageOutputPortIndex) {
Expand Down Expand Up @@ -618,6 +627,9 @@ private Optional<Path> copyDataAreaToTempDir() throws IOException {
}

private static Map<String, JsonValue> parseParameters(final String parameters) {
if (StringUtils.isBlank(parameters)) {
return null;
}
try (var reader = JsonUtil.getProvider().createReader(new StringReader(parameters))) {
var jsonObject = reader.readObject();
return jsonObject.entrySet().stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import org.knime.core.node.workflow.WorkflowAnnotationID;
import org.knime.core.node.workflow.WorkflowCopyContent;
import org.knime.core.node.workflow.WorkflowManager;
import org.knime.core.node.workflow.WorkflowPersistor;
import org.knime.core.node.workflow.capture.WorkflowSegment.Output;
import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor.BuilderParams;
import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor.ExecutionMode;
import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor.WorkflowSegmentNodeMessage;
Expand Down Expand Up @@ -111,6 +113,8 @@ public static final class Builder {

final WorkflowManager m_combinedWorkflow;

boolean m_removeFailingSegments = false;

Builder(final BuilderParams params, final PortObject[] initialInputs) {
m_params = params;
m_initialInputs = initialInputs;
Expand All @@ -123,6 +127,18 @@ public static final class Builder {
m_initialInputs = null;
}

/**
* @param remove whether to remove failing segments from the combined workflow
*
* @return the builder
*
* @since 5.10
*/
public Builder removeFailedSegments(final boolean remove) {
m_removeFailingSegments = remove;
return this;
}

/**
* @return a new instance
*/
Expand All @@ -139,6 +155,8 @@ public CombinedExecutor build() {

private final ExecutionContext m_exec;

private final boolean m_removeFailingSegments;

private List<PortId> m_sourcePortIds;

private CombinedExecutor(final Builder builder) {
Expand Down Expand Up @@ -187,6 +205,7 @@ private CombinedExecutor(final Builder builder) {
} else {
m_sourcePortIds = null;
}
m_removeFailingSegments = builder.m_removeFailingSegments;

}

Expand Down Expand Up @@ -270,12 +289,15 @@ public synchronized WorkflowSegmentExecutionResult execute(final WorkflowSegment
// connect outputs
List<PortType> outTypes = new ArrayList<>();
List<Pair<NodeID, Integer>> outPorts = new ArrayList<>();
WorkflowPersistor formerOutputNodePersistor = null;
for (var outputNodeId : m_wfm.findNodes(DefaultVirtualPortObjectOutNodeModel.class, false).keySet()) {
// collect outports that are already connected to the output node
m_wfm.getIncomingConnectionsFor(outputNodeId).forEach(cc -> {
outTypes.add(m_wfm.getNodeContainer(cc.getSource()).getOutPort(cc.getSourcePort()).getPortType());
outPorts.add(Pair.create(cc.getSource(), cc.getSourcePort()));
});
formerOutputNodePersistor = m_wfm.copy(false,
WorkflowCopyContent.builder().setNodeIDs(outputNodeId).setIncludeInOutConnections(true).build());
m_wfm.removeNode(outputNodeId);
}
var wsOutputs = ws.getConnectedOutputs();
Expand Down Expand Up @@ -312,28 +334,12 @@ public synchronized WorkflowSegmentExecutionResult execute(final WorkflowSegment
throw exception.get();
}

var outputs = new PortObject[wsOutputs.size()];
var outputIds = new PortId[wsOutputs.size()];
var outIdx = 0;
for (int i = 0; i < outPorts.size(); i++) {
var cc = m_wfm.getIncomingConnectionFor(outputNodeId, i + 1);
if (cc.getSource().equals(componentId)) {
outputs[outIdx] = m_wfm.getNodeContainer(cc.getSource()).getOutPort(cc.getSourcePort()).getPortObject();
outputIds[outIdx] = new PortId(NodeIDSuffix.create(m_wfm.getID(), cc.getSource()), cc.getSourcePort());
outIdx++;
}
}
var flowVars =
WorkflowSegmentExecutor.getFlowVariablesFromNC((SingleNodeContainer)m_wfm.getNodeContainer(outputNodeId));
boolean executionSuccessful = m_wfm.getNodeContainerState().isExecuted();
if (!executionSuccessful) {
outputs = null;
}
List<WorkflowSegmentNodeMessage> nodeMessages = List.of();
if (m_collectMessages) {
nodeMessages = WorkflowSegmentExecutor.recursivelyExtractNodeMessages(component.getWorkflowManager());
if (component.getNodeContainerState().isExecuted()) {
return createSuccessResult(outPorts, wsOutputs, outputNodeId, componentId, component);
} else {
return createFailureResult(formerOutputNodePersistor, outputNodeId, componentId, component);
}
return new WorkflowSegmentExecutionResult(outputs, flowVars, nodeMessages, outputIds, component);


}

Expand Down Expand Up @@ -369,6 +375,50 @@ private static void layout(final WorkflowManager wfm) {
}
}

private WorkflowSegmentExecutionResult createSuccessResult(final List<Pair<NodeID, Integer>> outPorts,
final List<Output> wsOutputs, final NodeID outputNodeId, final NodeID componentId,
final SubNodeContainer component) {

var outputs = new PortObject[wsOutputs.size()];
var outputIds = new PortId[wsOutputs.size()];
var outIdx = 0;
for (int i = 0; i < outPorts.size(); i++) {
var cc = m_wfm.getIncomingConnectionFor(outputNodeId, i + 1);
if (cc.getSource().equals(componentId)) {
outputs[outIdx] = m_wfm.getNodeContainer(cc.getSource()).getOutPort(cc.getSourcePort()).getPortObject();
outputIds[outIdx] = new PortId(NodeIDSuffix.create(m_wfm.getID(), cc.getSource()), cc.getSourcePort());
outIdx++;
}
}

var flowVars =
WorkflowSegmentExecutor.getFlowVariablesFromNC((SingleNodeContainer)m_wfm.getNodeContainer(outputNodeId));
final List<WorkflowSegmentNodeMessage> nodeMessages;
if (m_collectMessages) {
nodeMessages = WorkflowSegmentExecutor.recursivelyExtractNodeMessages(component.getWorkflowManager());
} else {
nodeMessages = List.of();
}

return new WorkflowSegmentExecutionResult(outputs, flowVars, nodeMessages, outputIds, component);
}

private WorkflowSegmentExecutionResult createFailureResult(final WorkflowPersistor formerOutputNodePersistor,
final NodeID outputNodeId, final NodeID componentId, SubNodeContainer component) {
var nodeMessages = WorkflowSegmentExecutor.recursivelyExtractNodeMessages(component.getWorkflowManager());
m_wfm.removeNode(outputNodeId);
if (formerOutputNodePersistor != null) {
m_wfm.paste(formerOutputNodePersistor);
}
if (m_removeFailingSegments) {
m_wfm.removeNode(componentId);
component = null;
}
layout(m_wfm);

return new WorkflowSegmentExecutionResult(null, List.of(), nodeMessages, null, component);
}

private NodeID toNodeID(final PortId portId) {
return portId.nodeIDSuffix().prependParent(m_wfm.getID());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ private WorkflowSegmentExecutor() {
* @param loadWarningConsumer callback for warning if there have while loading the workflow from the workflow
* segment
* @param exec for cancellation
* @param collectMessages whether to collect node messages after execution or not (will be part of the execution
* result)
* @param collectMessages whether to collect node messages after successful execution or not (will be part of the
* execution result) - if the execution fails, messages are always collected
* @return the builder to further configure the workflow segment executor
* @since 5.9
*/
Expand Down