diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java index d7a3bef5015e..c31e20bc5974 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java @@ -49,6 +49,7 @@ public class ProcessorDTO extends ComponentDTO { private Boolean executionNodeRestricted; private Boolean multipleVersionsAvailable; private String inputRequirement; + private String physicalState; private ProcessorConfigDTO config; @@ -231,6 +232,21 @@ public void setInputRequirement(String inputRequirement) { this.inputRequirement = inputRequirement; } + /** + * @return The physical state of this processor, including transition states such as STARTING and STOPPING + */ + @Schema(description = "The physical state of the processor, including transition states", + allowableValues = {"RUNNING", "STOPPED", "DISABLED", "STARTING", "STOPPING", "RUN_ONCE"} + ) + public String getPhysicalState() { + return physicalState; + } + + public void setPhysicalState(String physicalState) { + this.physicalState = physicalState; + } + + /** * @return whether this processor supports batching */ diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorEntity.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorEntity.java index 3d1e5dcdc0c1..9a2eb27178d7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorEntity.java +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorEntity.java @@ -31,6 +31,7 @@ public class ProcessorEntity extends ComponentEntity implements Permissible entityMap) { @@ -111,11 +114,39 @@ private static void mergeDtos(final ProcessorDTO clientDto, final Map statuses = dtoMap.values().stream() - .map(ProcessorDTO::getValidationStatus) - .collect(Collectors.toSet()); + .map(ProcessorDTO::getValidationStatus) + .collect(Collectors.toSet()); clientDto.setValidationStatus(ErrorMerger.mergeValidationStatus(statuses)); // set the merged the validation errors clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size())); } + + /** + * Merges physical state from cluster nodes, giving precedence to transition states (STARTING, STOPPING) + * over stable states since they indicate a processor is actively changing state. + */ + private static void mergePhysicalState(final ProcessorEntity clientEntity, final Map entityMap) { + String mergedPhysicalState = clientEntity.getPhysicalState(); + + for (final ProcessorEntity nodeEntity : entityMap.values()) { + final String nodePhysicalState = nodeEntity.getPhysicalState(); + if (nodePhysicalState != null) { + final boolean nodeIsTransition = isTransitionState(nodePhysicalState); + final boolean mergedIsTransition = isTransitionState(mergedPhysicalState); + + // Always use transition states, or use any state if merged has none + if (nodeIsTransition || !mergedIsTransition) { + mergedPhysicalState = nodePhysicalState; + } + } + } + + clientEntity.setPhysicalState(mergedPhysicalState); + } + + private static boolean isTransitionState(final String physicalState) { + return "STARTING".equals(physicalState) || "STOPPING".equals(physicalState); + } + } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ProcessorEntityMergerTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ProcessorEntityMergerTest.java new file mode 100644 index 000000000000..abb977d991b5 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ProcessorEntityMergerTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.status.RunStatus; +import org.apache.nifi.web.api.dto.PermissionsDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class ProcessorEntityMergerTest { + + @Test + void testMergePhysicalState_StoppingTakePrecedence() { + // Test that STOPPING state takes precedence over stable states + final ProcessorEntity clientEntity = createProcessorEntity("client", "STOPPED"); + final ProcessorEntity nodeEntity1 = createProcessorEntity("node1", "STOPPED"); + final ProcessorEntity nodeEntity2 = createProcessorEntity("node2", "STOPPING"); + final ProcessorEntity nodeEntity3 = createProcessorEntity("node3", "STOPPED"); + + final Map entityMap = new HashMap<>(); + entityMap.put(getNodeIdentifier("client", 8000), clientEntity); + entityMap.put(getNodeIdentifier("node1", 8001), nodeEntity1); + entityMap.put(getNodeIdentifier("node2", 8002), nodeEntity2); + entityMap.put(getNodeIdentifier("node3", 8003), nodeEntity3); + + ProcessorEntityMerger merger = new ProcessorEntityMerger(); + merger.merge(clientEntity, entityMap); + + assertEquals("STOPPING", clientEntity.getPhysicalState()); + } + + @Test + void testMergePhysicalState_StartingTakePrecedence() { + // Test that STARTING state takes precedence over stable states + final ProcessorEntity clientEntity = createProcessorEntity("client", "RUNNING"); + final ProcessorEntity nodeEntity1 = createProcessorEntity("node1", "RUNNING"); + final ProcessorEntity nodeEntity2 = createProcessorEntity("node2", "STARTING"); + final ProcessorEntity nodeEntity3 = createProcessorEntity("node3", "RUNNING"); + + final Map entityMap = new HashMap<>(); + entityMap.put(getNodeIdentifier("client", 8000), clientEntity); + entityMap.put(getNodeIdentifier("node1", 8001), nodeEntity1); + entityMap.put(getNodeIdentifier("node2", 8002), nodeEntity2); + entityMap.put(getNodeIdentifier("node3", 8003), nodeEntity3); + + ProcessorEntityMerger merger = new ProcessorEntityMerger(); + merger.merge(clientEntity, entityMap); + + assertEquals("STARTING", clientEntity.getPhysicalState()); + } + + @Test + void testMergePhysicalState_NullPhysicalStates() { + // Test that null physical states are handled gracefully + final ProcessorEntity clientEntity = createProcessorEntity("client", null); + final ProcessorEntity nodeEntity1 = createProcessorEntity("node1", null); + final ProcessorEntity nodeEntity2 = createProcessorEntity("node2", "RUNNING"); + + final Map entityMap = new HashMap<>(); + entityMap.put(getNodeIdentifier("client", 8000), clientEntity); + entityMap.put(getNodeIdentifier("node1", 8001), nodeEntity1); + entityMap.put(getNodeIdentifier("node2", 8002), nodeEntity2); + + ProcessorEntityMerger merger = new ProcessorEntityMerger(); + merger.merge(clientEntity, entityMap); + + assertEquals("RUNNING", clientEntity.getPhysicalState()); + } + + private NodeIdentifier getNodeIdentifier(final String id, final int port) { + return new NodeIdentifier(id, "localhost", port, "localhost", port + 1, "localhost", port + 2, port + 3, true); + } + + private ProcessorEntity createProcessorEntity(final String id, final String physicalState) { + final ProcessorDTO dto = new ProcessorDTO(); + dto.setId(id); + dto.setState(ScheduledState.STOPPED.name()); + dto.setPhysicalState(physicalState); + dto.setValidationStatus(ValidationStatus.VALID.name()); + + // Add ProcessorConfigDTO to avoid NPE during merging + final ProcessorConfigDTO configDto = new ProcessorConfigDTO(); + configDto.setDescriptors(new HashMap<>()); + dto.setConfig(configDto); + + final ProcessorStatusDTO statusDto = new ProcessorStatusDTO(); + statusDto.setRunStatus(RunStatus.Stopped.name()); + + final PermissionsDTO permissions = new PermissionsDTO(); + permissions.setCanRead(true); + permissions.setCanWrite(true); + + final ProcessorEntity entity = new ProcessorEntity(); + entity.setComponent(dto); + entity.setRevision(createNewRevision()); + entity.setPermissions(permissions); + entity.setStatus(statusDto); + entity.setPhysicalState(physicalState); + + return entity; + } + + public RevisionDTO createNewRevision() { + final RevisionDTO revisionDto = new RevisionDTO(); + revisionDto.setClientId(getClass().getName()); + revisionDto.setVersion(0L); + return revisionDto; + } + +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 0f6ebea9afb3..4d0860bd9aa4 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1330,8 +1330,11 @@ public void verifyCanStart() { @Override public void verifyCanStop() { - if (getScheduledState() != ScheduledState.RUNNING) { - throw new IllegalStateException(this + " cannot be stopped because is not scheduled to run"); + final ScheduledState logicalState = getScheduledState(); + final ScheduledState physicalState = getPhysicalScheduledState(); + + if (logicalState != ScheduledState.RUNNING && physicalState != ScheduledState.STARTING) { + throw new IllegalStateException(this + " cannot be stopped because is not scheduled to run and is not starting"); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 645545f84068..737410f06636 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -3379,6 +3379,7 @@ private ProcessorDTO createProcessorDto(final ProcessorNode node, final boolean dto.setBundle(createBundleDto(bundleCoordinate)); dto.setName(node.getName()); dto.setState(node.getScheduledState().toString()); + dto.setPhysicalState(node.getPhysicalScheduledState().toString()); // build the relationship dtos final List relationships = new ArrayList<>(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java index 3ff7923e9884..d176ae3d5aa3 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java @@ -238,6 +238,7 @@ public ProcessorEntity createProcessorEntity(final ProcessorDTO dto, final Revis entity.setStatus(status); entity.setId(dto.getId()); entity.setInputRequirement(dto.getInputRequirement()); + entity.setPhysicalState(dto.getPhysicalState()); entity.setPosition(dto.getPosition()); if (permissions != null && permissions.getCanRead()) { entity.setComponent(dto); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index 132c864702fa..1a9a45a37a6e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -413,7 +413,15 @@ private void verifyUpdate(ProcessorNode processor, ProcessorDTO processorDTO) { case DISABLED: processor.verifyCanDisable(); break; + case STARTING: + case STOPPING: + // These are internal transition states and should not be set directly via API + throw new IllegalArgumentException("Cannot set processor state to " + purposedScheduledState); } + } else if (purposedScheduledState == ScheduledState.STOPPED && processor.getPhysicalScheduledState() == ScheduledState.STARTING) { + // Handle special case: verify stopping a processor that's physically starting + processor.getProcessGroup().verifyCanScheduleComponentsIndividually(); + processor.verifyCanStop(); } } catch (IllegalArgumentException iae) { throw new IllegalArgumentException(String.format( @@ -542,15 +550,24 @@ public ProcessorNode updateProcessor(ProcessorDTO processorDTO) { case RUN_ONCE: parentGroup.runProcessorOnce(processor, () -> parentGroup.stopProcessor(processor)); break; + case STARTING: + case STOPPING: + // These are internal transition states and should not be set directly via API + throw new IllegalStateException("Cannot set processor state to " + purposedScheduledState); } } catch (IllegalStateException | ComponentLifeCycleException ise) { throw new NiFiCoreException(ise.getMessage(), ise); } catch (RejectedExecutionException ree) { throw new NiFiCoreException("Unable to schedule all tasks for the specified processor.", ree); - } catch (NullPointerException npe) { - throw new NiFiCoreException("Unable to update processor run state.", npe); } catch (Exception e) { - throw new NiFiCoreException("Unable to update processor run state: " + e, e); + throw new NiFiCoreException("Unable to update processor [%s] run state: %s".formatted(processor, e), e); + } + } else if (purposedScheduledState == ScheduledState.STOPPED && processor.getPhysicalScheduledState() == ScheduledState.STARTING) { + // Handle special case: allow stopping a processor that's physically starting + try { + parentGroup.stopProcessor(processor); + } catch (Exception e) { + throw new NiFiCoreException("Unable to stop starting processor [%s]: %s".formatted(processor, e), e); } } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessorDAOTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessorDAOTest.java new file mode 100644 index 000000000000..ce215fbb255a --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessorDAOTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.dao.impl; + +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.dao.ComponentStateDAO; +import org.apache.nifi.controller.flow.FlowManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class StandardProcessorDAOTest { + + @Mock + private FlowController flowController; + + @Mock + private ComponentStateDAO componentStateDAO; + + @Mock + private ProcessorNode processorNode; + + @Mock + private ProcessGroup processGroup; + + @Mock + private FlowManager flowManager; + + private StandardProcessorDAO dao; + + @BeforeEach + void setUp() { + dao = new StandardProcessorDAO(); + dao.setFlowController(flowController); + dao.setComponentStateDAO(componentStateDAO); + + // Set up lenient mocks for common interactions + lenient().when(flowController.getFlowManager()).thenReturn(flowManager); + lenient().when(flowManager.getRootGroup()).thenReturn(processGroup); + lenient().when(processGroup.findProcessor(anyString())).thenReturn(processorNode); + lenient().when(processorNode.getProcessGroup()).thenReturn(processGroup); + lenient().when(processorNode.getIdentifier()).thenReturn("test-processor-id"); + } + + @Test + void testVerifyUpdate_NormalStateChange() { + // Processor in STOPPED state, trying to change to RUNNING + ProcessorDTO processorDTO = new ProcessorDTO(); + processorDTO.setId("test-processor-id"); + processorDTO.setState("RUNNING"); + + when(processorNode.getScheduledState()).thenReturn(ScheduledState.STOPPED); + + // Should not throw exception + assertDoesNotThrow(() -> dao.verifyUpdate(processorDTO)); + } + + @Test + void testVerifyUpdate_StopStartingProcessor() { + // Processor in STOPPED logical state but STARTING physical state + ProcessorDTO processorDTO = new ProcessorDTO(); + processorDTO.setId("test-processor-id"); + processorDTO.setState("STOPPED"); + + when(processorNode.getScheduledState()).thenReturn(ScheduledState.STOPPED); + when(processorNode.getPhysicalScheduledState()).thenReturn(ScheduledState.STARTING); + + // Should not throw exception and should verify stop permissions + assertDoesNotThrow(() -> dao.verifyUpdate(processorDTO)); + verify(processorNode).verifyCanStop(); + } + + @Test + void testVerifyUpdate_StopStartingProcessor_NoPermission() { + // Processor in STARTING physical state but no permission to stop + ProcessorDTO processorDTO = new ProcessorDTO(); + processorDTO.setId("test-processor-id"); + processorDTO.setState("STOPPED"); + + when(processorNode.getScheduledState()).thenReturn(ScheduledState.STOPPED); + when(processorNode.getPhysicalScheduledState()).thenReturn(ScheduledState.STARTING); + doThrow(new IllegalStateException("Cannot stop")).when(processorNode).verifyCanStop(); + + // Should throw exception + assertThrows(IllegalStateException.class, () -> dao.verifyUpdate(processorDTO)); + } + + @Test + void testVerifyUpdate_NoSpecialCaseForDisabled() { + // Processor in DISABLED physical state (not STARTING) + ProcessorDTO processorDTO = new ProcessorDTO(); + processorDTO.setId("test-processor-id"); + processorDTO.setState("STOPPED"); + + when(processorNode.getScheduledState()).thenReturn(ScheduledState.DISABLED); + + // Should not call special case verification + assertDoesNotThrow(() -> dao.verifyUpdate(processorDTO)); + verify(processorNode, never()).verifyCanStop(); + } + + @Test + void testVerifyUpdate_TransitionStatesNotAllowed() { + // User trying to set state to STARTING (internal transition state) + ProcessorDTO processorDTO = new ProcessorDTO(); + processorDTO.setId("test-processor-id"); + processorDTO.setState("STARTING"); + + // Should throw IllegalArgumentException + assertThrows(IllegalArgumentException.class, () -> dao.verifyUpdate(processorDTO)); + } + + @Test + void testVerifyUpdate_StoppingStateNotAllowed() { + // User trying to set state to STOPPING (internal transition state) + ProcessorDTO processorDTO = new ProcessorDTO(); + processorDTO.setId("test-processor-id"); + processorDTO.setState("STOPPING"); + + // Should throw IllegalArgumentException + assertThrows(IllegalArgumentException.class, () -> dao.verifyUpdate(processorDTO)); + } + + @Test + void testVerifyUpdate_ProcessorNotFound() { + // ProcessorDTO with ID that doesn't exist + ProcessorDTO processorDTO = new ProcessorDTO(); + processorDTO.setId("non-existent-id"); + processorDTO.setState("RUNNING"); + + when(processGroup.findProcessor("non-existent-id")).thenReturn(null); + + // Should throw ResourceNotFoundException + assertThrows(ResourceNotFoundException.class, () -> dao.verifyUpdate(processorDTO)); + } +} diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.spec.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.spec.ts index ca855cab74d7..958abfe4b4d8 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.spec.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.spec.ts @@ -131,4 +131,282 @@ describe('CanvasUtils', () => { expect(service.getFlowVersionControlInformation(selection)).toBe(versionControlInformation); }); }); + + describe('isStoppable', () => { + it('should return false for empty selection', () => { + const emptySelection = d3.select(null); + expect(service.isStoppable(emptySelection)).toBe(false); + }); + + it('should return false for multiple selections', () => { + const div1 = document.createElement('g'); + const div2 = document.createElement('g'); + const multiSelection = d3.selectAll([div1, div2]); + expect(service.isStoppable(multiSelection)).toBe(false); + }); + + it('should return true for process groups', () => { + const pgDatum = { + id: '1', + type: ComponentType.ProcessGroup, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true } + }; + const selection = d3.select(document.createElement('g')).classed('process-group', true).datum(pgDatum); + expect(service.isStoppable(selection)).toBe(true); + }); + + it('should return false when lacking operate permissions', () => { + const processorDatum = { + id: '1', + type: ComponentType.Processor, + permissions: { canRead: true, canWrite: false }, + operatePermissions: { canWrite: false }, + status: { + aggregateSnapshot: { runStatus: 'Running' } + }, + physicalState: 'RUNNING' + }; + const selection = d3.select(document.createElement('g')).classed('processor', true).datum(processorDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + + describe('for processors', () => { + it('should return true when runStatus is Running', () => { + const processorDatum = { + id: '1', + type: ComponentType.Processor, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Running' } + }, + physicalState: 'RUNNING' + }; + const selection = d3 + .select(document.createElement('g')) + .classed('processor', true) + .datum(processorDatum); + expect(service.isStoppable(selection)).toBe(true); + }); + + it('should return false when runStatus is Stopped', () => { + const processorDatum = { + id: '1', + type: ComponentType.Processor, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Stopped' } + }, + physicalState: 'STOPPED' + }; + const selection = d3 + .select(document.createElement('g')) + .classed('processor', true) + .datum(processorDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + + it('should return true when runStatus is Invalid and physicalState is STARTING', () => { + const processorDatum = { + id: '1', + type: ComponentType.Processor, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Invalid' } + }, + physicalState: 'STARTING' + }; + const selection = d3 + .select(document.createElement('g')) + .classed('processor', true) + .datum(processorDatum); + expect(service.isStoppable(selection)).toBe(true); + }); + + it('should return false when runStatus is Invalid and physicalState is STOPPED', () => { + const processorDatum = { + id: '1', + type: ComponentType.Processor, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Invalid' } + }, + physicalState: 'STOPPED' + }; + const selection = d3 + .select(document.createElement('g')) + .classed('processor', true) + .datum(processorDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + + it('should return false when runStatus is Invalid and physicalState is DISABLED', () => { + const processorDatum = { + id: '1', + type: ComponentType.Processor, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Invalid' } + }, + physicalState: 'DISABLED' + }; + const selection = d3 + .select(document.createElement('g')) + .classed('processor', true) + .datum(processorDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + + it('should return false when runStatus is Invalid and physicalState is null', () => { + const processorDatum = { + id: '1', + type: ComponentType.Processor, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Invalid' } + }, + physicalState: null + }; + const selection = d3 + .select(document.createElement('g')) + .classed('processor', true) + .datum(processorDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + + it('should return false when runStatus is Invalid and physicalState is undefined', () => { + const processorDatum = { + id: '1', + type: ComponentType.Processor, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Invalid' } + } + // physicalState is undefined + }; + const selection = d3 + .select(document.createElement('g')) + .classed('processor', true) + .datum(processorDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + }); + + describe('for input ports', () => { + it('should return true when runStatus is Running', () => { + const inputPortDatum = { + id: '1', + type: ComponentType.InputPort, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Running' } + } + }; + const selection = d3 + .select(document.createElement('g')) + .classed('input-port', true) + .datum(inputPortDatum); + expect(service.isStoppable(selection)).toBe(true); + }); + + it('should return false when runStatus is Stopped', () => { + const inputPortDatum = { + id: '1', + type: ComponentType.InputPort, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Stopped' } + } + }; + const selection = d3 + .select(document.createElement('g')) + .classed('input-port', true) + .datum(inputPortDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + }); + + describe('for output ports', () => { + it('should return true when runStatus is Running', () => { + const outputPortDatum = { + id: '1', + type: ComponentType.OutputPort, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Running' } + } + }; + const selection = d3 + .select(document.createElement('g')) + .classed('output-port', true) + .datum(outputPortDatum); + expect(service.isStoppable(selection)).toBe(true); + }); + + it('should return false when runStatus is Stopped', () => { + const outputPortDatum = { + id: '1', + type: ComponentType.OutputPort, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true }, + status: { + aggregateSnapshot: { runStatus: 'Stopped' } + } + }; + const selection = d3 + .select(document.createElement('g')) + .classed('output-port', true) + .datum(outputPortDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + }); + + describe('for other component types', () => { + it('should return false for connections', () => { + const connectionDatum = { + id: '1', + type: ComponentType.Connection, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true } + }; + const selection = d3 + .select(document.createElement('g')) + .classed('connection', true) + .datum(connectionDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + + it('should return false for labels', () => { + const labelDatum = { + id: '1', + type: ComponentType.Label, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true } + }; + const selection = d3.select(document.createElement('g')).classed('label', true).datum(labelDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + + it('should return false for funnels', () => { + const funnelDatum = { + id: '1', + type: ComponentType.Funnel, + permissions: { canRead: true, canWrite: true }, + operatePermissions: { canWrite: true } + }; + const selection = d3.select(document.createElement('g')).classed('funnel', true).datum(funnelDatum); + expect(service.isStoppable(selection)).toBe(false); + }); + }); + }); }); diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts index f0feda136ce5..de006e6e1fae 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts @@ -1955,7 +1955,15 @@ export class CanvasUtils { let stoppable = false; const selectionData = selection.datum(); if (this.isProcessor(selection) || this.isInputPort(selection) || this.isOutputPort(selection)) { - stoppable = selectionData.status.aggregateSnapshot.runStatus === 'Running'; + const runStatus = selectionData.status.aggregateSnapshot.runStatus; + + // For processors, also check if physical state is Starting when runStatus is Invalid + if (this.isProcessor(selection) && runStatus === 'Invalid') { + const physicalState = selectionData.physicalState; + stoppable = physicalState === 'STARTING'; + } else { + stoppable = runStatus === 'Running'; + } } return stoppable; }