Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@
<artifactId>nifi-utils</artifactId>
</dependency>

<!-- OAuth2 Access Token Provider API for Web Identity (OIDC) support -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-oauth2-provider-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-service-utils</artifactId>
Expand All @@ -52,10 +66,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-oauth2-provider-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
Expand Down Expand Up @@ -150,6 +151,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy must have Listen claims.")
Expand Down Expand Up @@ -267,6 +269,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
.required(true)
.dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.OAUTH2)
.build();
static final PropertyDescriptor BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER = new PropertyDescriptor.Builder()
.name("Storage Identity Federation Token Provider")
.description("Controller Service exchanging workload identity tokens for Azure AD access tokens when using Identity Federation with Azure Blob Storage.")
.identifiesControllerService(AzureIdentityFederationTokenProvider.class)
.required(true)
.dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION)
.build();
static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
.name("Storage Account Key")
.description("The Azure Storage account key to store event hub consumer group state.")
Expand Down Expand Up @@ -326,6 +335,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
CONSUMER_GROUP,
RECORD_READER,
RECORD_WRITER,
Expand All @@ -340,6 +350,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
STORAGE_ACCOUNT_KEY,
STORAGE_SAS_TOKEN,
BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER,
BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER,
PROXY_CONFIGURATION_SERVICE
);

Expand Down Expand Up @@ -434,7 +445,6 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
final String storageAccountKey = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue());
final boolean blobOauthProviderSet = validationContext.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).isSet();

if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) {
results.add(new ValidationResult.Builder()
Expand All @@ -447,10 +457,10 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati

if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
final BlobStorageAuthenticationStrategy blobStorageAuthenticationStrategy =
validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY)
.asAllowableValue(BlobStorageAuthenticationStrategy.class);
validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class);

if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY) {
// needed because of expression language support
if (StringUtils.isBlank(storageAccountKey)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_ACCOUNT_KEY.getDisplayName())
Expand All @@ -461,18 +471,8 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
.valid(false)
.build());
}

if (StringUtils.isNotBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_SAS_TOKEN.getDisplayName())
.explanation("%s must not be set when %s is %s."
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
.valid(false)
.build());
}
} else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE) {
// needed because of expression language support
if (StringUtils.isBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_SAS_TOKEN.getDisplayName())
Expand All @@ -483,53 +483,16 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
.valid(false)
.build());
}

if (StringUtils.isNotBlank(storageAccountKey)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_ACCOUNT_KEY.getDisplayName())
.explanation("%s must not be set when %s is %s."
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
.valid(false)
.build());
}
} else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.OAUTH2) {
if (!blobOauthProviderSet) {
results.add(new ValidationResult.Builder()
.subject(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName())
.explanation("%s must be set when %s is %s."
.formatted(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName(),
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
.valid(false)
.build());
}

if (StringUtils.isNotBlank(storageAccountKey)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_ACCOUNT_KEY.getDisplayName())
.explanation("%s must not be set when %s is %s."
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
.valid(false)
.build());
}

if (StringUtils.isNotBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_SAS_TOKEN.getDisplayName())
.explanation("%s must not be set when %s is %s."
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
.valid(false)
.build());
}
// Rely on required property + dependsOn validation to ensure provider is configured
} else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION) {
// Rely on required property + dependsOn validation to ensure provider is configured
}
}
results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, validationContext));

results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, validationContext));

return results;
}

Expand Down Expand Up @@ -627,6 +590,14 @@ protected EventProcessorClient createClient(final ProcessContext context) {
blobContainerClientBuilder.endpoint(endpoint);
blobContainerClientBuilder.credential(tokenCredential);
}
case IDENTITY_FEDERATION -> {
final AzureIdentityFederationTokenProvider tokenProvider =
context.getProperty(BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER).asControllerService(AzureIdentityFederationTokenProvider.class);
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
final String endpoint = createBlobEndpoint(storageAccountName, domainName);
blobContainerClientBuilder.endpoint(endpoint);
blobContainerClientBuilder.credential(tokenCredential);
}
}
blobContainerClientBuilder.containerName(containerName);

Expand Down Expand Up @@ -682,6 +653,13 @@ protected EventProcessorClient createClient(final ProcessContext context) {
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
case IDENTITY_FEDERATION -> {
final AzureIdentityFederationTokenProvider tokenProvider =
context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER)
.asControllerService(AzureIdentityFederationTokenProvider.class);
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
}

final Integer prefetchCount = context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
Expand Down Expand Up @@ -921,7 +899,7 @@ private String createStorageConnectionString(final ProcessContext context,
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey, domainName);
case SHARED_ACCESS_SIGNATURE ->
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, domainName, storageSasToken);
case OAUTH2 -> throw new IllegalArgumentException(String.format(
case OAUTH2, IDENTITY_FEDERATION -> throw new IllegalArgumentException(String.format(
"Blob Storage Authentication Strategy %s does not support connection string authentication", blobStorageAuthenticationStrategy));
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import org.apache.nifi.util.StopWatch;

import java.time.Duration;
Expand Down Expand Up @@ -116,6 +117,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy must have Listen claims.")
Expand Down Expand Up @@ -172,6 +174,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
CONSUMER_GROUP,
ENQUEUE_TIME,
RECEIVER_FETCH_SIZE,
Expand Down Expand Up @@ -209,7 +212,8 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, context);
}

@OnPrimaryNodeStateChange
Expand Down Expand Up @@ -435,6 +439,13 @@ private EventHubClientBuilder createEventHubClientBuilder(final ProcessContext c
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
case IDENTITY_FEDERATION -> {
final AzureIdentityFederationTokenProvider tokenProvider =
context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER)
.asControllerService(AzureIdentityFederationTokenProvider.class);
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
}

// Set Azure Event Hub Client Identifier using Processor Identifier instead of default random UUID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

Expand Down Expand Up @@ -89,6 +90,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy must have Send claims.")
Expand Down Expand Up @@ -133,6 +135,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
PARTITIONING_KEY_ATTRIBUTE_NAME,
MAX_BATCH_SIZE,
PROXY_CONFIGURATION_SERVICE
Expand Down Expand Up @@ -171,7 +174,8 @@ public void closeClient() {

@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, context);
}

@Override
Expand Down Expand Up @@ -259,6 +263,13 @@ protected EventHubProducerClient createEventHubProducerClient(final ProcessConte
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
case IDENTITY_FEDERATION -> {
final AzureIdentityFederationTokenProvider tokenProvider =
context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER)
.asControllerService(AzureIdentityFederationTokenProvider.class);
final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
}
}
AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions);
return eventHubClientBuilder.buildProducerClient();
Expand Down
Loading
Loading