diff --git a/org.knime.core.tests/src/org/knime/core/node/agentic/tool/WorkflowToolCellTest.java b/org.knime.core.tests/src/org/knime/core/node/agentic/tool/WorkflowToolCellTest.java index b106112f62..a44249b346 100644 --- a/org.knime.core.tests/src/org/knime/core/node/agentic/tool/WorkflowToolCellTest.java +++ b/org.knime.core.tests/src/org/knime/core/node/agentic/tool/WorkflowToolCellTest.java @@ -104,6 +104,7 @@ import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor; import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor.ExecutionMode; import org.knime.core.node.workflow.contextv2.WorkflowContextV2; +import org.knime.core.node.workflow.virtual.DefaultVirtualPortObjectOutNodeModel; import org.knime.core.node.workflow.virtual.VirtualNodeContext; import org.knime.core.node.workflow.virtual.VirtualNodeContext.Restriction; import org.knime.core.util.FileUtil; @@ -513,6 +514,61 @@ void testExecuteToolsWithCombinedWorkflow() assertThat(combinedExecutor.getSourcePortIds()).isNull(); } + @Test + void testExecuteToolsWithCombinedWorkflowAndFailure() throws ToolIncompatibleWorkflowException, IOException { + addAndConnectNodes(m_toolWfm, inData -> { + throw new RuntimeException("Purposely fail on execute"); + }, true); + var failingTool = WorkflowToolCell.createFromAndModifyWorkflow(m_toolWfm, null, + FileStoreFactory.createNotInWorkflowFileStoreFactory()); + + m_toolWfm.getNodeContainers().forEach(nc -> m_toolWfm.removeNode(nc.getID())); + addAndConnectNodes(m_toolWfm, inData -> { + // + }, true); + var tool = WorkflowToolCell.createFromAndModifyWorkflow(m_toolWfm, null, + FileStoreFactory.createNotInWorkflowFileStoreFactory()); + + // setup execution + var exec = executionContextExtension.getExecutionContext(); + var inputs = new PortObject[]{TestNodeModel.createTable(exec)}; + var hostNode = (NativeNodeContainer)NodeContext.getContext().getNodeContainer(); + var combinedExecutorBuilder = + WorkflowSegmentExecutor.builder(hostNode, ExecutionMode.DETACHED, "test_combined_tools_workflow", s -> { + }, exec, false).combined(inputs); + var combinedExecutor = combinedExecutorBuilder.build(); + + // execute a failing tool as first tool + var res = failingTool.execute(combinedExecutor, "", combinedExecutor.getSourcePortIds(), exec, Map.of()); + assertThat(res.message()) + .startsWith("Tool execution failed with: Workflow contains one node with execution failure:" + + System.lineSeparator() + "TestNodeFactory #3: Purposely fail on execute"); + var combinedWorkflow = combinedExecutor.getWorkflow(); + assertThat(WorkflowManager.ROOT.containsNodeContainer(combinedWorkflow.getID())).isTrue(); + assertThat(combinedWorkflow.getNodeContainerState().isExecuted()).isFalse(); + assertThat(combinedWorkflow.getNodeContainers().size()).isEqualTo(2); // virtual in + tool component + assertThat(res.outputs()).isNull(); + assertThat(res.outputIds()).isNull(); + + // execute tool successfully after failure + res = tool.execute(combinedExecutor, "", combinedExecutor.getSourcePortIds(), exec, Map.of()); + assertThat(combinedWorkflow.getNodeContainers().size()).isEqualTo(4); // virtual in + 2 tool components + virtual out + var outputNodeId = + combinedWorkflow.findNodes(DefaultVirtualPortObjectOutNodeModel.class, false).keySet().iterator().next(); + assertThat(combinedWorkflow.getNodeContainer(outputNodeId).getNodeContainerState().isExecuted()).isTrue(); + + // execute failing tool again, but remove it from combined workflow + res = failingTool.execute(combinedExecutor, "", combinedExecutor.getSourcePortIds(), exec, Map.of()); + assertThat(combinedWorkflow.getNodeContainers().size()).isEqualTo(5); // virtual in + 3 tool components + virtual out + + // execute failing tool again using a new combined executor, configured to remove failing tools from the combined workflow + combinedExecutor = combinedExecutorBuilder.removeFailedSegments(true).build(); + res = failingTool.execute(combinedExecutor, "", combinedExecutor.getSourcePortIds(), exec, Map.of()); + combinedWorkflow = combinedExecutor.getWorkflow(); + assertThat(combinedWorkflow.getNodeContainers().size()).isEqualTo(1); // just the virtual in + + } + private static NativeNodeContainer addAndConnectNodes(final WorkflowManager wfm, final boolean addMessageOutput) { return addAndConnectNodes(wfm, null, addMessageOutput); } diff --git a/org.knime.core/src/eclipse/org/knime/core/node/agentic/tool/WorkflowToolCell.java b/org.knime.core/src/eclipse/org/knime/core/node/agentic/tool/WorkflowToolCell.java index 44fb485acd..8ac3329473 100644 --- a/org.knime.core/src/eclipse/org/knime/core/node/agentic/tool/WorkflowToolCell.java +++ b/org.knime.core/src/eclipse/org/knime/core/node/agentic/tool/WorkflowToolCell.java @@ -108,6 +108,7 @@ import org.knime.core.node.workflow.WorkflowManager; import org.knime.core.node.workflow.capture.CombinedExecutor; import org.knime.core.node.workflow.capture.CombinedExecutor.PortId; +import org.knime.core.node.workflow.capture.CombinedExecutor.WorkflowSegmentExecutionResult; import org.knime.core.node.workflow.capture.IsolatedExecutor; import org.knime.core.node.workflow.capture.WorkflowSegment; import org.knime.core.node.workflow.capture.WorkflowSegment.Input; @@ -545,20 +546,10 @@ public WorkflowToolResult execute(final CombinedExecutor workflowExecutor, final var result = workflowExecutor.execute(ws, inputs, parseParameters(parameters), dataAreaPath, Restriction.WORKFLOW_RELATIVE_RESOURCE_ACCESS, Restriction.WORKFLOW_DATA_AREA_ACCESS); - String[] viewNodeIds = null; - var component = result.component(); - if (Boolean.parseBoolean(executionHints.get("with-view-nodes"))) { - viewNodeIds = component.getWorkflowManager().getNodeContainers().stream() - .filter(nc -> nc instanceof NativeNodeContainer nnc - && nnc.getNode().getFactory() instanceof WizardPageContribution wpc && wpc.hasNodeView()) // - .map(nc -> NodeIDSuffix.create(workflowExecutor.getWorkflow().getID(), nc.getID()).toString()) - .toArray(String[]::new); - } - - setComponentMetadata(workflowExecutor, inputs, result.outputIds(), component); - - var outputIds = Stream.of(result.outputIds()).map(id -> id.nodeIDSuffix().toString() + "#" + id.portIndex()) - .toArray(String[]::new); + var viewNodeIds = getViewNodeIds(workflowExecutor, executionHints, result); + setComponentMetadata(workflowExecutor, inputs, result.outputIds(), result.component()); + var outputIds = result.outputIds() == null ? null : Stream.of(result.outputIds()) + .map(id -> id.nodeIDSuffix().toString() + "#" + id.portIndex()).toArray(String[]::new); return new WorkflowToolResult( extractMessage(result.outputs(), () -> WorkflowSegmentNodeMessage.compileSingleErrorMessage(result.nodeMessages())), @@ -574,8 +565,26 @@ public WorkflowToolResult execute(final CombinedExecutor workflowExecutor, final } } + private static String[] getViewNodeIds(final CombinedExecutor workflowExecutor, + final Map executionHints, final WorkflowSegmentExecutionResult result) { + if (result.outputs() == null || result.component() == null + || !Boolean.parseBoolean(executionHints.get("with-view-nodes"))) { + return null; + } + var component = result.component(); + return component.getWorkflowManager().getNodeContainers().stream() + .filter(nc -> nc instanceof NativeNodeContainer nnc + && nnc.getNode().getFactory() instanceof WizardPageContribution wpc && wpc.hasNodeView()) // + .map(nc -> NodeIDSuffix.create(workflowExecutor.getWorkflow().getID(), nc.getID()).toString()) + .toArray(String[]::new); + + } + private void setComponentMetadata(final CombinedExecutor workflowExecutor, final List inputs, final PortId[] outputs, final SubNodeContainer component) { + if (component == null) { + return; + } var metadataBuilder = ComponentMetadata.fluentBuilder(); var wfm = workflowExecutor.getWorkflow(); var inPortMetadata = new Port[component.getNrInPorts() - 1]; @@ -589,7 +598,7 @@ private void setComponentMetadata(final CombinedExecutor workflowExecutor, final for (var pm : inPortMetadata) { metadataBuilder.withInPort(pm.name(), pm.description()); } - var outPortMetadata = new Port[component.getNrOutPorts() - 1]; + var outPortMetadata = new Port[outputs == null ? 0 : component.getNrOutPorts() - 1]; for (int i = 0; i < outPortMetadata.length; i++) { var portId = outputs[i]; if (i == m_messageOutputPortIndex) { @@ -618,6 +627,9 @@ private Optional copyDataAreaToTempDir() throws IOException { } private static Map parseParameters(final String parameters) { + if (StringUtils.isBlank(parameters)) { + return null; + } try (var reader = JsonUtil.getProvider().createReader(new StringReader(parameters))) { var jsonObject = reader.readObject(); return jsonObject.entrySet().stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue)); diff --git a/org.knime.core/src/eclipse/org/knime/core/node/workflow/capture/CombinedExecutor.java b/org.knime.core/src/eclipse/org/knime/core/node/workflow/capture/CombinedExecutor.java index 722eefe76c..8f5d358cd6 100644 --- a/org.knime.core/src/eclipse/org/knime/core/node/workflow/capture/CombinedExecutor.java +++ b/org.knime.core/src/eclipse/org/knime/core/node/workflow/capture/CombinedExecutor.java @@ -75,6 +75,8 @@ import org.knime.core.node.workflow.WorkflowAnnotationID; import org.knime.core.node.workflow.WorkflowCopyContent; import org.knime.core.node.workflow.WorkflowManager; +import org.knime.core.node.workflow.WorkflowPersistor; +import org.knime.core.node.workflow.capture.WorkflowSegment.Output; import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor.BuilderParams; import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor.ExecutionMode; import org.knime.core.node.workflow.capture.WorkflowSegmentExecutor.WorkflowSegmentNodeMessage; @@ -111,6 +113,8 @@ public static final class Builder { final WorkflowManager m_combinedWorkflow; + boolean m_removeFailingSegments = false; + Builder(final BuilderParams params, final PortObject[] initialInputs) { m_params = params; m_initialInputs = initialInputs; @@ -123,6 +127,18 @@ public static final class Builder { m_initialInputs = null; } + /** + * @param remove whether to remove failing segments from the combined workflow + * + * @return the builder + * + * @since 5.10 + */ + public Builder removeFailedSegments(final boolean remove) { + m_removeFailingSegments = remove; + return this; + } + /** * @return a new instance */ @@ -139,6 +155,8 @@ public CombinedExecutor build() { private final ExecutionContext m_exec; + private final boolean m_removeFailingSegments; + private List m_sourcePortIds; private CombinedExecutor(final Builder builder) { @@ -187,6 +205,7 @@ private CombinedExecutor(final Builder builder) { } else { m_sourcePortIds = null; } + m_removeFailingSegments = builder.m_removeFailingSegments; } @@ -270,12 +289,15 @@ public synchronized WorkflowSegmentExecutionResult execute(final WorkflowSegment // connect outputs List outTypes = new ArrayList<>(); List> outPorts = new ArrayList<>(); + WorkflowPersistor formerOutputNodePersistor = null; for (var outputNodeId : m_wfm.findNodes(DefaultVirtualPortObjectOutNodeModel.class, false).keySet()) { // collect outports that are already connected to the output node m_wfm.getIncomingConnectionsFor(outputNodeId).forEach(cc -> { outTypes.add(m_wfm.getNodeContainer(cc.getSource()).getOutPort(cc.getSourcePort()).getPortType()); outPorts.add(Pair.create(cc.getSource(), cc.getSourcePort())); }); + formerOutputNodePersistor = m_wfm.copy(false, + WorkflowCopyContent.builder().setNodeIDs(outputNodeId).setIncludeInOutConnections(true).build()); m_wfm.removeNode(outputNodeId); } var wsOutputs = ws.getConnectedOutputs(); @@ -312,28 +334,12 @@ public synchronized WorkflowSegmentExecutionResult execute(final WorkflowSegment throw exception.get(); } - var outputs = new PortObject[wsOutputs.size()]; - var outputIds = new PortId[wsOutputs.size()]; - var outIdx = 0; - for (int i = 0; i < outPorts.size(); i++) { - var cc = m_wfm.getIncomingConnectionFor(outputNodeId, i + 1); - if (cc.getSource().equals(componentId)) { - outputs[outIdx] = m_wfm.getNodeContainer(cc.getSource()).getOutPort(cc.getSourcePort()).getPortObject(); - outputIds[outIdx] = new PortId(NodeIDSuffix.create(m_wfm.getID(), cc.getSource()), cc.getSourcePort()); - outIdx++; - } - } - var flowVars = - WorkflowSegmentExecutor.getFlowVariablesFromNC((SingleNodeContainer)m_wfm.getNodeContainer(outputNodeId)); - boolean executionSuccessful = m_wfm.getNodeContainerState().isExecuted(); - if (!executionSuccessful) { - outputs = null; - } - List nodeMessages = List.of(); - if (m_collectMessages) { - nodeMessages = WorkflowSegmentExecutor.recursivelyExtractNodeMessages(component.getWorkflowManager()); + if (component.getNodeContainerState().isExecuted()) { + return createSuccessResult(outPorts, wsOutputs, outputNodeId, componentId, component); + } else { + return createFailureResult(formerOutputNodePersistor, outputNodeId, componentId, component); } - return new WorkflowSegmentExecutionResult(outputs, flowVars, nodeMessages, outputIds, component); + } @@ -369,6 +375,50 @@ private static void layout(final WorkflowManager wfm) { } } + private WorkflowSegmentExecutionResult createSuccessResult(final List> outPorts, + final List wsOutputs, final NodeID outputNodeId, final NodeID componentId, + final SubNodeContainer component) { + + var outputs = new PortObject[wsOutputs.size()]; + var outputIds = new PortId[wsOutputs.size()]; + var outIdx = 0; + for (int i = 0; i < outPorts.size(); i++) { + var cc = m_wfm.getIncomingConnectionFor(outputNodeId, i + 1); + if (cc.getSource().equals(componentId)) { + outputs[outIdx] = m_wfm.getNodeContainer(cc.getSource()).getOutPort(cc.getSourcePort()).getPortObject(); + outputIds[outIdx] = new PortId(NodeIDSuffix.create(m_wfm.getID(), cc.getSource()), cc.getSourcePort()); + outIdx++; + } + } + + var flowVars = + WorkflowSegmentExecutor.getFlowVariablesFromNC((SingleNodeContainer)m_wfm.getNodeContainer(outputNodeId)); + final List nodeMessages; + if (m_collectMessages) { + nodeMessages = WorkflowSegmentExecutor.recursivelyExtractNodeMessages(component.getWorkflowManager()); + } else { + nodeMessages = List.of(); + } + + return new WorkflowSegmentExecutionResult(outputs, flowVars, nodeMessages, outputIds, component); + } + + private WorkflowSegmentExecutionResult createFailureResult(final WorkflowPersistor formerOutputNodePersistor, + final NodeID outputNodeId, final NodeID componentId, SubNodeContainer component) { + var nodeMessages = WorkflowSegmentExecutor.recursivelyExtractNodeMessages(component.getWorkflowManager()); + m_wfm.removeNode(outputNodeId); + if (formerOutputNodePersistor != null) { + m_wfm.paste(formerOutputNodePersistor); + } + if (m_removeFailingSegments) { + m_wfm.removeNode(componentId); + component = null; + } + layout(m_wfm); + + return new WorkflowSegmentExecutionResult(null, List.of(), nodeMessages, null, component); + } + private NodeID toNodeID(final PortId portId) { return portId.nodeIDSuffix().prependParent(m_wfm.getID()); } diff --git a/org.knime.core/src/eclipse/org/knime/core/node/workflow/capture/WorkflowSegmentExecutor.java b/org.knime.core/src/eclipse/org/knime/core/node/workflow/capture/WorkflowSegmentExecutor.java index 01ae2e49ff..20d0ed0dff 100644 --- a/org.knime.core/src/eclipse/org/knime/core/node/workflow/capture/WorkflowSegmentExecutor.java +++ b/org.knime.core/src/eclipse/org/knime/core/node/workflow/capture/WorkflowSegmentExecutor.java @@ -118,8 +118,8 @@ private WorkflowSegmentExecutor() { * @param loadWarningConsumer callback for warning if there have while loading the workflow from the workflow * segment * @param exec for cancellation - * @param collectMessages whether to collect node messages after execution or not (will be part of the execution - * result) + * @param collectMessages whether to collect node messages after successful execution or not (will be part of the + * execution result) - if the execution fails, messages are always collected * @return the builder to further configure the workflow segment executor * @since 5.9 */