Skip to content

Commit

Permalink
NIFI-12282 Updated TestRuntimeManifest using GetFile and ListFile
Browse files Browse the repository at this point in the history
  • Loading branch information
exceptionfactory committed Oct 27, 2023
1 parent 863e97a commit 8dfc29b
Showing 1 changed file with 86 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@
import org.apache.nifi.c2.protocol.component.api.ProcessorDefinition;
import org.apache.nifi.c2.protocol.component.api.PropertyDependency;
import org.apache.nifi.c2.protocol.component.api.PropertyDescriptor;
import org.apache.nifi.c2.protocol.component.api.PropertyResourceDefinition;
import org.apache.nifi.c2.protocol.component.api.Relationship;
import org.apache.nifi.c2.protocol.component.api.ReportingTaskDefinition;
import org.apache.nifi.c2.protocol.component.api.Restriction;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
import org.apache.nifi.c2.protocol.component.api.SchedulingDefaults;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.junit.jupiter.api.Test;
Expand All @@ -52,8 +49,6 @@

class TestRuntimeManifest {

public static final String LIST_HDFS_DEFAULT_SCHEDULE_TIME = "1 min";

private static final String REPORTING_TASK_DEFAULT_SCHEDULE_TIME = "60 sec";

@Test
Expand Down Expand Up @@ -93,96 +88,10 @@ void testRuntimeManifest() throws IOException {

final List<Bundle> bundles = runtimeManifest.getBundles();
assertNotNull(bundles);
assertTrue(bundles.size() > 0);

// Verify ListHDFS definition
final ProcessorDefinition listHdfsDefinition = getProcessorDefinition(bundles, "nifi-hadoop-nar", "org.apache.nifi.processors.hadoop.ListHDFS");
assertNotNull(listHdfsDefinition);
assertTrue(listHdfsDefinition.getPrimaryNodeOnly());
assertTrue(listHdfsDefinition.getTriggerSerially());
assertTrue(listHdfsDefinition.getTriggerWhenEmpty());
assertFalse(listHdfsDefinition.getSupportsBatching());
assertFalse(listHdfsDefinition.getSideEffectFree());
assertFalse(listHdfsDefinition.getTriggerWhenAnyDestinationAvailable());
assertFalse(listHdfsDefinition.getSupportsDynamicProperties());
assertFalse(listHdfsDefinition.getSupportsSensitiveDynamicProperties());
assertNull(listHdfsDefinition.getDynamicProperties());
assertFalse(listHdfsDefinition.getSupportsDynamicRelationships());
assertNull(listHdfsDefinition.getDynamicRelationship());
assertEquals(InputRequirement.Requirement.INPUT_FORBIDDEN, listHdfsDefinition.getInputRequirement());
assertTrue(listHdfsDefinition.isAdditionalDetails());
assertNull(listHdfsDefinition.getReadsAttributes());
assertNotNull(listHdfsDefinition.getWritesAttributes());
assertFalse(listHdfsDefinition.getWritesAttributes().isEmpty());
assertNotNull(listHdfsDefinition.getWritesAttributes().get(0).getName());
assertNotNull(listHdfsDefinition.getWritesAttributes().get(0).getDescription());
assertNotNull(listHdfsDefinition.getSeeAlso());
assertFalse(listHdfsDefinition.getSeeAlso().isEmpty());
assertNull(listHdfsDefinition.getSystemResourceConsiderations());
assertNull(listHdfsDefinition.getDeprecated());
assertNull(listHdfsDefinition.getDeprecationReason());
assertNull(listHdfsDefinition.getDeprecationAlternatives());

assertEquals("30 sec", listHdfsDefinition.getDefaultPenaltyDuration());
assertEquals("1 sec", listHdfsDefinition.getDefaultYieldDuration());
assertEquals("WARN", listHdfsDefinition.getDefaultBulletinLevel());

assertEquals(SchedulingStrategy.TIMER_DRIVEN.name(), listHdfsDefinition.getDefaultSchedulingStrategy());

final List<String> listHdfsSchedulingStrategies = listHdfsDefinition.getSupportedSchedulingStrategies();
assertNotNull(listHdfsSchedulingStrategies);
assertEquals(2, listHdfsSchedulingStrategies.size());
assertTrue(listHdfsSchedulingStrategies.contains(SchedulingStrategy.TIMER_DRIVEN.name()));
assertTrue(listHdfsSchedulingStrategies.contains(SchedulingStrategy.CRON_DRIVEN.name()));

final Map<String, Integer> listHdfsDefaultConcurrentTasks = listHdfsDefinition.getDefaultConcurrentTasksBySchedulingStrategy();
assertNotNull(listHdfsDefaultConcurrentTasks);
assertEquals(2, listHdfsDefaultConcurrentTasks.size());
assertEquals(SchedulingStrategy.TIMER_DRIVEN.getDefaultConcurrentTasks(), listHdfsDefaultConcurrentTasks.get(SchedulingStrategy.TIMER_DRIVEN.name()).intValue());
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultConcurrentTasks(), listHdfsDefaultConcurrentTasks.get(SchedulingStrategy.CRON_DRIVEN.name()).intValue());

final Map<String, String> listHdfsDefaultSchedulingPeriods = listHdfsDefinition.getDefaultSchedulingPeriodBySchedulingStrategy();
assertNotNull(listHdfsDefaultSchedulingPeriods);
assertEquals(2, listHdfsDefaultSchedulingPeriods.size());
assertEquals("1 min", listHdfsDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), listHdfsDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));

final List<Relationship> relationships = listHdfsDefinition.getSupportedRelationships();
assertNotNull(relationships);
assertEquals(1, relationships.size());
assertEquals("success", relationships.get(0).getName());
assertFalse(bundles.isEmpty());

final PropertyDescriptor configResourcesProp = getPropertyDescriptor(listHdfsDefinition, "Hadoop Configuration Resources");

final PropertyResourceDefinition resourceDefinition = configResourcesProp.getResourceDefinition();
assertNotNull(resourceDefinition);
assertEquals(ResourceCardinality.MULTIPLE, resourceDefinition.getCardinality());
assertNotNull(resourceDefinition.getResourceTypes());
assertEquals(1, resourceDefinition.getResourceTypes().size());
assertEquals(ResourceType.FILE, resourceDefinition.getResourceTypes().stream().findFirst().get());

assertNull(listHdfsDefinition.isRestricted());
assertNull(listHdfsDefinition.getRestrictedExplanation());
assertNull(listHdfsDefinition.getExplicitRestrictions());
assertNotNull(listHdfsDefinition.getStateful());
assertNotNull(listHdfsDefinition.getStateful().getDescription());
assertNotNull(listHdfsDefinition.getStateful().getScopes());
assertEquals(Scope.CLUSTER, listHdfsDefinition.getStateful().getScopes().stream().findFirst().get());

// Verify FetchHDFS definition has restrictions
final ProcessorDefinition fetchHdfsDefinition = getProcessorDefinition(bundles, "nifi-hadoop-nar",
"org.apache.nifi.processors.hadoop.FetchHDFS");
assertNotNull(fetchHdfsDefinition.isRestricted());
assertTrue(fetchHdfsDefinition.isRestricted());
assertFalse(fetchHdfsDefinition.isAdditionalDetails());

final Set<Restriction> restrictions = fetchHdfsDefinition.getExplicitRestrictions();
assertNotNull(restrictions);
assertEquals(1, restrictions.size());

final Restriction restriction = restrictions.stream().findFirst().orElse(null);
assertEquals(RequiredPermission.READ_DISTRIBUTED_FILESYSTEM.getPermissionLabel(), restriction.getRequiredPermission());
assertNotNull(restriction.getExplanation());
assertProcessorDefinitionFound(bundles);
assertRestrictionsFound(bundles);

// Verify ConsumeKafka_2_6 definition which has properties with dependencies
final ProcessorDefinition consumeKafkaDefinition = getProcessorDefinition(bundles, "nifi-kafka-2-6-nar",
Expand Down Expand Up @@ -235,10 +144,10 @@ void testRuntimeManifest() throws IOException {
assertEquals(SchedulingStrategy.TIMER_DRIVEN.getDefaultConcurrentTasks(), joltTransformDefaultConcurrentTasks.get(SchedulingStrategy.TIMER_DRIVEN.name()).intValue());
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultConcurrentTasks(), joltTransformDefaultConcurrentTasks.get(SchedulingStrategy.CRON_DRIVEN.name()).intValue());

final Map<String, String> joltTransformDefaultSchedulingPeriods = listHdfsDefinition.getDefaultSchedulingPeriodBySchedulingStrategy();
final Map<String, String> joltTransformDefaultSchedulingPeriods = joltTransformDef.getDefaultSchedulingPeriodBySchedulingStrategy();
assertNotNull(joltTransformDefaultSchedulingPeriods);
assertEquals(2, joltTransformDefaultSchedulingPeriods.size());
assertEquals(LIST_HDFS_DEFAULT_SCHEDULE_TIME, joltTransformDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
assertEquals("0 sec", joltTransformDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), joltTransformDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));

// Verify ExecuteSQL has readsAttributes
Expand Down Expand Up @@ -277,6 +186,87 @@ void testRuntimeManifest() throws IOException {
assertNotNull(splitJsonDef.getSystemResourceConsiderations().get(0).getDescription());
}

private void assertProcessorDefinitionFound(final List<Bundle> bundles) {
final ProcessorDefinition definition = getProcessorDefinition(bundles, "nifi-standard-nar", "org.apache.nifi.processors.standard.ListFile");
assertNotNull(definition);
assertFalse(definition.getPrimaryNodeOnly());
assertTrue(definition.getTriggerSerially());
assertFalse(definition.getTriggerWhenEmpty());
assertFalse(definition.getSupportsBatching());
assertFalse(definition.getSideEffectFree());
assertFalse(definition.getTriggerWhenAnyDestinationAvailable());
assertFalse(definition.getSupportsDynamicProperties());
assertFalse(definition.getSupportsSensitiveDynamicProperties());
assertNull(definition.getDynamicProperties());
assertFalse(definition.getSupportsDynamicRelationships());
assertNull(definition.getDynamicRelationship());
assertEquals(InputRequirement.Requirement.INPUT_FORBIDDEN, definition.getInputRequirement());
assertTrue(definition.isAdditionalDetails());
assertNull(definition.getReadsAttributes());
assertNotNull(definition.getWritesAttributes());
assertFalse(definition.getWritesAttributes().isEmpty());
assertNotNull(definition.getWritesAttributes().get(0).getName());
assertNotNull(definition.getWritesAttributes().get(0).getDescription());
assertNotNull(definition.getSeeAlso());
assertFalse(definition.getSeeAlso().isEmpty());
assertNull(definition.getSystemResourceConsiderations());
assertNull(definition.getDeprecated());
assertNull(definition.getDeprecationReason());
assertNull(definition.getDeprecationAlternatives());

assertEquals("30 sec", definition.getDefaultPenaltyDuration());
assertEquals("1 sec", definition.getDefaultYieldDuration());
assertEquals("WARN", definition.getDefaultBulletinLevel());

assertEquals(SchedulingStrategy.TIMER_DRIVEN.name(), definition.getDefaultSchedulingStrategy());

final List<String> schedulingStrategies = definition.getSupportedSchedulingStrategies();
assertNotNull(schedulingStrategies);
assertEquals(2, schedulingStrategies.size());
assertTrue(schedulingStrategies.contains(SchedulingStrategy.TIMER_DRIVEN.name()));
assertTrue(schedulingStrategies.contains(SchedulingStrategy.CRON_DRIVEN.name()));

final Map<String, Integer> defaultConcurrentTasks = definition.getDefaultConcurrentTasksBySchedulingStrategy();
assertNotNull(defaultConcurrentTasks);
assertEquals(2, defaultConcurrentTasks.size());
assertEquals(SchedulingStrategy.TIMER_DRIVEN.getDefaultConcurrentTasks(), defaultConcurrentTasks.get(SchedulingStrategy.TIMER_DRIVEN.name()).intValue());
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultConcurrentTasks(), defaultConcurrentTasks.get(SchedulingStrategy.CRON_DRIVEN.name()).intValue());

final Map<String, String> defaultSchedulingPeriods = definition.getDefaultSchedulingPeriodBySchedulingStrategy();
assertNotNull(defaultSchedulingPeriods);
assertEquals(2, defaultSchedulingPeriods.size());
assertEquals("1 min", defaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), defaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));

final List<Relationship> relationships = definition.getSupportedRelationships();
assertNotNull(relationships);
assertEquals(1, relationships.size());
assertEquals("success", relationships.get(0).getName());

assertNull(definition.isRestricted());
assertNull(definition.getRestrictedExplanation());
assertNull(definition.getExplicitRestrictions());
assertNotNull(definition.getStateful());
assertNotNull(definition.getStateful().getDescription());
assertNotNull(definition.getStateful().getScopes());
assertEquals(Set.of(Scope.LOCAL, Scope.CLUSTER), definition.getStateful().getScopes());
}

private void assertRestrictionsFound(final List<Bundle> bundles) {
final ProcessorDefinition processorDefinition = getProcessorDefinition(bundles, "nifi-standard-nar", "org.apache.nifi.processors.standard.GetFile");
assertNotNull(processorDefinition.isRestricted());
assertTrue(processorDefinition.isRestricted());
assertFalse(processorDefinition.isAdditionalDetails());

final Set<Restriction> restrictions = processorDefinition.getExplicitRestrictions();
assertNotNull(restrictions);
assertEquals(2, restrictions.size());

final Restriction restriction = restrictions.stream().findFirst().orElse(null);
assertEquals(RequiredPermission.READ_FILESYSTEM.getPermissionLabel(), restriction.getRequiredPermission());
assertNotNull(restriction.getExplanation());
}

private PropertyDescriptor getPropertyDescriptor(final ProcessorDefinition processorDefinition, final String propName) {
final Map<String, PropertyDescriptor> propertyDescriptors = processorDefinition.getPropertyDescriptors();
assertNotNull(propertyDescriptors);
Expand Down

0 comments on commit 8dfc29b

Please sign in to comment.