Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
add newSqlClientConf method for DynamicTableSink
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Dec 1, 2021
1 parent fbb8d17 commit e9bd613
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@

package org.apache.flink.streaming.connectors.pulsar.internal;

import org.apache.flink.shaded.curator4.com.google.common.collect.Maps;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

/** Utility to create Pulsar Admin Client from adminUrl and clientConfigurationData. */
public class PulsarClientUtils {
Expand Down Expand Up @@ -65,4 +70,27 @@ public static ClientConfigurationData newClientConf(String serviceUrl, Propertie
}
return clientConf;
}

public static ClientConfigurationData newSqlClientConf(String serviceUrl,
Properties properties) {
Map<String, Object> clientConfData = getClientParams(Maps.fromProperties(properties));
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf =
ConfigurationDataUtils.loadData(
clientConfData, clientConf, ClientConfigurationData.class);
clientConf.setServiceUrl(serviceUrl);
return clientConf;
}

public static Map<String, Object> getClientParams(Map<String, String> parameters) {
return parameters.keySet().stream()
.filter(k -> k.startsWith(PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX))
.collect(
Collectors.toMap(
k ->
k.substring(
PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX
.length()),
k -> parameters.get(k)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private SinkFunction<RowData> createPulsarSink(
Properties properties,
PulsarSerializationSchema<RowData> pulsarSerializer) {
final ClientConfigurationData configurationData =
PulsarClientUtils.newClientConf(serviceUrl, properties);
PulsarClientUtils.newSqlClientConf(serviceUrl, properties);
return new FlinkPulsarSink<RowData>(
adminUrl,
Optional.ofNullable(topic),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
createPulsarDeserialization(
keyDeserialization, valueDeserialization, producedTypeInfo);
final ClientConfigurationData clientConfigurationData =
PulsarClientUtils.newClientConf(serviceUrl, properties);
PulsarClientUtils.newSqlClientConf(serviceUrl, properties);
FlinkPulsarSource<RowData> source =
new FlinkPulsarSource<>(
adminUrl, clientConfigurationData, deserializationSchema, properties);
Expand Down

0 comments on commit e9bd613

Please sign in to comment.