Skip to content

Commit f4e9486

Browse files
committed
add processor start/stop operation
1 parent 47fecfa commit f4e9486

File tree

11 files changed

+633
-3
lines changed

11 files changed

+633
-3
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.c2.client.service.operation;
19+
20+
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
21+
22+
public interface ProcessorStateStrategy {
23+
OperationState startProcessor(String processorId);
24+
OperationState stopProcessor(String processorId);
25+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.c2.client.service.operation;
19+
20+
import org.apache.nifi.c2.protocol.api.C2Operation;
21+
import org.apache.nifi.c2.protocol.api.C2OperationAck;
22+
import org.apache.nifi.c2.protocol.api.C2OperationState;
23+
import org.apache.nifi.c2.protocol.api.OperandType;
24+
import org.apache.nifi.c2.protocol.api.OperationType;
25+
26+
import java.util.Collections;
27+
import java.util.Map;
28+
29+
import static java.util.Optional.ofNullable;
30+
import static org.apache.commons.lang3.StringUtils.EMPTY;
31+
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
32+
33+
public class StartProcessorOperationHandler implements C2OperationHandler {
34+
35+
public static final String PROCESSOR_ID_ARG = StopProcessorOperationHandler.PROCESSOR_ID_ARG;
36+
public static final String NOT_APPLIED_DETAILS = "Failed to start processor (not found or invalid state)";
37+
public static final String FULLY_APPLIED_DETAILS = "Processor started";
38+
public static final String PARTIALLY_APPLIED_DETAILS = "Processor start partially applied";
39+
40+
private final ProcessorStateStrategy processorStateStrategy;
41+
42+
public StartProcessorOperationHandler(ProcessorStateStrategy processorStateStrategy) {
43+
this.processorStateStrategy = processorStateStrategy;
44+
}
45+
46+
@Override
47+
public OperationType getOperationType() {
48+
return OperationType.START;
49+
}
50+
51+
@Override
52+
public OperandType getOperandType() {
53+
return OperandType.PROCESSOR;
54+
}
55+
56+
@Override
57+
public Map<String, Object> getProperties() {
58+
return Collections.emptyMap();
59+
}
60+
61+
@Override
62+
public C2OperationAck handle(C2Operation operation) {
63+
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
64+
String processorId = ofNullable(operation.getArgs()).map(a -> a.get(PROCESSOR_ID_ARG)).map(Object::toString).orElse(null);
65+
C2OperationState.OperationState opState;
66+
if (processorId == null) {
67+
opState = NOT_APPLIED;
68+
} else {
69+
opState = processorStateStrategy.startProcessor(processorId);
70+
}
71+
72+
String details = switch (opState) {
73+
case NOT_APPLIED -> NOT_APPLIED_DETAILS;
74+
case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
75+
case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
76+
default -> PARTIALLY_APPLIED_DETAILS;
77+
};
78+
79+
return operationAck(operationId, operationState(opState, details));
80+
}
81+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.c2.client.service.operation;
19+
20+
import org.apache.nifi.c2.protocol.api.C2Operation;
21+
import org.apache.nifi.c2.protocol.api.C2OperationAck;
22+
import org.apache.nifi.c2.protocol.api.C2OperationState;
23+
import org.apache.nifi.c2.protocol.api.OperandType;
24+
import org.apache.nifi.c2.protocol.api.OperationType;
25+
26+
import java.util.Collections;
27+
import java.util.Map;
28+
29+
import static java.util.Optional.ofNullable;
30+
import static org.apache.commons.lang3.StringUtils.EMPTY;
31+
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
32+
33+
public class StopProcessorOperationHandler implements C2OperationHandler {
34+
35+
public static final String PROCESSOR_ID_ARG = "processorId";
36+
public static final String NOT_APPLIED_DETAILS = "Failed to stop processor (not found)";
37+
public static final String FULLY_APPLIED_DETAILS = "Processor stopped";
38+
public static final String PARTIALLY_APPLIED_DETAILS = "Processor stop partially applied";
39+
40+
private final ProcessorStateStrategy processorStateStrategy;
41+
42+
public StopProcessorOperationHandler(ProcessorStateStrategy processorStateStrategy) {
43+
this.processorStateStrategy = processorStateStrategy;
44+
}
45+
46+
@Override
47+
public OperationType getOperationType() {
48+
return OperationType.STOP;
49+
}
50+
51+
@Override
52+
public OperandType getOperandType() {
53+
return OperandType.PROCESSOR;
54+
}
55+
56+
@Override
57+
public Map<String, Object> getProperties() {
58+
return Collections.emptyMap();
59+
}
60+
61+
@Override
62+
public C2OperationAck handle(C2Operation operation) {
63+
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
64+
String processorId = ofNullable(operation.getArgs()).map(a -> a.get(PROCESSOR_ID_ARG)).map(Object::toString).orElse(null);
65+
C2OperationState.OperationState opState;
66+
if (processorId == null) {
67+
opState = NOT_APPLIED;
68+
} else {
69+
opState = processorStateStrategy.stopProcessor(processorId);
70+
}
71+
72+
String details = switch (opState) {
73+
case NOT_APPLIED -> NOT_APPLIED_DETAILS;
74+
case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
75+
case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
76+
default -> PARTIALLY_APPLIED_DETAILS;
77+
};
78+
79+
return operationAck(operationId, operationState(opState, details));
80+
}
81+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.c2.client.service.operation;
19+
20+
import org.apache.nifi.c2.protocol.api.C2Operation;
21+
import org.apache.nifi.c2.protocol.api.C2OperationAck;
22+
import org.apache.nifi.c2.protocol.api.C2OperationState;
23+
import org.junit.jupiter.api.Test;
24+
import org.junit.jupiter.api.extension.ExtendWith;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.Arguments;
27+
import org.junit.jupiter.params.provider.MethodSource;
28+
import org.mockito.InjectMocks;
29+
import org.mockito.Mock;
30+
import org.mockito.junit.jupiter.MockitoExtension;
31+
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
import java.util.stream.Stream;
35+
36+
import static org.apache.commons.lang3.StringUtils.EMPTY;
37+
import static org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler.FULLY_APPLIED_DETAILS;
38+
import static org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler.NOT_APPLIED_DETAILS;
39+
import static org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler.PARTIALLY_APPLIED_DETAILS;
40+
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
41+
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
42+
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
43+
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
44+
import static org.apache.nifi.c2.protocol.api.OperandType.PROCESSOR;
45+
import static org.apache.nifi.c2.protocol.api.OperationType.START;
46+
import static org.junit.jupiter.api.Assertions.assertEquals;
47+
import static org.mockito.Mockito.when;
48+
49+
@ExtendWith(MockitoExtension.class)
50+
public class StartProcessorOperationHandlerTest {
51+
52+
private static final String OPERATION_ID = "operation id";
53+
private static final String PROCESSOR_ID = "processor id";
54+
55+
@Mock
56+
private ProcessorStateStrategy processorStateStrategy;
57+
58+
@InjectMocks
59+
private StartProcessorOperationHandler victim;
60+
61+
@Test
62+
public void testOperationAndOperandTypes() {
63+
assertEquals(START, victim.getOperationType());
64+
assertEquals(PROCESSOR, victim.getOperandType());
65+
}
66+
67+
@ParameterizedTest(name = "operationId={0} ackOperationId={1} state={2} details={3}")
68+
@MethodSource("handleArguments")
69+
public void testHandle(String operationId, String expectedAckOperationId, C2OperationState.OperationState state, String expectedDetails) {
70+
when(processorStateStrategy.startProcessor(PROCESSOR_ID)).thenReturn(state);
71+
72+
C2Operation operation = anOperation(operationId, PROCESSOR_ID);
73+
C2OperationAck ack = victim.handle(operation);
74+
75+
assertEquals(expectedAckOperationId, ack.getOperationId());
76+
assertEquals(state, ack.getOperationState().getState());
77+
assertEquals(expectedDetails, ack.getOperationState().getDetails());
78+
}
79+
80+
private static Stream<Arguments> handleArguments() {
81+
return Stream.of(
82+
Arguments.of(null, EMPTY, NOT_APPLIED, NOT_APPLIED_DETAILS),
83+
Arguments.of(null, EMPTY, FULLY_APPLIED, FULLY_APPLIED_DETAILS),
84+
Arguments.of(null, EMPTY, PARTIALLY_APPLIED, PARTIALLY_APPLIED_DETAILS),
85+
Arguments.of(null, EMPTY, NO_OPERATION, PARTIALLY_APPLIED_DETAILS),
86+
Arguments.of(OPERATION_ID, OPERATION_ID, NOT_APPLIED, NOT_APPLIED_DETAILS),
87+
Arguments.of(OPERATION_ID, OPERATION_ID, FULLY_APPLIED, FULLY_APPLIED_DETAILS),
88+
Arguments.of(OPERATION_ID, OPERATION_ID, PARTIALLY_APPLIED, PARTIALLY_APPLIED_DETAILS),
89+
Arguments.of(OPERATION_ID, OPERATION_ID, NO_OPERATION, PARTIALLY_APPLIED_DETAILS)
90+
);
91+
}
92+
93+
@Test
94+
public void testHandleMissingProcessorId() {
95+
C2Operation operation = new C2Operation();
96+
operation.setIdentifier(OPERATION_ID);
97+
C2OperationAck ack = victim.handle(operation);
98+
assertEquals(OPERATION_ID, ack.getOperationId());
99+
assertEquals(NOT_APPLIED, ack.getOperationState().getState());
100+
assertEquals(NOT_APPLIED_DETAILS, ack.getOperationState().getDetails());
101+
}
102+
103+
private C2Operation anOperation(String operationId, String processorId) {
104+
C2Operation operation = new C2Operation();
105+
operation.setIdentifier(operationId);
106+
Map<String, Object> args = new HashMap<>();
107+
args.put(StartProcessorOperationHandler.PROCESSOR_ID_ARG, processorId);
108+
operation.setArgs(args);
109+
return operation;
110+
}
111+
}

0 commit comments

Comments
 (0)