diff --git a/bitsail-connectors/connector-pulsar/pom.xml b/bitsail-connectors/connector-pulsar/pom.xml
new file mode 100644
index 000000000..dda61396c
--- /dev/null
+++ b/bitsail-connectors/connector-pulsar/pom.xml
@@ -0,0 +1,183 @@
+
+
+
+ bitsail-connectors
+ com.bytedance.bitsail
+ ${revision}
+
+ 4.0.0
+
+ connector-pulsar
+
+
+ 8
+ 8
+ 2.8.0
+
+
+ 0.6.1
+ 3.20.2
+ 3.11
+ 1.33.0
+ 3.17.3
+
+
+
+
+
+ com.bytedance.bitsail
+ connector-base
+ ${revision}
+
+
+ com.bytedance.bitsail
+ bitsail-common
+ ${revision}
+
+
+ org.apache.flink
+ flink-connector-base
+
+
+ io.streamnative.connectors
+ pulsar-flink-connector_${scala.binary.version}
+ ${pulsar-flink-connector.version}
+
+
+ com.bytedance.bitsail
+ bitsail-connector-test
+ ${revision}
+ test
+
+
+ org.apache.flink
+ flink-test-utils_${scala.binary.version}
+ test
+
+
+
+
+
+ com.google.protobuf
+ protobuf-java
+ ${protoc.version}
+
+
+ junit
+ junit
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ provided
+ true
+
+
+
+
+
+ org.assertj
+ assertj-core
+ ${assertj-core.version}
+ test
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ test
+ test-jar
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+ test
+ test-jar
+
+
+
+
+
+ org.apache.pulsar
+ testmocks
+ ${pulsar.version}
+ test
+
+
+ org.testng
+ testng
+
+
+ org.powermock
+ powermock-module-testng
+
+
+
+
+ org.apache.pulsar
+ pulsar-broker
+ ${pulsar.version}
+ test
+
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+ test
+
+
+
+
+
+
+ org.apache.pulsar
+ pulsar-client-all
+ ${pulsar.version}
+
+
+ org.apache.pulsar
+ pulsar-package-core
+
+
+
+
+ org.apache.flink
+ flink-annotations
+ ${flink.version}
+
+
+ com.bytedance.bitsail
+ bitsail-component-format-json
+ ${revision}
+
+
+ com.bytedance.bitsail
+ bitsail-connector-print
+ ${revision}
+ test
+
+
+
+
+
+
+
+ io.grpc
+ grpc-bom
+ ${grpc.version}
+ pom
+ import
+
+
+
+
+
\ No newline at end of file
diff --git a/bitsail-connectors/connector-pulsar/src/main/java/com/bytedance/bitsail/connector/pulsar/common/config/PulsarConfigUtils.java b/bitsail-connectors/connector-pulsar/src/main/java/com/bytedance/bitsail/connector/pulsar/common/config/PulsarConfigUtils.java
new file mode 100644
index 000000000..628780907
--- /dev/null
+++ b/bitsail-connectors/connector-pulsar/src/main/java/com/bytedance/bitsail/connector/pulsar/common/config/PulsarConfigUtils.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.pulsar.common.config;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProxyProtocol;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.function.Function.identity;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTO_CERT_REFRESH_TIME;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECT_TIMEOUT;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_BUSY_WAIT;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_KEEP_ALIVE_INTERVAL_SECONDS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_LISTENER_NAME;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_BACKOFF_INTERVAL_NANOS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REDIRECTS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REQUEST;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_IO_THREADS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_LISTENER_THREADS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_PROTOCOL;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_SERVICE_URL;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_READ_TIMEOUT;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_SSL_PROVIDER;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_ALLOW_INSECURE_CONNECTION;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_CIPHERS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_PROTOCOLS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PASSWORD;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PATH;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_KEY_STORE_TLS;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TCP_NO_DELAY;
+import static com.bytedance.bitsail.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.pulsar.client.api.SizeUnit.BYTES;
+
+/** The util for creating pulsar configuration class from flink's {@link Configuration}. */
+@Internal
+public final class PulsarConfigUtils {
+
+ private PulsarConfigUtils() {
+ // No need to create instance.
+ }
+
+ /** Create a PulsarClient by using the flink Configuration and the config customizer. */
+ public static PulsarClient createClient(Configuration configuration) {
+ ClientBuilder builder = PulsarClient.builder();
+
+ setOptionValue(configuration, PULSAR_SERVICE_URL, builder::serviceUrl);
+ setOptionValue(configuration, PULSAR_LISTENER_NAME, builder::listenerName);
+ builder.authentication(createAuthentication(configuration));
+ setOptionValue(
+ configuration,
+ PULSAR_OPERATION_TIMEOUT_MS,
+ timeout -> builder.operationTimeout(timeout, MILLISECONDS));
+ setOptionValue(configuration, PULSAR_NUM_IO_THREADS, builder::ioThreads);
+ setOptionValue(configuration, PULSAR_NUM_LISTENER_THREADS, builder::listenerThreads);
+ setOptionValue(configuration, PULSAR_CONNECTIONS_PER_BROKER, builder::connectionsPerBroker);
+ setOptionValue(configuration, PULSAR_USE_TCP_NO_DELAY, builder::enableTcpNoDelay);
+ setOptionValue(
+ configuration, PULSAR_TLS_TRUST_CERTS_FILE_PATH, builder::tlsTrustCertsFilePath);
+ setOptionValue(
+ configuration,
+ PULSAR_TLS_ALLOW_INSECURE_CONNECTION,
+ builder::allowTlsInsecureConnection);
+ setOptionValue(
+ configuration,
+ PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE,
+ builder::enableTlsHostnameVerification);
+ setOptionValue(configuration, PULSAR_USE_KEY_STORE_TLS, builder::useKeyStoreTls);
+ setOptionValue(configuration, PULSAR_SSL_PROVIDER, builder::sslProvider);
+ setOptionValue(configuration, PULSAR_TLS_TRUST_STORE_TYPE, builder::tlsTrustStoreType);
+ setOptionValue(configuration, PULSAR_TLS_TRUST_STORE_PATH, builder::tlsTrustStorePath);
+ setOptionValue(
+ configuration, PULSAR_TLS_TRUST_STORE_PASSWORD, builder::tlsTrustStorePassword);
+ setOptionValue(configuration, PULSAR_TLS_CIPHERS, TreeSet::new, builder::tlsCiphers);
+ setOptionValue(configuration, PULSAR_TLS_PROTOCOLS, TreeSet::new, builder::tlsProtocols);
+ setOptionValue(
+ configuration,
+ PULSAR_MEMORY_LIMIT_BYTES,
+ bytes -> builder.memoryLimit(bytes, BYTES));
+ setOptionValue(
+ configuration,
+ PULSAR_STATS_INTERVAL_SECONDS,
+ v -> builder.statsInterval(v, SECONDS));
+ setOptionValue(
+ configuration,
+ PULSAR_CONCURRENT_LOOKUP_REQUEST,
+ builder::maxConcurrentLookupRequests);
+ setOptionValue(configuration, PULSAR_MAX_LOOKUP_REQUEST, builder::maxLookupRequests);
+ setOptionValue(configuration, PULSAR_MAX_LOOKUP_REDIRECTS, builder::maxLookupRedirects);
+ setOptionValue(
+ configuration,
+ PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION,
+ builder::maxNumberOfRejectedRequestPerConnection);
+ setOptionValue(
+ configuration,
+ PULSAR_KEEP_ALIVE_INTERVAL_SECONDS,
+ v -> builder.keepAliveInterval(v, SECONDS));
+ setOptionValue(
+ configuration,
+ PULSAR_CONNECTION_TIMEOUT_MS,
+ v -> builder.connectionTimeout(v, MILLISECONDS));
+ setOptionValue(
+ configuration,
+ PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS,
+ v -> builder.startingBackoffInterval(v, NANOSECONDS));
+ setOptionValue(
+ configuration,
+ PULSAR_MAX_BACKOFF_INTERVAL_NANOS,
+ v -> builder.maxBackoffInterval(v, NANOSECONDS));
+ setOptionValue(configuration, PULSAR_ENABLE_BUSY_WAIT, builder::enableBusyWait);
+ if (configuration.contains(PULSAR_PROXY_SERVICE_URL)) {
+ String proxyServiceUrl = configuration.get(PULSAR_PROXY_SERVICE_URL);
+ ProxyProtocol proxyProtocol = configuration.get(PULSAR_PROXY_PROTOCOL);
+ builder.proxyServiceUrl(proxyServiceUrl, proxyProtocol);
+ }
+ setOptionValue(configuration, PULSAR_ENABLE_TRANSACTION, builder::enableTransaction);
+
+ return sneakyClient(builder::build);
+ }
+
+ /**
+ * PulsarAdmin shares almost the same configuration with PulsarClient, but we separate this
+ * create method for directly create it.
+ */
+ public static PulsarAdmin createAdmin(Configuration configuration) {
+ PulsarAdminBuilder builder = PulsarAdmin.builder();
+
+ setOptionValue(configuration, PULSAR_ADMIN_URL, builder::serviceHttpUrl);
+ builder.authentication(createAuthentication(configuration));
+ setOptionValue(
+ configuration, PULSAR_TLS_TRUST_CERTS_FILE_PATH, builder::tlsTrustCertsFilePath);
+ setOptionValue(
+ configuration,
+ PULSAR_TLS_ALLOW_INSECURE_CONNECTION,
+ builder::allowTlsInsecureConnection);
+ setOptionValue(
+ configuration,
+ PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE,
+ builder::enableTlsHostnameVerification);
+ setOptionValue(configuration, PULSAR_USE_KEY_STORE_TLS, builder::useKeyStoreTls);
+ setOptionValue(configuration, PULSAR_SSL_PROVIDER, builder::sslProvider);
+ setOptionValue(configuration, PULSAR_TLS_TRUST_STORE_TYPE, builder::tlsTrustStoreType);
+ setOptionValue(configuration, PULSAR_TLS_TRUST_STORE_PATH, builder::tlsTrustStorePath);
+ setOptionValue(
+ configuration, PULSAR_TLS_TRUST_STORE_PASSWORD, builder::tlsTrustStorePassword);
+ setOptionValue(configuration, PULSAR_TLS_CIPHERS, TreeSet::new, builder::tlsCiphers);
+ setOptionValue(configuration, PULSAR_TLS_PROTOCOLS, TreeSet::new, builder::tlsProtocols);
+ setOptionValue(
+ configuration,
+ PULSAR_CONNECT_TIMEOUT,
+ v -> builder.connectionTimeout(v, MILLISECONDS));
+ setOptionValue(
+ configuration, PULSAR_READ_TIMEOUT, v -> builder.readTimeout(v, MILLISECONDS));
+ setOptionValue(
+ configuration,
+ PULSAR_REQUEST_TIMEOUT,
+ v -> builder.requestTimeout(v, MILLISECONDS));
+ setOptionValue(
+ configuration,
+ PULSAR_AUTO_CERT_REFRESH_TIME,
+ v -> builder.autoCertRefreshTime(v, MILLISECONDS));
+
+ return sneakyClient(builder::build);
+ }
+
+ /**
+ * Create the {@link Authentication} instance for both {@code PulsarClient} and {@code
+ * PulsarAdmin}. If the user didn't provide configuration, a {@link AuthenticationDisabled}
+ * instance would be returned.
+ *
+ *
This method behavior is the same as the pulsar command line tools.
+ */
+ private static Authentication createAuthentication(Configuration configuration) {
+ if (configuration.contains(PULSAR_AUTH_PLUGIN_CLASS_NAME)) {
+ String authPluginClassName = configuration.get(PULSAR_AUTH_PLUGIN_CLASS_NAME);
+
+ if (configuration.contains(PULSAR_AUTH_PARAMS)) {
+ String authParamsString = configuration.get(PULSAR_AUTH_PARAMS);
+ return sneakyClient(
+ () -> AuthenticationFactory.create(authPluginClassName, authParamsString));
+ } else if (configuration.contains(PULSAR_AUTH_PARAM_MAP)) {
+ Map paramsMap = configuration.get(PULSAR_AUTH_PARAM_MAP);
+ return sneakyClient(
+ () -> AuthenticationFactory.create(authPluginClassName, paramsMap));
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "No %s or %s provided",
+ PULSAR_AUTH_PARAMS.key(), PULSAR_AUTH_PARAM_MAP.key()));
+ }
+ }
+
+ return AuthenticationDisabled.INSTANCE;
+ }
+
+ /** Get the option value str from given config, convert it into the real value instance. */
+ public static T getOptionValue(
+ Configuration configuration, ConfigOption option, Function convertor) {
+ F value = configuration.get(option);
+ if (value != null) {
+ return convertor.apply(value);
+ } else {
+ return null;
+ }
+ }
+
+ /** Set the config option's value to a given builder. */
+ public static void setOptionValue(
+ Configuration configuration, ConfigOption option, Consumer setter) {
+ setOptionValue(configuration, option, identity(), setter);
+ }
+
+ /**
+ * Query the config option's value, convert it into a required type, set it to a given builder.
+ */
+ public static void setOptionValue(
+ Configuration configuration,
+ ConfigOption option,
+ Function convertor,
+ Consumer setter) {
+ if (configuration.contains(option)) {
+ V value = getOptionValue(configuration, option, convertor);
+ setter.accept(value);
+ }
+ }
+}
diff --git a/bitsail-connectors/connector-pulsar/src/main/java/com/bytedance/bitsail/connector/pulsar/common/config/PulsarOptions.java b/bitsail-connectors/connector-pulsar/src/main/java/com/bytedance/bitsail/connector/pulsar/common/config/PulsarOptions.java
new file mode 100644
index 000000000..2ac17ea0e
--- /dev/null
+++ b/bitsail-connectors/connector-pulsar/src/main/java/com/bytedance/bitsail/connector/pulsar/common/config/PulsarOptions.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.pulsar.common.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.pulsar.client.api.ProxyProtocol;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.ADMIN_CONFIG_PREFIX;
+import static com.bytedance.bitsail.connector.pulsar.common.config.PulsarOptions.CLIENT_CONFIG_PREFIX;
+
+/**
+ * Configuration for Pulsar Client, these config options would be used for both source, sink and
+ * table.
+ */
+@PublicEvolving
+@ConfigGroups(
+ groups = {
+ @ConfigGroup(name = "PulsarClient", keyPrefix = CLIENT_CONFIG_PREFIX),
+ @ConfigGroup(name = "PulsarAdmin", keyPrefix = ADMIN_CONFIG_PREFIX)
+ })
+@SuppressWarnings("java:S1192")
+public final class PulsarOptions {
+
+ // Pulsar client API config prefix.
+ public static final String CLIENT_CONFIG_PREFIX = "pulsar.client.";
+ // Pulsar admin API config prefix.
+ public static final String ADMIN_CONFIG_PREFIX = "pulsar.admin.";
+
+ private PulsarOptions() {
+ // This is a constant class
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////
+ //
+ // The configuration for ClientConfigurationData part.
+ // All the configuration listed below should have the pulsar.client prefix.
+ //
+ ///////////////////////////////////////////////////////////////////////////////
+
+ public static final ConfigOption PULSAR_SERVICE_URL =
+ ConfigOptions.key(CLIENT_CONFIG_PREFIX + "serviceUrl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text("Service URL provider for Pulsar service.")
+ .linebreak()
+ .text(
+ "To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.")
+ .linebreak()
+ .text(
+ "You can assign Pulsar protocol URLs to specific clusters and use the %s scheme.",
+ code("pulsar"))
+ .linebreak()
+ .list(
+ text(
+ "This is an example of %s: %s.",
+ code("localhost"),
+ code("pulsar://localhost:6650")),
+ text(
+ "If you have multiple brokers, the URL is as: %s",
+ code(
+ "pulsar://localhost:6550,localhost:6651,localhost:6652")),
+ text(
+ "A URL for a production Pulsar cluster is as: %s",
+ code(
+ "pulsar://pulsar.us-west.example.com:6650")),
+ text(
+ "If you use TLS authentication, the URL is as %s",
+ code(
+ "pulsar+ssl://pulsar.us-west.example.com:6651")))
+ .build());
+
+ public static final ConfigOption PULSAR_AUTH_PLUGIN_CLASS_NAME =
+ ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authPluginClassName")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Name of the authentication plugin.");
+
+ public static final ConfigOption PULSAR_AUTH_PARAMS =
+ ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authParams")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text("Parameters for the authentication plugin.")
+ .linebreak()
+ .linebreak()
+ .text("Example:")
+ .linebreak()
+ .add(code("key1:val1,key2:val2"))
+ .build());
+
+ public static final ConfigOption