Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class ProcessorDTO extends ComponentDTO {
private Boolean executionNodeRestricted;
private Boolean multipleVersionsAvailable;
private String inputRequirement;
private String physicalState;

private ProcessorConfigDTO config;

Expand Down Expand Up @@ -231,6 +232,21 @@ public void setInputRequirement(String inputRequirement) {
this.inputRequirement = inputRequirement;
}

/**
* @return The physical state of this processor, including transition states such as STARTING and STOPPING
*/
@Schema(description = "The physical state of the processor, including transition states",
allowableValues = {"RUNNING", "STOPPED", "DISABLED", "STARTING", "STOPPING", "RUN_ONCE"}
)
public String getPhysicalState() {
return physicalState;
}

public void setPhysicalState(String physicalState) {
this.physicalState = physicalState;
}


/**
* @return whether this processor supports batching
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ProcessorEntity extends ComponentEntity implements Permissible<Proc

private ProcessorDTO component;
private String inputRequirement;
private String physicalState;
private ProcessorStatusDTO status;
private PermissionsDTO operatePermissions;

Expand Down Expand Up @@ -75,6 +76,20 @@ public void setInputRequirement(String inputRequirement) {
this.inputRequirement = inputRequirement;
}

/**
* @return the physical state of this processor
*/
@Schema(description = "The physical state of the processor, including transition states",
allowableValues = {"RUNNING", "STOPPED", "DISABLED", "STARTING", "STOPPING", "RUN_ONCE"})
public String getPhysicalState() {
return physicalState;
}

public void setPhysicalState(String physicalState) {
this.physicalState = physicalState;
}


/**
* @return The permissions for this component operations
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ public void merge(ProcessorEntity clientEntity, Map<NodeIdentifier, ProcessorEnt
mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey());
}
}

// Merge physical state
mergePhysicalState(clientEntity, entityMap);
}

/**
* Merges the ProcessorEntity responses.
*
* @param clientEntity the entity being returned to the client
* @param entityMap all node responses
* @param entityMap all node responses
*/
@Override
public void mergeComponents(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap) {
Expand Down Expand Up @@ -111,11 +114,39 @@ private static void mergeDtos(final ProcessorDTO clientDto, final Map<NodeIdenti
}

final Set<String> statuses = dtoMap.values().stream()
.map(ProcessorDTO::getValidationStatus)
.collect(Collectors.toSet());
.map(ProcessorDTO::getValidationStatus)
.collect(Collectors.toSet());
clientDto.setValidationStatus(ErrorMerger.mergeValidationStatus(statuses));

// set the merged the validation errors
clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size()));
}

/**
* Merges physical state from cluster nodes, giving precedence to transition states (STARTING, STOPPING)
* over stable states since they indicate a processor is actively changing state.
*/
private static void mergePhysicalState(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap) {
String mergedPhysicalState = clientEntity.getPhysicalState();

for (final ProcessorEntity nodeEntity : entityMap.values()) {
final String nodePhysicalState = nodeEntity.getPhysicalState();
if (nodePhysicalState != null) {
final boolean nodeIsTransition = isTransitionState(nodePhysicalState);
final boolean mergedIsTransition = isTransitionState(mergedPhysicalState);

// Always use transition states, or use any state if merged has none
if (nodeIsTransition || !mergedIsTransition) {
mergedPhysicalState = nodePhysicalState;
}
}
}

clientEntity.setPhysicalState(mergedPhysicalState);
}

private static boolean isTransitionState(final String physicalState) {
return "STARTING".equals(physicalState) || "STOPPING".equals(physicalState);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.cluster.manager;

import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

class ProcessorEntityMergerTest {

@Test
void testMergePhysicalState_StoppingTakePrecedence() {
// Test that STOPPING state takes precedence over stable states
final ProcessorEntity clientEntity = createProcessorEntity("client", "STOPPED");
final ProcessorEntity nodeEntity1 = createProcessorEntity("node1", "STOPPED");
final ProcessorEntity nodeEntity2 = createProcessorEntity("node2", "STOPPING");
final ProcessorEntity nodeEntity3 = createProcessorEntity("node3", "STOPPED");

final Map<NodeIdentifier, ProcessorEntity> entityMap = new HashMap<>();
entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
entityMap.put(getNodeIdentifier("node1", 8001), nodeEntity1);
entityMap.put(getNodeIdentifier("node2", 8002), nodeEntity2);
entityMap.put(getNodeIdentifier("node3", 8003), nodeEntity3);

ProcessorEntityMerger merger = new ProcessorEntityMerger();
merger.merge(clientEntity, entityMap);

assertEquals("STOPPING", clientEntity.getPhysicalState());
}

@Test
void testMergePhysicalState_StartingTakePrecedence() {
// Test that STARTING state takes precedence over stable states
final ProcessorEntity clientEntity = createProcessorEntity("client", "RUNNING");
final ProcessorEntity nodeEntity1 = createProcessorEntity("node1", "RUNNING");
final ProcessorEntity nodeEntity2 = createProcessorEntity("node2", "STARTING");
final ProcessorEntity nodeEntity3 = createProcessorEntity("node3", "RUNNING");

final Map<NodeIdentifier, ProcessorEntity> entityMap = new HashMap<>();
entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
entityMap.put(getNodeIdentifier("node1", 8001), nodeEntity1);
entityMap.put(getNodeIdentifier("node2", 8002), nodeEntity2);
entityMap.put(getNodeIdentifier("node3", 8003), nodeEntity3);

ProcessorEntityMerger merger = new ProcessorEntityMerger();
merger.merge(clientEntity, entityMap);

assertEquals("STARTING", clientEntity.getPhysicalState());
}

@Test
void testMergePhysicalState_NullPhysicalStates() {
// Test that null physical states are handled gracefully
final ProcessorEntity clientEntity = createProcessorEntity("client", null);
final ProcessorEntity nodeEntity1 = createProcessorEntity("node1", null);
final ProcessorEntity nodeEntity2 = createProcessorEntity("node2", "RUNNING");

final Map<NodeIdentifier, ProcessorEntity> entityMap = new HashMap<>();
entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
entityMap.put(getNodeIdentifier("node1", 8001), nodeEntity1);
entityMap.put(getNodeIdentifier("node2", 8002), nodeEntity2);

ProcessorEntityMerger merger = new ProcessorEntityMerger();
merger.merge(clientEntity, entityMap);

assertEquals("RUNNING", clientEntity.getPhysicalState());
}

private NodeIdentifier getNodeIdentifier(final String id, final int port) {
return new NodeIdentifier(id, "localhost", port, "localhost", port + 1, "localhost", port + 2, port + 3, true);
}

private ProcessorEntity createProcessorEntity(final String id, final String physicalState) {
final ProcessorDTO dto = new ProcessorDTO();
dto.setId(id);
dto.setState(ScheduledState.STOPPED.name());
dto.setPhysicalState(physicalState);
dto.setValidationStatus(ValidationStatus.VALID.name());

// Add ProcessorConfigDTO to avoid NPE during merging
final ProcessorConfigDTO configDto = new ProcessorConfigDTO();
configDto.setDescriptors(new HashMap<>());
dto.setConfig(configDto);

final ProcessorStatusDTO statusDto = new ProcessorStatusDTO();
statusDto.setRunStatus(RunStatus.Stopped.name());

final PermissionsDTO permissions = new PermissionsDTO();
permissions.setCanRead(true);
permissions.setCanWrite(true);

final ProcessorEntity entity = new ProcessorEntity();
entity.setComponent(dto);
entity.setRevision(createNewRevision());
entity.setPermissions(permissions);
entity.setStatus(statusDto);
entity.setPhysicalState(physicalState);

return entity;
}

public RevisionDTO createNewRevision() {
final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(getClass().getName());
revisionDto.setVersion(0L);
return revisionDto;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1330,8 +1330,11 @@ public void verifyCanStart() {

@Override
public void verifyCanStop() {
if (getScheduledState() != ScheduledState.RUNNING) {
throw new IllegalStateException(this + " cannot be stopped because is not scheduled to run");
final ScheduledState logicalState = getScheduledState();
final ScheduledState physicalState = getPhysicalScheduledState();

if (logicalState != ScheduledState.RUNNING && physicalState != ScheduledState.STARTING) {
throw new IllegalStateException(this + " cannot be stopped because is not scheduled to run and is not starting");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3379,6 +3379,7 @@ private ProcessorDTO createProcessorDto(final ProcessorNode node, final boolean
dto.setBundle(createBundleDto(bundleCoordinate));
dto.setName(node.getName());
dto.setState(node.getScheduledState().toString());
dto.setPhysicalState(node.getPhysicalScheduledState().toString());

// build the relationship dtos
final List<RelationshipDTO> relationships = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public ProcessorEntity createProcessorEntity(final ProcessorDTO dto, final Revis
entity.setStatus(status);
entity.setId(dto.getId());
entity.setInputRequirement(dto.getInputRequirement());
entity.setPhysicalState(dto.getPhysicalState());
entity.setPosition(dto.getPosition());
if (permissions != null && permissions.getCanRead()) {
entity.setComponent(dto);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,15 @@ private void verifyUpdate(ProcessorNode processor, ProcessorDTO processorDTO) {
case DISABLED:
processor.verifyCanDisable();
break;
case STARTING:
case STOPPING:
// These are internal transition states and should not be set directly via API
throw new IllegalArgumentException("Cannot set processor state to " + purposedScheduledState);
}
} else if (purposedScheduledState == ScheduledState.STOPPED && processor.getPhysicalScheduledState() == ScheduledState.STARTING) {
// Handle special case: verify stopping a processor that's physically starting
processor.getProcessGroup().verifyCanScheduleComponentsIndividually();
processor.verifyCanStop();
}
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format(
Expand Down Expand Up @@ -542,15 +550,24 @@ public ProcessorNode updateProcessor(ProcessorDTO processorDTO) {
case RUN_ONCE:
parentGroup.runProcessorOnce(processor, () -> parentGroup.stopProcessor(processor));
break;
case STARTING:
case STOPPING:
// These are internal transition states and should not be set directly via API
throw new IllegalStateException("Cannot set processor state to " + purposedScheduledState);
}
} catch (IllegalStateException | ComponentLifeCycleException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);
} catch (RejectedExecutionException ree) {
throw new NiFiCoreException("Unable to schedule all tasks for the specified processor.", ree);
} catch (NullPointerException npe) {
throw new NiFiCoreException("Unable to update processor run state.", npe);
} catch (Exception e) {
throw new NiFiCoreException("Unable to update processor run state: " + e, e);
throw new NiFiCoreException("Unable to update processor [%s] run state: %s".formatted(processor, e), e);
}
} else if (purposedScheduledState == ScheduledState.STOPPED && processor.getPhysicalScheduledState() == ScheduledState.STARTING) {
// Handle special case: allow stopping a processor that's physically starting
try {
parentGroup.stopProcessor(processor);
} catch (Exception e) {
throw new NiFiCoreException("Unable to stop starting processor [%s]: %s".formatted(processor, e), e);
}
}
}
Expand Down
Loading
Loading