Skip to content

Commit

Permalink
NIFI-13614: populate failure cause in operation state
Browse files Browse the repository at this point in the history
This closes apache#9136.

Signed-off-by: Ferenc Kis <[email protected]>
  • Loading branch information
KalmanJantner authored and briansolo1985 committed Sep 3, 2024
1 parent d1432d6 commit 5c3e22a
Show file tree
Hide file tree
Showing 25 changed files with 319 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public interface C2Client {
*
* @param absoluteUrl absolute url sent by C2 server
* @param relativeUrl relative url sent by C2 server
* @return an optional with content of finalised callback url
* @return finalised callback url
*/
Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl);
String getCallbackUrl(String absoluteUrl, String relativeUrl);
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public Optional<String> uploadBundle(String callbackUrl, byte[] bundle) {
}

@Override
public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) {
public String getCallbackUrl(String absoluteUrl, String relativeUrl) {
return c2UrlProvider.getCallbackUrl(absoluteUrl, relativeUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.nifi.c2.client.http.url;

import java.util.Optional;

public interface C2UrlProvider {

/**
Expand All @@ -42,5 +40,5 @@ public interface C2UrlProvider {
* @param relativeUrl relative url sent by the C2 server
* @return the url of the C2 server to send requests to
*/
Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl);
String getCallbackUrl(String absoluteUrl, String relativeUrl);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ public String getAcknowledgeUrl() {
}

@Override
public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) {
Optional<String> url = Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank);
if (!url.isPresent()) {
LOG.error("Provided absolute url was empty or null. Relative urls are not supported with this configuration");
}
return url;
public String getCallbackUrl(String absoluteUrl, String relativeUrl) {
return Optional.ofNullable(absoluteUrl)
.filter(StringUtils::isNotBlank)
.orElseThrow( () -> {
LOG.error("Provided absolute url was empty or null. Relative urls are not supported with this configuration");
throw new IllegalArgumentException("Provided absolute url was empty or null. Relative C2 urls are not supported with this configuration");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ public String getAcknowledgeUrl() {
}

@Override
public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) {
public String getCallbackUrl(String absoluteUrl, String relativeUrl) {
return Optional.ofNullable(relativeUrl)
.map(this::toAbsoluteUrl)
.filter(Optional::isPresent)
.orElseGet(() -> Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank));
.orElseGet(() -> Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank))
.orElseThrow(() -> new IllegalArgumentException("Unable to return non empty c2 url."));
}

private Optional<String> toAbsoluteUrl(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.nifi.c2.client.http.url;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.Optional;
import java.util.stream.Stream;
Expand All @@ -33,6 +34,9 @@ public class LegacyC2UrlProviderTest {

private static final String C2_HEARTBEAT_URL = "https://host:8080/c2/api/heartbeat";
private static final String C2_ACKNOWLEDGE_URL = "https://host:8080/c2/api/acknowledge";
private static final String ABSOLUTE_URL = "http://c2/api/callback";
private static final String RELATIVE_URL = "any_url";
private static final String EXPECTED_URL = "http://c2/api/callback";

@Test
public void testProviderIsCreatedAndReturnsProperHeartbeatAndAcknowledgeUrls() {
Expand All @@ -44,18 +48,23 @@ public void testProviderIsCreatedAndReturnsProperHeartbeatAndAcknowledgeUrls() {

@MethodSource("testCallbackUrlProvidedArguments")
@ParameterizedTest(name = "{index} => absoluteUrl={0}, relativeUrl={1}, expectedCallbackUrl={2}")
public void testCallbackUrlProvidedFor(String absoluteUrl, String relativeUrl, Optional<String> expectedCallbackUrl) {
public void testCallbackUrlProvidedForInvalidInputs(String absoluteUrl, String relativeUrl) {
LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL);
assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
assertThrows(IllegalArgumentException.class, () -> testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
}

@Test
public void testCallbackUrlProvidedFor() {
LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL);
assertEquals(EXPECTED_URL, testProvider.getCallbackUrl(ABSOLUTE_URL, RELATIVE_URL));
}

private static Stream<Arguments> testCallbackUrlProvidedArguments() {
return Stream.of(
Arguments.of(null, null, Optional.empty()),
Arguments.of(null, "any_url", Optional.empty()),
Arguments.of("", "", Optional.empty()),
Arguments.of("", "any_url", Optional.empty()),
Arguments.of("http://c2/api/callback", "any_url", Optional.of("http://c2/api/callback"))
Arguments.of("", "any_url", Optional.empty())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.nifi.c2.client.http.url;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;

import java.util.Optional;
Expand Down Expand Up @@ -83,16 +84,36 @@ private static Stream<Arguments> testValidProviderConstructorArguments() {

@MethodSource("testCallbackUrlProvidedArguments")
@ParameterizedTest(name = "{index} => c2RestBase={0}, absoluteUrl={1}, relativeUrl={2}, expectedCallbackUrl={3}")
public void testCallbackUrlProvidedFor(String c2RestBase, String absoluteUrl, String relativeUrl, Optional<String> expectedCallbackUrl) {
public void testCallbackUrlProvidedForValidInputs(String c2RestBase, String absoluteUrl, String relativeUrl, String expectedCallbackUrl) {
ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestBase, "any_path", "any_path");
assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
}

@MethodSource("testCallbackUrlProvidedInvalidArguments")
@ParameterizedTest(name = "{index} => c2RestBase={0}, absoluteUrl={1}, relativeUrl={2}, expectedCallbackUrl={3}")
public void testCallbackUrlProvidedForInvalidInputs(String c2RestBase, String absoluteUrl, String relativeUrl) {
ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestBase, "any_path", "any_path");
assertThrows(IllegalArgumentException.class, () -> testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
}

private static Stream<Arguments> testCallbackUrlProvidedArguments() {
String c2RestBaseNoTrailingSlash = "http://c2/api";
String c2RestBaseWithTrailingSlash = "http://c2/api/";
String path = "path/endpoint";
String absoluteUrl = "http://c2-other/api/path/endpoint";
return Stream.of(
Arguments.of(c2RestBaseNoTrailingSlash, null, path, c2RestBaseWithTrailingSlash + path),
Arguments.of(c2RestBaseNoTrailingSlash, "", "/" + path, c2RestBaseWithTrailingSlash + path),
Arguments.of(c2RestBaseWithTrailingSlash, null, path, c2RestBaseWithTrailingSlash + path),
Arguments.of(c2RestBaseWithTrailingSlash, "", "/" + path, c2RestBaseWithTrailingSlash + path),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, null, absoluteUrl),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, "", absoluteUrl)
);
}

private static Stream<Arguments> testCallbackUrlProvidedInvalidArguments() {
String c2RestBaseNoTrailingSlash = "http://c2/api";
String c2RestBaseWithTrailingSlash = "http://c2/api/";
return Stream.of(
Arguments.of(c2RestBaseNoTrailingSlash, null, null, Optional.empty()),
Arguments.of(c2RestBaseNoTrailingSlash, "", null, Optional.empty()),
Expand All @@ -101,13 +122,7 @@ private static Stream<Arguments> testCallbackUrlProvidedArguments() {
Arguments.of(c2RestBaseWithTrailingSlash, null, null, Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, "", null, Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, null, "", Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, "", "", Optional.empty()),
Arguments.of(c2RestBaseNoTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseNoTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseWithTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseWithTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, null, Optional.of(absoluteUrl)),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, "", Optional.of(absoluteUrl))
Arguments.of(c2RestBaseWithTrailingSlash, "", "", Optional.empty())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import static java.util.Optional.ofNullable;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.FailureCause;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.minifi.validator.ValidationException;

/**
* Handler interface for the different operation types
Expand Down Expand Up @@ -79,13 +83,38 @@ default boolean requiresRestart() {
* @param details additional status info to detail the state
* @return the created state
*/
default C2OperationState operationState(C2OperationState.OperationState operationState, String details) {
default C2OperationState operationState(C2OperationState.OperationState operationState, String details, Exception e) {
C2OperationState state = new C2OperationState();
state.setState(operationState);
state.setDetails(details);
ofNullable(e).map(this::toFailureCause).ifPresent(state::setFailureCause);
return state;
}

private FailureCause toFailureCause(Exception exception) {
FailureCause failureCause = new FailureCause();
failureCause.setExceptionMessage(exception.getMessage());
List<String> causeList = new LinkedList<>();
populateCausedChain(ofNullable(exception.getCause()), causeList);
failureCause.setCausedByMessages(causeList);
if (exception instanceof ValidationException validationException) {
failureCause.setValidationResults(validationException.getValidationResults());
}
return failureCause;
}

private List<String> populateCausedChain(Optional<Throwable> cause, List<String> causeList) {
cause.ifPresent(c -> {
causeList.add(c.getMessage());
populateCausedChain(cause.map(Throwable::getCause), causeList);
});
return causeList;
}

default C2OperationState operationState(C2OperationState.OperationState operationState, String details) {
return operationState(operationState, details, null);
}

/**
* Commonly used logic for creating an C2OperationAck object
*
Expand Down Expand Up @@ -122,9 +151,10 @@ default Optional<String> getOperationArg(C2Operation operation, String argument)
* @param serializer the serializer used to converting to the target class
* @return the optional retrieved and converted argument value
*/
default <T> Optional<T> getOperationArg(C2Operation operation, String argument, TypeReference<T> type, C2Serializer serializer) {
default <T> T getOperationArg(C2Operation operation, String argument, TypeReference<T> type, C2Serializer serializer) {
return ofNullable(operation.getArgs())
.map(args -> args.get(argument))
.flatMap(arg -> serializer.convert(arg, type));
.flatMap(arg -> serializer.convert(arg, type))
.orElseThrow(() -> new IllegalArgumentException("Failed to parse argument " + argument + " of operation " + operation));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
import static org.apache.nifi.c2.protocol.api.OperationType.DESCRIBE;

Expand All @@ -33,12 +34,16 @@
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DescribeManifestOperationHandler implements C2OperationHandler {

private static final String ERROR_MESSAGE = "Failed to execute manifest describe operation.";
private final C2HeartbeatFactory heartbeatFactory;
private final Supplier<RuntimeInfoWrapper> runtimeInfoSupplier;
private final OperandPropertiesProvider operandPropertiesProvider;
private static final Logger LOGGER = LoggerFactory.getLogger(DescribeManifestOperationHandler.class);

public DescribeManifestOperationHandler(C2HeartbeatFactory heartbeatFactory, Supplier<RuntimeInfoWrapper> runtimeInfoSupplier,
OperandPropertiesProvider operandPropertiesProvider) {
Expand All @@ -60,15 +65,20 @@ public OperandType getOperandType() {
@Override
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
C2OperationAck c2OperationAck;
try {
RuntimeInfoWrapper runtimeInfoWrapper = runtimeInfoSupplier.get();
C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoWrapper);

RuntimeInfoWrapper runtimeInfoWrapper = runtimeInfoSupplier.get();
C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoWrapper);

C2OperationAck c2OperationAck = operationAck(operationId, operationState(FULLY_APPLIED, EMPTY));
c2OperationAck.setAgentInfo(agentInfo(heartbeat, runtimeInfoWrapper));
c2OperationAck.setDeviceInfo(heartbeat.getDeviceInfo());
c2OperationAck.setFlowInfo(heartbeat.getFlowInfo());
c2OperationAck.setResourceInfo(heartbeat.getResourceInfo());
c2OperationAck = operationAck(operationId, operationState(FULLY_APPLIED, EMPTY));
c2OperationAck.setAgentInfo(agentInfo(heartbeat, runtimeInfoWrapper));
c2OperationAck.setDeviceInfo(heartbeat.getDeviceInfo());
c2OperationAck.setFlowInfo(heartbeat.getFlowInfo());
c2OperationAck.setResourceInfo(heartbeat.getResourceInfo());
} catch (Exception e) {
LOGGER.error(ERROR_MESSAGE, e);
c2OperationAck = operationAck(operationId, operationState(NOT_APPLIED, ERROR_MESSAGE, e));
}

return c2OperationAck;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
Expand Down Expand Up @@ -89,20 +88,24 @@ public Map<String, Object> getProperties() {
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);

Optional<ResourcesGlobalHash> resourcesGlobalHash = getOperationArg(operation, GLOBAL_HASH_FIELD, new TypeReference<>() {
}, c2Serializer);
if (resourcesGlobalHash.isEmpty()) {
ResourcesGlobalHash resourcesGlobalHash;
try {
resourcesGlobalHash = getOperationArg(operation, GLOBAL_HASH_FIELD, new TypeReference<>() { }, c2Serializer);
} catch (Exception e) {
LOG.error("Resources global hash could not be constructed from C2 request");
return operationAck(operationId, operationState(NOT_APPLIED, "Resources global hash element was not found"));
return operationAck(operationId, operationState(NOT_APPLIED, "Resources global hash element was not found", e));
}
Optional<List<ResourceItem>> resourceItems = getOperationArg(operation, RESOURCE_LIST_FIELD, new TypeReference<>() {
}, c2Serializer);
if (resourceItems.isEmpty()) {

List<ResourceItem> resourceItems;
try {
resourceItems = getOperationArg(operation, RESOURCE_LIST_FIELD, new TypeReference<>() { }, c2Serializer);

} catch (Exception e) {
LOG.error("Resource item list could not be constructed from C2 request");
return operationAck(operationId, operationState(NOT_APPLIED, "Resource item list element was not found"));
return operationAck(operationId, operationState(NOT_APPLIED, "Resource item list element was not found", e));
}

OperationState operationState = syncResourceStrategy.synchronizeResourceRepository(resourcesGlobalHash.get(), resourceItems.get(), c2Client::retrieveResourceItem,
OperationState operationState = syncResourceStrategy.synchronizeResourceRepository(resourcesGlobalHash, resourceItems, c2Client::retrieveResourceItem,
relativeUrl -> c2Client.getCallbackUrl(null, relativeUrl));
C2OperationState resultState = operationState(
operationState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ public interface SyncResourceStrategy {

OperationState synchronizeResourceRepository(ResourcesGlobalHash resourcesGlobalHash, List<ResourceItem> c2ServerItems,
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction,
Function<String, Optional<String>> urlEnrichFunction);
Function<String, String> urlEnrichFunction);
}
Loading

0 comments on commit 5c3e22a

Please sign in to comment.