diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index ad003cb5ec1c..05850a602f16 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -26,6 +26,20 @@ nifi-utils + + + org.apache.nifi + nifi-oauth2-provider-api + + + org.apache.nifi + nifi-web-client-api + + + org.apache.nifi + nifi-web-client-provider-api + + org.apache.nifi nifi-service-utils @@ -52,10 +66,6 @@ org.apache.nifi nifi-record - - org.apache.nifi - nifi-oauth2-provider-api - org.apache.nifi nifi-proxy-configuration-api diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index d9e07843a76f..22571d02404e 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -80,6 +80,7 @@ import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; @@ -150,6 +151,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT; static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY; static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER; + static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER; static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the shared access policy. This policy must have Listen claims.") @@ -267,6 +269,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem .required(true) .dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.OAUTH2) .build(); + static final PropertyDescriptor BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("Storage Identity Federation Token Provider") + .description("Controller Service exchanging workload identity tokens for Azure AD access tokens when using Identity Federation with Azure Blob Storage.") + .identifiesControllerService(AzureIdentityFederationTokenProvider.class) + .required(true) + .dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION) + .build(); static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder() .name("Storage Account Key") .description("The Azure Storage account key to store event hub consumer group state.") @@ -326,6 +335,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem POLICY_PRIMARY_KEY, AUTHENTICATION_STRATEGY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, + EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, CONSUMER_GROUP, RECORD_READER, RECORD_WRITER, @@ -340,6 +350,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER, + BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER, PROXY_CONFIGURATION_SERVICE ); @@ -434,7 +445,6 @@ protected Collection customValidate(ValidationContext validati final String storageAccountKey = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); final String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue(); final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue()); - final boolean blobOauthProviderSet = validationContext.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).isSet(); if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) { results.add(new ValidationResult.Builder() @@ -447,10 +457,10 @@ protected Collection customValidate(ValidationContext validati if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) { final BlobStorageAuthenticationStrategy blobStorageAuthenticationStrategy = - validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY) - .asAllowableValue(BlobStorageAuthenticationStrategy.class); + validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class); if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY) { + // needed because of expression language support if (StringUtils.isBlank(storageAccountKey)) { results.add(new ValidationResult.Builder() .subject(STORAGE_ACCOUNT_KEY.getDisplayName()) @@ -461,18 +471,8 @@ protected Collection customValidate(ValidationContext validati .valid(false) .build()); } - - if (StringUtils.isNotBlank(storageSasToken)) { - results.add(new ValidationResult.Builder() - .subject(STORAGE_SAS_TOKEN.getDisplayName()) - .explanation("%s must not be set when %s is %s." - .formatted(STORAGE_SAS_TOKEN.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName())) - .valid(false) - .build()); - } } else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE) { + // needed because of expression language support if (StringUtils.isBlank(storageSasToken)) { results.add(new ValidationResult.Builder() .subject(STORAGE_SAS_TOKEN.getDisplayName()) @@ -483,53 +483,16 @@ protected Collection customValidate(ValidationContext validati .valid(false) .build()); } - - if (StringUtils.isNotBlank(storageAccountKey)) { - results.add(new ValidationResult.Builder() - .subject(STORAGE_ACCOUNT_KEY.getDisplayName()) - .explanation("%s must not be set when %s is %s." - .formatted(STORAGE_ACCOUNT_KEY.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName())) - .valid(false) - .build()); - } } else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.OAUTH2) { - if (!blobOauthProviderSet) { - results.add(new ValidationResult.Builder() - .subject(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName()) - .explanation("%s must be set when %s is %s." - .formatted(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName())) - .valid(false) - .build()); - } - - if (StringUtils.isNotBlank(storageAccountKey)) { - results.add(new ValidationResult.Builder() - .subject(STORAGE_ACCOUNT_KEY.getDisplayName()) - .explanation("%s must not be set when %s is %s." - .formatted(STORAGE_ACCOUNT_KEY.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName())) - .valid(false) - .build()); - } - - if (StringUtils.isNotBlank(storageSasToken)) { - results.add(new ValidationResult.Builder() - .subject(STORAGE_SAS_TOKEN.getDisplayName()) - .explanation("%s must not be set when %s is %s." - .formatted(STORAGE_SAS_TOKEN.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName())) - .valid(false) - .build()); - } + // Rely on required property + dependsOn validation to ensure provider is configured + } else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION) { + // Rely on required property + dependsOn validation to ensure provider is configured } } - results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, validationContext)); + + results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, + EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, validationContext)); + return results; } @@ -627,6 +590,14 @@ protected EventProcessorClient createClient(final ProcessContext context) { blobContainerClientBuilder.endpoint(endpoint); blobContainerClientBuilder.credential(tokenCredential); } + case IDENTITY_FEDERATION -> { + final AzureIdentityFederationTokenProvider tokenProvider = + context.getProperty(BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER).asControllerService(AzureIdentityFederationTokenProvider.class); + final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); + final String endpoint = createBlobEndpoint(storageAccountName, domainName); + blobContainerClientBuilder.endpoint(endpoint); + blobContainerClientBuilder.credential(tokenCredential); + } } blobContainerClientBuilder.containerName(containerName); @@ -682,6 +653,13 @@ protected EventProcessorClient createClient(final ProcessContext context) { final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); } + case IDENTITY_FEDERATION -> { + final AzureIdentityFederationTokenProvider tokenProvider = + context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); + eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); + } } final Integer prefetchCount = context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger(); @@ -921,7 +899,7 @@ private String createStorageConnectionString(final ProcessContext context, String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey, domainName); case SHARED_ACCESS_SIGNATURE -> String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, domainName, storageSasToken); - case OAUTH2 -> throw new IllegalArgumentException(String.format( + case OAUTH2, IDENTITY_FEDERATION -> throw new IllegalArgumentException(String.format( "Blob Storage Authentication Strategy %s does not support connection string authentication", blobStorageAuthenticationStrategy)); }; } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index d8dc8b4611f9..d19d15f82e46 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -60,6 +60,7 @@ import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.util.StopWatch; import java.time.Duration; @@ -116,6 +117,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT; static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY; static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER; + static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER; static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the shared access policy. This policy must have Listen claims.") @@ -172,6 +174,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub POLICY_PRIMARY_KEY, AUTHENTICATION_STRATEGY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, + EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, CONSUMER_GROUP, ENQUEUE_TIME, RECEIVER_FETCH_SIZE, @@ -209,7 +212,8 @@ public final List getSupportedPropertyDescriptors() { @Override protected Collection customValidate(ValidationContext context) { - return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context); + return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, + EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, context); } @OnPrimaryNodeStateChange @@ -435,6 +439,13 @@ private EventHubClientBuilder createEventHubClientBuilder(final ProcessContext c final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); } + case IDENTITY_FEDERATION -> { + final AzureIdentityFederationTokenProvider tokenProvider = + context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); + eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); + } } // Set Azure Event Hub Client Identifier using Processor Identifier instead of default random UUID diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java index d34b3a872d24..e7435f2e0857 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -53,6 +53,7 @@ import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; @@ -89,6 +90,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT; static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY; static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER; + static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER; static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the shared access policy. This policy must have Send claims.") @@ -133,6 +135,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub POLICY_PRIMARY_KEY, AUTHENTICATION_STRATEGY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, + EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, PARTITIONING_KEY_ATTRIBUTE_NAME, MAX_BATCH_SIZE, PROXY_CONFIGURATION_SERVICE @@ -171,7 +174,8 @@ public void closeClient() { @Override protected Collection customValidate(ValidationContext context) { - return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context); + return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, + EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, context); } @Override @@ -259,6 +263,13 @@ protected EventHubProducerClient createEventHubProducerClient(final ProcessConte final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); } + case IDENTITY_FEDERATION -> { + final AzureIdentityFederationTokenProvider tokenProvider = + context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); + eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); + } } AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions); return eventHubClientBuilder.buildProducerClient(); diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java index 566d08b0569f..80535b046487 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java @@ -28,6 +28,8 @@ import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; +import org.apache.nifi.services.azure.util.OAuth2AccessTokenAdapter; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; @@ -35,16 +37,12 @@ import java.net.InetSocketAddress; import java.net.Proxy; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; public final class AzureEventHubUtils { @@ -76,44 +74,21 @@ public final class AzureEventHubUtils { .required(true) .build(); - private static final long DEFAULT_TOKEN_LIFETIME_SECONDS = TimeUnit.MINUTES.toSeconds(5); - public static List customValidate(PropertyDescriptor accessPolicyDescriptor, PropertyDescriptor policyKeyDescriptor, - PropertyDescriptor tokenProviderDescriptor, + PropertyDescriptor oauth2TokenProviderDescriptor, + PropertyDescriptor identityFederationTokenProviderDescriptor, ValidationContext context) { List validationResults = new ArrayList<>(); - boolean accessPolicyIsSet = context.getProperty(accessPolicyDescriptor).isSet(); boolean policyKeyIsSet = context.getProperty(policyKeyDescriptor).isSet(); final AzureEventHubAuthenticationStrategy authenticationStrategy = Optional.ofNullable( - context.getProperty(AzureEventHubComponent.AUTHENTICATION_STRATEGY) - .asAllowableValue(AzureEventHubAuthenticationStrategy.class)) + context.getProperty(AzureEventHubComponent.AUTHENTICATION_STRATEGY).asAllowableValue(AzureEventHubAuthenticationStrategy.class)) .orElse(AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY); - final boolean tokenProviderIsSet = tokenProviderDescriptor != null && context.getProperty(tokenProviderDescriptor).isSet(); switch (authenticationStrategy) { - case MANAGED_IDENTITY -> { - if (accessPolicyIsSet || policyKeyIsSet) { - final String msg = String.format( - "When '%s' is set to '%s', '%s' and '%s' must not be set.", - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName(), - accessPolicyDescriptor.getDisplayName(), - policyKeyDescriptor.getDisplayName() - ); - validationResults.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); - } - if (tokenProviderIsSet) { - validationResults.add(new ValidationResult.Builder() - .subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName()) - .valid(false) - .explanation(String.format("'%s' must not be set when '%s' is '%s'.", - tokenProviderDescriptor.getDisplayName(), - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName())) - .build()); - } + case MANAGED_IDENTITY, OAUTH2, IDENTITY_FEDERATION -> { + // Rely on required property + dependsOn validation to ensure proper configuration } case SHARED_ACCESS_SIGNATURE -> { if (!accessPolicyIsSet || !policyKeyIsSet) { @@ -126,40 +101,9 @@ public static List customValidate(PropertyDescriptor accessPol ); validationResults.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); } - if (tokenProviderIsSet) { - validationResults.add(new ValidationResult.Builder() - .subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName()) - .valid(false) - .explanation(String.format("'%s' must not be set when '%s' is '%s'.", - tokenProviderDescriptor.getDisplayName(), - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName())) - .build()); - } - } - case OAUTH2 -> { - if (accessPolicyIsSet || policyKeyIsSet) { - final String msg = String.format( - "When '%s' is set to '%s', '%s' and '%s' must not be set.", - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName(), - accessPolicyDescriptor.getDisplayName(), - policyKeyDescriptor.getDisplayName() - ); - validationResults.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); - } - if (!tokenProviderIsSet) { - validationResults.add(new ValidationResult.Builder() - .subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName()) - .valid(false) - .explanation(String.format("'%s' must be set when '%s' is '%s'.", - tokenProviderDescriptor.getDisplayName(), - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName())) - .build()); - } } } + ProxyConfiguration.validateProxySpec(context, validationResults, AzureEventHubComponent.PROXY_SPECS); return validationResults; } @@ -214,23 +158,12 @@ public static Optional getProxyOptions(final PropertyContext prope public static TokenCredential createTokenCredential(final OAuth2AccessTokenProvider tokenProvider) { Objects.requireNonNull(tokenProvider, "OAuth2 Access Token Provider is required"); - return tokenRequestContext -> Mono.fromSupplier(() -> { - final org.apache.nifi.oauth2.AccessToken accessDetails = tokenProvider.getAccessDetails(); - final String accessToken = accessDetails.getAccessToken(); - - if (accessToken == null || accessToken.isBlank()) { - throw new IllegalStateException("OAuth2 Access Token Provider returned an empty access token"); - } - - final Instant fetchTime = accessDetails.getFetchTime(); - final long expiresInSeconds = accessDetails.getExpiresIn(); - final Instant expirationInstant = expiresInSeconds > 0 - ? fetchTime.plusSeconds(expiresInSeconds) - : fetchTime.plusSeconds(DEFAULT_TOKEN_LIFETIME_SECONDS); - final OffsetDateTime expiresAt = OffsetDateTime.ofInstant(expirationInstant, ZoneOffset.UTC); + return tokenRequestContext -> Mono.fromSupplier(() -> OAuth2AccessTokenAdapter.toAzureAccessToken(tokenProvider.getAccessDetails())); + } - return new com.azure.core.credential.AccessToken(accessToken, expiresAt); - }); + public static TokenCredential createTokenCredential(final AzureIdentityFederationTokenProvider tokenProvider) { + Objects.requireNonNull(tokenProvider, "Identity Federation Token Provider is required"); + return createTokenCredential((OAuth2AccessTokenProvider) tokenProvider); } private static Proxy getProxy(ProxyConfiguration proxyConfiguration) { diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java index ea4078dc42dc..8cdb1abc6c03 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java @@ -58,10 +58,12 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType; +import org.apache.nifi.services.azure.util.OAuth2AccessTokenAdapter; import reactor.core.publisher.Mono; import java.text.DecimalFormat; @@ -364,7 +366,11 @@ private static AzureStorageCredentialsService_v12 getCopyFromCredentialsService( private static HttpAuthorization getHttpAuthorization(final AzureStorageCredentialsDetails_v12 credentialsDetails) { switch (credentialsDetails.getCredentialsType()) { case ACCESS_TOKEN -> { - TokenCredential credential = tokenRequestContext -> Mono.just(credentialsDetails.getAccessToken()); + final AzureIdentityFederationTokenProvider identityTokenProvider = credentialsDetails.getIdentityTokenProvider(); + TokenCredential credential = identityTokenProvider != null + ? tokenRequestContext -> Mono.fromSupplier(() -> + OAuth2AccessTokenAdapter.toAzureAccessToken(identityTokenProvider.getAccessDetails())) + : tokenRequestContext -> Mono.just(credentialsDetails.getAccessToken()); return getHttpAuthorizationFromTokenCredential(credential); } case MANAGED_IDENTITY -> { diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java index 37c7429d747f..5afa5ff58232 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java @@ -39,8 +39,10 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AzureServiceEndpoints; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12; +import org.apache.nifi.services.azure.util.OAuth2AccessTokenAdapter; import reactor.core.publisher.Mono; import java.util.ArrayList; @@ -169,7 +171,11 @@ private void processCredentials(final QueueClientBuilder clientBuilder, final Az .build()); break; case ACCESS_TOKEN: - TokenCredential credential = tokenRequestContext -> Mono.just(storageCredentialsDetails.getAccessToken()); + final AzureIdentityFederationTokenProvider identityTokenProvider = storageCredentialsDetails.getIdentityTokenProvider(); + TokenCredential credential = identityTokenProvider != null + ? tokenRequestContext -> Mono.fromSupplier(() -> + OAuth2AccessTokenAdapter.toAzureAccessToken(identityTokenProvider.getAccessDetails())) + : tokenRequestContext -> Mono.just(storageCredentialsDetails.getAccessToken()); clientBuilder.credential(credential); break; } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java index 26030d307537..b8942591a688 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java @@ -25,6 +25,7 @@ import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; @@ -85,7 +86,8 @@ public final class AzureStorageUtils { AzureStorageCredentialsType.ACCOUNT_KEY, AzureStorageCredentialsType.SAS_TOKEN, AzureStorageCredentialsType.MANAGED_IDENTITY, - AzureStorageCredentialsType.SERVICE_PRINCIPAL)) + AzureStorageCredentialsType.SERVICE_PRINCIPAL, + AzureStorageCredentialsType.ACCESS_TOKEN)) .defaultValue(AzureStorageCredentialsType.SAS_TOKEN) .build(); @@ -252,6 +254,14 @@ public final class AzureStorageUtils { .dependsOn(CREDENTIALS_TYPE, AzureStorageCredentialsType.SERVICE_PRINCIPAL) .build(); + public static final PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("Azure Identity Federation Token Provider") + .description("Controller Service that exchanges workload identity tokens for Azure AD access tokens.") + .identifiesControllerService(AzureIdentityFederationTokenProvider.class) + .required(true) + .dependsOn(CREDENTIALS_TYPE, AzureStorageCredentialsType.ACCESS_TOKEN) + .build(); + private AzureStorageUtils() { // do not instantiate } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java index fb811f6de29d..dfd3b9f35602 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java @@ -28,7 +28,9 @@ import com.azure.storage.blob.BlobServiceClientBuilder; import com.azure.storage.common.StorageSharedKeyCredential; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12; +import org.apache.nifi.services.azure.util.OAuth2AccessTokenAdapter; import reactor.core.publisher.Mono; public class BlobServiceClientFactory extends AbstractStorageClientFactory { @@ -73,11 +75,15 @@ private void configureCredential(final BlobServiceClientBuilder clientBuilder, f .clientSecret(credentialsDetails.getServicePrincipalClientSecret()) .httpClient(new NettyAsyncHttpClientBuilder() .proxy(credentialsDetails.getProxyOptions()) - .build()) + .build()) .build()); break; case ACCESS_TOKEN: - TokenCredential credential = tokenRequestContext -> Mono.just(credentialsDetails.getAccessToken()); + final AzureIdentityFederationTokenProvider identityTokenProvider = credentialsDetails.getIdentityTokenProvider(); + final TokenCredential credential = identityTokenProvider != null + ? tokenRequestContext -> Mono.fromSupplier(() -> + OAuth2AccessTokenAdapter.toAzureAccessToken(identityTokenProvider.getAccessDetails())) + : tokenRequestContext -> Mono.just(credentialsDetails.getAccessToken()); clientBuilder.credential(credential); break; } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java index bebf1cebd954..295e86f29c57 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java @@ -32,6 +32,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; +import org.apache.nifi.services.azure.util.OAuth2AccessTokenAdapter; import reactor.core.publisher.Mono; public class DataLakeServiceClientFactory extends AbstractStorageClientFactory { @@ -46,6 +48,7 @@ protected DataLakeServiceClient createStorageClient(ADLSCredentialsDetails crede final String accountKey = credentialsDetails.getAccountKey(); final String sasToken = credentialsDetails.getSasToken(); final AccessToken accessToken = credentialsDetails.getAccessToken(); + final AzureIdentityFederationTokenProvider identityTokenProvider = credentialsDetails.getIdentityTokenProvider(); final String endpointSuffix = credentialsDetails.getEndpointSuffix(); final boolean useManagedIdentity = credentialsDetails.getUseManagedIdentity(); final String managedIdentityClientId = credentialsDetails.getManagedIdentityClientId(); @@ -64,6 +67,10 @@ protected DataLakeServiceClient createStorageClient(ADLSCredentialsDetails crede dataLakeServiceClientBuilder.credential(credential); } else if (StringUtils.isNotBlank(sasToken)) { dataLakeServiceClientBuilder.sasToken(sasToken); + } else if (identityTokenProvider != null) { + final TokenCredential credential = tokenRequestContext -> Mono.fromSupplier(() -> + OAuth2AccessTokenAdapter.toAzureAccessToken(identityTokenProvider.getAccessDetails())); + dataLakeServiceClientBuilder.credential(credential); } else if (accessToken != null) { final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken); dataLakeServiceClientBuilder.credential(credential); diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java index 7a872ebf9739..4a8577a85f9d 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java @@ -30,6 +30,8 @@ import org.apache.nifi.migration.PropertyConfiguration; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.util.OAuth2AccessTokenAdapter; +import reactor.core.publisher.Mono; import java.util.List; @@ -47,12 +49,15 @@ public class StandardAzureCredentialsControllerService extends AbstractControlle public static AllowableValue MANAGED_IDENTITY = new AllowableValue("managed-identity", "Managed Identity", "Azure Virtual Machine Managed Identity (it can only be used when NiFi is running on Azure)"); + public static AllowableValue OAUTH2 = new AllowableValue("oauth2-access-token", + "OAuth2 Access Token", + "Uses an OAuth2 Access Token Provider controller service to obtain access tokens for Azure clients."); public static final PropertyDescriptor CREDENTIAL_CONFIGURATION_STRATEGY = new PropertyDescriptor.Builder() .name("Credential Configuration Strategy") .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .sensitive(false) - .allowableValues(DEFAULT_CREDENTIAL, MANAGED_IDENTITY) + .allowableValues(DEFAULT_CREDENTIAL, MANAGED_IDENTITY, OAUTH2) .defaultValue(DEFAULT_CREDENTIAL) .build(); @@ -67,9 +72,18 @@ public class StandardAzureCredentialsControllerService extends AbstractControlle .dependsOn(CREDENTIAL_CONFIGURATION_STRATEGY, MANAGED_IDENTITY) .build(); + public static final PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("Azure Identity Federation Token Provider") + .description("Controller Service used to obtain Azure access tokens via workload identity federation.") + .identifiesControllerService(AzureIdentityFederationTokenProvider.class) + .required(true) + .dependsOn(CREDENTIAL_CONFIGURATION_STRATEGY, OAUTH2) + .build(); + private static final List PROPERTY_DESCRIPTORS = List.of( CREDENTIAL_CONFIGURATION_STRATEGY, - MANAGED_IDENTITY_CLIENT_ID + MANAGED_IDENTITY_CLIENT_ID, + OAUTH2_ACCESS_TOKEN_PROVIDER ); private TokenCredential credentials; @@ -92,6 +106,8 @@ public void onConfigured(final ConfigurationContext context) { credentials = getDefaultAzureCredential(); } else if (MANAGED_IDENTITY.getValue().equals(configurationStrategy)) { credentials = getManagedIdentityCredential(context); + } else if (OAUTH2.getValue().equals(configurationStrategy)) { + credentials = getOAuth2Credential(context); } else { final String errorMsg = String.format("Configuration Strategy [%s] not recognized", configurationStrategy); getLogger().error(errorMsg); @@ -117,6 +133,13 @@ private TokenCredential getManagedIdentityCredential(final ConfigurationContext .build(); } + private TokenCredential getOAuth2Credential(final ConfigurationContext context) { + final AzureIdentityFederationTokenProvider oauth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + return tokenRequestContext -> Mono.fromSupplier(() -> + OAuth2AccessTokenAdapter.toAzureAccessToken(oauth2AccessTokenProvider.getAccessDetails())); + } + @Override public String toString() { return "StandardAzureCredentialsControllerService[id=" + getIdentifier() + "]"; diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureIdentityFederationTokenProvider.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureIdentityFederationTokenProvider.java new file mode 100644 index 000000000000..d35fc521f330 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureIdentityFederationTokenProvider.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.VerifiableControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.oauth2.AccessToken; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.WebClientService; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Controller Service that exchanges an external identity token for an Azure AD access token + * using the Microsoft Entra workload identity federation flow. + */ +@Tags({ "azure", "oauth2", "identity", "federation", "credentials" }) +@CapabilityDescription("Exchanges workload identity tokens for Azure AD access tokens suitable for accessing Azure services.") +public class StandardAzureIdentityFederationTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider, VerifiableControllerService { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String DEFAULT_SCOPE = "https://storage.azure.com/.default"; + private static final String FORM_CONTENT_TYPE = "application/x-www-form-urlencoded"; + private static final String ACCEPT_HEADER = "application/json"; + private static final String HEADER_CONTENT_TYPE = "Content-Type"; + private static final String HEADER_ACCEPT = "Accept"; + private static final String PARAM_GRANT_TYPE = "grant_type"; + private static final String PARAM_CLIENT_ID = "client_id"; + private static final String PARAM_SCOPE = "scope"; + private static final String PARAM_CLIENT_ASSERTION_TYPE = "client_assertion_type"; + private static final String PARAM_CLIENT_ASSERTION = "client_assertion"; + private static final String GRANT_TYPE_CLIENT_CREDENTIALS = "client_credentials"; + private static final String CLIENT_ASSERTION_TYPE_BEARER = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer"; + private static final String ERROR_EXCHANGE_FAILED = "Failed to exchange workload identity token: %s"; + private static final String ERROR_READ_RESPONSE = "Failed reading response from Azure token endpoint"; + private static final String ERROR_NO_ACCESS_TOKEN = "Azure token endpoint response did not contain an access_token"; + private static final String STEP_EXCHANGE_TOKEN = "Exchange workload identity token"; + private static final String FIELD_SCOPE = "scope"; + private static final String FIELD_TOKEN_TYPE = "token_type"; + private static final String FIELD_EXPIRES_IN = "expires_in"; + + public static final PropertyDescriptor TENANT_ID = new PropertyDescriptor.Builder() + .name("Tenant ID") + .description("Microsoft Entra tenant ID. Used to build the token endpoint when an explicit endpoint is not configured.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder() + .name("Client ID") + .description("Application (client) ID of the Microsoft Entra application registration configured for workload identity federation.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor SCOPE = new PropertyDescriptor.Builder() + .name("Scope") + .description("OAuth2 scope requested from Azure AD. Defaults to https://storage.azure.com/.default for Azure Storage access.") + .required(true) + .defaultValue(DEFAULT_SCOPE) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor TOKEN_ENDPOINT = new PropertyDescriptor.Builder() + .name("Token Endpoint") + .description("Azure AD OAuth2 token endpoint. When not set, defaults to https://login.microsoftonline.com//oauth2/v2.0/token.") + .required(false) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + public static final PropertyDescriptor CLIENT_ASSERTION_PROVIDER = new PropertyDescriptor.Builder() + .name("Client Assertion Provider") + .description("Controller Service that retrieves the external workload identity token (client assertion) exchanged with Azure AD.") + .identifiesControllerService(OAuth2AccessTokenProvider.class) + .required(true) + .build(); + + public static final PropertyDescriptor WEB_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("Web Client Service") + .description("Controller Service that provides the HTTP client used for exchanging tokens with Azure AD.") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + private static final List DESCRIPTORS = List.of( + TENANT_ID, + CLIENT_ID, + SCOPE, + TOKEN_ENDPOINT, + CLIENT_ASSERTION_PROVIDER, + WEB_CLIENT_SERVICE + ); + + private volatile WebClientServiceProvider webClientServiceProvider; + private volatile OAuth2AccessTokenProvider clientAssertionProvider; + private volatile String clientId; + private volatile String scope; + private volatile String tokenEndpoint; + + @Override + protected List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE).asControllerService(WebClientServiceProvider.class); + this.clientAssertionProvider = context.getProperty(CLIENT_ASSERTION_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class); + this.clientId = context.getProperty(CLIENT_ID).getValue(); + this.scope = context.getProperty(SCOPE).getValue(); + this.tokenEndpoint = resolveTokenEndpoint(context); + } + + @Override + public AccessToken getAccessDetails() { + return exchangeAccessToken(webClientServiceProvider, clientAssertionProvider, clientId, scope, tokenEndpoint); + } + + @Override + public void refreshAccessDetails() { + clientAssertionProvider.refreshAccessDetails(); + } + + @Override + public List verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map variables) { + final ConfigVerificationResult.Builder resultBuilder = new ConfigVerificationResult.Builder() + .verificationStepName(STEP_EXCHANGE_TOKEN); + + try { + final WebClientServiceProvider verificationWebClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE) + .asControllerService(WebClientServiceProvider.class); + final OAuth2AccessTokenProvider verificationClientAssertionProvider = context.getProperty(CLIENT_ASSERTION_PROVIDER) + .asControllerService(OAuth2AccessTokenProvider.class); + final String verificationClientId = context.getProperty(CLIENT_ID).getValue(); + final String verificationScope = context.getProperty(SCOPE).getValue(); + final String verificationEndpoint = resolveTokenEndpoint(context); + + exchangeAccessToken(verificationWebClientServiceProvider, verificationClientAssertionProvider, + verificationClientId, verificationScope, verificationEndpoint); + + return Collections.singletonList(resultBuilder + .outcome(Outcome.SUCCESSFUL) + .explanation("Successfully exchanged workload identity token for an Azure AD access token") + .build()); + } catch (final Exception e) { + final String explanation = String.format(ERROR_EXCHANGE_FAILED, e.getMessage()); + verificationLogger.error(explanation, e); + return Collections.singletonList(resultBuilder + .outcome(Outcome.FAILED) + .explanation(explanation) + .build()); + } + } + + private AccessToken exchangeAccessToken(final WebClientServiceProvider webClientServiceProvider, + final OAuth2AccessTokenProvider assertionProvider, + final String clientId, + final String scope, + final String tokenEndpoint) { + + final AccessToken clientAssertionAccessToken = assertionProvider.getAccessDetails(); + + if (clientAssertionAccessToken == null || StringUtils.isBlank(clientAssertionAccessToken.getAccessToken())) { + throw new ProcessException("Client assertion provider returned no access token"); + } + + final WebClientService webClientService = webClientServiceProvider.getWebClientService(); + final URI tokenUri = URI.create(tokenEndpoint); + final String requestBody = buildRequestBody(clientId, scope, clientAssertionAccessToken.getAccessToken()); + + final HttpResponseEntity responseEntity = webClientService.post() + .uri(tokenUri) + .header(HEADER_CONTENT_TYPE, FORM_CONTENT_TYPE) + .header(HEADER_ACCEPT, ACCEPT_HEADER) + .body(requestBody) + .retrieve(); + + try (responseEntity) { + final int statusCode = responseEntity.statusCode(); + final String responseBody = IOUtils.toString(responseEntity.body(), StandardCharsets.UTF_8); + if (statusCode < 200 || statusCode >= 300) { + throw new ProcessException(String.format( + "Azure token endpoint %s returned status %d: %s", + tokenUri, statusCode, responseBody)); + } + + return parseAccessToken(responseBody); + } catch (final IOException e) { + throw new ProcessException(ERROR_READ_RESPONSE, e); + } + } + + private AccessToken parseAccessToken(final String responseBody) throws IOException { + final JsonNode responseNode = OBJECT_MAPPER.readTree(responseBody); + final String accessTokenValue = responseNode.path("access_token").asText(null); + if (StringUtils.isBlank(accessTokenValue)) { + throw new ProcessException(ERROR_NO_ACCESS_TOKEN); + } + + final AccessToken accessToken = new AccessToken(); + accessToken.setAccessToken(accessTokenValue); + + final JsonNode expiresInNode = responseNode.path(FIELD_EXPIRES_IN); + final long expiresIn = parseExpiresIn(expiresInNode); + if (expiresIn > 0) { + accessToken.setExpiresIn(expiresIn); + } + + final String tokenType = responseNode.path(FIELD_TOKEN_TYPE).asText(null); + if (StringUtils.isNotBlank(tokenType)) { + accessToken.setTokenType(tokenType); + } + + final String scopeValue = responseNode.path(FIELD_SCOPE).asText(null); + if (StringUtils.isNotBlank(scopeValue)) { + accessToken.setScope(scopeValue); + } + + return accessToken; + } + + private long parseExpiresIn(final JsonNode expiresInNode) { + if (expiresInNode == null || expiresInNode.isMissingNode()) { + return 0; + } + + if (expiresInNode.isNumber()) { + return expiresInNode.asLong(); + } + + if (expiresInNode.isTextual()) { + final String expiresInText = expiresInNode.asText(); + if (StringUtils.isNotBlank(expiresInText)) { + try { + return Long.parseLong(expiresInText); + } catch (final NumberFormatException e) { + throw new ProcessException(String.format("Azure token endpoint returned invalid expires_in value [%s]", expiresInText), e); + } + } + } + + return 0; + } + + private static String buildRequestBody(final String clientId, final String scope, final String clientAssertion) { + final StringBuilder builder = new StringBuilder(); + appendFormParameter(builder, PARAM_GRANT_TYPE, GRANT_TYPE_CLIENT_CREDENTIALS); + appendFormParameter(builder, PARAM_CLIENT_ID, clientId); + appendFormParameter(builder, PARAM_SCOPE, scope); + appendFormParameter(builder, PARAM_CLIENT_ASSERTION_TYPE, CLIENT_ASSERTION_TYPE_BEARER); + appendFormParameter(builder, PARAM_CLIENT_ASSERTION, clientAssertion); + return builder.toString(); + } + + private static void appendFormParameter(final StringBuilder builder, final String name, final String value) { + Objects.requireNonNull(name, "Form parameter name required"); + if (builder.length() > 0) { + builder.append('&'); + } + builder.append(encode(name)).append('=').append(encode(value)); + } + + private static String encode(final String value) { + return URLEncoder.encode(value == null ? "" : value, StandardCharsets.UTF_8); + } + + private String resolveTokenEndpoint(final ConfigurationContext context) { + final String configuredEndpoint = context.getProperty(TOKEN_ENDPOINT).getValue(); + if (StringUtils.isNotBlank(configuredEndpoint)) { + return configuredEndpoint; + } + + final String tenantId = context.getProperty(TENANT_ID).getValue(); + return String.format("https://login.microsoftonline.com/%s/oauth2/v2.0/token", tenantId); + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java index 83737ee58177..fc507e45d18b 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java @@ -27,6 +27,8 @@ import org.apache.nifi.migration.PropertyConfiguration; import org.apache.nifi.processors.azure.AzureServiceEndpoints; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; +import org.apache.nifi.services.azure.util.OAuth2AccessTokenAdapter; import java.util.List; import java.util.Map; @@ -87,6 +89,7 @@ public class ADLSCredentialsControllerService extends AbstractControllerService SERVICE_PRINCIPAL_TENANT_ID, SERVICE_PRINCIPAL_CLIENT_ID, SERVICE_PRINCIPAL_CLIENT_SECRET, + AzureStorageUtils.OAUTH2_ACCESS_TOKEN_PROVIDER, PROXY_CONFIGURATION_SERVICE ); @@ -150,6 +153,13 @@ public ADLSCredentialsDetails getCredentialsDetails(Map attribut setValue(credentialsBuilder, SERVICE_PRINCIPAL_CLIENT_ID, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setServicePrincipalClientId, attributes); setValue(credentialsBuilder, SERVICE_PRINCIPAL_CLIENT_SECRET, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setServicePrincipalClientSecret, attributes); + if (context.getProperty(CREDENTIALS_TYPE).asAllowableValue(AzureStorageCredentialsType.class) == AzureStorageCredentialsType.ACCESS_TOKEN) { + final AzureIdentityFederationTokenProvider oauth2AccessTokenProvider = context.getProperty(AzureStorageUtils.OAUTH2_ACCESS_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + credentialsBuilder.setIdentityTokenProvider(oauth2AccessTokenProvider); + credentialsBuilder.setAccessToken(OAuth2AccessTokenAdapter.toAzureAccessToken(oauth2AccessTokenProvider.getAccessDetails())); + } + credentialsBuilder.setProxyOptions(AzureStorageUtils.getProxyOptions(context)); return credentialsBuilder.build(); diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java index fd5d6db8e439..d4934585138b 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java @@ -26,6 +26,8 @@ import org.apache.nifi.migration.PropertyConfiguration; import org.apache.nifi.processors.azure.AzureServiceEndpoints; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; +import org.apache.nifi.services.azure.util.OAuth2AccessTokenAdapter; import java.util.List; import java.util.Map; @@ -38,6 +40,7 @@ import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_ID; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_SECRET; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_TENANT_ID; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.OAUTH2_ACCESS_TOKEN_PROVIDER; /** * Provides credentials details for Azure Storage processors @@ -68,6 +71,7 @@ public class AzureStorageCredentialsControllerService_v12 extends AbstractContro SERVICE_PRINCIPAL_TENANT_ID, SERVICE_PRINCIPAL_CLIENT_ID, SERVICE_PRINCIPAL_CLIENT_SECRET, + OAUTH2_ACCESS_TOKEN_PROVIDER, PROXY_CONFIGURATION_SERVICE ); @@ -106,9 +110,17 @@ public AzureStorageCredentialsDetails_v12 getCredentialsDetails(Map 0 + ? fetchTime.plusSeconds(expiresIn) + : fetchTime.plusSeconds(DEFAULT_EXPIRATION_OFFSET_SECONDS); + + final OffsetDateTime expirationTime = OffsetDateTime.ofInstant(expirationInstant, ZoneOffset.UTC); + return new AccessToken(tokenValue, expirationTime); + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java index 88398e82ec65..efcd1cf1efa8 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java @@ -24,7 +24,8 @@ public enum AzureEventHubAuthenticationStrategy implements DescribedValue { MANAGED_IDENTITY("Managed Identity", "Authenticate using the Managed Identity of the hosting Azure resource."), SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate using the Shared Access Policy name and key."), - OAUTH2("OAuth2", "Authenticate using an OAuth2 Access Token Provider backed by an Entra registered application."); + OAUTH2("OAuth2", "Authenticate using an OAuth2 Access Token Provider backed by an Entra registered application."), + IDENTITY_FEDERATION("Identity Federation", "Authenticate using a workload identity token exchanged for an Azure AD access token."); private final String displayName; private final String description; diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java index caf503f02411..28f738725da5 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java @@ -22,6 +22,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxySpec; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; /** * Azure Event Hub Component interface with shared properties @@ -52,6 +53,14 @@ public interface AzureEventHubComponent { .expressionLanguageSupported(ExpressionLanguageScope.NONE) .dependsOn(AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.OAUTH2) .build(); + PropertyDescriptor IDENTITY_FEDERATION_TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("Event Hubs Identity Federation Token Provider") + .description("Controller Service exchanging workload identity tokens for Azure AD access tokens when authenticating to Azure Event Hubs.") + .identifiesControllerService(AzureIdentityFederationTokenProvider.class) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .dependsOn(AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION) + .build(); ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.HTTP_AUTH}; PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new PropertyDescriptor.Builder() .fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(PROXY_SPECS)) diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java index 3ab03cc3f614..121f66339a4f 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java @@ -21,7 +21,8 @@ public enum BlobStorageAuthenticationStrategy implements DescribedValue { STORAGE_ACCOUNT_KEY("Storage Account Key", "Authenticate to Azure Blob Storage using the account key."), SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate to Azure Blob Storage using a SAS token."), - OAUTH2("OAuth2", "Authenticate to Azure Blob Storage using an OAuth2 Access Token Provider backed by an Entra registered application."); + OAUTH2("OAuth2", "Authenticate to Azure Blob Storage using an OAuth2 Access Token Provider backed by an Entra registered application."), + IDENTITY_FEDERATION("Identity Federation", "Authenticate to Azure Blob Storage using a workload identity token exchanged for an Azure AD access token."); private final String displayName; private final String description; diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 2823c4f3fcdc..e76b973a7817 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -21,5 +21,6 @@ org.apache.nifi.services.azure.data.explorer.StandardKustoIngestService org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12 org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12 org.apache.nifi.services.azure.StandardAzureCredentialsControllerService +org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider org.apache.nifi.services.azure.storage.AzureBlobStorageFileResourceService org.apache.nifi.services.azure.storage.AzureDataLakeStorageFileResourceService diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider/additionalDetails.md b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider/additionalDetails.md new file mode 100644 index 000000000000..b4544ba6ad65 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider/additionalDetails.md @@ -0,0 +1,52 @@ + + +# StandardAzureIdentityFederationTokenProvider + +The *StandardAzureIdentityFederationTokenProvider* exchanges workload identity tokens from external identity providers for Azure AD access tokens. Components such as the ADLS and Azure Storage credentials controller services reference it when the **Credentials Type** is set to **Access Token**. + +> **Note**: Microsoft Entra requires a single resource (`*.default`) per client credentials request. Configure one controller service per Azure resource you need to access. + + +## Configuration workflow + +1. **Client Assertion Provider** – Select a controller service that retrieves the external workload identity token. The token is passed to Azure AD as the `client_assertion` parameter. +2. **Tenant ID** and **Client ID** – Provide the Microsoft Entra tenant and application (client) ID for the federated app registration. +3. **Scope** – Defaults to `https://storage.azure.com/.default`. Adjust to match the resource you are targeting; Azure AD only allows a single resource (`*.default`) per token request. +4. **Web Client Service** – Choose a `WebClientServiceProvider` (such as `StandardWebClientServiceProvider`) that handles HTTP requests. + +At runtime the service submits a `client_credentials` request to `https://login.microsoftonline.com//oauth2/v2.0/token` unless a custom token endpoint is supplied. The returned Azure AD access token is propagated to the calling component. + +Ensure the federated app registration has the necessary Azure RBAC roles (for example *Storage Blob Data Contributor* and *Azure Event Hubs Data Receiver/Sender* as appropriate) and that the client assertion provider refreshes assertions before they expire so new Azure access tokens can be obtained. Create separate controller service instances if you need tokens for different Azure resources. + +## Event Hub components + +- `GetAzureEventHub`, `PutAzureEventHub`, and `ConsumeAzureEventHub` support the **Identity Federation** authentication strategy for Event Hubs connections. Configure this controller service with a scope such as `https://eventhubs.azure.net/.default`. +- `ConsumeAzureEventHub` also supports Identity Federation for the Blob Storage checkpoint store. Configure a separate controller service instance using the Storage scope (for example `https://storage.azure.com/.default`). + + +## Entra ID setup summary + +1. **Create or reuse an app registration** for NiFi in Microsoft Entra ID. +2. **Add a federated credential** (Certificates & secrets → Federated credentials) matching your issuer/subject. Set the audience to `api://AzureADTokenExchange`. +3. **Assign RBAC roles** to that app registration, such as `Storage Blob Data Reader`/`Storage Blob Data Contributor` on the storage account. +4. Record the **Tenant ID** and **Client ID** for configuring the controller service in NiFi. + + +## Scope examples + +- `https://storage.azure.com/.default` – Azure Storage operations only. +- `https://eventhubs.azure.net/.default` – Event Hubs operations. +- `https://management.azure.com/.default` – Azure Resource Manager APIs. diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md index d67c55ed98c2..6900c8a7754b 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md @@ -15,6 +15,12 @@ # ADLSCredentialsControllerService +### Azure Identity Federation Token Provider + +When the **Credentials Type** property is set to `Access Token`, configure the **Azure Identity Federation Token Provider** with a controller service capable of exchanging workload identity tokens for Azure AD access tokens. The provider must return an `access_token` issued by Microsoft Entra ID (for example using the `StandardAzureIdentityFederationTokenProvider`). The access token is converted to the Azure SDK representation and cached in memory until it expires. + +The Azure client instances created by this service do not perform additional token refresh on their own. Ensure the configured Azure Identity Federation Token Provider automatically refreshes tokens before they expire, and that the configured scopes or audiences grant access to the target storage resources. + ### Security considerations of using Expression Language for sensitive properties Allowing Expression Language for a property has the advantage of configuring the property dynamically via FlowFile @@ -30,4 +36,4 @@ Best practices for using Expression Language for sensitive properties: * control access to the flow and to provenance repository * encrypt disks storing FlowFiles and provenance data * if the sensitive data is a temporary token (like the SAS token), use a shorter lifetime and refresh the token - periodically \ No newline at end of file + periodically diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java index c6f2c4cf8099..81293e74f5fb 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -32,6 +32,7 @@ import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockPropertyConfiguration; import org.apache.nifi.util.PropertyMigrationResult; @@ -65,6 +66,7 @@ public class GetAzureEventHubTest { private static final String POLICY_KEY = "POLICY-KEY"; private static final String CONSUMER_GROUP = "$Default"; private static final String EVENT_HUB_OAUTH_SERVICE_ID = "get-event-hub-oauth"; + private static final String EVENT_HUB_IDENTITY_SERVICE_ID = "get-event-hub-identity"; private static final Instant ENQUEUED_TIME = Instant.now(); private static final long SEQUENCE_NUMBER = 32; private static final String OFFSET = "64"; @@ -168,6 +170,13 @@ private void configureEventHubOAuthTokenProvider() throws InitializationExceptio testRunner.setProperty(GetAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_OAUTH_SERVICE_ID); } + private void configureEventHubIdentityTokenProvider() throws InitializationException { + final MockIdentityFederationTokenProvider provider = new MockIdentityFederationTokenProvider(); + testRunner.addControllerService(EVENT_HUB_IDENTITY_SERVICE_ID, provider); + testRunner.enableControllerService(provider); + testRunner.setProperty(GetAzureEventHub.EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_SERVICE_ID); + } + @Test public void testPropertiesManagedIdentity() { testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); @@ -191,6 +200,19 @@ public void testEventHubOAuthRequiresTokenProvider() throws InitializationExcept testRunner.assertValid(); } + @Test + public void testEventHubIdentityFederationRequiresTokenProvider() throws InitializationException { + testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); + testRunner.setProperty(GetAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); + testRunner.setProperty(GetAzureEventHub.AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION.getValue()); + + testRunner.assertNotValid(); + + configureEventHubIdentityTokenProvider(); + + testRunner.assertValid(); + } + @Test public void testRunNoEventsReceived() { setProperties(); @@ -319,4 +341,14 @@ public AccessToken getAccessDetails() { return accessToken; } } + + private static class MockIdentityFederationTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + @Override + public AccessToken getAccessDetails() { + final AccessToken accessToken = new AccessToken(); + accessToken.setAccessToken("access-token"); + accessToken.setExpiresIn(TimeUnit.MINUTES.toSeconds(5)); + return accessToken; + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java index 777743ab04df..d9435f925bd6 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java @@ -28,6 +28,7 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.util.MockPropertyConfiguration; import org.apache.nifi.util.PropertyMigrationResult; import org.apache.nifi.util.TestRunner; @@ -65,6 +66,7 @@ public class PutAzureEventHubTest { private static final String PARTITION_KEY = "partition"; private static final String CONTENT = String.class.getSimpleName(); private static final String EVENT_HUB_OAUTH_SERVICE_ID = "put-event-hub-oauth"; + private static final String EVENT_HUB_IDENTITY_SERVICE_ID = "put-event-hub-identity"; @Mock EventHubProducerClient eventHubProducerClient; @@ -159,6 +161,13 @@ private void configureEventHubOAuthTokenProvider() throws InitializationExceptio testRunner.setProperty(PutAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_OAUTH_SERVICE_ID); } + private void configureEventHubIdentityTokenProvider() throws InitializationException { + final MockIdentityFederationTokenProvider provider = new MockIdentityFederationTokenProvider(); + testRunner.addControllerService(EVENT_HUB_IDENTITY_SERVICE_ID, provider); + testRunner.enableControllerService(provider); + testRunner.setProperty(PutAzureEventHub.EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_SERVICE_ID); + } + @Test public void testPropertiesManagedIdentityEnabled() { testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); @@ -182,6 +191,19 @@ public void testEventHubOAuthRequiresTokenProvider() throws InitializationExcept testRunner.assertValid(); } + @Test + public void testEventHubIdentityFederationRequiresTokenProvider() throws InitializationException { + testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); + testRunner.setProperty(PutAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); + testRunner.setProperty(PutAzureEventHub.AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION.getValue()); + + testRunner.assertNotValid(); + + configureEventHubIdentityTokenProvider(); + + testRunner.assertValid(); + } + @Test public void testRunNoFlowFiles() { setProperties(); @@ -277,4 +299,14 @@ public AccessToken getAccessDetails() { return accessToken; } } + + private static class MockIdentityFederationTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + @Override + public AccessToken getAccessDetails() { + final AccessToken accessToken = new AccessToken(); + accessToken.setAccessToken("access-token"); + accessToken.setExpiresIn(TimeUnit.MINUTES.toSeconds(5)); + return accessToken; + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index 32aed258e2d1..f4c73464e9e7 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -50,6 +50,7 @@ import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.shared.azure.eventhubs.BlobStorageAuthenticationStrategy; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockPropertyConfiguration; import org.apache.nifi.util.PropertyMigrationResult; @@ -105,7 +106,9 @@ public class TestConsumeAzureEventHub { private static final String APPLICATION_PROPERTY = "application"; private static final String APPLICATION_ATTRIBUTE_NAME = String.format("eventhub.property.%s", APPLICATION_PROPERTY); private static final String EVENT_HUB_OAUTH_SERVICE_ID = "eventHubOauth"; + private static final String EVENT_HUB_IDENTITY_SERVICE_ID = "eventHubIdentity"; private static final String BLOB_OAUTH_SERVICE_ID = "blobOauth"; + private static final String BLOB_IDENTITY_SERVICE_ID = "blobIdentity"; private static final String EXPECTED_TRANSIT_URI = String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s", EVENT_HUB_NAMESPACE, @@ -201,7 +204,7 @@ public void testProcessorConfigValidityWithBothStorageKeyAndTokenSet() { testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_NAME); testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, STORAGE_ACCOUNT_KEY); testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, STORAGE_TOKEN); - testRunner.assertNotValid(); + testRunner.assertValid(); } @Test @@ -217,6 +220,20 @@ public void testProcessorConfigValidityWithEventHubOAuthRequiresTokenProvider() testRunner.assertValid(); } + @Test + public void testProcessorConfigValidityWithEventHubIdentityFederationRequiresTokenProvider() throws InitializationException { + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); + testRunner.setProperty(ConsumeAzureEventHub.CHECKPOINT_STRATEGY, CheckpointStrategy.COMPONENT_STATE.getValue()); + testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION.getValue()); + + testRunner.assertNotValid(); + + configureEventHubIdentityTokenProvider(); + + testRunner.assertValid(); + } + @Test public void testProcessorConfigValidityWithBlobOAuthRequiresTokenProvider() throws InitializationException { testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); @@ -230,6 +247,20 @@ public void testProcessorConfigValidityWithBlobOAuthRequiresTokenProvider() thro testRunner.assertValid(); } + @Test + public void testProcessorConfigValidityWithBlobIdentityFederationRequiresTokenProvider() throws InitializationException { + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_NAME); + testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION.getValue()); + + testRunner.assertNotValid(); + + configureBlobIdentityTokenProvider(); + + testRunner.assertValid(); + } + @Test public void testProcessorConfigValidityWithTokenSet() throws InitializationException { testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); @@ -666,6 +697,13 @@ private void configureEventHubOAuthTokenProvider() throws InitializationExceptio testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_OAUTH_SERVICE_ID); } + private void configureEventHubIdentityTokenProvider() throws InitializationException { + final MockIdentityFederationTokenProvider provider = new MockIdentityFederationTokenProvider(); + testRunner.addControllerService(EVENT_HUB_IDENTITY_SERVICE_ID, provider); + testRunner.enableControllerService(provider); + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_SERVICE_ID); + } + private void configureBlobOAuthTokenProvider() throws InitializationException { final MockOAuth2AccessTokenProvider provider = new MockOAuth2AccessTokenProvider(); testRunner.addControllerService(BLOB_OAUTH_SERVICE_ID, provider); @@ -673,6 +711,13 @@ private void configureBlobOAuthTokenProvider() throws InitializationException { testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER, BLOB_OAUTH_SERVICE_ID); } + private void configureBlobIdentityTokenProvider() throws InitializationException { + final MockIdentityFederationTokenProvider provider = new MockIdentityFederationTokenProvider(); + testRunner.addControllerService(BLOB_IDENTITY_SERVICE_ID, provider); + testRunner.enableControllerService(provider); + testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER, BLOB_IDENTITY_SERVICE_ID); + } + private class MockConsumeAzureEventHub extends ConsumeAzureEventHub { @Override protected EventProcessorClient createClient(final ProcessContext context) { @@ -694,4 +739,14 @@ public AccessToken getAccessDetails() { return accessToken; } } + + private static class MockIdentityFederationTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + @Override + public AccessToken getAccessDetails() { + final AccessToken accessToken = new AccessToken(); + accessToken.setAccessToken("access-token"); + accessToken.setExpiresIn(TimeUnit.MINUTES.toSeconds(5)); + return accessToken; + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java index a89968446585..f4af54830304 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.services.azure; +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; +import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.NoOpProcessor; import org.apache.nifi.util.TestRunner; @@ -23,17 +27,27 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + public class TestStandardAzureCredentialsControllerService { private static final String CREDENTIALS_SERVICE_IDENTIFIER = "credentials-service"; private static final String SAMPLE_MANAGED_CLIENT_ID = "sample-managed-client-id"; + private static final String TOKEN_PROVIDER_IDENTIFIER = "oauth2-provider"; + private TestRunner runner; private StandardAzureCredentialsControllerService credentialsService; + private MockOAuth2AccessTokenProvider tokenProvider; @BeforeEach public void setUp() throws InitializationException { runner = TestRunners.newTestRunner(NoOpProcessor.class); credentialsService = new StandardAzureCredentialsControllerService(); runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, credentialsService); + + tokenProvider = new MockOAuth2AccessTokenProvider(); + runner.addControllerService(TOKEN_PROVIDER_IDENTIFIER, tokenProvider); + runner.enableControllerService(tokenProvider); } @Test @@ -43,7 +57,6 @@ public void testValidControllerServiceConfiguration() { StandardAzureCredentialsControllerService.DEFAULT_CREDENTIAL); runner.assertValid(credentialsService); - // should still be valid be ignored until CREDENTIAL_CONFIGURATION_STRATEGY is set to MANAGED_IDENTITY runner.setProperty(credentialsService, StandardAzureCredentialsControllerService.MANAGED_IDENTITY_CLIENT_ID, SAMPLE_MANAGED_CLIENT_ID); @@ -76,5 +89,42 @@ public void testNotValidControllerServiceBlankManagedIdentityClientId() { runner.assertValid(credentialsService); } + @Test + public void testOAuth2StrategyRequiresProvider() { + runner.setProperty(credentialsService, + StandardAzureCredentialsControllerService.CREDENTIAL_CONFIGURATION_STRATEGY, + StandardAzureCredentialsControllerService.OAUTH2); + + runner.assertNotValid(credentialsService); + } + + @Test + public void testOAuth2StrategyProvidesTokenCredential() throws Exception { + runner.setProperty(credentialsService, + StandardAzureCredentialsControllerService.CREDENTIAL_CONFIGURATION_STRATEGY, + StandardAzureCredentialsControllerService.OAUTH2); + runner.setProperty(credentialsService, + StandardAzureCredentialsControllerService.OAUTH2_ACCESS_TOKEN_PROVIDER, + TOKEN_PROVIDER_IDENTIFIER); + + runner.assertValid(credentialsService); + runner.enableControllerService(credentialsService); + + final TokenCredential tokenCredential = credentialsService.getCredentials(); + final AccessToken accessToken = tokenCredential.getToken(new TokenRequestContext().addScopes("https://storage.azure.com/.default")).block(); + assertNotNull(accessToken); + assertEquals(MockOAuth2AccessTokenProvider.ACCESS_TOKEN_VALUE, accessToken.getToken()); + } + + private static final class MockOAuth2AccessTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + private static final String ACCESS_TOKEN_VALUE = "access-token"; + @Override + public org.apache.nifi.oauth2.AccessToken getAccessDetails() { + final org.apache.nifi.oauth2.AccessToken accessToken = new org.apache.nifi.oauth2.AccessToken(); + accessToken.setAccessToken(ACCESS_TOKEN_VALUE); + accessToken.setExpiresIn(3600); + return accessToken; + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureIdentityFederationTokenProvider.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureIdentityFederationTokenProvider.java new file mode 100644 index 000000000000..e4a84dfc1b74 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureIdentityFederationTokenProvider.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.oauth2.AccessToken; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.web.client.api.HttpEntityHeaders; +import org.apache.nifi.web.client.api.HttpRequestBodySpec; +import org.apache.nifi.web.client.api.HttpRequestHeadersSpec; +import org.apache.nifi.web.client.api.HttpRequestMethod; +import org.apache.nifi.web.client.api.HttpRequestUriSpec; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.api.WebClientService; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestStandardAzureIdentityFederationTokenProvider { + private static final String TOKEN_RESPONSE = """ + { + "access_token": "access-token", + "token_type": "Bearer", + "expires_in": "7200", + "scope": "https://storage.azure.com/.default" + } + """; + private static final String TOKEN_ENDPOINT = "https://example.com/token"; + + private TestRunner runner; + private StandardAzureIdentityFederationTokenProvider tokenProvider; + + @BeforeEach + public void setUp() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + + tokenProvider = new StandardAzureIdentityFederationTokenProvider(); + runner.addControllerService("identity-provider", tokenProvider); + + final MockOAuth2AccessTokenProvider assertionProvider = new MockOAuth2AccessTokenProvider(); + runner.addControllerService("assertion-provider", assertionProvider); + runner.enableControllerService(assertionProvider); + + final MockWebClientServiceProvider webClientServiceProvider = new MockWebClientServiceProvider(TOKEN_RESPONSE, TOKEN_ENDPOINT); + runner.addControllerService("web-client", webClientServiceProvider); + runner.enableControllerService(webClientServiceProvider); + + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.TENANT_ID, "tenant-id"); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.CLIENT_ID, "client-id"); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.SCOPE, "https://storage.azure.com/.default"); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.TOKEN_ENDPOINT, TOKEN_ENDPOINT); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.CLIENT_ASSERTION_PROVIDER, "assertion-provider"); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.WEB_CLIENT_SERVICE, "web-client"); + + runner.enableControllerService(tokenProvider); + } + + @Test + public void testParsesStringExpiresIn() { + runner.assertValid(tokenProvider); + + final AccessToken accessToken = tokenProvider.getAccessDetails(); + + assertEquals("access-token", accessToken.getAccessToken()); + assertEquals(7200, accessToken.getExpiresIn()); + } + + private static class MockOAuth2AccessTokenProvider extends AbstractControllerService implements OAuth2AccessTokenProvider { + @Override + public AccessToken getAccessDetails() { + final AccessToken accessToken = new AccessToken(); + accessToken.setAccessToken("client-assertion"); + accessToken.setExpiresIn(600); + return accessToken; + } + } + + private static class MockWebClientServiceProvider extends AbstractControllerService implements WebClientServiceProvider { + private final MockWebClientService webClientService; + private final String tokenEndpoint; + + private MockWebClientServiceProvider(final String responseBody, final String tokenEndpoint) { + this.tokenEndpoint = tokenEndpoint; + this.webClientService = new MockWebClientService(responseBody, tokenEndpoint); + } + + @Override + public HttpUriBuilder getHttpUriBuilder() { + return new MockHttpUriBuilder(tokenEndpoint); + } + + @Override + public WebClientService getWebClientService() { + return webClientService; + } + } + + private static class MockHttpUriBuilder implements HttpUriBuilder { + private final URI uri; + + private MockHttpUriBuilder(final String tokenEndpoint) { + this.uri = URI.create(tokenEndpoint); + } + + @Override + public URI build() { + return uri; + } + + @Override + public HttpUriBuilder scheme(final String scheme) { + return this; + } + + @Override + public HttpUriBuilder host(final String host) { + return this; + } + + @Override + public HttpUriBuilder port(final int port) { + return this; + } + + @Override + public HttpUriBuilder encodedPath(final String encodedPath) { + return this; + } + + @Override + public HttpUriBuilder addPathSegment(final String pathSegment) { + return this; + } + + @Override + public HttpUriBuilder addQueryParameter(final String name, final String value) { + return this; + } + } + + private static class MockWebClientService implements WebClientService { + private final String responseBody; + private final String tokenEndpoint; + + private MockWebClientService(final String responseBody, final String tokenEndpoint) { + this.responseBody = responseBody; + this.tokenEndpoint = tokenEndpoint; + } + + @Override + public HttpRequestUriSpec method(final HttpRequestMethod requestMethod) { + throw new UnsupportedOperationException(); + } + + @Override + public HttpRequestUriSpec delete() { + throw new UnsupportedOperationException(); + } + + @Override + public HttpRequestUriSpec get() { + throw new UnsupportedOperationException(); + } + + @Override + public HttpRequestUriSpec head() { + throw new UnsupportedOperationException(); + } + + @Override + public HttpRequestUriSpec patch() { + throw new UnsupportedOperationException(); + } + + @Override + public HttpRequestUriSpec post() { + return new MockHttpRequestUriSpec(responseBody, tokenEndpoint); + } + + @Override + public HttpRequestUriSpec put() { + throw new UnsupportedOperationException(); + } + } + + private static class MockHttpRequestUriSpec implements HttpRequestUriSpec { + private final String responseBody; + private final String tokenEndpoint; + + private MockHttpRequestUriSpec(final String responseBody, final String tokenEndpoint) { + this.responseBody = responseBody; + this.tokenEndpoint = tokenEndpoint; + } + + @Override + public HttpRequestBodySpec uri(final URI uri) { + return new MockHttpRequestBodySpec(responseBody, uri != null ? uri : URI.create(tokenEndpoint)); + } + } + + private static class MockHttpRequestBodySpec implements HttpRequestBodySpec { + private final HttpResponseEntity responseEntity; + + private MockHttpRequestBodySpec(final String responseBody, final URI uri) { + this.responseEntity = new MockHttpResponseEntity(responseBody, uri); + } + + @Override + public HttpRequestHeadersSpec body(final InputStream inputStream, final OptionalLong contentLength) { + return this; + } + + @Override + public HttpRequestHeadersSpec body(final String body) { + return this; + } + + @Override + public HttpRequestBodySpec header(final String headerName, final String headerValue) { + return this; + } + + @Override + public HttpResponseEntity retrieve() { + return responseEntity; + } + } + + private static class MockHttpResponseEntity implements HttpResponseEntity { + private final String responseBody; + private final URI uri; + + private MockHttpResponseEntity(final String responseBody, final URI uri) { + this.responseBody = responseBody; + this.uri = uri; + } + + @Override + public int statusCode() { + return 200; + } + + @Override + public HttpEntityHeaders headers() { + return new MockHttpEntityHeaders(); + } + + @Override + public InputStream body() { + return new ByteArrayInputStream(responseBody.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public URI uri() { + return uri; + } + + @Override + public void close() { + // no-op + } + } + + private static class MockHttpEntityHeaders implements HttpEntityHeaders { + @Override + public Optional getFirstHeader(final String headerName) { + return Optional.empty(); + } + + @Override + public List getHeader(final String headerName) { + return Collections.emptyList(); + } + + @Override + public Collection getHeaderNames() { + return Collections.emptyList(); + } + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java index ad0c1ba83dfe..1fc8089a4a60 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java @@ -16,8 +16,11 @@ */ package org.apache.nifi.services.azure.storage; +import com.azure.core.credential.AccessToken; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.NoOpProcessor; import org.apache.nifi.util.TestRunner; @@ -36,6 +39,7 @@ public class TestADLSCredentialsControllerService { public static final String CREDENTIALS_SERVICE_IDENTIFIER = "credentials-service"; + private static final String TOKEN_PROVIDER_IDENTIFIER = "oauth2-provider"; private static final String ACCOUNT_NAME_VALUE = "AccountName"; private static final String ACCOUNT_KEY_VALUE = "AccountKey"; @@ -48,12 +52,16 @@ public class TestADLSCredentialsControllerService { private TestRunner runner; private ADLSCredentialsControllerService credentialsService; + private MockOAuth2AccessTokenProvider tokenProvider; @BeforeEach public void setUp() throws InitializationException { runner = TestRunners.newTestRunner(NoOpProcessor.class); credentialsService = new ADLSCredentialsControllerService(); runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, credentialsService); + tokenProvider = new MockOAuth2AccessTokenProvider(); + runner.addControllerService(TOKEN_PROVIDER_IDENTIFIER, tokenProvider); + runner.enableControllerService(tokenProvider); } @Test @@ -153,6 +161,23 @@ public void testNotValidBecauseNoClientSecretSpecifiedForServicePrincipal() { runner.assertNotValid(credentialsService); } + @Test + public void testValidWithAccountNameAndAccessToken() { + configureCredentialsType(AzureStorageCredentialsType.ACCESS_TOKEN); + configureAccountName(); + configureOAuth2Provider(); + + runner.assertValid(credentialsService); + } + + @Test + public void testNotValidWithAccessTokenMissingProvider() { + configureCredentialsType(AzureStorageCredentialsType.ACCESS_TOKEN); + configureAccountName(); + + runner.assertNotValid(credentialsService); + } + @Test public void testGetCredentialsDetailsWithAccountKey() throws Exception { // GIVEN @@ -245,6 +270,22 @@ public void testGetCredentialsDetailsWithSasTokenUsingEL() throws Exception { assertNull(actual.getServicePrincipalClientSecret()); } + @Test + public void testGetCredentialsDetailsWithAccessToken() throws Exception { + configureCredentialsType(AzureStorageCredentialsType.ACCESS_TOKEN); + configureAccountName(); + configureOAuth2Provider(); + + runner.enableControllerService(credentialsService); + + final ADLSCredentialsDetails actual = credentialsService.getCredentialsDetails(new HashMap<>()); + + assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName()); + final AccessToken accessToken = actual.getAccessToken(); + assertNotNull(accessToken); + assertEquals(MockOAuth2AccessTokenProvider.ACCESS_TOKEN_VALUE, accessToken.getToken()); + } + @Test public void testGetCredentialsDetailsWithSystemAssignedManagedIdentity() throws Exception { // GIVEN @@ -405,8 +446,24 @@ private void configureServicePrincipalClientSecret() { runner.setProperty(credentialsService, AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_SECRET, SERVICE_PRINCIPAL_CLIENT_SECRET_VALUE); } + private void configureOAuth2Provider() { + runner.setProperty(credentialsService, AzureStorageUtils.OAUTH2_ACCESS_TOKEN_PROVIDER, TOKEN_PROVIDER_IDENTIFIER); + } + private void configurePropertyUsingEL(PropertyDescriptor propertyDescriptor, String variableName, String variableValue) { runner.setProperty(credentialsService, propertyDescriptor, String.format("${%s}", variableName)); runner.setEnvironmentVariableValue(variableName, variableValue); } + + private static final class MockOAuth2AccessTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + private static final String ACCESS_TOKEN_VALUE = "access-token"; + + @Override + public org.apache.nifi.oauth2.AccessToken getAccessDetails() { + final org.apache.nifi.oauth2.AccessToken token = new org.apache.nifi.oauth2.AccessToken(); + token.setAccessToken(ACCESS_TOKEN_VALUE); + token.setExpiresIn(3600); + return token; + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java index 94b61bb2e22e..041bb1cdf6da 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.services.azure.storage; +import com.azure.core.credential.AccessToken; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.NoOpProcessor; import org.apache.nifi.util.TestRunner; @@ -35,11 +39,13 @@ import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_SECRET; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_TENANT_ID; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; public class TestAzureStorageCredentialsControllerService_v12 { public static final String CREDENTIALS_SERVICE_IDENTIFIER = "credentials-service"; + private static final String TOKEN_PROVIDER_IDENTIFIER = "oauth2-provider"; private static final String ACCOUNT_NAME_VALUE = "AccountName"; private static final String ACCOUNT_KEY_VALUE = "AccountKey"; @@ -51,12 +57,17 @@ public class TestAzureStorageCredentialsControllerService_v12 { private TestRunner runner; private AzureStorageCredentialsControllerService_v12 credentialsService; + private MockOAuth2AccessTokenProvider tokenProvider; @BeforeEach public void setUp() throws InitializationException { runner = TestRunners.newTestRunner(NoOpProcessor.class); credentialsService = new AzureStorageCredentialsControllerService_v12(); runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, credentialsService); + + tokenProvider = new MockOAuth2AccessTokenProvider(); + runner.addControllerService(TOKEN_PROVIDER_IDENTIFIER, tokenProvider); + runner.enableControllerService(tokenProvider); } @Test @@ -150,6 +161,40 @@ public void testServicePrincipalCredentialsTypeNotValidBecauseClientSecretMissin runner.assertNotValid(credentialsService); } + @Test + public void testAccessTokenCredentialsTypeValid() { + configureAccountName(); + configureCredentialsType(AzureStorageCredentialsType.ACCESS_TOKEN); + configureOAuth2Provider(); + + runner.assertValid(credentialsService); + } + + @Test + public void testAccessTokenCredentialsTypeNotValidWhenProviderMissing() { + configureAccountName(); + configureCredentialsType(AzureStorageCredentialsType.ACCESS_TOKEN); + + runner.assertNotValid(credentialsService); + } + + @Test + public void testGetCredentialsDetailsWithAccessToken() throws Exception { + configureAccountName(); + configureCredentialsType(AzureStorageCredentialsType.ACCESS_TOKEN); + configureOAuth2Provider(); + + runner.enableControllerService(credentialsService); + + final AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails(Collections.emptyMap()); + + assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName()); + assertEquals(AzureStorageCredentialsType.ACCESS_TOKEN, actual.getCredentialsType()); + final AccessToken accessToken = actual.getAccessToken(); + assertNotNull(accessToken); + assertEquals(MockOAuth2AccessTokenProvider.ACCESS_TOKEN_VALUE, accessToken.getToken()); + } + @Test public void testGetCredentialsDetailsWithAccountKey() { configureAccountName(); @@ -276,4 +321,20 @@ private void configureServicePrincipalClientId() { private void configureServicePrincipalClientSecret() { runner.setProperty(credentialsService, SERVICE_PRINCIPAL_CLIENT_SECRET, SERVICE_PRINCIPAL_CLIENT_SECRET_VALUE); } + + private void configureOAuth2Provider() { + runner.setProperty(credentialsService, AzureStorageUtils.OAUTH2_ACCESS_TOKEN_PROVIDER, TOKEN_PROVIDER_IDENTIFIER); + } + + private static final class MockOAuth2AccessTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + private static final String ACCESS_TOKEN_VALUE = "access-token"; + + @Override + public org.apache.nifi.oauth2.AccessToken getAccessDetails() { + final org.apache.nifi.oauth2.AccessToken accessToken = new org.apache.nifi.oauth2.AccessToken(); + accessToken.setAccessToken(ACCESS_TOKEN_VALUE); + accessToken.setExpiresIn(3600); + return accessToken; + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml index 7f6067a61081..0f299329e89d 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml @@ -24,6 +24,10 @@ nifi-azure-services-api + + org.apache.nifi + nifi-oauth2-provider-api + com.azure azure-core diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/AzureIdentityFederationTokenProvider.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/AzureIdentityFederationTokenProvider.java new file mode 100644 index 000000000000..c902b2758095 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/AzureIdentityFederationTokenProvider.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure; + +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; + +/** + * Azure-specific extension of {@link OAuth2AccessTokenProvider} used for workload identity federation. + * Implementations exchange an external identity token for an Azure AD access token suitable for + * Azure service clients (for example, Storage or Data Lake). + */ +public interface AzureIdentityFederationTokenProvider extends OAuth2AccessTokenProvider { +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java index b1780f9e164f..3c8fc25468bb 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java @@ -21,6 +21,7 @@ import java.util.Objects; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import static org.apache.nifi.services.azure.util.ProxyOptionsUtils.equalsProxyOptions; import static org.apache.nifi.services.azure.util.ProxyOptionsUtils.hashCodeProxyOptions; @@ -40,6 +41,7 @@ public class ADLSCredentialsDetails { private final String servicePrincipalClientId; private final String servicePrincipalClientSecret; + private final AzureIdentityFederationTokenProvider identityTokenProvider; private final ProxyOptions proxyOptions; public ADLSCredentialsDetails( @@ -53,6 +55,7 @@ public ADLSCredentialsDetails( String servicePrincipalTenantId, String servicePrincipalClientId, String servicePrincipalClientSecret, + AzureIdentityFederationTokenProvider identityTokenProvider, ProxyOptions proxyOptions ) { this.accountName = accountName; @@ -65,6 +68,7 @@ public ADLSCredentialsDetails( this.servicePrincipalTenantId = servicePrincipalTenantId; this.servicePrincipalClientId = servicePrincipalClientId; this.servicePrincipalClientSecret = servicePrincipalClientSecret; + this.identityTokenProvider = identityTokenProvider; this.proxyOptions = proxyOptions; } @@ -108,6 +112,10 @@ public String getServicePrincipalClientSecret() { return servicePrincipalClientSecret; } + public AzureIdentityFederationTokenProvider getIdentityTokenProvider() { + return identityTokenProvider; + } + public ProxyOptions getProxyOptions() { return proxyOptions; } @@ -133,6 +141,7 @@ public boolean equals(Object o) { && Objects.equals(servicePrincipalTenantId, that.servicePrincipalTenantId) && Objects.equals(servicePrincipalClientId, that.servicePrincipalClientId) && Objects.equals(servicePrincipalClientSecret, that.servicePrincipalClientSecret) + && Objects.equals(identityTokenProvider, that.identityTokenProvider) && equalsProxyOptions(proxyOptions, that.proxyOptions); } @@ -149,6 +158,7 @@ public int hashCode() { servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, + identityTokenProvider, hashCodeProxyOptions(proxyOptions) ); } @@ -164,6 +174,7 @@ public static class Builder { private String servicePrincipalTenantId; private String servicePrincipalClientId; private String servicePrincipalClientSecret; + private AzureIdentityFederationTokenProvider identityTokenProvider; private ProxyOptions proxyOptions; private Builder() { } @@ -222,6 +233,11 @@ public Builder setServicePrincipalClientSecret(String servicePrincipalClientSecr return this; } + public Builder setIdentityTokenProvider(final AzureIdentityFederationTokenProvider identityTokenProvider) { + this.identityTokenProvider = identityTokenProvider; + return this; + } + public Builder setProxyOptions(ProxyOptions proxyOptions) { this.proxyOptions = proxyOptions; return this; @@ -229,7 +245,7 @@ public Builder setProxyOptions(ProxyOptions proxyOptions) { public ADLSCredentialsDetails build() { return new ADLSCredentialsDetails(accountName, accountKey, sasToken, endpointSuffix, accessToken, useManagedIdentity, managedIdentityClientId, - servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, proxyOptions); + servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, identityTokenProvider, proxyOptions); } } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java index ee28670f9cee..f85e6622df44 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java @@ -21,6 +21,8 @@ import java.util.Objects; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; + import static org.apache.nifi.services.azure.util.ProxyOptionsUtils.equalsProxyOptions; import static org.apache.nifi.services.azure.util.ProxyOptionsUtils.hashCodeProxyOptions; @@ -36,11 +38,13 @@ public class AzureStorageCredentialsDetails_v12 { private final String servicePrincipalClientId; private final String servicePrincipalClientSecret; private final AccessToken accessToken; + private final AzureIdentityFederationTokenProvider identityTokenProvider; private final ProxyOptions proxyOptions; private AzureStorageCredentialsDetails_v12( String accountName, String endpointSuffix, AzureStorageCredentialsType credentialsType, String accountKey, String sasToken, String managedIdentityClientId, - String servicePrincipalTenantId, String servicePrincipalClientId, String servicePrincipalClientSecret, AccessToken accessToken, ProxyOptions proxyOptions) { + String servicePrincipalTenantId, String servicePrincipalClientId, String servicePrincipalClientSecret, AccessToken accessToken, + AzureIdentityFederationTokenProvider identityTokenProvider, ProxyOptions proxyOptions) { this.accountName = accountName; this.endpointSuffix = endpointSuffix; this.credentialsType = credentialsType; @@ -51,6 +55,7 @@ private AzureStorageCredentialsDetails_v12( this.servicePrincipalClientId = servicePrincipalClientId; this.servicePrincipalClientSecret = servicePrincipalClientSecret; this.accessToken = accessToken; + this.identityTokenProvider = identityTokenProvider; this.proxyOptions = proxyOptions; } @@ -94,6 +99,10 @@ public AccessToken getAccessToken() { return accessToken; } + public AzureIdentityFederationTokenProvider getIdentityTokenProvider() { + return identityTokenProvider; + } + public ProxyOptions getProxyOptions() { return proxyOptions; } @@ -119,6 +128,7 @@ public boolean equals(Object o) { && Objects.equals(servicePrincipalClientId, that.servicePrincipalClientId) && Objects.equals(servicePrincipalClientSecret, that.servicePrincipalClientSecret) && Objects.equals(accessToken, that.accessToken) + && Objects.equals(identityTokenProvider, that.identityTokenProvider) && equalsProxyOptions(proxyOptions, that.proxyOptions); } @@ -135,6 +145,7 @@ public int hashCode() { servicePrincipalClientId, servicePrincipalClientSecret, accessToken, + identityTokenProvider, hashCodeProxyOptions(proxyOptions) ); } @@ -143,14 +154,16 @@ public static AzureStorageCredentialsDetails_v12 createWithAccountKey( String accountName, String endpointSuffix, String accountKey) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.ACCOUNT_KEY, accountKey, null, null, null, null, null, null, null); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.ACCOUNT_KEY, + accountKey, null, null, null, null, null, null, null, null); } public static AzureStorageCredentialsDetails_v12 createWithSasToken( String accountName, String endpointSuffix, String sasToken) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.SAS_TOKEN, null, sasToken, null, null, null, null, null, null); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.SAS_TOKEN, + null, sasToken, null, null, null, null, null, null, null); } public static AzureStorageCredentialsDetails_v12 createWithManagedIdentity( @@ -158,8 +171,8 @@ public static AzureStorageCredentialsDetails_v12 createWithManagedIdentity( String endpointSuffix, String managedIdentityClientId, ProxyOptions proxyOptions) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.MANAGED_IDENTITY, null, null, managedIdentityClientId, - null, null, null, null, proxyOptions); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.MANAGED_IDENTITY, + null, null, managedIdentityClientId, null, null, null, null, null, proxyOptions); } public static AzureStorageCredentialsDetails_v12 createWithServicePrincipal( @@ -169,14 +182,24 @@ public static AzureStorageCredentialsDetails_v12 createWithServicePrincipal( String servicePrincipalClientId, String servicePrincipalClientSecret, ProxyOptions proxyOptions) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.SERVICE_PRINCIPAL, null, null, null, - servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, null, proxyOptions); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.SERVICE_PRINCIPAL, + null, null, null, servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, null, null, proxyOptions); } public static AzureStorageCredentialsDetails_v12 createWithAccessToken( String accountName, String endpointSuffix, AccessToken accessToken) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.ACCESS_TOKEN, null, null, null, null, null, null, accessToken, null); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.ACCESS_TOKEN, + null, null, null, null, null, null, accessToken, null, null); + } + + public static AzureStorageCredentialsDetails_v12 createWithAccessToken( + String accountName, + String endpointSuffix, + AccessToken accessToken, + AzureIdentityFederationTokenProvider identityTokenProvider) { + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.ACCESS_TOKEN, null, null, null, + null, null, null, accessToken, identityTokenProvider, null); } }