Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@
<artifactId>nifi-utils</artifactId>
</dependency>

<!-- OAuth2 Access Token Provider API for Web Identity (OIDC) support -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-oauth2-provider-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-service-utils</artifactId>
Expand All @@ -52,10 +66,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-oauth2-provider-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.shared.azure.eventhubs.BlobStorageAuthenticationStrategy;
import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

Expand Down Expand Up @@ -150,6 +151,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy must have Listen claims.")
Expand Down Expand Up @@ -267,6 +269,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
.required(true)
.dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.OAUTH2)
.build();
static final PropertyDescriptor BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER = new PropertyDescriptor.Builder()
.name("Storage Identity Federation Token Provider")
.description("Controller Service exchanging workload identity tokens for Azure AD access tokens when using Identity Federation with Azure Blob Storage.")
.identifiesControllerService(AzureIdentityFederationTokenProvider.class)
.required(true)
.dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION)
.build();
static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
.name("Storage Account Key")
.description("The Azure Storage account key to store event hub consumer group state.")
Expand Down Expand Up @@ -326,6 +335,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
CONSUMER_GROUP,
RECORD_READER,
RECORD_WRITER,
Expand All @@ -340,6 +350,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
STORAGE_ACCOUNT_KEY,
STORAGE_SAS_TOKEN,
BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER,
BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER,
PROXY_CONFIGURATION_SERVICE
);

Expand Down Expand Up @@ -435,6 +446,7 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
final String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue());
final boolean blobOauthProviderSet = validationContext.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).isSet();
final boolean blobIdentityFederationProviderSet = validationContext.getProperty(BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER).isSet();

if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) {
results.add(new ValidationResult.Builder()
Expand Down Expand Up @@ -527,9 +539,42 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
.valid(false)
.build());
}
if (blobIdentityFederationProviderSet) {
results.add(new ValidationResult.Builder()
.subject(BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER.getDisplayName())
.explanation("%s must not be set when %s is %s."
.formatted(BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER.getDisplayName(),
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
.valid(false)
.build());
}
} else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION) {
if (StringUtils.isNotBlank(storageAccountKey)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_ACCOUNT_KEY.getDisplayName())
.explanation("%s must not be set when %s is %s."
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION.getDisplayName()))
.valid(false)
.build());
}
Copy link
Contributor

@turcsanyip turcsanyip Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvillard31 Thanks for addig identity federation support in Azure processors. My plan is to review it in more detail.

Just an initial question: Are these extensive validation steps really needed? I mean the authentication related properties are controlled by the Authentication Strategy property: only the relevant properties are shown and it does not matter what values the other properties contain. If we keep the validation steps as well, then the user will get warnings related to properties that are not visible and have no effect when the processor runs.

For example:

  1. The user chooses Authentication Strategy = Storage Account Key and fills in Storage Account Key property
  2. Then they change their mind and select Authentication Strategy = Identity Federation and provide the configuration
  3. After Apply, they get a validation error saying "Storage Account Key must not be set when Authentication Strategy is Identity Federation."
  4. So they have to go back to the other option, clear the fields, and choose the desired option again

I believe it is not the ideal user experience. However, there may be other advantages to keeping the customValidate() rules that I'm not aware of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No you're absolutely right that it's better to provide a good UX experience and rely on dependsOn/required. I pushed a commit to simplify all of this.


if (StringUtils.isNotBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_SAS_TOKEN.getDisplayName())
.explanation("%s must not be set when %s is %s."
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION.getDisplayName()))
.valid(false)
.build());
}
}
}
results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, validationContext));
results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, validationContext));
return results;
}

Expand Down Expand Up @@ -627,6 +672,14 @@ protected EventProcessorClient createClient(final ProcessContext context) {
blobContainerClientBuilder.endpoint(endpoint);
blobContainerClientBuilder.credential(tokenCredential);
}
case IDENTITY_FEDERATION -> {
final AzureIdentityFederationTokenProvider tokenProvider =
context.getProperty(BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER).asControllerService(AzureIdentityFederationTokenProvider.class);
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
final String endpoint = createBlobEndpoint(storageAccountName, domainName);
blobContainerClientBuilder.endpoint(endpoint);
blobContainerClientBuilder.credential(tokenCredential);
}
}
blobContainerClientBuilder.containerName(containerName);

Expand Down Expand Up @@ -682,6 +735,13 @@ protected EventProcessorClient createClient(final ProcessContext context) {
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
case IDENTITY_FEDERATION -> {
final AzureIdentityFederationTokenProvider tokenProvider =
context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER)
.asControllerService(AzureIdentityFederationTokenProvider.class);
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
}

final Integer prefetchCount = context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
Expand Down Expand Up @@ -921,7 +981,7 @@ private String createStorageConnectionString(final ProcessContext context,
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey, domainName);
case SHARED_ACCESS_SIGNATURE ->
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, domainName, storageSasToken);
case OAUTH2 -> throw new IllegalArgumentException(String.format(
case OAUTH2, IDENTITY_FEDERATION -> throw new IllegalArgumentException(String.format(
"Blob Storage Authentication Strategy %s does not support connection string authentication", blobStorageAuthenticationStrategy));
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import org.apache.nifi.util.StopWatch;

import java.time.Duration;
Expand Down Expand Up @@ -116,6 +117,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy must have Listen claims.")
Expand Down Expand Up @@ -172,6 +174,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
CONSUMER_GROUP,
ENQUEUE_TIME,
RECEIVER_FETCH_SIZE,
Expand Down Expand Up @@ -209,7 +212,8 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, context);
}

@OnPrimaryNodeStateChange
Expand Down Expand Up @@ -435,6 +439,13 @@ private EventHubClientBuilder createEventHubClientBuilder(final ProcessContext c
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
case IDENTITY_FEDERATION -> {
final AzureIdentityFederationTokenProvider tokenProvider =
context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER)
.asControllerService(AzureIdentityFederationTokenProvider.class);
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
}

// Set Azure Event Hub Client Identifier using Processor Identifier instead of default random UUID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

Expand Down Expand Up @@ -89,6 +90,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy must have Send claims.")
Expand Down Expand Up @@ -133,6 +135,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
PARTITIONING_KEY_ATTRIBUTE_NAME,
MAX_BATCH_SIZE,
PROXY_CONFIGURATION_SERVICE
Expand Down Expand Up @@ -171,7 +174,8 @@ public void closeClient() {

@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, context);
}

@Override
Expand Down Expand Up @@ -259,6 +263,13 @@ protected EventHubProducerClient createEventHubProducerClient(final ProcessConte
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
case IDENTITY_FEDERATION -> {
final AzureIdentityFederationTokenProvider tokenProvider =
context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER)
.asControllerService(AzureIdentityFederationTokenProvider.class);
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
}
AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions);
return eventHubClientBuilder.buildProducerClient();
Expand Down
Loading
Loading