Skip to content

Conversation

Mihhai
Copy link

@Mihhai Mihhai commented Sep 26, 2025

Summary

NIFI-15017

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@Mihhai Mihhai force-pushed the feature/add-processor-start/stop-c2-command branch from 22e7ed4 to 6e15b54 Compare September 29, 2025 16:08
return FULLY_APPLIED;
} catch (Exception e) {
LOGGER.error("Failed to change state for processor {}", processorId, e);
return PARTIALLY_APPLIED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion the PARTIALLY_APPLIED result is not suitable for a single processor start/stop operation. It would make sense for a processor group what could be partially started or stopped (eg some of the processors in the group are started or stopped by the operation meanwhile others remain in the opposite state for some reason) but the operation result for a single processor should not be partial.

@lordgamez could you please check what could be the operation state from the CPP agent in case of a processor start/stop operation? We should keep this in line with the existing implementation in the CPP agent.

Copy link
Author

@Mihhai Mihhai Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the cpp implementation ( response construction for start/stop and c2payload implementation ) it seems the response to a successful start/stop is a FULLY_APPLIED status. As a result i removed the PARTIALLY_APPLIED response as you suggested but still left the NOT_APPLIED in case of error. From my understanding of the cpp code there doesnt seem to be any handling of the case where the start/stop operation fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkedvessy sorry I could not check the change earlier, @Mihhai is right we only respond with FULLY_APPLIED every time for the start/stop messages, thanks for adjusting the code to minificpp!

return result;
}

private RunStatus mapProcessorRunStatus(org.apache.nifi.controller.status.RunStatus controllerRunStatus) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the switch statement, we can simply return STOPPED as default and we can keep RUNNING mapping as it is now.

In other hand, controllerRunStatus parameter naming is misleading. We should use something like a runStatus or processorRunStatus as parameter name or we can simply set the runStatus without this method like
result.setRunStatus(processorStatus.getRunStatus() == Running ? RUNNING : STOPPED);

Copy link
Author

@Mihhai Mihhai Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i refactored to the one line mapping you suggested and i removed the helper method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx

@Mihhai Mihhai force-pushed the feature/add-processor-start/stop-c2-command branch from 3e2629b to f4e9486 Compare October 2, 2025 09:31
@Mihhai Mihhai requested a review from pkedvessy October 3, 2025 09:35
return changeState(processorId, false);
}

private OperationState changeState(String processorId, boolean start) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick:
Controlling the state transition with a boolean flag works but not well readable without checking the body of the method. I can see the reason behind: this way you did not duplicate the boilerplate code around the flowController calls. I think it would be a bit more readable by passing the state transition as a method reference:

Eg.: changeState(processorId, this::star) and changeState(processorId, this::stop)

private void start(String processorId, String parentGroupId) ...
private void stop(String processorId, String parentGroupId) ...

@pkedvessy
Copy link
Contributor

In general I'm happy with your PR but before moving forward to merge it I'd have someone from the CPP Agent team to make sure the C2 API is 100% compatible with their implementation.

@szaszm
Copy link
Member

szaszm commented Oct 3, 2025

I believe it's currently not compatible with the API implemented by MiNiFi C++, but we're open to an API change if we can agree that the new API is an improvement, and if we can provide a transition period. I find it difficult to understand what protocol API this code maps to. In MiNiFi C++, I believe we have this:

  • Operation: START/STOP
    valid "name": C2 | FlowController | <component UUID> | <component name>
    content / arguments: (empty, not used)

From what I understand after talking to @pkedvessy, this pull request adds a new "name" called "processor, where we would put the processor UUID in the (currently unused) arguments, as a JSON Object with the key "processorId".

@lordgamez
Copy link
Contributor

I believe it's currently not compatible with the API implemented by MiNiFi C++, but we're open to an API change if we can agree that the new API is an improvement, and if we can provide a transition period. I find it difficult to understand what protocol API this code maps to. In MiNiFi C++, I believe we have this:

* Operation: START/STOP
  valid "name": C2 | FlowController | <component UUID> | <component name>
  content / arguments: (empty, not used)

From what I understand after talking to @pkedvessy, this pull request adds a new "name" called "processor, where we would put the processor UUID in the (currently unused) arguments, as a JSON Object with the key "processorId".

Just to have a clear example currently in MiNiFi C++ the format of the start/stop request looks like the following where the operand is a processor id:

{
    "operation": "heartbeat",
    "requested_operations": [
        {
            "operationid": 41,
            "operation": "stop",
            "operand": "2438e3c8-015a-1000-79ca-83af40ec1991"
        }
    ]
}

What is the equivalent json in this case?

@Mihhai Mihhai closed this Oct 3, 2025
@Mihhai Mihhai reopened this Oct 3, 2025
@Mihhai
Copy link
Author

Mihhai commented Oct 3, 2025

I believe it's currently not compatible with the API implemented by MiNiFi C++, but we're open to an API change if we can agree that the new API is an improvement, and if we can provide a transition period. I find it difficult to understand what protocol API this code maps to. In MiNiFi C++, I believe we have this:

* Operation: START/STOP
  valid "name": C2 | FlowController | <component UUID> | <component name>
  content / arguments: (empty, not used)

From what I understand after talking to @pkedvessy, this pull request adds a new "name" called "processor, where we would put the processor UUID in the (currently unused) arguments, as a JSON Object with the key "processorId".

Just to have a clear example currently in MiNiFi C++ the format of the start/stop request looks like the following where the operand is a processor id:

{
    "operation": "heartbeat",
    "requested_operations": [
        {
            "operationid": 41,
            "operation": "stop",
            "operand": "2438e3c8-015a-1000-79ca-83af40ec1991"
        }
    ]
}

What is the equivalent json in this case?

Currently the json for a stop flow opearation which is already integrated in the java solution would be :

{
    "requested_operations": [
        {
            "identifier": <some-id>,
            "operation": "STOP",
            "operand": "FLOW"
        }
    ]
}

I believe this stops everything inside the root process group, and if anything other than FLOW is sent as operand the command fails

The json for the stop processor command i proposed would look like this:

{
    "requested_operations": [
        {
            "identifier": <some-id>,
            "operation": "STOP",
            "operand": "PROCESSOR",
            "args": {
                    "processorId": "processor_id"
            }
        }
    ]
}

This is the implementation i naturally gravitated towards given the selection machanism of handlers that the java implementation has. It selects the proper handler based on a combination of operation/operand.

I believe there are already some differences between the interfaces of the 2 implementations.So in order to be compatible with the cpp variant and have start/stop per processor we would need some sort of detection logic for the value received in the operand field.

@lordgamez
Copy link
Contributor

Currently the json for a stop flow opearation which is already integrated in the java solution would be :

{
    "requested_operations": [
        {
            "identifier": <some-id>,
            "operation": "STOP",
            "operand": "FLOW"
        }
    ]
}

I believe this stops everything inside the root process group, and if anything other than FLOW is sent as operand the command fails

The json for the stop processor command i proposed would look like this:

{
    "requested_operations": [
        {
            "identifier": <some-id>,
            "operation": "STOP",
            "operand": "PROCESSOR",
            "args": {
                    "processorId": "processor_id"
            }
        }
    ]
}

This is the implementation i naturally gravitated towards given the selection machanism of handlers that the java implementation has. It selects the proper handler based on a combination of operation/operand.

I believe there are already some differences between the interfaces of the 2 implementations.So in order to be compatible with the cpp variant and have start/stop per processor we would need some sort of detection logic for the value received in the operand field.

I think this is okay to move forward with this implementation and we'll adjust minifi-cpp and the open source minifi-c2 server implementation to this in the future. It will be easier to accept both the old and the new notation in minifi-cpp for a while until the old notation is taken out in a future release.

@szaszm
Copy link
Member

szaszm commented Oct 3, 2025

I agree with Gábor: we can go forward with this API and change minifi c++ and ask adamdebreceni/c2-server to use this new API. It seems to work better in the Java world to have the operand name be a static string.

@Mihhai just out of curiosity, what C2 server do you use? Do you have a custom one, or did you patch something to add this capability? If it's open source, we may want to add integration tests with it to ensure no breakage in the future.

@Mihhai
Copy link
Author

Mihhai commented Oct 3, 2025

I agree with Gábor: we can go forward with this API and change minifi c++ and ask adamdebreceni/c2-server to use this new API. It seems to work better in the Java world to have the operand name be a static string.

@Mihhai just out of curiosity, what C2 server do you use? Do you have a custom one, or did you patch something to add this capability? If it's open source, we may want to add integration tests with it to ensure no breakage in the future.

I just implemented a small, custom server to exercise and validate the flows during development. Nothing special, i just exposed 2 endpoints (heartbeat and acknowledgment) and some simple mechanism to queue up commands in the heartbeat response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants