diff --git a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java index ad46e2351c19..425010355af3 100644 --- a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java +++ b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java @@ -85,7 +85,7 @@ public interface C2Client { * * @param absoluteUrl absolute url sent by C2 server * @param relativeUrl relative url sent by C2 server - * @return an optional with content of finalised callback url + * @return finalised callback url */ - Optional getCallbackUrl(String absoluteUrl, String relativeUrl); + String getCallbackUrl(String absoluteUrl, String relativeUrl); } diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java index beff8d171a43..1c9ecc163add 100644 --- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java @@ -146,7 +146,7 @@ public Optional uploadBundle(String callbackUrl, byte[] bundle) { } @Override - public Optional getCallbackUrl(String absoluteUrl, String relativeUrl) { + public String getCallbackUrl(String absoluteUrl, String relativeUrl) { return c2UrlProvider.getCallbackUrl(absoluteUrl, relativeUrl); } diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProvider.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProvider.java index 51ea7a30c822..15447f8b96a6 100644 --- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProvider.java +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProvider.java @@ -17,8 +17,6 @@ package org.apache.nifi.c2.client.http.url; -import java.util.Optional; - public interface C2UrlProvider { /** @@ -42,5 +40,5 @@ public interface C2UrlProvider { * @param relativeUrl relative url sent by the C2 server * @return the url of the C2 server to send requests to */ - Optional getCallbackUrl(String absoluteUrl, String relativeUrl); + String getCallbackUrl(String absoluteUrl, String relativeUrl); } diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProvider.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProvider.java index fe9fad24ef7e..9040754486c5 100644 --- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProvider.java +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProvider.java @@ -45,11 +45,12 @@ public String getAcknowledgeUrl() { } @Override - public Optional getCallbackUrl(String absoluteUrl, String relativeUrl) { - Optional url = Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank); - if (!url.isPresent()) { - LOG.error("Provided absolute url was empty or null. Relative urls are not supported with this configuration"); - } - return url; + public String getCallbackUrl(String absoluteUrl, String relativeUrl) { + return Optional.ofNullable(absoluteUrl) + .filter(StringUtils::isNotBlank) + .orElseThrow( () -> { + LOG.error("Provided absolute url was empty or null. Relative urls are not supported with this configuration"); + throw new IllegalArgumentException("Provided absolute url was empty or null. Relative C2 urls are not supported with this configuration"); + }); } } diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2UrlProvider.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2UrlProvider.java index aeb8708101df..fcb440081e52 100644 --- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2UrlProvider.java +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2UrlProvider.java @@ -59,11 +59,12 @@ public String getAcknowledgeUrl() { } @Override - public Optional getCallbackUrl(String absoluteUrl, String relativeUrl) { + public String getCallbackUrl(String absoluteUrl, String relativeUrl) { return Optional.ofNullable(relativeUrl) .map(this::toAbsoluteUrl) .filter(Optional::isPresent) - .orElseGet(() -> Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank)); + .orElseGet(() -> Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank)) + .orElseThrow(() -> new IllegalArgumentException("Unable to return non empty c2 url.")); } private Optional toAbsoluteUrl(String path) { diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProviderTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProviderTest.java index 05f88b65e107..bf1913845777 100644 --- a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProviderTest.java +++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProviderTest.java @@ -18,6 +18,7 @@ package org.apache.nifi.c2.client.http.url; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Optional; import java.util.stream.Stream; @@ -33,6 +34,9 @@ public class LegacyC2UrlProviderTest { private static final String C2_HEARTBEAT_URL = "https://host:8080/c2/api/heartbeat"; private static final String C2_ACKNOWLEDGE_URL = "https://host:8080/c2/api/acknowledge"; + private static final String ABSOLUTE_URL = "http://c2/api/callback"; + private static final String RELATIVE_URL = "any_url"; + private static final String EXPECTED_URL = "http://c2/api/callback"; @Test public void testProviderIsCreatedAndReturnsProperHeartbeatAndAcknowledgeUrls() { @@ -44,9 +48,15 @@ public void testProviderIsCreatedAndReturnsProperHeartbeatAndAcknowledgeUrls() { @MethodSource("testCallbackUrlProvidedArguments") @ParameterizedTest(name = "{index} => absoluteUrl={0}, relativeUrl={1}, expectedCallbackUrl={2}") - public void testCallbackUrlProvidedFor(String absoluteUrl, String relativeUrl, Optional expectedCallbackUrl) { + public void testCallbackUrlProvidedForInvalidInputs(String absoluteUrl, String relativeUrl) { LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL); - assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl)); + assertThrows(IllegalArgumentException.class, () -> testProvider.getCallbackUrl(absoluteUrl, relativeUrl)); + } + + @Test + public void testCallbackUrlProvidedFor() { + LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL); + assertEquals(EXPECTED_URL, testProvider.getCallbackUrl(ABSOLUTE_URL, RELATIVE_URL)); } private static Stream testCallbackUrlProvidedArguments() { @@ -54,8 +64,7 @@ private static Stream testCallbackUrlProvidedArguments() { Arguments.of(null, null, Optional.empty()), Arguments.of(null, "any_url", Optional.empty()), Arguments.of("", "", Optional.empty()), - Arguments.of("", "any_url", Optional.empty()), - Arguments.of("http://c2/api/callback", "any_url", Optional.of("http://c2/api/callback")) + Arguments.of("", "any_url", Optional.empty()) ); } } diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2ProviderTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2ProviderTest.java index 99c1a8221122..21dffc432854 100644 --- a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2ProviderTest.java +++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2ProviderTest.java @@ -18,6 +18,7 @@ package org.apache.nifi.c2.client.http.url; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import java.util.Optional; @@ -83,16 +84,36 @@ private static Stream testValidProviderConstructorArguments() { @MethodSource("testCallbackUrlProvidedArguments") @ParameterizedTest(name = "{index} => c2RestBase={0}, absoluteUrl={1}, relativeUrl={2}, expectedCallbackUrl={3}") - public void testCallbackUrlProvidedFor(String c2RestBase, String absoluteUrl, String relativeUrl, Optional expectedCallbackUrl) { + public void testCallbackUrlProvidedForValidInputs(String c2RestBase, String absoluteUrl, String relativeUrl, String expectedCallbackUrl) { ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestBase, "any_path", "any_path"); assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl)); } + @MethodSource("testCallbackUrlProvidedInvalidArguments") + @ParameterizedTest(name = "{index} => c2RestBase={0}, absoluteUrl={1}, relativeUrl={2}, expectedCallbackUrl={3}") + public void testCallbackUrlProvidedForInvalidInputs(String c2RestBase, String absoluteUrl, String relativeUrl) { + ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestBase, "any_path", "any_path"); + assertThrows(IllegalArgumentException.class, () -> testProvider.getCallbackUrl(absoluteUrl, relativeUrl)); + } + private static Stream testCallbackUrlProvidedArguments() { String c2RestBaseNoTrailingSlash = "http://c2/api"; String c2RestBaseWithTrailingSlash = "http://c2/api/"; String path = "path/endpoint"; String absoluteUrl = "http://c2-other/api/path/endpoint"; + return Stream.of( + Arguments.of(c2RestBaseNoTrailingSlash, null, path, c2RestBaseWithTrailingSlash + path), + Arguments.of(c2RestBaseNoTrailingSlash, "", "/" + path, c2RestBaseWithTrailingSlash + path), + Arguments.of(c2RestBaseWithTrailingSlash, null, path, c2RestBaseWithTrailingSlash + path), + Arguments.of(c2RestBaseWithTrailingSlash, "", "/" + path, c2RestBaseWithTrailingSlash + path), + Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, null, absoluteUrl), + Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, "", absoluteUrl) + ); + } + + private static Stream testCallbackUrlProvidedInvalidArguments() { + String c2RestBaseNoTrailingSlash = "http://c2/api"; + String c2RestBaseWithTrailingSlash = "http://c2/api/"; return Stream.of( Arguments.of(c2RestBaseNoTrailingSlash, null, null, Optional.empty()), Arguments.of(c2RestBaseNoTrailingSlash, "", null, Optional.empty()), @@ -101,13 +122,7 @@ private static Stream testCallbackUrlProvidedArguments() { Arguments.of(c2RestBaseWithTrailingSlash, null, null, Optional.empty()), Arguments.of(c2RestBaseWithTrailingSlash, "", null, Optional.empty()), Arguments.of(c2RestBaseWithTrailingSlash, null, "", Optional.empty()), - Arguments.of(c2RestBaseWithTrailingSlash, "", "", Optional.empty()), - Arguments.of(c2RestBaseNoTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)), - Arguments.of(c2RestBaseNoTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)), - Arguments.of(c2RestBaseWithTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)), - Arguments.of(c2RestBaseWithTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)), - Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, null, Optional.of(absoluteUrl)), - Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, "", Optional.of(absoluteUrl)) + Arguments.of(c2RestBaseWithTrailingSlash, "", "", Optional.empty()) ); } } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java index 4ecd66273448..4d504d048e2a 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java @@ -20,14 +20,18 @@ import static java.util.Optional.ofNullable; import com.fasterxml.jackson.core.type.TypeReference; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.nifi.c2.protocol.api.C2Operation; import org.apache.nifi.c2.protocol.api.C2OperationAck; import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.apache.nifi.c2.protocol.api.FailureCause; import org.apache.nifi.c2.protocol.api.OperandType; import org.apache.nifi.c2.protocol.api.OperationType; import org.apache.nifi.c2.serializer.C2Serializer; +import org.apache.nifi.minifi.validator.ValidationException; /** * Handler interface for the different operation types @@ -79,13 +83,38 @@ default boolean requiresRestart() { * @param details additional status info to detail the state * @return the created state */ - default C2OperationState operationState(C2OperationState.OperationState operationState, String details) { + default C2OperationState operationState(C2OperationState.OperationState operationState, String details, Exception e) { C2OperationState state = new C2OperationState(); state.setState(operationState); state.setDetails(details); + ofNullable(e).map(this::toFailureCause).ifPresent(state::setFailureCause); return state; } + private FailureCause toFailureCause(Exception exception) { + FailureCause failureCause = new FailureCause(); + failureCause.setExceptionMessage(exception.getMessage()); + List causeList = new LinkedList<>(); + populateCausedChain(ofNullable(exception.getCause()), causeList); + failureCause.setCausedByMessages(causeList); + if (exception instanceof ValidationException validationException) { + failureCause.setValidationResults(validationException.getValidationResults()); + } + return failureCause; + } + + private List populateCausedChain(Optional cause, List causeList) { + cause.ifPresent(c -> { + causeList.add(c.getMessage()); + populateCausedChain(cause.map(Throwable::getCause), causeList); + }); + return causeList; + } + + default C2OperationState operationState(C2OperationState.OperationState operationState, String details) { + return operationState(operationState, details, null); + } + /** * Commonly used logic for creating an C2OperationAck object * @@ -122,9 +151,10 @@ default Optional getOperationArg(C2Operation operation, String argument) * @param serializer the serializer used to converting to the target class * @return the optional retrieved and converted argument value */ - default Optional getOperationArg(C2Operation operation, String argument, TypeReference type, C2Serializer serializer) { + default T getOperationArg(C2Operation operation, String argument, TypeReference type, C2Serializer serializer) { return ofNullable(operation.getArgs()) .map(args -> args.get(argument)) - .flatMap(arg -> serializer.convert(arg, type)); + .flatMap(arg -> serializer.convert(arg, type)) + .orElseThrow(() -> new IllegalArgumentException("Failed to parse argument " + argument + " of operation " + operation)); } } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java index 41fc564613da..a443226acb49 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java @@ -20,6 +20,7 @@ import static java.util.Optional.ofNullable; import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST; import static org.apache.nifi.c2.protocol.api.OperationType.DESCRIBE; @@ -33,12 +34,16 @@ import org.apache.nifi.c2.protocol.api.C2OperationAck; import org.apache.nifi.c2.protocol.api.OperandType; import org.apache.nifi.c2.protocol.api.OperationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DescribeManifestOperationHandler implements C2OperationHandler { + private static final String ERROR_MESSAGE = "Failed to execute manifest describe operation."; private final C2HeartbeatFactory heartbeatFactory; private final Supplier runtimeInfoSupplier; private final OperandPropertiesProvider operandPropertiesProvider; + private static final Logger LOGGER = LoggerFactory.getLogger(DescribeManifestOperationHandler.class); public DescribeManifestOperationHandler(C2HeartbeatFactory heartbeatFactory, Supplier runtimeInfoSupplier, OperandPropertiesProvider operandPropertiesProvider) { @@ -60,15 +65,20 @@ public OperandType getOperandType() { @Override public C2OperationAck handle(C2Operation operation) { String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); + C2OperationAck c2OperationAck; + try { + RuntimeInfoWrapper runtimeInfoWrapper = runtimeInfoSupplier.get(); + C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoWrapper); - RuntimeInfoWrapper runtimeInfoWrapper = runtimeInfoSupplier.get(); - C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoWrapper); - - C2OperationAck c2OperationAck = operationAck(operationId, operationState(FULLY_APPLIED, EMPTY)); - c2OperationAck.setAgentInfo(agentInfo(heartbeat, runtimeInfoWrapper)); - c2OperationAck.setDeviceInfo(heartbeat.getDeviceInfo()); - c2OperationAck.setFlowInfo(heartbeat.getFlowInfo()); - c2OperationAck.setResourceInfo(heartbeat.getResourceInfo()); + c2OperationAck = operationAck(operationId, operationState(FULLY_APPLIED, EMPTY)); + c2OperationAck.setAgentInfo(agentInfo(heartbeat, runtimeInfoWrapper)); + c2OperationAck.setDeviceInfo(heartbeat.getDeviceInfo()); + c2OperationAck.setFlowInfo(heartbeat.getFlowInfo()); + c2OperationAck.setResourceInfo(heartbeat.getResourceInfo()); + } catch (Exception e) { + LOGGER.error(ERROR_MESSAGE, e); + c2OperationAck = operationAck(operationId, operationState(NOT_APPLIED, ERROR_MESSAGE, e)); + } return c2OperationAck; } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandler.java index 839c287a2561..01153be8569a 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandler.java @@ -27,7 +27,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.nifi.c2.client.api.C2Client; import org.apache.nifi.c2.protocol.api.C2Operation; import org.apache.nifi.c2.protocol.api.C2OperationAck; @@ -89,20 +88,24 @@ public Map getProperties() { public C2OperationAck handle(C2Operation operation) { String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); - Optional resourcesGlobalHash = getOperationArg(operation, GLOBAL_HASH_FIELD, new TypeReference<>() { - }, c2Serializer); - if (resourcesGlobalHash.isEmpty()) { + ResourcesGlobalHash resourcesGlobalHash; + try { + resourcesGlobalHash = getOperationArg(operation, GLOBAL_HASH_FIELD, new TypeReference<>() { }, c2Serializer); + } catch (Exception e) { LOG.error("Resources global hash could not be constructed from C2 request"); - return operationAck(operationId, operationState(NOT_APPLIED, "Resources global hash element was not found")); + return operationAck(operationId, operationState(NOT_APPLIED, "Resources global hash element was not found", e)); } - Optional> resourceItems = getOperationArg(operation, RESOURCE_LIST_FIELD, new TypeReference<>() { - }, c2Serializer); - if (resourceItems.isEmpty()) { + + List resourceItems; + try { + resourceItems = getOperationArg(operation, RESOURCE_LIST_FIELD, new TypeReference<>() { }, c2Serializer); + + } catch (Exception e) { LOG.error("Resource item list could not be constructed from C2 request"); - return operationAck(operationId, operationState(NOT_APPLIED, "Resource item list element was not found")); + return operationAck(operationId, operationState(NOT_APPLIED, "Resource item list element was not found", e)); } - OperationState operationState = syncResourceStrategy.synchronizeResourceRepository(resourcesGlobalHash.get(), resourceItems.get(), c2Client::retrieveResourceItem, + OperationState operationState = syncResourceStrategy.synchronizeResourceRepository(resourcesGlobalHash, resourceItems, c2Client::retrieveResourceItem, relativeUrl -> c2Client.getCallbackUrl(null, relativeUrl)); C2OperationState resultState = operationState( operationState, diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceStrategy.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceStrategy.java index 213614204ab8..75255dc2f12c 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceStrategy.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceStrategy.java @@ -31,5 +31,5 @@ public interface SyncResourceStrategy { OperationState synchronizeResourceRepository(ResourcesGlobalHash resourcesGlobalHash, List c2ServerItems, BiFunction>, Optional> resourceDownloadFunction, - Function> urlEnrichFunction); + Function urlEnrichFunction); } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java index 083464d57825..99d7bbb1f53a 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java @@ -113,10 +113,12 @@ public Map getProperties() { public C2OperationAck handle(C2Operation operation) { String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); - Optional callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, TARGET_ARG).orElse(EMPTY), getOperationArg(operation, RELATIVE_TARGET_ARG).orElse(EMPTY)); - if (callbackUrl.isEmpty()) { + String callbackUrl; + try { + callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, TARGET_ARG).orElse(EMPTY), getOperationArg(operation, RELATIVE_TARGET_ARG).orElse(EMPTY)); + } catch (Exception e) { LOG.error("Callback URL could not be constructed from C2 request and current configuration"); - return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); + return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND, e)); } List preparedFiles = null; @@ -124,7 +126,7 @@ public C2OperationAck handle(C2Operation operation) { try { preparedFiles = prepareFiles(operationId, bundleFilePaths); operationState = createDebugBundle(preparedFiles) - .map(bundle -> c2Client.uploadBundle(callbackUrl.get(), bundle) + .map(bundle -> c2Client.uploadBundle(callbackUrl, bundle) .map(errorMessage -> operationState(NOT_APPLIED, errorMessage)) .orElseGet(() -> operationState(FULLY_APPLIED, SUCCESSFUL_UPLOAD))) .orElseGet(() -> operationState(NOT_APPLIED, UNABLE_TO_CREATE_BUNDLE)); diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java index 76847a58c149..fb70d697147f 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java @@ -97,9 +97,11 @@ public Map getProperties() { public C2OperationAck handle(C2Operation operation) { String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); - Optional callbackUrl = - c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY).orElse(EMPTY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY).orElse(EMPTY)); - if (callbackUrl.isEmpty()) { + String callbackUrl; + + try { + callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY).orElse(EMPTY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY).orElse(EMPTY)); + } catch (Exception e) { LOG.error("Callback URL could not be constructed from C2 request and current configuration"); return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); } @@ -114,7 +116,7 @@ public C2OperationAck handle(C2Operation operation) { LOG.info("Initiating asset update from url {} with name {}, force update is {}", callbackUrl, assetFileName, forceDownload); C2OperationState operationState = assetUpdatePrecondition.test(assetFileName.get(), forceDownload) - ? c2Client.retrieveUpdateAssetContent(callbackUrl.get()) + ? c2Client.retrieveUpdateAssetContent(callbackUrl) .map(content -> assetPersistFunction.apply(assetFileName.get(), content) ? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET) : operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK)) diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java index ecc31920a4c0..57e343fe9ca5 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java @@ -91,13 +91,15 @@ public C2OperationAck handle(C2Operation operation) { String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); String absoluteFlowUrl = getOperationArg(operation, FLOW_URL_KEY).orElse(getOperationArg(operation, LOCATION).orElse(EMPTY)); - Optional callbackUrl = client.getCallbackUrl(absoluteFlowUrl, getOperationArg(operation, FLOW_RELATIVE_URL_KEY).orElse(EMPTY)); - if (callbackUrl.isEmpty()) { + String callbackUrl; + try { + callbackUrl = client.getCallbackUrl(absoluteFlowUrl, getOperationArg(operation, FLOW_RELATIVE_URL_KEY).orElse(EMPTY)); + } catch (Exception e) { logger.error("Callback URL could not be constructed from C2 request and current configuration"); return operationAck(operationId, operationState(NOT_APPLIED, "Could not get callback url from operation and current configuration")); } - Optional flowId = getFlowId(operation, callbackUrl.get()); + Optional flowId = getFlowId(operation, callbackUrl); if (flowId.isEmpty()) { logger.error("FlowId is missing, no update will be performed"); return operationAck(operationId, operationState(NOT_APPLIED, "Could not get flowId from the operation")); @@ -110,7 +112,7 @@ public C2OperationAck handle(C2Operation operation) { logger.info("Will perform flow update from {} for operation #{}. Previous flow id was {}, replacing with new id {}", callbackUrl, operationId, ofNullable(flowIdHolder.getFlowId()).orElse("not set"), flowId.get()); - C2OperationState state = updateFlow(operationId, callbackUrl.get()); + C2OperationState state = updateFlow(operationId, callbackUrl); if (state.getState() == FULLY_APPLIED) { flowIdHolder.setFlowId(flowId.get()); } @@ -125,9 +127,11 @@ private C2OperationState updateFlow(String opIdentifier, String callbackUrl) { return operationState(NOT_APPLIED, "Update content retrieval resulted in empty content"); } - if (!updateConfigurationStrategy.update(updateContent.get())) { + try { + updateConfigurationStrategy.update(updateContent.get()); + } catch (Exception e) { logger.error("Update resulted in error for operation #{}.", opIdentifier); - return operationState(NOT_APPLIED, "Update resulted in error"); + return operationState(NOT_APPLIED, "Update resulted in error:", e); } logger.debug("Update configuration applied for operation #{}.", opIdentifier); diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationStrategy.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationStrategy.java index e551cb853e4d..32c82dc61fa0 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationStrategy.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationStrategy.java @@ -27,7 +27,7 @@ public interface UpdateConfigurationStrategy { * Updates the MiNiFi agent's flow with the flow passed as parameter * * @param flow the MiNiFi flow config JSON represented as a byte array - * @return true if the flow update was true, false otherwise + * @throw exception if update failed. */ - boolean update(byte[] flow); + void update(byte[] flow); } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/minifi/validator/ValidationException.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/minifi/validator/ValidationException.java new file mode 100644 index 000000000000..ac542a793cfd --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/minifi/validator/ValidationException.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.validator; + +import java.util.List; +import java.util.Objects; +import org.apache.nifi.components.ValidationResult; + +public class ValidationException extends IllegalStateException { + private List validationResults; + + public ValidationException(String message, List details) { + super(message); + this.validationResults = details; + } + + public List getValidationResults() { + return validationResults; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ValidationException that = (ValidationException) o; + return Objects.equals(validationResults, that.validationResults); + } + + @Override + public int hashCode() { + return Objects.hash(validationResults); + } + + @Override + public String toString() { + return "ValidationException{" + + "validationResults=" + validationResults + + "message=" + this.getMessage() + + '}'; + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java index 78a0fc44264a..07d83329f3ad 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java @@ -51,7 +51,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -139,7 +138,7 @@ public void testFilesAreCollectedAndUploadedAsATarGzBundle() { .collect(toList()); TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, createBundleFiles, DEFAULT_CONTENT_FILTER); C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); - when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(C2_DEBUG_UPLOAD_ENDPOINT)); + when(c2Client.getCallbackUrl(any(), any())).thenReturn(C2_DEBUG_UPLOAD_ENDPOINT); // when C2OperationAck result = testHandler.handle(c2Operation); @@ -200,7 +199,7 @@ public void testContentIsFilteredOut(String fileName, String filterKeyword, Stri Predicate testContentFilter = content -> !content.contains(filterKeyword); TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, singletonList(bundleFile), testContentFilter); C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); - when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(C2_DEBUG_UPLOAD_ENDPOINT)); + when(c2Client.getCallbackUrl(any(), any())).thenReturn(C2_DEBUG_UPLOAD_ENDPOINT); // when C2OperationAck result = testHandler.handle(c2Operation); diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java index 4a171dc8f147..f3754a0b424a 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java @@ -94,7 +94,7 @@ private static Stream invalidConstructorArguments() { @BeforeEach public void setup() { - lenient().when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(ASSET_URL)); + lenient().when(c2Client.getCallbackUrl(any(), any())).thenReturn(ASSET_URL); } @ParameterizedTest(name = "c2Client={0} operandPropertiesProvider={1} bundleFileList={2} contentFilter={3}") @@ -115,7 +115,7 @@ public void testOperationAndOperandTypesAreMatching() { public void testAssetUrlCanNotBeNull() { // given C2Operation operation = operation(null, ASSET_FILE_NAME, FORCE_DOWNLOAD); - when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.empty()); + when(c2Client.getCallbackUrl(any(), any())).thenThrow(new IllegalArgumentException()); // when C2OperationAck result = testHandler.handle(operation); diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java index d67d56cdc8d8..a358fbeb895c 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java @@ -22,6 +22,8 @@ import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.LOCATION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -72,7 +74,7 @@ void testHandleIncorrectArg() { C2Operation operation = new C2Operation(); operation.setArgs(INCORRECT_LOCATION_MAP); - when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(INCORRECT_LOCATION)); + when(client.getCallbackUrl(any(), any())).thenReturn(INCORRECT_LOCATION); C2OperationAck response = handler.handle(operation); @@ -81,10 +83,10 @@ void testHandleIncorrectArg() { @Test void testHandleFlowIdInArg() { - UpdateConfigurationStrategy successUpdate = flow -> true; + UpdateConfigurationStrategy successUpdate = mock(UpdateConfigurationStrategy.class); when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID); when(client.retrieveUpdateConfigurationContent(any())).thenReturn(Optional.of("content".getBytes())); - when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(INCORRECT_LOCATION)); + when(client.getCallbackUrl(any(), any())).thenReturn(INCORRECT_LOCATION); UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider); C2Operation operation = new C2Operation(); operation.setIdentifier(OPERATION_ID); @@ -103,7 +105,7 @@ void testHandleFlowIdInArg() { @Test void testHandleReturnsNoOperationWithNoContent() { when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID); - when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION)); + when(client.getCallbackUrl(any(), any())).thenReturn(CORRECT_LOCATION); UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, null, operandPropertiesProvider); C2Operation operation = new C2Operation(); operation.setArgs(CORRECT_LOCATION_MAP); @@ -116,10 +118,11 @@ void testHandleReturnsNoOperationWithNoContent() { @Test void testHandleReturnsNotAppliedWithContentApplyIssues() { - UpdateConfigurationStrategy failedToUpdate = flow -> false; + UpdateConfigurationStrategy failedToUpdate = mock(UpdateConfigurationStrategy.class); + doThrow(new IllegalStateException()).when(failedToUpdate).update(any()); when(flowIdHolder.getFlowId()).thenReturn("previous_flow_id"); when(client.retrieveUpdateConfigurationContent(any())).thenReturn(Optional.of("content".getBytes())); - when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION)); + when(client.getCallbackUrl(any(), any())).thenReturn(CORRECT_LOCATION); UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, failedToUpdate, operandPropertiesProvider); C2Operation operation = new C2Operation(); operation.setIdentifier(OPERATION_ID); @@ -133,9 +136,9 @@ void testHandleReturnsNotAppliedWithContentApplyIssues() { @Test void testHandleReturnsFullyApplied() { - UpdateConfigurationStrategy successUpdate = flow -> true; + UpdateConfigurationStrategy successUpdate = mock(UpdateConfigurationStrategy.class); when(flowIdHolder.getFlowId()).thenReturn("previous_flow_id"); - when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION)); + when(client.getCallbackUrl(any(), any())).thenReturn(CORRECT_LOCATION); when(client.retrieveUpdateConfigurationContent(any())).thenReturn(Optional.of("content".getBytes())); UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider); C2Operation operation = new C2Operation(); diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java index 1e9c1e5bf24a..7414b53c11da 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java @@ -42,6 +42,17 @@ public class C2OperationState implements Serializable { @Schema(description = "Additional details about the state") private String details; + @Schema(description = "Additional details about the cause of the failure") + private FailureCause failureCause; + + public FailureCause getFailureCause() { + return failureCause; + } + + public void setFailureCause(FailureCause failureCause) { + this.failureCause = failureCause; + } + public String getDetails() { return details; } @@ -76,12 +87,12 @@ public boolean equals(Object o) { return false; } C2OperationState that = (C2OperationState) o; - return state == that.state && Objects.equals(details, that.details); + return state == that.state && Objects.equals(details, that.details) && Objects.equals(failureCause, that.failureCause); } @Override public int hashCode() { - return Objects.hash(state, details); + return Objects.hash(state, details, failureCause); } @Override @@ -89,6 +100,7 @@ public String toString() { return "C2OperationState{" + "state=" + state + ", details='" + details + '\'' + + ", failureCause='" + failureCause + '\'' + '}'; } diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FailureCause.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FailureCause.java new file mode 100644 index 000000000000..56ef7ca1e42c --- /dev/null +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FailureCause.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.protocol.api; + +import java.util.List; +import java.util.Objects; +import org.apache.nifi.components.ValidationResult; + +public class FailureCause { + private List validationResults; + private String exceptionMessage; + private List causedByMessages; + + public List getValidationResults() { + return validationResults; + } + + public void setValidationResults(List validationResults) { + this.validationResults = validationResults; + } + + public String getExceptionMessage() { + return exceptionMessage; + } + + public void setExceptionMessage(String exceptionMessage) { + this.exceptionMessage = exceptionMessage; + } + + public List getCausedByMessages() { + return causedByMessages; + } + + public void setCausedByMessages(List causedByMessages) { + this.causedByMessages = causedByMessages; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FailureCause that = (FailureCause) o; + return Objects.equals(validationResults, that.validationResults) && Objects.equals(exceptionMessage, that.exceptionMessage) && Objects.equals(causedByMessages, that.causedByMessages); + } + + @Override + public int hashCode() { + return Objects.hash(validationResults, exceptionMessage, causedByMessages); + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java index 47504750be08..7c1e7bddc58b 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java @@ -54,6 +54,7 @@ import org.apache.nifi.minifi.commons.service.FlowEnrichService; import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor; import org.apache.nifi.minifi.commons.service.FlowSerDeService; +import org.apache.nifi.minifi.validator.ValidationException; import org.apache.nifi.services.FlowService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +92,7 @@ public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowSer } @Override - public boolean update(byte[] rawFlow) { + public void update(byte[] rawFlow) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8)); } @@ -116,22 +117,24 @@ public boolean update(byte[] rawFlow) { reloadFlow(findAllProposedConnectionIds(enrichedFlowCandidate.getRootGroup())); - return true; } catch (IllegalStateException e) { LOGGER.error("Configuration update failed. Reverting and reloading previous flow", e); revert(backupFlowConfigurationFile, flowConfigurationFile); revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile); try { reloadFlow(originalConnectionIds); - } catch (IOException ex) { - LOGGER.error("Unable to reload the reverted flow", e); + } catch (ValidationException ex) { + LOGGER.error("Unable to reload the reverted flow", ex); + throw ex; + } catch (Exception exception) { + throw new RuntimeException(exception); } - return false; + throw e; } catch (Exception e) { LOGGER.error("Configuration update failed. Reverting to previous flow, no reload is necessary", e); revert(backupFlowConfigurationFile, flowConfigurationFile); revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile); - return false; + throw new RuntimeException(e); } finally { removeIfExists(backupFlowConfigurationFile); removeIfExists(backupRawFlowConfigurationFile); @@ -148,7 +151,7 @@ private void reloadFlow(Set proposedConnectionIds) throws IOException { List validationErrors = validate(flowController.getFlowManager()); if (!validationErrors.isEmpty()) { LOGGER.error("Validation errors found when reloading the flow: {}", validationErrors); - throw new IllegalStateException("Unable to start flow due to validation errors"); + throw new ValidationException("Unable to start flow due to validation errors", validationErrors); } flowController.getFlowManager().getRootGroup().startProcessing(); diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategy.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategy.java index 7c1f497192b0..427ab573a6df 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategy.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategy.java @@ -73,7 +73,7 @@ public DefaultSyncResourceStrategy(ResourceRepository resourceRepository) { @Override public OperationState synchronizeResourceRepository(ResourcesGlobalHash c2GlobalHash, List c2ServerItems, BiFunction>, Optional> resourceDownloadFunction, - Function> urlEnrichFunction) { + Function urlEnrichFunction) { Set c2Items = Set.copyOf(c2ServerItems); Set agentItems = Set.copyOf(resourceRepository.findAllResourceItems()); @@ -89,7 +89,7 @@ public OperationState synchronizeResourceRepository(ResourcesGlobalHash c2Global private OperationState saveNewItems(Set c2Items, Set agentItems, BiFunction>, Optional> resourceDownloadFunction, - Function> urlEnrichFunction) { + Function urlEnrichFunction) { List newItems = c2Items.stream().filter(not(agentItems::contains)).toList(); if (newItems.isEmpty()) { return NO_OPERATION; @@ -107,10 +107,10 @@ private OperationState saveNewItems(Set c2Items, Set } private Function> downloadIfNotPresentAndAddToRepository( - BiFunction>, Optional> resourceDownloadFunction, Function> urlEnrichFunction) { + BiFunction>, Optional> resourceDownloadFunction, Function urlEnrichFunction) { return resourceItem -> resourceRepository.resourceItemBinaryPresent(resourceItem) ? resourceRepository.addResourceItem(resourceItem) - : urlEnrichFunction.apply(resourceItem.getUrl()) + : Optional.ofNullable(urlEnrichFunction.apply(resourceItem.getUrl())) .flatMap(enrichedUrl -> resourceDownloadFunction.apply(enrichedUrl, this::persistToTemporaryLocation)) .flatMap(tempResourcePath -> resourceRepository.addResourceItem(resourceItem, tempResourcePath)); } diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java index 87edfead9caf..6c2226f803f9 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java @@ -127,10 +127,9 @@ public void testFlowIsUpdatedAndBackupsAreClearedUp() throws IOException { when(mockFlowManager.getRootGroup()).thenReturn(mockProcessGroup); // when - boolean result = testUpdateConfigurationStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT); + testUpdateConfigurationStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT); //then - assertTrue(result); assertTrue(exists(flowConfigurationFile)); assertTrue(exists(rawFlowConfigurationFile)); assertArrayEquals(NEW_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile)); @@ -155,19 +154,20 @@ public void testFlowIsRevertedInCaseOfAnyErrorAndBackupsAreClearedUp() throws IO doThrow(new IOException()).when(mockFlowService).load(null); // when - boolean result = testUpdateConfigurationStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT); - - //then - assertFalse(result); - assertTrue(exists(flowConfigurationFile)); - assertTrue(exists(rawFlowConfigurationFile)); - assertArrayEquals(ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile)); - assertArrayEquals(ORIGINAL_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile)); - assertFalse(exists(backupFlowConfigurationFile)); - assertFalse(exists(backupRawFlowConfigurationFile)); - verify(mockFlowService, times(1)).load(null); - verify(mockFlowController, times(0)).onFlowInitialized(true); - verify(mockProcessGroup, times(0)).startProcessing(); + try { + testUpdateConfigurationStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT); + } catch (Exception e) { + //then + assertTrue(exists(flowConfigurationFile)); + assertTrue(exists(rawFlowConfigurationFile)); + assertArrayEquals(ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile)); + assertArrayEquals(ORIGINAL_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile)); + assertFalse(exists(backupFlowConfigurationFile)); + assertFalse(exists(backupRawFlowConfigurationFile)); + verify(mockFlowService, times(1)).load(null); + verify(mockFlowController, times(0)).onFlowInitialized(true); + verify(mockProcessGroup, times(0)).startProcessing(); + } } private void writeGzipFile(Path path, byte[] content) throws IOException { diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategyTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategyTest.java index 1ba84d747d4c..e59eeed402ee 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategyTest.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategyTest.java @@ -70,7 +70,7 @@ public class DefaultSyncResourceStrategyTest { (url, persistFunction) -> url.endsWith(FAIL_DOWNLOAD_URL) ? empty() : persistFunction.apply(new ByteArrayInputStream(url.getBytes())); private static String ENRICH_PREFIX = "pre_"; - private static final Function> PREFIXING_ENRICH_FUNCTION = url -> ofNullable(url).map(arg -> ENRICH_PREFIX + arg); + private static final Function PREFIXING_ENRICH_FUNCTION = url -> ofNullable(url).map(arg -> ENRICH_PREFIX + arg).orElse(""); @Mock private ResourceRepository mockResourceRepository;