Skip to content

Commit 28bcea3

Browse files
committed
NIFI-14999: Adding support to stop a starting processor.
1 parent 423381d commit 28bcea3

File tree

11 files changed

+677
-3
lines changed

11 files changed

+677
-3
lines changed

nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class ProcessorDTO extends ComponentDTO {
4949
private Boolean executionNodeRestricted;
5050
private Boolean multipleVersionsAvailable;
5151
private String inputRequirement;
52+
private String physicalState;
5253

5354
private ProcessorConfigDTO config;
5455

@@ -231,6 +232,21 @@ public void setInputRequirement(String inputRequirement) {
231232
this.inputRequirement = inputRequirement;
232233
}
233234

235+
/**
236+
* @return The physical state of this processor, including transition states such as STARTING and STOPPING
237+
*/
238+
@Schema(description = "The physical state of the processor, including transition states",
239+
allowableValues = {"RUNNING", "STOPPED", "DISABLED", "STARTING", "STOPPING", "RUN_ONCE"}
240+
)
241+
public String getPhysicalState() {
242+
return physicalState;
243+
}
244+
245+
public void setPhysicalState(String physicalState) {
246+
this.physicalState = physicalState;
247+
}
248+
249+
234250
/**
235251
* @return whether this processor supports batching
236252
*/

nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorEntity.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class ProcessorEntity extends ComponentEntity implements Permissible<Proc
3131

3232
private ProcessorDTO component;
3333
private String inputRequirement;
34+
private String physicalState;
3435
private ProcessorStatusDTO status;
3536
private PermissionsDTO operatePermissions;
3637

@@ -75,6 +76,20 @@ public void setInputRequirement(String inputRequirement) {
7576
this.inputRequirement = inputRequirement;
7677
}
7778

79+
/**
80+
* @return the physical state of this processor
81+
*/
82+
@Schema(description = "The physical state of the processor, including transition states",
83+
allowableValues = {"RUNNING", "STOPPED", "DISABLED", "STARTING", "STOPPING", "RUN_ONCE"})
84+
public String getPhysicalState() {
85+
return physicalState;
86+
}
87+
88+
public void setPhysicalState(String physicalState) {
89+
this.physicalState = physicalState;
90+
}
91+
92+
7893
/**
7994
* @return The permissions for this component operations
8095
*/

nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public void merge(ProcessorEntity clientEntity, Map<NodeIdentifier, ProcessorEnt
3838
mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey());
3939
}
4040
}
41+
42+
// Merge physical state
43+
mergePhysicalState(clientEntity, entityMap);
4144
}
4245

4346
/**
@@ -118,4 +121,32 @@ private static void mergeDtos(final ProcessorDTO clientDto, final Map<NodeIdenti
118121
// set the merged the validation errors
119122
clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size()));
120123
}
124+
125+
/**
126+
* Merges physical state from cluster nodes, giving precedence to transition states (STARTING, STOPPING)
127+
* over stable states since they indicate a processor is actively changing state.
128+
*/
129+
private static void mergePhysicalState(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap) {
130+
String mergedPhysicalState = clientEntity.getPhysicalState();
131+
132+
for (final ProcessorEntity nodeEntity : entityMap.values()) {
133+
final String nodePhysicalState = nodeEntity.getPhysicalState();
134+
if (nodePhysicalState != null) {
135+
final boolean nodeIsTransition = isTransitionState(nodePhysicalState);
136+
final boolean mergedIsTransition = isTransitionState(mergedPhysicalState);
137+
138+
// Always use transition states, or use any state if merged has none
139+
if (nodeIsTransition || !mergedIsTransition) {
140+
mergedPhysicalState = nodePhysicalState;
141+
}
142+
}
143+
}
144+
145+
clientEntity.setPhysicalState(mergedPhysicalState);
146+
}
147+
148+
private static boolean isTransitionState(final String physicalState) {
149+
return "STARTING".equals(physicalState) || "STOPPING".equals(physicalState);
150+
}
151+
121152
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.cluster.manager;
19+
20+
import org.apache.nifi.cluster.protocol.NodeIdentifier;
21+
import org.apache.nifi.components.validation.ValidationStatus;
22+
import org.apache.nifi.controller.ScheduledState;
23+
import org.apache.nifi.controller.status.RunStatus;
24+
import org.apache.nifi.web.api.dto.PermissionsDTO;
25+
import org.apache.nifi.web.api.dto.ProcessorDTO;
26+
import org.apache.nifi.web.api.dto.RevisionDTO;
27+
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
28+
import org.apache.nifi.web.api.entity.ProcessorEntity;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.HashMap;
32+
import java.util.Map;
33+
34+
import static org.junit.jupiter.api.Assertions.assertEquals;
35+
36+
class ProcessorEntityMergerTest {
37+
38+
@Test
39+
void testMergePhysicalState_StoppingTakePrecedence() {
40+
// Test that STOPPING state takes precedence over stable states
41+
final ProcessorEntity clientEntity = createProcessorEntity("client", "STOPPED");
42+
final ProcessorEntity nodeEntity1 = createProcessorEntity("node1", "STOPPED");
43+
final ProcessorEntity nodeEntity2 = createProcessorEntity("node2", "STOPPING");
44+
final ProcessorEntity nodeEntity3 = createProcessorEntity("node3", "STOPPED");
45+
46+
final Map<NodeIdentifier, ProcessorEntity> entityMap = new HashMap<>();
47+
entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
48+
entityMap.put(getNodeIdentifier("node1", 8001), nodeEntity1);
49+
entityMap.put(getNodeIdentifier("node2", 8002), nodeEntity2);
50+
entityMap.put(getNodeIdentifier("node3", 8003), nodeEntity3);
51+
52+
ProcessorEntityMerger merger = new ProcessorEntityMerger();
53+
merger.merge(clientEntity, entityMap);
54+
55+
assertEquals("STOPPING", clientEntity.getPhysicalState());
56+
}
57+
58+
@Test
59+
void testMergePhysicalState_StartingTakePrecedence() {
60+
// Test that STARTING state takes precedence over stable states
61+
final ProcessorEntity clientEntity = createProcessorEntity("client", "RUNNING");
62+
final ProcessorEntity nodeEntity1 = createProcessorEntity("node1", "RUNNING");
63+
final ProcessorEntity nodeEntity2 = createProcessorEntity("node2", "STARTING");
64+
final ProcessorEntity nodeEntity3 = createProcessorEntity("node3", "RUNNING");
65+
66+
final Map<NodeIdentifier, ProcessorEntity> entityMap = new HashMap<>();
67+
entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
68+
entityMap.put(getNodeIdentifier("node1", 8001), nodeEntity1);
69+
entityMap.put(getNodeIdentifier("node2", 8002), nodeEntity2);
70+
entityMap.put(getNodeIdentifier("node3", 8003), nodeEntity3);
71+
72+
ProcessorEntityMerger merger = new ProcessorEntityMerger();
73+
merger.merge(clientEntity, entityMap);
74+
75+
assertEquals("STARTING", clientEntity.getPhysicalState());
76+
}
77+
78+
@Test
79+
void testMergePhysicalState_NullPhysicalStates() {
80+
// Test that null physical states are handled gracefully
81+
final ProcessorEntity clientEntity = createProcessorEntity("client", null);
82+
final ProcessorEntity nodeEntity1 = createProcessorEntity("node1", null);
83+
final ProcessorEntity nodeEntity2 = createProcessorEntity("node2", "RUNNING");
84+
85+
final Map<NodeIdentifier, ProcessorEntity> entityMap = new HashMap<>();
86+
entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
87+
entityMap.put(getNodeIdentifier("node1", 8001), nodeEntity1);
88+
entityMap.put(getNodeIdentifier("node2", 8002), nodeEntity2);
89+
90+
ProcessorEntityMerger merger = new ProcessorEntityMerger();
91+
merger.merge(clientEntity, entityMap);
92+
93+
assertEquals("RUNNING", clientEntity.getPhysicalState());
94+
}
95+
96+
private NodeIdentifier getNodeIdentifier(final String id, final int port) {
97+
return new NodeIdentifier(id, "localhost", port, "localhost", port + 1, "localhost", port + 2, port + 3, true);
98+
}
99+
100+
private ProcessorEntity createProcessorEntity(final String id, final String physicalState) {
101+
final ProcessorDTO dto = new ProcessorDTO();
102+
dto.setId(id);
103+
dto.setState(ScheduledState.STOPPED.name());
104+
dto.setPhysicalState(physicalState);
105+
dto.setValidationStatus(ValidationStatus.VALID.name());
106+
107+
final ProcessorStatusDTO statusDto = new ProcessorStatusDTO();
108+
statusDto.setRunStatus(RunStatus.Stopped.name());
109+
110+
final PermissionsDTO permissions = new PermissionsDTO();
111+
permissions.setCanRead(true);
112+
permissions.setCanWrite(true);
113+
114+
final ProcessorEntity entity = new ProcessorEntity();
115+
entity.setComponent(dto);
116+
entity.setRevision(createNewRevision());
117+
entity.setPermissions(permissions);
118+
entity.setStatus(statusDto);
119+
entity.setPhysicalState(physicalState);
120+
121+
return entity;
122+
}
123+
124+
public RevisionDTO createNewRevision() {
125+
final RevisionDTO revisionDto = new RevisionDTO();
126+
revisionDto.setClientId(getClass().getName());
127+
revisionDto.setVersion(0L);
128+
return revisionDto;
129+
}
130+
131+
}

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,8 +1330,11 @@ public void verifyCanStart() {
13301330

13311331
@Override
13321332
public void verifyCanStop() {
1333-
if (getScheduledState() != ScheduledState.RUNNING) {
1334-
throw new IllegalStateException(this + " cannot be stopped because is not scheduled to run");
1333+
final ScheduledState logicalState = getScheduledState();
1334+
final ScheduledState physicalState = getPhysicalScheduledState();
1335+
1336+
if (logicalState != ScheduledState.RUNNING && physicalState != ScheduledState.STARTING) {
1337+
throw new IllegalStateException(this + " cannot be stopped because is not scheduled to run and is not starting");
13351338
}
13361339
}
13371340

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3379,6 +3379,7 @@ private ProcessorDTO createProcessorDto(final ProcessorNode node, final boolean
33793379
dto.setBundle(createBundleDto(bundleCoordinate));
33803380
dto.setName(node.getName());
33813381
dto.setState(node.getScheduledState().toString());
3382+
dto.setPhysicalState(node.getPhysicalScheduledState().toString());
33823383

33833384
// build the relationship dtos
33843385
final List<RelationshipDTO> relationships = new ArrayList<>();

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ public ProcessorEntity createProcessorEntity(final ProcessorDTO dto, final Revis
238238
entity.setStatus(status);
239239
entity.setId(dto.getId());
240240
entity.setInputRequirement(dto.getInputRequirement());
241+
entity.setPhysicalState(dto.getPhysicalState());
241242
entity.setPosition(dto.getPosition());
242243
if (permissions != null && permissions.getCanRead()) {
243244
entity.setComponent(dto);

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,15 @@ private void verifyUpdate(ProcessorNode processor, ProcessorDTO processorDTO) {
413413
case DISABLED:
414414
processor.verifyCanDisable();
415415
break;
416+
case STARTING:
417+
case STOPPING:
418+
// These are internal transition states and should not be set directly via API
419+
throw new IllegalArgumentException("Cannot set processor state to " + purposedScheduledState);
416420
}
421+
} else if (purposedScheduledState == ScheduledState.STOPPED && processor.getPhysicalScheduledState() == ScheduledState.STARTING) {
422+
// Handle special case: verify stopping a processor that's physically starting
423+
processor.getProcessGroup().verifyCanScheduleComponentsIndividually();
424+
processor.verifyCanStop();
417425
}
418426
} catch (IllegalArgumentException iae) {
419427
throw new IllegalArgumentException(String.format(
@@ -542,6 +550,10 @@ public ProcessorNode updateProcessor(ProcessorDTO processorDTO) {
542550
case RUN_ONCE:
543551
parentGroup.runProcessorOnce(processor, () -> parentGroup.stopProcessor(processor));
544552
break;
553+
case STARTING:
554+
case STOPPING:
555+
// These are internal transition states and should not be set directly via API
556+
throw new IllegalStateException("Cannot set processor state to " + purposedScheduledState);
545557
}
546558
} catch (IllegalStateException | ComponentLifeCycleException ise) {
547559
throw new NiFiCoreException(ise.getMessage(), ise);
@@ -552,6 +564,15 @@ public ProcessorNode updateProcessor(ProcessorDTO processorDTO) {
552564
} catch (Exception e) {
553565
throw new NiFiCoreException("Unable to update processor run state: " + e, e);
554566
}
567+
} else if (purposedScheduledState == ScheduledState.STOPPED && processor.getPhysicalScheduledState() == ScheduledState.STARTING) {
568+
// Handle special case: allow stopping a processor that's physically starting
569+
try {
570+
parentGroup.stopProcessor(processor);
571+
} catch (NullPointerException npe) {
572+
throw new NiFiCoreException("Unable to stop starting processor.", npe);
573+
} catch (Exception e) {
574+
throw new NiFiCoreException("Unable to stop starting processor: " + e, e);
575+
}
555576
}
556577
}
557578

0 commit comments

Comments
 (0)