Skip to content

Commit

Permalink
Added Authentication Controller Service
Browse files Browse the repository at this point in the history
  • Loading branch information
David Kjerrumgaard committed Mar 8, 2019
1 parent a84b4bd commit 25f736f
Show file tree
Hide file tree
Showing 7 changed files with 383 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.pulsar.auth;

import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ControllerService;
import org.apache.pulsar.client.api.Authentication;

@Tags({"Pulsar", "client", "security", "authentication"})
@CapabilityDescription("Provides Pulsar clients with the ability to authenticate against a "
+ "secured Apache Pulsar broker endpoint.")
public interface PulsarClientAuthenticationService extends ControllerService {

public String getTlsTrustCertsFilePath();

public Authentication getAuthentication();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
Expand All @@ -32,13 +31,12 @@
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.pulsar.auth.PulsarClientAuthenticationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;

public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService {

Expand Down Expand Up @@ -158,12 +156,12 @@ public class StandardPulsarClientService extends AbstractControllerService imple
.defaultValue("false")
.build();

public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE")
.displayName("SSL Context Service")
.description("Specifies the SSL Context Service to use for communicating with Pulsar.")
public static final PropertyDescriptor AUTHENTICATION_SERVICE = new PropertyDescriptor.Builder()
.name("AUTHENTICATION_SERVICE")
.displayName("Pulsar Client Authentication Service")
.description("Specifies the Service to use for authenticating with Pulsar.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.identifiesControllerService(PulsarClientAuthenticationService.class)
.build();

private static List<PropertyDescriptor> properties;
Expand All @@ -174,6 +172,7 @@ public class StandardPulsarClientService extends AbstractControllerService imple
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(PULSAR_SERVICE_URL);
props.add(AUTHENTICATION_SERVICE);
props.add(CONCURRENT_LOOKUP_REQUESTS);
props.add(CONNECTIONS_PER_BROKER);
props.add(IO_THREADS);
Expand All @@ -184,7 +183,6 @@ public class StandardPulsarClientService extends AbstractControllerService imple
props.add(OPERATION_TIMEOUT);
props.add(STATS_INTERVAL);
props.add(USE_TCP_NO_DELAY);
props.add(SSL_CONTEXT_SERVICE);
properties = Collections.unmodifiableList(props);
}

Expand Down Expand Up @@ -256,16 +254,17 @@ private ClientBuilder getClientBuilder(ConfigurationContext context) throws Unsu
.enableTcpNoDelay(context.getProperty(USE_TCP_NO_DELAY).asBoolean());

// Configure TLS
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);

if (sslContextService != null && sslContextService.isTrustStoreConfigured() && sslContextService.isKeyStoreConfigured()) {
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", sslContextService.getTrustStoreFile());
authParams.put("tlsKeyFile", sslContextService.getKeyStoreFile());

builder = builder.authentication(AuthenticationTls.class.getName(), authParams)
.tlsTrustCertsFilePath(sslContextService.getTrustStoreFile());
secure = true;
final PulsarClientAuthenticationService authenticationService =
context.getProperty(AUTHENTICATION_SERVICE)
.asControllerService(PulsarClientAuthenticationService.class);

if (authenticationService != null) {
builder = builder.authentication(authenticationService.getAuthentication());

if (StringUtils.isNotBlank(authenticationService.getTlsTrustCertsFilePath())) {
builder = builder.tlsTrustCertsFilePath(authenticationService.getTlsTrustCertsFilePath());
secure = true;
}
}

setPulsarBrokerRootURL(buildPulsarBrokerRootUrl(context.getProperty(PULSAR_SERVICE_URL).evaluateAttributeExpressions().getValue(), secure));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.pulsar.auth;

import java.io.File;

import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.pulsar.client.api.Authentication;

public abstract class AbstractPulsarClientAuntenticationService extends AbstractControllerService
implements PulsarClientAuthenticationService {

public static final PropertyDescriptor TRUST_CERTIFICATE = new PropertyDescriptor.Builder()
.name("Trusted Certificate Filename")
.description("The fully-qualified filename of the Trusted certificate.")
.defaultValue(null)
.addValidator(createFileExistsAndReadableValidator())
.sensitive(false)
.build();

protected ConfigurationContext configContext;

@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
configContext = context;
}

@Override
public String getTlsTrustCertsFilePath() {
return configContext.getProperty(TRUST_CERTIFICATE).getValue();
}

public abstract Authentication getAuthentication();

protected static Validator createFileExistsAndReadableValidator() {
return new Validator() {
// Not using the FILE_EXISTS_VALIDATOR because the default is to
// allow expression language
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
final File file = new File(input);
final boolean valid = file.exists() && file.canRead();
final String explanation = valid ? null : "File " + file + " does not exist or cannot be read";
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(valid)
.explanation(explanation)
.build();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.pulsar.auth;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;

// import org.apache.pulsar.client.impl.auth.AuthenticationAthenz;

/**
* https://pulsar.apache.org/docs/en/security-athenz/
*
*/
public class PulsarClientAthenzAuthenticationService extends AbstractPulsarClientAuntenticationService {

public static final PropertyDescriptor TENANT_DOMAIN = new PropertyDescriptor.Builder()
.name("The tenant domain name")
.description("The domain name for this tenant")
.defaultValue(null)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(false)
.build();

public static final PropertyDescriptor TENANT_SERVICE = new PropertyDescriptor.Builder()
.name("The tenant service name")
.description("The service name for this tenant")
.defaultValue(null)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(false)
.build();

public static final PropertyDescriptor PROVIDER_DOMAIN = new PropertyDescriptor.Builder()
.name("The provider domain")
.description("The provider domain name")
.defaultValue(null)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(false)
.build();

public static final PropertyDescriptor TENANT_PRIVATE_KEY_FILE = new PropertyDescriptor.Builder()
.name("Tenants Private Key Filename")
.description("The fully-qualified filename of the tenant's private key.")
.defaultValue(null)
.addValidator(createFileExistsAndReadableValidator())
.sensitive(false)
.build();

public static final PropertyDescriptor TENANT_PRIVATE_KEY_ID = new PropertyDescriptor.Builder()
.name("Tenants Private Key Id")
.description("The id of tenant's private key.")
.defaultValue("0")
.required(false)
.sensitive(false)
.build();

private static final List<PropertyDescriptor> properties;

static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(TRUST_CERTIFICATE);
props.add(TENANT_DOMAIN);
props.add(TENANT_SERVICE);
props.add(PROVIDER_DOMAIN);
props.add(TENANT_PRIVATE_KEY_FILE);
props.add(TENANT_PRIVATE_KEY_ID);
properties = Collections.unmodifiableList(props);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}

@Override
public Authentication getAuthentication() {
Map<String, String> authParams = new HashMap<>();
// TODO Define constants for these keys
authParams.put("tenantDomain", configContext.getProperty(TENANT_DOMAIN).getValue());
authParams.put("tenantService", configContext.getProperty(TENANT_SERVICE).getValue());
authParams.put("providerDomain", configContext.getProperty(PROVIDER_DOMAIN).getValue());
authParams.put("privateKey", configContext.getProperty(TENANT_PRIVATE_KEY_FILE).getValue());

if (configContext.getProperty(TENANT_PRIVATE_KEY_ID).isSet()) {
authParams.put("keyId", configContext.getProperty(TENANT_PRIVATE_KEY_ID).getValue());
}

// return AuthenticationFactory.create(AuthenticationAthenz.class.getName(), authParams);

return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.pulsar.auth;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;

/**
* http://pulsar.apache.org/docs/en/security-token-client/
*
*/
public class PulsarClientJwtAuthenticationService extends AbstractPulsarClientAuntenticationService {

public static final PropertyDescriptor JWT_TOKEN = new PropertyDescriptor.Builder()
.name("The JSON Web Token")
.description("The raw signed JWT string")
.defaultValue(null)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true)
.sensitive(true)
.build();

private static final List<PropertyDescriptor> properties;

static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(TRUST_CERTIFICATE);
props.add(JWT_TOKEN);
properties = Collections.unmodifiableList(props);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}

@Override
public Authentication getAuthentication() {
return AuthenticationFactory.token(configContext.getProperty(JWT_TOKEN).getValue());
}

}
Loading

0 comments on commit 25f736f

Please sign in to comment.