From 55fb56ba123dbe43dfd7b64ca4b5470346e4caf3 Mon Sep 17 00:00:00 2001 From: tiezhu Date: Mon, 11 Apr 2022 10:57:46 +0800 Subject: [PATCH] Cherry-pick commits from DTStack. --- .../table/CassandraDynamicTableFactory.java | 6 +- .../hbase14/sink/HBaseOutputFormat.java | 16 +- .../hbase14/source/HBaseInputFormat.java | 43 +---- .../connector/hbase14/util/HBaseHelper.java | 53 +----- .../hdfs/sink/BaseHdfsOutputFormat.java | 35 +--- .../hdfs/sink/HdfsParquetOutputFormat.java | 22 +-- .../hdfs/source/BaseHdfsInputFormat.java | 8 +- .../hdfs/source/HdfsParquetInputFormat.java | 3 +- .../connector/hive/sink/HiveOutputFormat.java | 11 +- .../connector/hive/util/HiveDbUtil.java | 10 +- .../flinkx/connector/hive/util/HiveUtil.java | 10 +- ...java => InceptorHdfsRawTypeConverter.java} | 2 +- ...ter.java => InceptorHdfsRowConverter.java} | 4 +- .../InceptorHyberbaseRawTypeConvert.java | 77 ++++++++ .../InceptorHyberbaseRowConvert.java | 132 +++++++++++++ .../InceptorSearchRawTypeConverter.java | 57 ++++++ .../converter/InceptorSearchRowConverter.java | 123 ++++++++++++ .../inceptor/dialect/InceptorDialect.java | 76 +++----- .../inceptor/dialect/InceptorHdfsDialect.java | 97 ++++++++++ .../dialect/InceptorHyperbaseDialect.java | 69 +++++++ .../dialect/InceptorSearchDialect.java | 68 +++++++ .../sink/InceptorDynamicTableSink.java | 4 +- ...mat.java => InceptorHdfsOutputFormat.java} | 38 +++- ...a => InceptorHdfsOutputFormatBuilder.java} | 15 +- .../InceptorHyperbaseOutputFormatBuilder.java | 43 +++++ .../InceptorSearchOutputFormatBuilder.java | 43 +++++ .../inceptor/sink/InceptorSinkFactory.java | 50 +++++ ...va => InceptorHdfsInputFormatBuilder.java} | 6 +- .../source/InceptorHyperbaseInputFormat.java | 83 ++++++++ .../InceptorHyperbaseInputFormatBuilder.java | 91 +++++++++ .../source/InceptorSearchInputFormat.java | 84 +++++++++ .../InceptorSearchInputFormatBuilder.java | 91 +++++++++ .../source/InceptorSourceFactory.java | 52 +++++ .../table/InceptorDynamicTableFactory.java | 62 ++++-- .../connector/jdbc/sink/JdbcSinkFactory.java | 13 +- .../jdbc/table/JdbcDynamicTableFactory.java | 16 +- .../table/OracleDynamicTableFactory.java | 19 -- .../CloudSolrClientKerberosWrapper.java | 7 +- .../connector/solr/sink/SolrOutputFormat.java | 2 +- .../solr/source/SolrInputFormat.java | 15 +- .../table/SqlserverDynamicTableFactory.java | 2 +- .../dtstack/flinkx/security/KerberosUtil.java | 177 +----------------- .../dtstack/flinkx/sink/DirtyDataManager.java | 2 +- .../dtstack/flinkx/util/FileSystemUtil.java | 22 +-- 44 files changed, 1343 insertions(+), 516 deletions(-) rename flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/{InceptorRawTypeConverter.java => InceptorHdfsRawTypeConverter.java} (98%) rename flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/{InceptorRowConverter.java => InceptorHdfsRowConverter.java} (98%) create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHyberbaseRawTypeConvert.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHyberbaseRowConvert.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorSearchRawTypeConverter.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorSearchRowConverter.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorHdfsDialect.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorHyperbaseDialect.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorSearchDialect.java rename flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/{InceptorOutputFormat.java => InceptorHdfsOutputFormat.java} (86%) rename flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/{InceptorOutputFormatBuilder.java => InceptorHdfsOutputFormatBuilder.java} (78%) create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHyperbaseOutputFormatBuilder.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorSearchOutputFormatBuilder.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorSinkFactory.java rename flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/{InceptorInputFormatBuilder.java => InceptorHdfsInputFormatBuilder.java} (94%) create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHyperbaseInputFormat.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHyperbaseInputFormatBuilder.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSearchInputFormat.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSearchInputFormatBuilder.java create mode 100644 flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSourceFactory.java diff --git a/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/table/CassandraDynamicTableFactory.java b/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/table/CassandraDynamicTableFactory.java index f89bfb9ad6..9b1659cca7 100644 --- a/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/table/CassandraDynamicTableFactory.java +++ b/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/table/CassandraDynamicTableFactory.java @@ -64,9 +64,6 @@ import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_FETCH_SIZE; import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_MAX_RETRIES; import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_PARALLELISM; -import static com.dtstack.flinkx.sink.options.SinkOptions.SINK_BUFFER_FLUSH_INTERVAL; -import static com.dtstack.flinkx.sink.options.SinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS; -import static com.dtstack.flinkx.sink.options.SinkOptions.SINK_MAX_RETRIES; import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_FETCH_SIZE; import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_INCREMENT_COLUMN; import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_INCREMENT_COLUMN_TYPE; @@ -77,6 +74,9 @@ import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_RESTORE_COLUMNNAME; import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_RESTORE_COLUMNTYPE; import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_START_LOCATION; +import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static com.dtstack.flinkx.table.options.SinkOptions.SINK_MAX_RETRIES; /** * @author tiezhu diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/sink/HBaseOutputFormat.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/sink/HBaseOutputFormat.java index a792306206..87e7b312d3 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/sink/HBaseOutputFormat.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/sink/HBaseOutputFormat.java @@ -22,7 +22,6 @@ import com.dtstack.flinkx.connector.hbase14.converter.DataSyncSinkConverter; import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils; import com.dtstack.flinkx.connector.hbase14.util.HBaseHelper; -import com.dtstack.flinkx.security.KerberosUtil; import com.dtstack.flinkx.sink.format.BaseRichOutputFormat; import com.dtstack.flinkx.throwable.WriteRecordException; @@ -47,8 +46,6 @@ import java.util.Map; import java.util.Objects; -import static org.apache.zookeeper.client.ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY; - /** * The Hbase Implementation of OutputFormat * @@ -111,18 +108,7 @@ protected void writeSingleRecordInternal(RowData rawRecord) throws WriteRecordEx public void openInternal(int taskNumber, int numTasks) throws IOException { boolean openKerberos = HBaseConfigUtils.isEnableKerberos(hbaseConfig); if (openKerberos) { - // TDH环境并且zk开启了kerberos需要设置zk的环境变量 - if (HBaseHelper.openKerberosForZk(hbaseConfig)) { - String keytabFile = - HBaseHelper.getKeyTabFileName( - hbaseConfig, getRuntimeContext().getDistributedCache(), jobId); - String principal = KerberosUtil.getPrincipal(hbaseConfig, keytabFile); - String client = System.getProperty(LOGIN_CONTEXT_NAME_KEY, "Client"); - KerberosUtil.appendOrUpdateJaasConf(client, keytabFile, principal); - } - UserGroupInformation ugi = - HBaseHelper.getUgi( - hbaseConfig, getRuntimeContext().getDistributedCache(), jobId); + UserGroupInformation ugi = HBaseHelper.getUgi(hbaseConfig); ugi.doAs( (PrivilegedAction) () -> { diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/source/HBaseInputFormat.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/source/HBaseInputFormat.java index ffada29343..d0161d1496 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/source/HBaseInputFormat.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/source/HBaseInputFormat.java @@ -20,11 +20,8 @@ import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils; import com.dtstack.flinkx.connector.hbase14.util.HBaseHelper; -import com.dtstack.flinkx.security.KerberosUtil; import com.dtstack.flinkx.source.format.BaseRichInputFormat; -import com.dtstack.flinkx.util.PluginUtil; -import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.core.io.InputSplit; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -55,8 +52,6 @@ import java.util.Locale; import java.util.Map; -import static org.apache.zookeeper.client.ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY; - /** * The InputFormat Implementation used for HbaseReader * @@ -96,31 +91,17 @@ public void openInputFormat() throws IOException { LOG.info("HbaseOutputFormat openInputFormat start"); nameMaps = Maps.newConcurrentMap(); - // tdh环境中 hbase关联的zk开启kerberos 需要添加zk的jaas文件配置 - if (HBaseHelper.openKerberosForZk(hbaseConfig)) { - setZkJaasConfiguration(getRuntimeContext().getDistributedCache()); - } - connection = - HBaseHelper.getHbaseConnection( - hbaseConfig, getRuntimeContext().getDistributedCache(), jobId); + + connection = HBaseHelper.getHbaseConnection(hbaseConfig); LOG.info("HbaseOutputFormat openInputFormat end"); } @Override public InputSplit[] createInputSplitsInternal(int minNumSplits) throws IOException { - DistributedCache distributedCache = - PluginUtil.createDistributedCacheFromContextClassLoader(); - // tdh环境中 hbase关联的zk开启kerberos 需要添加zk的jaas文件配置 - if (HBaseHelper.openKerberosForZk(hbaseConfig)) { - setZkJaasConfiguration(getRuntimeContext().getDistributedCache()); - } - try (Connection connection = - HBaseHelper.getHbaseConnection(hbaseConfig, distributedCache, jobId)) { + try (Connection connection = HBaseHelper.getHbaseConnection(hbaseConfig)) { if (HBaseConfigUtils.isEnableKerberos(hbaseConfig)) { - UserGroupInformation ugi = - HBaseHelper.getUgi( - hbaseConfig, getRuntimeContext().getDistributedCache(), jobId); + UserGroupInformation ugi = HBaseHelper.getUgi(hbaseConfig); return ugi.doAs( (PrivilegedAction) () -> @@ -261,16 +242,10 @@ public void openInternal(InputSplit inputSplit) throws IOException { byte[] stopRow = Bytes.toBytesBinary(hbaseInputSplit.getEndKey()); if (null == connection || connection.isClosed()) { - connection = - HBaseHelper.getHbaseConnection( - hbaseConfig, getRuntimeContext().getDistributedCache(), jobId); + connection = HBaseHelper.getHbaseConnection(hbaseConfig); } openKerberos = HBaseConfigUtils.isEnableKerberos(hbaseConfig); - // tdh环境中 hbase关联的zk开启kerberos 需要添加zk的jaas文件配置 - if (HBaseHelper.openKerberosForZk(hbaseConfig)) { - setZkJaasConfiguration(getRuntimeContext().getDistributedCache()); - } table = connection.getTable(TableName.valueOf(tableName)); scan = new Scan(); @@ -405,12 +380,4 @@ public Object convertBytesToAssignType(String columnType, byte[] byteArray, Stri } return column; } - - // 设置zk的jaas配置 - private void setZkJaasConfiguration(DistributedCache distributedCache) { - String keytabFile = HBaseHelper.getKeyTabFileName(hbaseConfig, distributedCache, jobId); - String principal = KerberosUtil.getPrincipal(hbaseConfig, keytabFile); - String client = System.getProperty(LOGIN_CONTEXT_NAME_KEY, "Client"); - KerberosUtil.appendOrUpdateJaasConf(client, keytabFile, principal); - } } diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseHelper.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseHelper.java index 31914cfa71..59dbe4e7ff 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseHelper.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseHelper.java @@ -19,9 +19,6 @@ package com.dtstack.flinkx.connector.hbase14.util; import com.dtstack.flinkx.security.KerberosUtil; -import com.dtstack.flinkx.util.FileSystemUtil; - -import org.apache.flink.api.common.cache.DistributedCache; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -44,6 +41,7 @@ import java.security.PrivilegedAction; import java.util.Map; +import static com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF; import static com.dtstack.flinkx.security.KerberosUtil.KRB_STR; /** @@ -60,12 +58,11 @@ public class HBaseHelper { private static final String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; private static final String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; - public static Connection getHbaseConnection( - Map hbaseConfigMap, DistributedCache distributedCache, String jobId) { + public static Connection getHbaseConnection(Map hbaseConfigMap) { Validate.isTrue(MapUtils.isNotEmpty(hbaseConfigMap), "hbaseConfig不能为空Map结构!"); if (HBaseConfigUtils.isEnableKerberos(hbaseConfigMap)) { - return getConnectionWithKerberos(hbaseConfigMap, distributedCache, jobId); + return getConnectionWithKerberos(hbaseConfigMap); } try { @@ -77,11 +74,10 @@ public static Connection getHbaseConnection( } } - private static Connection getConnectionWithKerberos( - Map hbaseConfigMap, DistributedCache distributedCache, String jobId) { + private static Connection getConnectionWithKerberos(Map hbaseConfigMap) { try { setKerberosConf(hbaseConfigMap); - UserGroupInformation ugi = getUgi(hbaseConfigMap, distributedCache, jobId); + UserGroupInformation ugi = getUgi(hbaseConfigMap); return ugi.doAs( (PrivilegedAction) () -> { @@ -98,19 +94,17 @@ private static Connection getConnectionWithKerberos( } } - public static UserGroupInformation getUgi( - Map hbaseConfigMap, DistributedCache distributedCache, String jobId) + public static UserGroupInformation getUgi(Map hbaseConfigMap) throws IOException { String keytabFileName = KerberosUtil.getPrincipalFileName(hbaseConfigMap); - keytabFileName = - KerberosUtil.loadFile(hbaseConfigMap, keytabFileName, distributedCache, jobId); + keytabFileName = KerberosUtil.loadFile(hbaseConfigMap, keytabFileName); String principal = KerberosUtil.getPrincipal(hbaseConfigMap, keytabFileName); - KerberosUtil.loadKrb5Conf(hbaseConfigMap, distributedCache, jobId); + KerberosUtil.loadKrb5Conf(hbaseConfigMap); KerberosUtil.refreshConfig(); - Configuration conf = FileSystemUtil.getConfiguration(hbaseConfigMap, null); - return KerberosUtil.loginAndReturnUgi(conf, principal, keytabFileName); + return KerberosUtil.loginAndReturnUgi( + principal, keytabFileName, System.getProperty(KEY_JAVA_SECURITY_KRB5_CONF)); } public static Configuration getConfig(Map hbaseConfigMap) { @@ -135,33 +129,6 @@ public static void setKerberosConf(Map hbaseConfigMap) { hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTH_ENABLE, true); } - /** - * 获取hbase关联的zk是否也开启了kerberos - * - * @param hbaseConfigMap - * @return - */ - public static boolean openKerberosForZk(Map hbaseConfigMap) { - String openKerberos = - MapUtils.getString(hbaseConfigMap, "zookeeper.security.authentication", "default"); - return "kerberos".equalsIgnoreCase(openKerberos); - } - - /** - * 获取keyTab文件的本地路径 - * - * @param hbaseConfigMap - * @param distributedCache - * @param jobId - * @return - */ - public static String getKeyTabFileName( - Map hbaseConfigMap, DistributedCache distributedCache, String jobId) { - String keytabFileName = KerberosUtil.getPrincipalFileName(hbaseConfigMap); - return KerberosUtil.getLocalFileName( - hbaseConfigMap, keytabFileName, distributedCache, jobId); - } - public static RegionLocator getRegionLocator(Connection hConnection, String userTable) { TableName hTableName = TableName.valueOf(userTable); Admin admin = null; diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/BaseHdfsOutputFormat.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/BaseHdfsOutputFormat.java index 0f12e6a2cd..2d12cd1560 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/BaseHdfsOutputFormat.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/BaseHdfsOutputFormat.java @@ -145,10 +145,7 @@ protected void openSource() { try { fs = FileSystemUtil.getFileSystem( - hdfsConf.getHadoopConfig(), - hdfsConf.getDefaultFS(), - distributedCache, - jobId); + hdfsConf.getHadoopConfig(), hdfsConf.getDefaultFS(), distributedCache); } catch (Exception e) { throw new FlinkxRuntimeException("can't init fileSystem", e); } @@ -184,12 +181,10 @@ protected List copyTmpDataFileToDir() { try { FileStatus[] dataFiles = fs.listStatus(tmpDir, pathFilter); for (FileStatus dataFile : dataFiles) { - if (!filterFile(dataFile)) { - currentFilePath = dataFile.getPath().getName(); - FileUtil.copy(fs, dataFile.getPath(), fs, dir, false, conf); - copyList.add(currentFilePath); - LOG.info("copy temp file:{} to dir:{}", currentFilePath, dir); - } + currentFilePath = dataFile.getPath().getName(); + FileUtil.copy(fs, dataFile.getPath(), fs, dir, false, conf); + copyList.add(currentFilePath); + LOG.info("copy temp file:{} to dir:{}", currentFilePath, dir); } } catch (Exception e) { throw new FlinkxRuntimeException( @@ -229,11 +224,9 @@ protected void moveAllTmpDataFileToDir() { FileStatus[] dataFiles = fs.listStatus(tmpDir); for (FileStatus dataFile : dataFiles) { - if (!filterFile(dataFile)) { - currentFilePath = dataFile.getPath().getName(); - fs.rename(dataFile.getPath(), dir); - LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir); - } + currentFilePath = dataFile.getPath().getName(); + fs.rename(dataFile.getPath(), dir); + LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir); } fs.delete(tmpDir, true); } catch (IOException e) { @@ -294,16 +287,4 @@ public HdfsConf getHdfsConf() { public void setHdfsConf(HdfsConf hdfsConf) { this.hdfsConf = hdfsConf; } - - /** filter file when move file to dataPath* */ - protected boolean filterFile(FileStatus fileStatus) { - if (fileStatus.getLen() == 0) { - LOG.warn( - "file {} has filter,because file len [{}] is 0 ", - fileStatus.getPath(), - fileStatus.getLen()); - return true; - } - return false; - } } diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/HdfsParquetOutputFormat.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/HdfsParquetOutputFormat.java index 127aa850dd..5dea95429d 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/HdfsParquetOutputFormat.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/HdfsParquetOutputFormat.java @@ -33,7 +33,6 @@ import org.apache.flink.table.data.RowData; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.parquet.column.ParquetProperties; @@ -51,7 +50,6 @@ import java.io.File; import java.io.IOException; -import java.nio.charset.Charset; import java.security.PrivilegedAction; import java.util.HashMap; import java.util.List; @@ -71,9 +69,6 @@ public class HdfsParquetOutputFormat extends BaseHdfsOutputFormat { private ParquetWriter writer; private MessageType schema; - public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); - public static int PARQUET_FOOTER_LENGTH_SIZE = 4; - @Override protected void openSource() { super.openSource(); @@ -135,8 +130,7 @@ protected void nextBlock() { FileSystemUtil.getUGI( hdfsConf.getHadoopConfig(), hdfsConf.getDefaultFS(), - getRuntimeContext().getDistributedCache(), - jobId); + getRuntimeContext().getDistributedCache()); ugi.doAs( (PrivilegedAction) () -> { @@ -297,18 +291,4 @@ private MessageType buildSchema() { return typeBuilder.named("Pair"); } - - @Override - protected boolean filterFile(FileStatus fileStatus) { - if (fileStatus.getLen() - < (long) (MAGIC.length + PARQUET_FOOTER_LENGTH_SIZE + MAGIC.length)) { - LOG.warn( - "file {} has filter,because file len [{}] less than [{}] ", - fileStatus.getPath(), - fileStatus.getLen(), - (long) (MAGIC.length + PARQUET_FOOTER_LENGTH_SIZE + MAGIC.length)); - return true; - } - return false; - } } diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/BaseHdfsInputFormat.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/BaseHdfsInputFormat.java index f5f0bd1aab..1b3779d585 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/BaseHdfsInputFormat.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/BaseHdfsInputFormat.java @@ -70,10 +70,7 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) throws IOExcepti PluginUtil.createDistributedCacheFromContextClassLoader(); UserGroupInformation ugi = FileSystemUtil.getUGI( - hdfsConf.getHadoopConfig(), - hdfsConf.getDefaultFS(), - distributedCache, - jobId); + hdfsConf.getHadoopConfig(), hdfsConf.getDefaultFS(), distributedCache); LOG.info("user:{}, ", ugi.getShortUserName()); return ugi.doAs( (PrivilegedAction) @@ -101,8 +98,7 @@ public void openInputFormat() throws IOException { FileSystemUtil.getUGI( hdfsConf.getHadoopConfig(), hdfsConf.getDefaultFS(), - getRuntimeContext().getDistributedCache(), - jobId); + getRuntimeContext().getDistributedCache()); } } diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsParquetInputFormat.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsParquetInputFormat.java index f0af983a5c..c86076ec62 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsParquetInputFormat.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsParquetInputFormat.java @@ -104,8 +104,7 @@ public InputSplit[] createHdfsSplit(int minNumSplits) { FileSystemUtil.getFileSystem( hdfsConf.getHadoopConfig(), hdfsConf.getDefaultFS(), - PluginUtil.createDistributedCacheFromContextClassLoader(), - jobId)) { + PluginUtil.createDistributedCacheFromContextClassLoader())) { allFilePaths = getAllPartitionPath(hdfsConf.getPath(), fs, pathFilter); } catch (Exception e) { throw new FlinkxRuntimeException(e); diff --git a/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/sink/HiveOutputFormat.java b/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/sink/HiveOutputFormat.java index 88de7219b3..54b6d11c1c 100644 --- a/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/sink/HiveOutputFormat.java +++ b/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/sink/HiveOutputFormat.java @@ -37,7 +37,6 @@ import com.dtstack.flinkx.restore.FormatState; import com.dtstack.flinkx.sink.format.BaseRichOutputFormat; import com.dtstack.flinkx.throwable.FlinkxRuntimeException; -import com.dtstack.flinkx.throwable.NoRestartException; import com.dtstack.flinkx.util.GsonUtil; import com.dtstack.flinkx.util.JsonUtil; @@ -183,10 +182,6 @@ public synchronized void writeRecord(RowData rowData) { hdfsOutputFormat.writeRecord(forwardRowData); } catch (Exception e) { - // 如果捕获到NoRestartException异常(脏数据管理抛出的异常) 直接抛出 而不是ignore - if (e instanceof NoRestartException) { - throw e; - } // 写入产生的脏数据已经由hdfsOutputFormat处理了,这里不用再处理了,只打印日志 if (numWriteCounter.getLocalValue() % LOG_PRINT_INTERNAL == 0) { LOG.warn("write hdfs exception:", e); @@ -278,8 +273,7 @@ private Pair getHdfsOutputFormat( hiveConf.getSchema(), partitionPath, connectionInfo, - getRuntimeContext().getDistributedCache(), - jobId); + getRuntimeContext().getDistributedCache()); String path = tableInfo.getPath() + File.separatorChar + partitionPath; outputFormat = @@ -390,8 +384,7 @@ private TableInfo checkCreateTable( tableInfo, hiveConf.getSchema(), connectionInfo, - getRuntimeContext().getDistributedCache(), - jobId); + getRuntimeContext().getDistributedCache()); tableCacheMap.put(tablePath, tableInfo); } return tableInfo; diff --git a/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/util/HiveDbUtil.java b/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/util/HiveDbUtil.java index c265b717cc..972db695d3 100644 --- a/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/util/HiveDbUtil.java +++ b/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/util/HiveDbUtil.java @@ -80,9 +80,9 @@ public class HiveDbUtil { private HiveDbUtil() {} public static Connection getConnection( - ConnectionInfo connectionInfo, DistributedCache distributedCache, String jobId) { + ConnectionInfo connectionInfo, DistributedCache distributedCache) { if (openKerberos(connectionInfo.getJdbcUrl())) { - return getConnectionWithKerberos(connectionInfo, distributedCache, jobId); + return getConnectionWithKerberos(connectionInfo, distributedCache); } else { return getConnectionWithRetry(connectionInfo); } @@ -109,7 +109,7 @@ public Connection call() throws Exception { } private static Connection getConnectionWithKerberos( - ConnectionInfo connectionInfo, DistributedCache distributedCache, String jobId) { + ConnectionInfo connectionInfo, DistributedCache distributedCache) { if (connectionInfo.getHiveConf() == null || connectionInfo.getHiveConf().isEmpty()) { throw new IllegalArgumentException("hiveConf can not be null or empty"); } @@ -118,9 +118,9 @@ private static Connection getConnectionWithKerberos( keytabFileName = KerberosUtil.loadFile( - connectionInfo.getHiveConf(), keytabFileName, distributedCache, jobId); + connectionInfo.getHiveConf(), keytabFileName, distributedCache); String principal = KerberosUtil.getPrincipal(connectionInfo.getHiveConf(), keytabFileName); - KerberosUtil.loadKrb5Conf(connectionInfo.getHiveConf(), distributedCache, jobId); + KerberosUtil.loadKrb5Conf(connectionInfo.getHiveConf(), distributedCache); KerberosUtil.refreshConfig(); Configuration conf = FileSystemUtil.getConfiguration(connectionInfo.getHiveConf(), null); diff --git a/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/util/HiveUtil.java b/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/util/HiveUtil.java index 20373fc53e..c2a8625e5f 100644 --- a/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/util/HiveUtil.java +++ b/flinkx-connectors/flinkx-connector-hive/src/main/java/com/dtstack/flinkx/connector/hive/util/HiveUtil.java @@ -77,11 +77,10 @@ public static void createHiveTableWithTableInfo( TableInfo tableInfo, String schema, ConnectionInfo connectionInfo, - DistributedCache distributedCache, - String jobId) { + DistributedCache distributedCache) { Connection connection = null; try { - connection = HiveDbUtil.getConnection(connectionInfo, distributedCache, jobId); + connection = HiveDbUtil.getConnection(connectionInfo, distributedCache); if (StringUtils.isNotBlank(schema)) { HiveDbUtil.executeSqlWithoutResultSet(connectionInfo, connection, "use " + schema); } @@ -101,11 +100,10 @@ public static void createPartition( String schema, String partition, ConnectionInfo connectionInfo, - DistributedCache distributedCache, - String jobId) { + DistributedCache distributedCache) { Connection connection = null; try { - connection = HiveDbUtil.getConnection(connectionInfo, distributedCache, jobId); + connection = HiveDbUtil.getConnection(connectionInfo, distributedCache); if (StringUtils.isNotBlank(schema)) { HiveDbUtil.executeSqlWithoutResultSet(connectionInfo, connection, "use " + schema); } diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorRawTypeConverter.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHdfsRawTypeConverter.java similarity index 98% rename from flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorRawTypeConverter.java rename to flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHdfsRawTypeConverter.java index 35de468c7e..1b23f16bec 100644 --- a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorRawTypeConverter.java +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHdfsRawTypeConverter.java @@ -25,7 +25,7 @@ import java.util.Locale; -public class InceptorRawTypeConverter { +public class InceptorHdfsRawTypeConverter { public static DataType apply(String type) { type = type.toUpperCase(Locale.ENGLISH); diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorRowConverter.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHdfsRowConverter.java similarity index 98% rename from flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorRowConverter.java rename to flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHdfsRowConverter.java index 6e66e2caaf..61e8cab7c8 100644 --- a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorRowConverter.java +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHdfsRowConverter.java @@ -40,11 +40,11 @@ import java.time.LocalTime; /** @author dujie */ -public class InceptorRowConverter extends JdbcRowConverter { +public class InceptorHdfsRowConverter extends JdbcRowConverter { private static final long serialVersionUID = 1L; - public InceptorRowConverter(RowType rowType) { + public InceptorHdfsRowConverter(RowType rowType) { super(rowType); } diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHyberbaseRawTypeConvert.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHyberbaseRawTypeConvert.java new file mode 100644 index 0000000000..0ab17fbd87 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHyberbaseRawTypeConvert.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.converter; + +import com.dtstack.flinkx.constants.ConstantValue; +import com.dtstack.flinkx.throwable.UnsupportedTypeException; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +import java.util.Locale; + +/** @author liuliu 2022/2/25 */ +public class InceptorHyberbaseRawTypeConvert { + + public static DataType apply(String type) { + type = type.toUpperCase(Locale.ENGLISH); + int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); + int right = type.indexOf(ConstantValue.RIGHT_PARENTHESIS_SYMBOL); + String leftStr = type; + String rightStr = null; + if (left > 0 && right > 0) { + leftStr = type.substring(0, left); + rightStr = type.substring(left + 1, type.length() - 1); + } + switch (leftStr) { + case "VARCHAR": + case "STRING": + return DataTypes.STRING(); + case "INT": + return DataTypes.INT(); + case "BOOLEAN": + return DataTypes.BOOLEAN(); + case "TINYINT": + return DataTypes.TINYINT(); + case "SMALLINT": + return DataTypes.SMALLINT(); + case "BIGINT": + return DataTypes.BIGINT(); + case "FLOAT": + case "DOUBLE": + return DataTypes.DOUBLE(); + case "DECIMAL": + if (rightStr != null) { + String[] split = rightStr.split(ConstantValue.COMMA_SYMBOL); + if (split.length == 2) { + return DataTypes.DECIMAL( + Integer.parseInt(split[0].trim()), + Integer.parseInt(split[1].trim())); + } + } + return DataTypes.DECIMAL(10, 2); + case "DATE": + return DataTypes.DATE(); + case "TIMESTAMP": + return DataTypes.TIMESTAMP(9); + default: + throw new UnsupportedTypeException(type); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHyberbaseRowConvert.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHyberbaseRowConvert.java new file mode 100644 index 0000000000..f0874eb7ee --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorHyberbaseRowConvert.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.converter; + +import com.dtstack.flinkx.connector.jdbc.converter.JdbcRowConverter; +import com.dtstack.flinkx.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.flinkx.converter.IDeserializationConverter; +import com.dtstack.flinkx.converter.ISerializationConverter; +import com.dtstack.flinkx.element.ColumnRowData; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.hive.common.type.HiveDate; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.Calendar; + +import static com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil.LOCAL_TIMEZONE; + +/** @author liuliu 2022/2/25 */ +public class InceptorHyberbaseRowConvert extends JdbcRowConverter { + + public InceptorHyberbaseRowConvert(RowType rowType) { + super(rowType); + } + + @Override + protected IDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return val -> null; + case VARCHAR: + return val -> StringData.fromString(val.toString()); + case INTEGER: + case BOOLEAN: + case BIGINT: + case DOUBLE: + return val -> val; + case SMALLINT: + return val -> ((Integer) val).shortValue(); + case TINYINT: + return val -> ((Integer) val).byteValue(); + case DECIMAL: + final int precision = ((DecimalType) type).getPrecision(); + final int scale = ((DecimalType) type).getScale(); + return val -> DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); + case DATE: + return val -> { + HiveDate hiveDate = (HiveDate) val; + long time = hiveDate.getTime(); + Calendar c = Calendar.getInstance(LOCAL_TIMEZONE.get()); + c.setTime(hiveDate); + if (c.get(Calendar.HOUR_OF_DAY) == 0 + && c.get(Calendar.MINUTE) == 0 + && c.get(Calendar.SECOND) == 0) { + return hiveDate.toLocalDate().toEpochDay(); + } else { + return TimestampData.fromEpochMillis(time); + } + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> TimestampData.fromEpochMillis(((Timestamp) val).getTime()); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + @Override + protected ISerializationConverter createExternalConverter( + LogicalType type) { + switch (type.getTypeRoot()) { + case VARCHAR: + return (val, index, statement) -> + statement.setString( + index, ((ColumnRowData) val).getField(index).asString()); + case INTEGER: + case SMALLINT: + return (val, index, statement) -> + statement.setInt(index, ((ColumnRowData) val).getField(index).asInt()); + case BOOLEAN: + return (val, index, statement) -> + statement.setBoolean( + index, ((ColumnRowData) val).getField(index).asBoolean()); + case TINYINT: + return (val, index, statement) -> statement.setByte(index, val.getByte(index)); + case BIGINT: + return (val, index, statement) -> + statement.setLong(index, ((ColumnRowData) val).getField(index).asLong()); + case DOUBLE: + return (val, index, statement) -> + statement.setDouble( + index, ((ColumnRowData) val).getField(index).asDouble()); + case DATE: + return (val, index, statement) -> + statement.setDate( + index, + new HiveDate( + ((ColumnRowData) val) + .getField(index) + .asTimestamp() + .getTime())); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (val, index, statement) -> + statement.setTimestamp( + index, ((ColumnRowData) val).getField(index).asTimestamp()); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorSearchRawTypeConverter.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorSearchRawTypeConverter.java new file mode 100644 index 0000000000..5c61b57160 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorSearchRawTypeConverter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.converter; + +import com.dtstack.flinkx.throwable.UnsupportedTypeException; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +import java.util.Locale; + +/** @author liuliu 2022/2/22 */ +public class InceptorSearchRawTypeConverter { + + public static DataType apply(String type) { + type = type.toUpperCase(Locale.ENGLISH); + switch (type) { + case "STRING": + return DataTypes.STRING(); + case "INT": + return DataTypes.INT(); + case "BOOLEAN": + return DataTypes.BOOLEAN(); + case "TINYINT": + return DataTypes.TINYINT(); + case "SMALLINT": + return DataTypes.SMALLINT(); + case "BIGINT": + return DataTypes.BIGINT(); + case "FLOAT": + case "DOUBLE": + return DataTypes.DOUBLE(); + case "DATE": + return DataTypes.DATE(); + case "TIMESTAMP": + return DataTypes.TIMESTAMP(9); + default: + throw new UnsupportedTypeException(type); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorSearchRowConverter.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorSearchRowConverter.java new file mode 100644 index 0000000000..c97820db02 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/converter/InceptorSearchRowConverter.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.converter; + +import com.dtstack.flinkx.connector.jdbc.converter.JdbcRowConverter; +import com.dtstack.flinkx.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.flinkx.converter.IDeserializationConverter; +import com.dtstack.flinkx.converter.ISerializationConverter; +import com.dtstack.flinkx.element.ColumnRowData; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.hive.common.type.HiveDate; + +import java.sql.Timestamp; +import java.util.Calendar; + +import static com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil.LOCAL_TIMEZONE; + +/** @author liuliu 2022/2/22 */ +public class InceptorSearchRowConverter extends JdbcRowConverter { + + public InceptorSearchRowConverter(RowType rowType) { + super(rowType); + } + + @Override + protected IDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case VARCHAR: + return val -> StringData.fromString(val.toString()); + case INTEGER: + case BOOLEAN: + case BIGINT: + case DOUBLE: + return val -> val; + case SMALLINT: + return val -> ((Integer) val).shortValue(); + case TINYINT: + return val -> ((Integer) val).byteValue(); + case DATE: + return val -> { + HiveDate hiveDate = (HiveDate) val; + long time = hiveDate.getTime(); + Calendar c = Calendar.getInstance(LOCAL_TIMEZONE.get()); + c.setTime(hiveDate); + if (c.get(Calendar.HOUR_OF_DAY) == 0 + && c.get(Calendar.MINUTE) == 0 + && c.get(Calendar.SECOND) == 0) { + return hiveDate.toLocalDate().toEpochDay(); + } else { + return TimestampData.fromEpochMillis(time); + } + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> TimestampData.fromEpochMillis(((Timestamp) val).getTime()); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + @Override + protected ISerializationConverter createExternalConverter( + LogicalType type) { + switch (type.getTypeRoot()) { + case VARCHAR: + return (val, index, statement) -> + statement.setString( + index, ((ColumnRowData) val).getField(index).asString()); + case INTEGER: + case SMALLINT: + return (val, index, statement) -> + statement.setInt(index, ((ColumnRowData) val).getField(index).asInt()); + case BOOLEAN: + return (val, index, statement) -> + statement.setBoolean( + index, ((ColumnRowData) val).getField(index).asBoolean()); + case TINYINT: + return (val, index, statement) -> statement.setByte(index, val.getByte(index)); + case BIGINT: + return (val, index, statement) -> + statement.setLong(index, ((ColumnRowData) val).getField(index).asLong()); + case DOUBLE: + return (val, index, statement) -> + statement.setDouble( + index, ((ColumnRowData) val).getField(index).asDouble()); + case DATE: + return (val, index, statement) -> + statement.setDate( + index, + new HiveDate( + ((ColumnRowData) val) + .getField(index) + .asTimestamp() + .getTime())); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (val, index, statement) -> + statement.setTimestamp( + index, ((ColumnRowData) val).getField(index).asTimestamp()); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorDialect.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorDialect.java index 01b50e5057..b4a170c089 100644 --- a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorDialect.java +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorDialect.java @@ -17,24 +17,16 @@ */ package com.dtstack.flinkx.connector.inceptor.dialect; -import com.dtstack.flinkx.connector.inceptor.converter.InceptorRawTypeConverter; -import com.dtstack.flinkx.connector.inceptor.converter.InceptorRowConverter; import com.dtstack.flinkx.connector.jdbc.dialect.JdbcDialect; -import com.dtstack.flinkx.connector.jdbc.statement.FieldNamedPreparedStatement; -import com.dtstack.flinkx.converter.AbstractRowConverter; -import com.dtstack.flinkx.converter.RawTypeConverter; +import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormatBuilder; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.RowType; - -import io.vertx.core.json.JsonArray; - -import java.sql.ResultSet; import java.util.Arrays; import java.util.Optional; -import java.util.stream.Collectors; -public class InceptorDialect implements JdbcDialect { +import static com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil.INCEPTOR_TRANSACTION_TYPE; + +public abstract class InceptorDialect implements JdbcDialect { @Override public String dialectName() { return "INCEPTOR"; @@ -45,11 +37,6 @@ public boolean canHandle(String url) { return url.startsWith("jdbc:hive2:"); } - @Override - public RawTypeConverter getRawTypeConverter() { - return InceptorRawTypeConverter::apply; - } - @Override public String quoteIdentifier(String identifier) { return "`" + identifier + "`"; @@ -60,39 +47,26 @@ public Optional defaultDriverName() { return Optional.of("org.apache.hive.jdbc.HiveDriver"); } - @Override - public AbstractRowConverter - getRowConverter(RowType rowType) { - return new InceptorRowConverter(rowType); + public String appendJdbcTransactionType(String jdbcUrl) { + String transactionType = INCEPTOR_TRANSACTION_TYPE.substring(4); + String[] split = jdbcUrl.split("\\?"); + StringBuilder stringBuilder = new StringBuilder(jdbcUrl); + boolean flag = false; + if (split.length == 2) { + flag = + Arrays.stream(split[1].split("&")) + .anyMatch(s -> s.equalsIgnoreCase(transactionType)); + stringBuilder.append('&'); + } else { + stringBuilder.append("?"); + } + if (!flag) { + return stringBuilder.append(transactionType).toString(); + } + return jdbcUrl; } - public String getInsertPartitionIntoStatement( - String schema, - String tableName, - String partitionKey, - String partiitonValue, - String[] fieldNames) { - String columns = - Arrays.stream(fieldNames) - .map(this::quoteIdentifier) - .collect(Collectors.joining(", ")); - String placeholders = - Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", ")); - return "INSERT INTO " - + buildTableInfoWithSchema(schema, tableName) - + " PARTITION " - + " ( " - + quoteIdentifier(partitionKey) - + "=" - + "'" - + partiitonValue - + "'" - + " ) " - + "(" - + columns - + ")" - + " SELECT " - + placeholders - + " FROM SYSTEM.DUAL"; - } + public abstract JdbcInputFormatBuilder getInputFormatBuilder(); + + public abstract JdbcOutputFormatBuilder getOutputFormatBuilder(); } diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorHdfsDialect.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorHdfsDialect.java new file mode 100644 index 0000000000..c801ae3732 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorHdfsDialect.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.dialect; + +import com.dtstack.flinkx.connector.inceptor.converter.InceptorHdfsRawTypeConverter; +import com.dtstack.flinkx.connector.inceptor.converter.InceptorHdfsRowConverter; +import com.dtstack.flinkx.connector.inceptor.sink.InceptorHdfsOutputFormatBuilder; +import com.dtstack.flinkx.connector.inceptor.source.InceptorHdfsInputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.flinkx.converter.AbstractRowConverter; +import com.dtstack.flinkx.converter.RawTypeConverter; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import io.vertx.core.json.JsonArray; + +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.stream.Collectors; + +/** @author liuliu 2022/3/4 */ +public class InceptorHdfsDialect extends InceptorDialect { + @Override + public AbstractRowConverter + getRowConverter(RowType rowType) { + return new InceptorHdfsRowConverter(rowType); + } + + @Override + public RawTypeConverter getRawTypeConverter() { + return InceptorHdfsRawTypeConverter::apply; + } + + public String getInsertPartitionIntoStatement( + String schema, + String tableName, + String partitionKey, + String partiitonValue, + String[] fieldNames) { + String columns = + Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String placeholders = + Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", ")); + return "INSERT INTO " + + buildTableInfoWithSchema(schema, tableName) + + " PARTITION " + + " ( " + + quoteIdentifier(partitionKey) + + "=" + + "'" + + partiitonValue + + "'" + + " ) " + + "(" + + columns + + ")" + + " SELECT " + + placeholders + + " FROM SYSTEM.DUAL"; + } + + @Override + public String appendJdbcTransactionType(String jdbcUrl) { + return jdbcUrl; + } + + @Override + public JdbcInputFormatBuilder getInputFormatBuilder() { + return new InceptorHdfsInputFormatBuilder(); + } + + @Override + public JdbcOutputFormatBuilder getOutputFormatBuilder() { + return new InceptorHdfsOutputFormatBuilder(); + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorHyperbaseDialect.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorHyperbaseDialect.java new file mode 100644 index 0000000000..5fd6ed1897 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorHyperbaseDialect.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.dialect; + +import com.dtstack.flinkx.conf.FlinkxCommonConf; +import com.dtstack.flinkx.connector.inceptor.converter.InceptorHyberbaseColumnConvert; +import com.dtstack.flinkx.connector.inceptor.converter.InceptorHyberbaseRawTypeConvert; +import com.dtstack.flinkx.connector.inceptor.converter.InceptorHyberbaseRowConvert; +import com.dtstack.flinkx.connector.inceptor.sink.InceptorHyperbaseOutputFormatBuilder; +import com.dtstack.flinkx.connector.inceptor.source.InceptorHyperbaseInputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.flinkx.converter.AbstractRowConverter; +import com.dtstack.flinkx.converter.RawTypeConverter; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import io.vertx.core.json.JsonArray; + +import java.sql.ResultSet; + +/** @author liuliu 2022/2/25 */ +public class InceptorHyperbaseDialect extends InceptorDialect { + + @Override + public RawTypeConverter getRawTypeConverter() { + return InceptorHyberbaseRawTypeConvert::apply; + } + + @Override + public AbstractRowConverter + getRowConverter(RowType rowType) { + return new InceptorHyberbaseRowConvert(rowType); + } + + @Override + public AbstractRowConverter + getColumnConverter(RowType rowType, FlinkxCommonConf commonConf) { + return new InceptorHyberbaseColumnConvert(rowType, commonConf); + } + + @Override + public JdbcInputFormatBuilder getInputFormatBuilder() { + return new InceptorHyperbaseInputFormatBuilder(); + } + + @Override + public JdbcOutputFormatBuilder getOutputFormatBuilder() { + return new InceptorHyperbaseOutputFormatBuilder(); + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorSearchDialect.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorSearchDialect.java new file mode 100644 index 0000000000..1c7ad5cb5c --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/dialect/InceptorSearchDialect.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.dialect; + +import com.dtstack.flinkx.conf.FlinkxCommonConf; +import com.dtstack.flinkx.connector.inceptor.converter.InceptorSearchColumnConverter; +import com.dtstack.flinkx.connector.inceptor.converter.InceptorSearchRawTypeConverter; +import com.dtstack.flinkx.connector.inceptor.converter.InceptorSearchRowConverter; +import com.dtstack.flinkx.connector.inceptor.sink.InceptorSearchOutputFormatBuilder; +import com.dtstack.flinkx.connector.inceptor.source.InceptorSearchInputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.flinkx.converter.AbstractRowConverter; +import com.dtstack.flinkx.converter.RawTypeConverter; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import io.vertx.core.json.JsonArray; + +import java.sql.ResultSet; + +/** @author liuliu 2022/2/23 */ +public class InceptorSearchDialect extends InceptorDialect { + @Override + public RawTypeConverter getRawTypeConverter() { + return InceptorSearchRawTypeConverter::apply; + } + + @Override + public AbstractRowConverter + getRowConverter(RowType rowType) { + return new InceptorSearchRowConverter(rowType); + } + + @Override + public AbstractRowConverter + getColumnConverter(RowType rowType, FlinkxCommonConf commonConf) { + return new InceptorSearchColumnConverter(rowType, commonConf); + } + + @Override + public JdbcInputFormatBuilder getInputFormatBuilder() { + return new InceptorSearchInputFormatBuilder(); + } + + @Override + public JdbcOutputFormatBuilder getOutputFormatBuilder() { + return new InceptorSearchOutputFormatBuilder(); + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorDynamicTableSink.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorDynamicTableSink.java index f73badcde2..1301b33fa8 100644 --- a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorDynamicTableSink.java +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorDynamicTableSink.java @@ -69,8 +69,6 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { // 通过该参数得到类型转换器,将数据库中的字段转成对应的类型 final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); - InceptorOutputFormatBuilder builder = (InceptorOutputFormatBuilder) this.builder; - String[] fieldNames = tableSchema.getFieldNames(); List columnList = new ArrayList<>(fieldNames.length); for (int i = 0; i < fieldNames.length; i++) { @@ -83,7 +81,7 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { inceptorConf.setColumn(columnList); builder.setJdbcDialect(jdbcDialect); - builder.setInceptorConf(inceptorConf); + builder.setJdbcConf(inceptorConf); builder.setRowConverter(jdbcDialect.getRowConverter(rowType)); return SinkFunctionProvider.of( diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorOutputFormat.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHdfsOutputFormat.java similarity index 86% rename from flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorOutputFormat.java rename to flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHdfsOutputFormat.java index f3fa01c961..1bd96ecbe4 100644 --- a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorOutputFormat.java +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHdfsOutputFormat.java @@ -18,8 +18,9 @@ package com.dtstack.flinkx.connector.inceptor.sink; import com.dtstack.flinkx.connector.inceptor.conf.InceptorConf; -import com.dtstack.flinkx.connector.inceptor.dialect.InceptorDialect; +import com.dtstack.flinkx.connector.inceptor.dialect.InceptorHdfsDialect; import com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil; +import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormat; import com.dtstack.flinkx.connector.jdbc.util.JdbcUtil; import com.dtstack.flinkx.element.ColumnRowData; @@ -57,9 +58,9 @@ * * @author dujie */ -public class InceptorOutputFormat extends JdbcOutputFormat { +public class InceptorHdfsOutputFormat extends JdbcOutputFormat { - protected static final Logger LOG = LoggerFactory.getLogger(InceptorOutputFormat.class); + protected static final Logger LOG = LoggerFactory.getLogger(InceptorHdfsOutputFormat.class); protected static final long serialVersionUID = 1L; @@ -226,7 +227,7 @@ public Connection getConnection() { @Override protected String prepareTemplates() { String singleSql = - ((InceptorDialect) jdbcDialect) + ((InceptorHdfsDialect) jdbcDialect) .getInsertPartitionIntoStatement( inceptorConf.getSchema(), inceptorConf.getTable(), @@ -276,8 +277,9 @@ private void switchNextPartiiton(Date currentData) throws SQLException { } } - public void setInceptorConf(InceptorConf inceptorConf) { - this.inceptorConf = inceptorConf; + public void setJdbcConf(JdbcConf jdbcConf) { + super.setJdbcConf(jdbcConf); + this.inceptorConf = (InceptorConf) jdbcConf; } // 判断是否是事务表 @@ -303,7 +305,27 @@ private boolean isTransactionTable() throws SQLException { data.add(lineData); } return data.stream() - .filter(i -> i.values().stream().anyMatch(i1 -> i1.startsWith("transactional"))) - .anyMatch(i -> i.values().stream().anyMatch(i1 -> i1.startsWith("true"))); + .filter( + i -> + i.values().stream() + .anyMatch( + i1 -> { + if (i1 != null) { + return i1.startsWith("transactional"); + } else { + return false; + } + })) + .anyMatch( + i -> + i.values().stream() + .anyMatch( + i1 -> { + if (i1 != null) { + return i1.startsWith("true"); + } else { + return false; + } + })); } } diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorOutputFormatBuilder.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHdfsOutputFormatBuilder.java similarity index 78% rename from flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorOutputFormatBuilder.java rename to flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHdfsOutputFormatBuilder.java index 9121b69200..e956fa7d06 100644 --- a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorOutputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHdfsOutputFormatBuilder.java @@ -17,7 +17,6 @@ */ package com.dtstack.flinkx.connector.inceptor.sink; -import com.dtstack.flinkx.connector.inceptor.conf.InceptorConf; import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormatBuilder; import com.dtstack.flinkx.converter.AbstractRowConverter; @@ -25,18 +24,10 @@ import org.apache.commons.lang.StringUtils; /** @author dujie */ -public class InceptorOutputFormatBuilder extends JdbcOutputFormatBuilder { +public class InceptorHdfsOutputFormatBuilder extends JdbcOutputFormatBuilder { - protected InceptorOutputFormat format; - - public InceptorOutputFormatBuilder(InceptorOutputFormat format) { - super(format); - this.format = format; - } - - public void setInceptorConf(InceptorConf inceptorConf) { - super.setJdbcConf(inceptorConf); - format.setInceptorConf(inceptorConf); + public InceptorHdfsOutputFormatBuilder() { + super(new InceptorHdfsOutputFormat()); } @Override diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHyperbaseOutputFormatBuilder.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHyperbaseOutputFormatBuilder.java new file mode 100644 index 0000000000..3aa2c4e201 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorHyperbaseOutputFormatBuilder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.sink; + +import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; +import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormatBuilder; + +import org.apache.commons.lang.StringUtils; + +/** @author liuliu 2022/3/7 */ +public class InceptorHyperbaseOutputFormatBuilder extends JdbcOutputFormatBuilder { + public InceptorHyperbaseOutputFormatBuilder() { + super(new InceptorHyperbaseOutputFormat()); + } + + @Override + protected void checkFormat() { + JdbcConf jdbcConf = format.getJdbcConf(); + StringBuilder sb = new StringBuilder(256); + if (StringUtils.isBlank(jdbcConf.getJdbcUrl())) { + sb.append("No jdbc url supplied;\n"); + } + if (sb.length() > 0) { + throw new IllegalArgumentException(sb.toString()); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorSearchOutputFormatBuilder.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorSearchOutputFormatBuilder.java new file mode 100644 index 0000000000..66a66f3905 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorSearchOutputFormatBuilder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.sink; + +import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; +import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormatBuilder; + +import org.apache.commons.lang.StringUtils; + +/** @author liuliu 2022/2/24 */ +public class InceptorSearchOutputFormatBuilder extends JdbcOutputFormatBuilder { + public InceptorSearchOutputFormatBuilder() { + super(new InceptorSearchOutputFormat()); + } + + @Override + protected void checkFormat() { + JdbcConf jdbcConf = format.getJdbcConf(); + StringBuilder sb = new StringBuilder(256); + if (StringUtils.isBlank(jdbcConf.getJdbcUrl())) { + sb.append("No jdbc url supplied;\n"); + } + if (sb.length() > 0) { + throw new IllegalArgumentException(sb.toString()); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorSinkFactory.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorSinkFactory.java new file mode 100644 index 0000000000..f9815d606f --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/sink/InceptorSinkFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.sink; + +import com.dtstack.flinkx.conf.SyncConf; +import com.dtstack.flinkx.connector.inceptor.conf.InceptorConf; +import com.dtstack.flinkx.connector.inceptor.dialect.InceptorDialect; +import com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil; +import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; +import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.sink.JdbcSinkFactory; + +/** @author liuliu 2022/2/24 */ +public class InceptorSinkFactory extends JdbcSinkFactory { + + InceptorDialect inceptorDialect; + + public InceptorSinkFactory(SyncConf syncConf) { + super(syncConf, null); + inceptorDialect = InceptorDbUtil.getDialectWithDriverType(jdbcConf); + jdbcConf.setJdbcUrl(inceptorDialect.appendJdbcTransactionType(jdbcConf.getJdbcUrl())); + super.jdbcDialect = inceptorDialect; + } + + @Override + protected Class getConfClass() { + return InceptorConf.class; + } + + @Override + protected JdbcOutputFormatBuilder getBuilder() { + return ((InceptorDialect) jdbcDialect).getOutputFormatBuilder(); + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHdfsInputFormatBuilder.java similarity index 94% rename from flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorInputFormatBuilder.java rename to flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHdfsInputFormatBuilder.java index 7024b17ad2..7cf6d4ff69 100644 --- a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHdfsInputFormatBuilder.java @@ -30,10 +30,10 @@ * @author dujie @Description * @createTime 2022-01-20 04:17:00 */ -public class InceptorInputFormatBuilder extends JdbcInputFormatBuilder { +public class InceptorHdfsInputFormatBuilder extends JdbcInputFormatBuilder { - public InceptorInputFormatBuilder(JdbcInputFormat format) { - super(format); + public InceptorHdfsInputFormatBuilder() { + super(new JdbcInputFormat()); } @Override diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHyperbaseInputFormat.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHyperbaseInputFormat.java new file mode 100644 index 0000000000..9ff07d4abe --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHyperbaseInputFormat.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.source; + +import com.dtstack.flinkx.connector.inceptor.conf.InceptorConf; +import com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormat; + +import org.apache.flink.core.io.InputSplit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.common.type.HiveDate; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.SQLException; +import java.sql.Timestamp; + +/** @author liuliu 2022/3/7 */ +public class InceptorHyperbaseInputFormat extends JdbcInputFormat { + @Override + public void openInternal(InputSplit inputSplit) { + super.openInternal(inputSplit); + } + + @Override + protected Connection getConnection() { + return InceptorDbUtil.getConnection( + (InceptorConf) jdbcConf, getRuntimeContext().getDistributedCache(), jobId); + } + + @Override + protected void queryForPolling(String startLocation) throws SQLException { + // 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期 + if ((System.currentTimeMillis() - startTime) % 300000 <= jdbcConf.getPollingInterval()) { + LOG.info("polling startLocation = {}", startLocation); + } else { + LOG.debug("polling startLocation = {}", startLocation); + } + + boolean isNumber = StringUtils.isNumeric(startLocation); + switch (type) { + case TIMESTAMP: + Timestamp ts = + isNumber + ? new Timestamp(Long.parseLong(startLocation)) + : Timestamp.valueOf(startLocation); + ps.setTimestamp(1, ts); + break; + case DATE: + Date date = + isNumber + ? new HiveDate(Long.parseLong(startLocation)) + : HiveDate.valueOf(startLocation); + ps.setDate(1, date); + break; + default: + if (isNumber) { + ps.setLong(1, Long.parseLong(startLocation)); + } else { + ps.setString(1, startLocation); + } + } + resultSet = ps.executeQuery(); + hasNext = resultSet.next(); + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHyperbaseInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHyperbaseInputFormatBuilder.java new file mode 100644 index 0000000000..f6128f15b9 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorHyperbaseInputFormatBuilder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.source; + +import com.dtstack.flinkx.conf.FieldConf; +import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormatBuilder; +import com.dtstack.flinkx.constants.ConstantValue; +import com.dtstack.flinkx.enums.ColumnType; +import com.dtstack.flinkx.enums.Semantic; + +import org.apache.commons.lang.StringUtils; + +import java.util.Arrays; + +/** @author liuliu 2022/3/7 */ +public class InceptorHyperbaseInputFormatBuilder extends JdbcInputFormatBuilder { + public InceptorHyperbaseInputFormatBuilder() { + super(new InceptorSearchInputFormat()); + } + + @Override + protected void checkFormat() { + JdbcConf conf = format.getJdbcConf(); + StringBuilder sb = new StringBuilder(256); + if (StringUtils.isBlank(conf.getJdbcUrl())) { + sb.append("No jdbc url supplied;\n"); + } + if (conf.isIncrement()) { + if (StringUtils.isBlank(conf.getIncreColumn())) { + sb.append("increColumn can't be empty when increment is true;\n"); + } + conf.setSplitPk(conf.getIncreColumn()); + if (conf.getParallelism() > 1) { + conf.setSplitStrategy("mod"); + } + } + + if (conf.getParallelism() > 1) { + if (StringUtils.isBlank(conf.getSplitPk())) { + sb.append("Must specify the split column when the channel is greater than 1;\n"); + } else { + FieldConf field = + FieldConf.getSameNameMetaColumn(conf.getColumn(), conf.getSplitPk()); + if (field == null) { + sb.append("split column must in columns;\n"); + } else if (!ColumnType.isNumberType(field.getType())) { + sb.append("split column's type must be number type;\n"); + } + } + } + + if (StringUtils.isNotBlank(conf.getStartLocation())) { + String[] startLocations = conf.getStartLocation().split(ConstantValue.COMMA_SYMBOL); + if (startLocations.length != 1 && startLocations.length != conf.getParallelism()) { + sb.append("startLocations is ") + .append(Arrays.toString(startLocations)) + .append(", length = [") + .append(startLocations.length) + .append("], but the channel is [") + .append(conf.getParallelism()) + .append("];\n"); + } + } + try { + Semantic.getByName(conf.getSemantic()); + } catch (Exception e) { + sb.append(String.format("unsupported semantic type %s", conf.getSemantic())); + } + + if (sb.length() > 0) { + throw new IllegalArgumentException(sb.toString()); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSearchInputFormat.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSearchInputFormat.java new file mode 100644 index 0000000000..bc0b81ff5f --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSearchInputFormat.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.source; + +import com.dtstack.flinkx.connector.inceptor.conf.InceptorConf; +import com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormat; + +import org.apache.flink.core.io.InputSplit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.common.type.HiveDate; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.SQLException; +import java.sql.Timestamp; + +/** @author liuliu 2022/2/24 */ +public class InceptorSearchInputFormat extends JdbcInputFormat { + + @Override + public void openInternal(InputSplit inputSplit) { + super.openInternal(inputSplit); + } + + @Override + protected Connection getConnection() { + return InceptorDbUtil.getConnection( + (InceptorConf) jdbcConf, getRuntimeContext().getDistributedCache(), jobId); + } + + @Override + protected void queryForPolling(String startLocation) throws SQLException { + // 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期 + if ((System.currentTimeMillis() - startTime) % 300000 <= jdbcConf.getPollingInterval()) { + LOG.info("polling startLocation = {}", startLocation); + } else { + LOG.debug("polling startLocation = {}", startLocation); + } + + boolean isNumber = StringUtils.isNumeric(startLocation); + switch (type) { + case TIMESTAMP: + Timestamp ts = + isNumber + ? new Timestamp(Long.parseLong(startLocation)) + : Timestamp.valueOf(startLocation); + ps.setTimestamp(1, ts); + break; + case DATE: + Date date = + isNumber + ? new HiveDate(Long.parseLong(startLocation)) + : HiveDate.valueOf(startLocation); + ps.setDate(1, date); + break; + default: + if (isNumber) { + ps.setLong(1, Long.parseLong(startLocation)); + } else { + ps.setString(1, startLocation); + } + } + resultSet = ps.executeQuery(); + hasNext = resultSet.next(); + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSearchInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSearchInputFormatBuilder.java new file mode 100644 index 0000000000..bb03f129cf --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSearchInputFormatBuilder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.source; + +import com.dtstack.flinkx.conf.FieldConf; +import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormatBuilder; +import com.dtstack.flinkx.constants.ConstantValue; +import com.dtstack.flinkx.enums.ColumnType; +import com.dtstack.flinkx.enums.Semantic; + +import org.apache.commons.lang.StringUtils; + +import java.util.Arrays; + +/** @author liuliu 2022/2/24 */ +public class InceptorSearchInputFormatBuilder extends JdbcInputFormatBuilder { + public InceptorSearchInputFormatBuilder() { + super(new InceptorSearchInputFormat()); + } + + @Override + protected void checkFormat() { + JdbcConf conf = format.getJdbcConf(); + StringBuilder sb = new StringBuilder(256); + if (StringUtils.isBlank(conf.getJdbcUrl())) { + sb.append("No jdbc url supplied;\n"); + } + if (conf.isIncrement()) { + if (StringUtils.isBlank(conf.getIncreColumn())) { + sb.append("increColumn can't be empty when increment is true;\n"); + } + conf.setSplitPk(conf.getIncreColumn()); + if (conf.getParallelism() > 1) { + conf.setSplitStrategy("mod"); + } + } + + if (conf.getParallelism() > 1) { + if (StringUtils.isBlank(conf.getSplitPk())) { + sb.append("Must specify the split column when the channel is greater than 1;\n"); + } else { + FieldConf field = + FieldConf.getSameNameMetaColumn(conf.getColumn(), conf.getSplitPk()); + if (field == null) { + sb.append("split column must in columns;\n"); + } else if (!ColumnType.isNumberType(field.getType())) { + sb.append("split column's type must be number type;\n"); + } + } + } + + if (StringUtils.isNotBlank(conf.getStartLocation())) { + String[] startLocations = conf.getStartLocation().split(ConstantValue.COMMA_SYMBOL); + if (startLocations.length != 1 && startLocations.length != conf.getParallelism()) { + sb.append("startLocations is ") + .append(Arrays.toString(startLocations)) + .append(", length = [") + .append(startLocations.length) + .append("], but the channel is [") + .append(conf.getParallelism()) + .append("];\n"); + } + } + try { + Semantic.getByName(conf.getSemantic()); + } catch (Exception e) { + sb.append(String.format("unsupported semantic type %s", conf.getSemantic())); + } + + if (sb.length() > 0) { + throw new IllegalArgumentException(sb.toString()); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSourceFactory.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSourceFactory.java new file mode 100644 index 0000000000..49684eb5c5 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/source/InceptorSourceFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.inceptor.source; + +import com.dtstack.flinkx.conf.SyncConf; +import com.dtstack.flinkx.connector.inceptor.conf.InceptorConf; +import com.dtstack.flinkx.connector.inceptor.dialect.InceptorDialect; +import com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil; +import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; +import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormatBuilder; +import com.dtstack.flinkx.connector.jdbc.source.JdbcSourceFactory; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** @author liuliu 2022/2/22 */ +public class InceptorSourceFactory extends JdbcSourceFactory { + + InceptorDialect inceptorDialect; + + public InceptorSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) { + super(syncConf, env, null); + this.inceptorDialect = InceptorDbUtil.getDialectWithDriverType(jdbcConf); + jdbcConf.setJdbcUrl(inceptorDialect.appendJdbcTransactionType(jdbcConf.getJdbcUrl())); + super.jdbcDialect = inceptorDialect; + } + + @Override + protected Class getConfClass() { + return InceptorConf.class; + } + + @Override + protected JdbcInputFormatBuilder getBuilder() { + return inceptorDialect.getInputFormatBuilder(); + } +} diff --git a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/table/InceptorDynamicTableFactory.java b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/table/InceptorDynamicTableFactory.java index 54edb298a3..745386fc02 100644 --- a/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/table/InceptorDynamicTableFactory.java +++ b/flinkx-connectors/flinkx-connector-inceptor/src/main/java/com/dtstack/flinkx/connector/inceptor/table/InceptorDynamicTableFactory.java @@ -21,15 +21,12 @@ import com.dtstack.flinkx.connector.inceptor.dialect.InceptorDialect; import com.dtstack.flinkx.connector.inceptor.options.InceptorOptions; import com.dtstack.flinkx.connector.inceptor.sink.InceptorDynamicTableSink; -import com.dtstack.flinkx.connector.inceptor.sink.InceptorOutputFormat; -import com.dtstack.flinkx.connector.inceptor.sink.InceptorOutputFormatBuilder; import com.dtstack.flinkx.connector.inceptor.source.InceptorDynamicTableSource; -import com.dtstack.flinkx.connector.inceptor.source.InceptorInputFormatBuilder; +import com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil; import com.dtstack.flinkx.connector.jdbc.conf.SinkConnectionConf; import com.dtstack.flinkx.connector.jdbc.conf.SourceConnectionConf; import com.dtstack.flinkx.connector.jdbc.dialect.JdbcDialect; import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormatBuilder; -import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormat; import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormatBuilder; import com.dtstack.flinkx.connector.jdbc.table.JdbcDynamicTableFactory; import com.dtstack.flinkx.connector.jdbc.util.JdbcUtil; @@ -53,13 +50,33 @@ import java.util.Optional; import java.util.Set; -import static com.dtstack.flinkx.connector.jdbc.options.JdbcCommonOptions.*; -import static com.dtstack.flinkx.connector.jdbc.options.JdbcLookupOptions.*; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcCommonOptions.PASSWORD; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcCommonOptions.SCHEMA; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcCommonOptions.TABLE_NAME; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcCommonOptions.URL; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcCommonOptions.USERNAME; import static com.dtstack.flinkx.connector.jdbc.options.JdbcLookupOptions.DRUID_PREFIX; -import static com.dtstack.flinkx.connector.jdbc.options.JdbcSinkOptions.*; -import static com.dtstack.flinkx.source.options.SourceOptions.*; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcLookupOptions.LOOKUP_CACHE_MAX_ROWS; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcLookupOptions.LOOKUP_CACHE_TTL; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcLookupOptions.LOOKUP_MAX_RETRIES; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcLookupOptions.VERTX_PREFIX; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcLookupOptions.getLibConfMap; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcSinkOptions.SINK_ALL_REPLACE; +import static com.dtstack.flinkx.connector.jdbc.options.JdbcSinkOptions.SINK_SEMANTIC; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_DEFAULT_FETCH_SIZE; import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_FETCH_SIZE; -import static com.dtstack.flinkx.table.options.SinkOptions.*; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_INCREMENT_COLUMN; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_INCREMENT_COLUMN_TYPE; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_PARALLELISM; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_PARTITION_COLUMN; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_PARTITION_STRATEGY; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_POLLING_INTERVAL; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_QUERY_TIMEOUT; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_RESTORE_COLUMNNAME; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_RESTORE_COLUMNTYPE; +import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_START_LOCATION; +import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static com.dtstack.flinkx.table.options.SinkOptions.SINK_MAX_RETRIES; import static com.dtstack.flinkx.table.options.SinkOptions.SINK_PARALLELISM; import static org.apache.flink.util.Preconditions.checkState; @@ -79,7 +96,7 @@ public String factoryIdentifier() { @Override protected JdbcDialect getDialect() { - return new InceptorDialect(); + return null; } @Override @@ -104,7 +121,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { // 3.封装参数 TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - JdbcDialect jdbcDialect = getDialect(); final Map druidConf = getLibConfMap(context.getCatalogTable().getOptions(), DRUID_PREFIX); @@ -113,6 +129,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { addKerberosConfig(sourceConnectionConf, context.getCatalogTable().getOptions()); + InceptorDialect inceptorDialect = + InceptorDbUtil.getDialectWithDriverType(sourceConnectionConf); + return new InceptorDynamicTableSource( sourceConnectionConf, getJdbcLookupConf( @@ -120,8 +139,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { context.getObjectIdentifier().getObjectName(), druidConf), physicalSchema, - jdbcDialect, - getInputFormatBuilder()); + inceptorDialect, + inceptorDialect.getInputFormatBuilder()); } @Override @@ -133,7 +152,6 @@ public DynamicTableSink createDynamicTableSink(Context context) { // 2.参数校验 helper.validateExcept("properties.", "security.kerberos"); - JdbcDialect jdbcDialect = getDialect(); // 3.封装参数 TableSchema physicalSchema = @@ -143,8 +161,13 @@ public DynamicTableSink createDynamicTableSink(Context context) { addKerberosConfig(inceptorConf, context.getCatalogTable().getOptions()); + InceptorDialect inceptorDialect = InceptorDbUtil.getDialectWithDriverType(inceptorConf); + return new InceptorDynamicTableSink( - jdbcDialect, physicalSchema, getOutputFormatBuilder(), inceptorConf); + inceptorDialect, + physicalSchema, + inceptorDialect.getOutputFormatBuilder(), + inceptorConf); } @Override @@ -177,7 +200,7 @@ protected InceptorConf getSinkConnectionConf( List keyFields = schema.getPrimaryKey().map(UniqueConstraint::getColumns).orElse(null); inceptorConf.setUniqueKey(keyFields); - rebuildJdbcConf(inceptorConf); + resetTableInfo(inceptorConf); JdbcUtil.putExtParam(inceptorConf); return inceptorConf; } @@ -228,24 +251,23 @@ protected InceptorConf getSourceConnectionConf(ReadableConfig readableConfig) { : readableConfig.get(SCAN_FETCH_SIZE)); } - rebuildJdbcConf(jdbcConf); + resetTableInfo(jdbcConf); return jdbcConf; } @Override protected JdbcOutputFormatBuilder getOutputFormatBuilder() { - return new InceptorOutputFormatBuilder(new InceptorOutputFormat()); + return null; } @Override protected JdbcInputFormatBuilder getInputFormatBuilder() { - return new InceptorInputFormatBuilder(new JdbcInputFormat()); + return null; } @Override protected void validateConfigOptions(ReadableConfig config) { String jdbcUrl = config.get(URL); - final Optional dialect = Optional.of(getDialect()); checkState(true, "Cannot handle such jdbc url: " + jdbcUrl); if (config.getOptional(SCAN_POLLING_INTERVAL).isPresent() diff --git a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcSinkFactory.java b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcSinkFactory.java index 6f7c543398..4a7e63cafb 100644 --- a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcSinkFactory.java +++ b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcSinkFactory.java @@ -54,8 +54,6 @@ */ public abstract class JdbcSinkFactory extends SinkFactory { - private static final int DEFAULT_CONNECTION_TIMEOUT = 600; - protected JdbcConf jdbcConf; protected JdbcDialect jdbcDialect; @@ -70,7 +68,7 @@ ConnectionConf.class, new ConnectionAdapter("SinkConnectionConf")) new FieldNameExclusionStrategy("column")) .create(); GsonUtil.setTypeAdapter(gson); - jdbcConf = gson.fromJson(gson.toJson(syncConf.getWriter().getParameter()), JdbcConf.class); + jdbcConf = gson.fromJson(gson.toJson(syncConf.getWriter().getParameter()), getConfClass()); int batchSize = syncConf.getWriter() .getIntVal( @@ -96,11 +94,6 @@ ConnectionConf.class, new ConnectionAdapter("SinkConnectionConf")) @Override public DataStreamSink createSink(DataStream dataSet) { JdbcOutputFormatBuilder builder = getBuilder(); - - int connectTimeOut = jdbcConf.getConnectTimeOut(); - jdbcConf.setConnectTimeOut( - connectTimeOut == 0 ? DEFAULT_CONNECTION_TIMEOUT : connectTimeOut); - builder.setJdbcConf(jdbcConf); builder.setJdbcDialect(jdbcDialect); @@ -121,6 +114,10 @@ public RawTypeConverter getRawTypeConverter() { return jdbcDialect.getRawTypeConverter(); } + protected Class getConfClass() { + return JdbcConf.class; + } + /** * 获取JDBC插件的具体outputFormatBuilder * diff --git a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/table/JdbcDynamicTableFactory.java b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/table/JdbcDynamicTableFactory.java index 02080f95fc..92fcf67131 100644 --- a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/table/JdbcDynamicTableFactory.java +++ b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/table/JdbcDynamicTableFactory.java @@ -76,7 +76,6 @@ import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_FETCH_SIZE; import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_MAX_RETRIES; import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_PARALLELISM; -import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_CONNECTION_QUERY_TIMEOUT; import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_DEFAULT_FETCH_SIZE; import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_FETCH_SIZE; import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_INCREMENT_COLUMN; @@ -91,7 +90,6 @@ import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_START_LOCATION; import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_INTERVAL; import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS; -import static com.dtstack.flinkx.table.options.SinkOptions.SINK_CONNECTION_QUERY_TIMEOUT; import static com.dtstack.flinkx.table.options.SinkOptions.SINK_MAX_RETRIES; import static org.apache.flink.util.Preconditions.checkState; @@ -174,12 +172,10 @@ protected JdbcConf getSinkConnectionConf(ReadableConfig readableConfig, TableSch jdbcConf.setParallelism(readableConfig.get(SINK_PARALLELISM)); jdbcConf.setSemantic(readableConfig.get(SINK_SEMANTIC)); - jdbcConf.setConnectTimeOut(readableConfig.get(SINK_CONNECTION_QUERY_TIMEOUT)); - List keyFields = schema.getPrimaryKey().map(UniqueConstraint::getColumns).orElse(null); jdbcConf.setUniqueKey(keyFields); - rebuildJdbcConf(jdbcConf); + resetTableInfo(jdbcConf); return jdbcConf; } @@ -219,7 +215,6 @@ protected JdbcConf getSourceConnectionConf(ReadableConfig readableConfig) { ? getDefaultFetchSize() : readableConfig.get(SCAN_FETCH_SIZE)); jdbcConf.setQueryTimeOut(readableConfig.get(SCAN_QUERY_TIMEOUT)); - jdbcConf.setConnectTimeOut(readableConfig.get(SCAN_CONNECTION_QUERY_TIMEOUT)); jdbcConf.setSplitPk(readableConfig.get(SCAN_PARTITION_COLUMN)); jdbcConf.setSplitStrategy(readableConfig.get(SCAN_PARTITION_STRATEGY)); @@ -246,7 +241,7 @@ protected JdbcConf getSourceConnectionConf(ReadableConfig readableConfig) { : readableConfig.get(SCAN_FETCH_SIZE)); } - rebuildJdbcConf(jdbcConf); + resetTableInfo(jdbcConf); return jdbcConf; } @@ -273,7 +268,6 @@ public Set> optionalOptions() { optionalOptions.add(SCAN_START_LOCATION); optionalOptions.add(SCAN_PARALLELISM); optionalOptions.add(SCAN_QUERY_TIMEOUT); - optionalOptions.add(SCAN_CONNECTION_QUERY_TIMEOUT); optionalOptions.add(SCAN_FETCH_SIZE); optionalOptions.add(SCAN_RESTORE_COLUMNNAME); optionalOptions.add(SCAN_RESTORE_COLUMNTYPE); @@ -294,7 +288,6 @@ public Set> optionalOptions() { optionalOptions.add(SINK_ALL_REPLACE); optionalOptions.add(SINK_PARALLELISM); optionalOptions.add(SINK_SEMANTIC); - optionalOptions.add(SINK_CONNECTION_QUERY_TIMEOUT); return optionalOptions; } @@ -397,9 +390,8 @@ protected JdbcOutputFormatBuilder getOutputFormatBuilder() { return new JdbcOutputFormatBuilder(new JdbcOutputFormat()); } - /** rebuild jdbcConf,add custom configuration */ - protected void rebuildJdbcConf(JdbcConf jdbcConf) { - // table字段有可能是schema.table格式 需要转换为对应的schema 和 table 字段 + /** table字段有可能是schema.table格式 需要转换为对应的schema 和 table 字段* */ + protected void resetTableInfo(JdbcConf jdbcConf) { if (StringUtils.isBlank(jdbcConf.getSchema())) { JdbcUtil.resetSchemaAndTable(jdbcConf, "\\\"", "\\\""); } diff --git a/flinkx-connectors/flinkx-connector-oracle/src/main/java/com/dtstack/flinkx/connector/oracle/table/OracleDynamicTableFactory.java b/flinkx-connectors/flinkx-connector-oracle/src/main/java/com/dtstack/flinkx/connector/oracle/table/OracleDynamicTableFactory.java index f2e18b8d0f..cbb95121f3 100644 --- a/flinkx-connectors/flinkx-connector-oracle/src/main/java/com/dtstack/flinkx/connector/oracle/table/OracleDynamicTableFactory.java +++ b/flinkx-connectors/flinkx-connector-oracle/src/main/java/com/dtstack/flinkx/connector/oracle/table/OracleDynamicTableFactory.java @@ -18,16 +18,12 @@ package com.dtstack.flinkx.connector.oracle.table; -import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; import com.dtstack.flinkx.connector.jdbc.dialect.JdbcDialect; import com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormatBuilder; import com.dtstack.flinkx.connector.jdbc.table.JdbcDynamicTableFactory; -import com.dtstack.flinkx.connector.jdbc.util.JdbcUtil; import com.dtstack.flinkx.connector.oracle.dialect.OracleDialect; import com.dtstack.flinkx.connector.oracle.source.OracleInputFormat; -import java.util.Properties; - /** * company www.dtstack.com * @@ -52,19 +48,4 @@ protected JdbcInputFormatBuilder getInputFormatBuilder() { protected JdbcDialect getDialect() { return new OracleDialect(); } - - @Override - protected void rebuildJdbcConf(JdbcConf jdbcConf) { - super.rebuildJdbcConf(jdbcConf); - - Properties properties = new Properties(); - if (jdbcConf.getConnectTimeOut() != 0) { - properties.put( - "oracle.jdbc.ReadTimeout", String.valueOf(jdbcConf.getConnectTimeOut() * 1000)); - properties.put( - "oracle.net.CONNECT_TIMEOUT", - String.valueOf((jdbcConf.getConnectTimeOut()) * 1000)); - } - JdbcUtil.putExtParam(jdbcConf, properties); - } } diff --git a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/client/CloudSolrClientKerberosWrapper.java b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/client/CloudSolrClientKerberosWrapper.java index 2d4de7e05c..24c8efc0a2 100644 --- a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/client/CloudSolrClientKerberosWrapper.java +++ b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/client/CloudSolrClientKerberosWrapper.java @@ -73,13 +73,10 @@ public class CloudSolrClientKerberosWrapper extends SolrClient { private CloudSolrClient cloudSolrClient; private Subject subject; private DistributedCache distributedCache; - private String jobId; - public CloudSolrClientKerberosWrapper( - SolrConf solrConf, DistributedCache distributedCache, String jobId) { + public CloudSolrClientKerberosWrapper(SolrConf solrConf, DistributedCache distributedCache) { this.solrConf = solrConf; this.distributedCache = distributedCache; - this.jobId = jobId; } public void init() { @@ -193,7 +190,7 @@ public String loadKrbFile(Map kerberosConfigMap, String filePath KerberosUtil.checkFileExists(filePath); return filePath; } catch (Exception e) { - return KerberosUtil.loadFile(kerberosConfigMap, filePath, distributedCache, jobId); + return KerberosUtil.loadFile(kerberosConfigMap, filePath, distributedCache); } } diff --git a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/sink/SolrOutputFormat.java b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/sink/SolrOutputFormat.java index a0f624ba2a..929b6f8c20 100644 --- a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/sink/SolrOutputFormat.java +++ b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/sink/SolrOutputFormat.java @@ -77,7 +77,7 @@ protected void writeMultipleRecordsInternal() throws WriteRecordException { protected void openInternal(int taskNumber, int numTasks) throws IOException { solrClientWrapper = new CloudSolrClientKerberosWrapper( - solrConf, getRuntimeContext().getDistributedCache(), jobId); + solrConf, getRuntimeContext().getDistributedCache()); solrClientWrapper.init(); } diff --git a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrInputFormat.java b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrInputFormat.java index 4308732fd2..23d14a0c5a 100644 --- a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrInputFormat.java +++ b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrInputFormat.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; -import java.util.stream.Collectors; /** * @author Ada Wong @@ -73,7 +72,7 @@ protected InputSplit[] createInputSplitsInternal(int splitNum) throws Exception protected void openInternal(InputSplit inputSplit) { solrClientWrapper = new CloudSolrClientKerberosWrapper( - solrConf, getRuntimeContext().getDistributedCache(), jobId); + solrConf, getRuntimeContext().getDistributedCache()); solrClientWrapper.init(); GenericInputSplit genericInputSplit = (GenericInputSplit) inputSplit; @@ -138,16 +137,8 @@ private boolean getNextBatchSize() throws IOException { if (CollectionUtils.isEmpty(solrDocumentList)) { return true; } - List solrDocuments = - solrDocumentList.stream() - .filter(solrDocument -> solrDocument.size() > 0) - .collect(Collectors.toList()); + this.iterator = solrDocumentList.iterator(); startRows += batchSize; - if (solrDocuments.size() == 0) { - return true; - } else { - this.iterator = solrDocuments.iterator(); - return false; - } + return false; } } diff --git a/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/table/SqlserverDynamicTableFactory.java b/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/table/SqlserverDynamicTableFactory.java index c0ccb9b04f..5f81a95409 100644 --- a/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/table/SqlserverDynamicTableFactory.java +++ b/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/table/SqlserverDynamicTableFactory.java @@ -75,7 +75,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { /** table字段有可能是[schema].[table]格式 需要转换为对应的schema 和 table 字段* */ @Override - protected void rebuildJdbcConf(JdbcConf jdbcConf) { + protected void resetTableInfo(JdbcConf jdbcConf) { if (jdbcConf.getTable().startsWith("[") && jdbcConf.getTable().endsWith("]") && StringUtils.isBlank(jdbcConf.getSchema())) { diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/security/KerberosUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/security/KerberosUtil.java index dbfe54803d..b9af89e89b 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/security/KerberosUtil.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/security/KerberosUtil.java @@ -20,14 +20,11 @@ import com.dtstack.flinkx.constants.ConstantValue; import com.dtstack.flinkx.throwable.FlinkxRuntimeException; -import com.dtstack.flinkx.util.ExceptionUtil; import com.dtstack.flinkx.util.FileSystemUtil; import com.dtstack.flinkx.util.JsonUtil; import com.dtstack.flinkx.util.Md5Util; import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.runtime.security.DynamicConfiguration; -import org.apache.flink.runtime.security.KerberosUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; @@ -41,14 +38,10 @@ import sun.security.krb5.internal.ktab.KeyTab; import sun.security.krb5.internal.ktab.KeyTabEntry; -import javax.security.auth.login.AppConfigurationEntry; - import java.io.File; import java.io.IOException; import java.lang.reflect.Field; -import java.util.HashMap; import java.util.Map; -import java.util.UUID; /** * @author jiangbo @@ -88,7 +81,7 @@ public static UserGroupInformation loginAndReturnUgi(Map hadoopC keytabFileName = KerberosUtil.loadFile(hadoopConfig, keytabFileName); String principal = KerberosUtil.getPrincipal(hadoopConfig, keytabFileName); - KerberosUtil.loadKrb5Conf(hadoopConfig); + KerberosUtil.loadKrb5Conf(hadoopConfig, null); Configuration conf = FileSystemUtil.getConfiguration(hadoopConfig, null); @@ -176,20 +169,20 @@ public static synchronized void reloadKrb5conf(String krb5confPath) { } public static void loadKrb5Conf( - Map kerberosConfig, DistributedCache distributedCache, String jobId) { + Map kerberosConfig, DistributedCache distributedCache) { String krb5FilePath = MapUtils.getString(kerberosConfig, KEY_JAVA_SECURITY_KRB5_CONF); if (StringUtils.isEmpty(krb5FilePath)) { LOG.info("krb5 file is empty,will use default file"); return; } - krb5FilePath = loadFile(kerberosConfig, krb5FilePath, distributedCache, jobId); + krb5FilePath = loadFile(kerberosConfig, krb5FilePath, distributedCache); kerberosConfig.put(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath); System.setProperty(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath); } public static void loadKrb5Conf(Map kerberosConfig) { - loadKrb5Conf(kerberosConfig, null, null); + loadKrb5Conf(kerberosConfig, null); } /** @@ -200,8 +193,7 @@ public static void loadKrb5Conf(Map kerberosConfig) { public static String loadFile( Map kerberosConfig, String filePath, - DistributedCache distributedCache, - String jobId) { + DistributedCache distributedCache) { boolean useLocalFile = MapUtils.getBooleanValue(kerberosConfig, KEY_USE_LOCAL_FILE); if (useLocalFile) { LOG.info("will use local file:{}", filePath); @@ -231,13 +223,13 @@ public static String loadFile( } } - fileName = loadFromSftp(kerberosConfig, fileName, jobId); + fileName = loadFromSftp(kerberosConfig, fileName); return fileName; } } public static String loadFile(Map kerberosConfig, String filePath) { - return loadFile(kerberosConfig, filePath, null, null); + return loadFile(kerberosConfig, filePath, null); } public static void checkFileExists(String filePath) { @@ -251,25 +243,16 @@ public static void checkFileExists(String filePath) { } } - private static String loadFromSftp(Map config, String fileName, String jobId) { + private static String loadFromSftp(Map config, String fileName) { String remoteDir = MapUtils.getString(config, KEY_REMOTE_DIR); if (StringUtils.isBlank(remoteDir)) { throw new FlinkxRuntimeException( "can't find [remoteDir] in config: \n" + JsonUtil.toPrintJson(config)); } String filePathOnSftp = remoteDir + "/" + fileName; - if (null == jobId) { - // 创建分片在 JobManager, 此时还没有 JobId,随机生成UUID - jobId = UUID.randomUUID().toString(); - LOG.warn("jobId is null, jobId will be replaced with [UUID], jobId(UUID) = {}.", jobId); - } + String localDirName = Md5Util.getMd5(remoteDir); - String localDir = LOCAL_CACHE_DIR + SP + jobId + SP + localDirName; - // 创建 TM 节点的本地目录 /tmp/${user}/flinkx/${jobId}/${md5(remoteDir)}/*.keytab - // 需要考虑的多种情况: - // ① ${user} 解决不同用户的权限问题。/tmp linux 下是有 777 权限。 - // ② ${jobId} 解决多个任务在同一个 TM 上面,keytab 覆盖问题。 - // ③ ${md5(remoteDir)} 解决 reader writer 在同一个 TM 上面,keytab 覆盖问题。 + String localDir = LOCAL_CACHE_DIR + SP + localDirName; localDir = createDir(localDir); String fileLocalPath = localDir + SP + fileName; // 更新sftp文件对应的local文件 @@ -364,144 +347,4 @@ public static void refreshConfig() { "resetting default realm failed, current default realm will still be used.", e); } } - - /** 添加或者更新JaasConfiguration */ - public static synchronized DynamicConfiguration appendOrUpdateJaasConf( - String name, String keytab, String principal) { - if (hasExistsOnConfiguration(name)) { - return KerberosUtil.resetJaasConfForName(name, keytab, principal); - } else { - return KerberosUtil.appendJaasConf(name, keytab, principal); - } - } - - /** configuration里是否添加了configureName的配置 */ - public static synchronized boolean hasExistsOnConfiguration(String configureName) { - javax.security.auth.login.Configuration priorConfig = - javax.security.auth.login.Configuration.getConfiguration(); - AppConfigurationEntry[] appConfigurationEntry = - priorConfig.getAppConfigurationEntry(configureName); - return appConfigurationEntry != null; - } - - /** - * 反射更新 DynamicConfiguration#dynamicEntries的值 - * - * @param name - * @param keytab - * @param principal - * @return - */ - public static DynamicConfiguration resetJaasConfForName( - String name, String keytab, String principal) { - LOG.info("resetJaasConfForName, name {} principal {} ,keytab {}", name, principal, keytab); - javax.security.auth.login.Configuration config = - javax.security.auth.login.Configuration.getConfiguration(); - if (config instanceof DynamicConfiguration) { - Class dynamicConfigurationClass = config.getClass(); - try { - Field dynamicEntriesField = - dynamicConfigurationClass.getDeclaredField("dynamicEntries"); - dynamicEntriesField.setAccessible(true); - Map dynamicEntries = - (Map) dynamicEntriesField.get(config); - Map newDynamicEntries = new HashMap<>(); - - // 存在相同的Name时 直接用当前的覆盖以前的 - dynamicEntries.forEach( - (k, v) -> { - if (k.equals(name)) { - AppConfigurationEntry krb5Entry = - KerberosUtils.keytabEntry(keytab, principal); - AppConfigurationEntry[] appConfigurationEntries = - new AppConfigurationEntry[1]; - appConfigurationEntries[0] = krb5Entry; - newDynamicEntries.put(name, appConfigurationEntries); - } else { - newDynamicEntries.put(k, v); - } - }); - - dynamicEntriesField.set(config, newDynamicEntries); - } catch (Exception e) { - LOG.warn("reflex error", e); - return appendJaasConf(name, keytab, principal); - } - return (DynamicConfiguration) config; - } else { - return appendJaasConf(name, keytab, principal); - } - } - - public static DynamicConfiguration appendJaasConf( - String name, String keytab, String principal) { - LOG.info("appendJaasConf, name {} principal {} ,keytab {}", name, principal, keytab); - javax.security.auth.login.Configuration priorConfig = - javax.security.auth.login.Configuration.getConfiguration(); - // construct a dynamic JAAS configuration - DynamicConfiguration currentConfig = new DynamicConfiguration(priorConfig); - // wire up the configured JAAS login contexts to use the krb5 entries - AppConfigurationEntry krb5Entry = KerberosUtils.keytabEntry(keytab, principal); - currentConfig.addAppConfigurationEntry(name, krb5Entry); - javax.security.auth.login.Configuration.setConfiguration(currentConfig); - return currentConfig; - } - - /** - * 获取filePath的本地路径 - * - * @param kerberosConfig - * @param filePath - * @param distributedCache - * @param jobId - * @return - */ - public static String getLocalFileName( - Map kerberosConfig, - String filePath, - DistributedCache distributedCache, - String jobId) { - boolean useLocalFile = MapUtils.getBooleanValue(kerberosConfig, KEY_USE_LOCAL_FILE); - if (useLocalFile) { - LOG.info("will use local file:{}", filePath); - checkFileExists(filePath); - return filePath; - } else { - String fileName = filePath; - if (filePath.contains(SP)) { - fileName = filePath.substring(filePath.lastIndexOf(SP) + 1); - } - if (StringUtils.startsWith(fileName, "blob_")) { - // already downloaded from blobServer - LOG.info("file [{}] already downloaded from blobServer", filePath); - return filePath; - } - if (distributedCache != null) { - try { - File file = distributedCache.getFile(fileName); - String absolutePath = file.getAbsolutePath(); - LOG.info( - "load file [{}] from Flink BlobServer, download file path = {}", - fileName, - absolutePath); - return absolutePath; - } catch (Exception e) { - LOG.warn( - "failed to get [{}] from Flink BlobServer, try to get from sftp. e = {}", - fileName, - ExceptionUtil.getErrorMessage(e)); - } - } - - String remoteDir = MapUtils.getString(kerberosConfig, KEY_REMOTE_DIR); - if (null == jobId) { - LOG.warn("jobId is null, jobId will be replaced with [local]."); - jobId = "local"; - } - String localDirName = Md5Util.getMd5(remoteDir); - String localDir = LOCAL_CACHE_DIR + SP + jobId + SP + localDirName + SP + fileName; - - return localDir; - } - } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/sink/DirtyDataManager.java b/flinkx-core/src/main/java/com/dtstack/flinkx/sink/DirtyDataManager.java index 908b6889d5..89e93e86d9 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/sink/DirtyDataManager.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/sink/DirtyDataManager.java @@ -128,7 +128,7 @@ private String retrieveCategory(WriteRecordException ex) { public void open() { try { - FileSystem fs = FileSystemUtil.getFileSystem(config, null, distributedCache, jobId); + FileSystem fs = FileSystemUtil.getFileSystem(config, null, distributedCache); Path path = new Path(location); stream = fs.create(path, true); } catch (Exception e) { diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java index 090fd13d56..0c8fede632 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java @@ -58,11 +58,10 @@ public class FileSystemUtil { public static FileSystem getFileSystem( Map hadoopConfigMap, String defaultFs, - DistributedCache distributedCache, - String jobId) + DistributedCache distributedCache) throws Exception { if (isOpenKerberos(hadoopConfigMap)) { - return getFsWithKerberos(hadoopConfigMap, defaultFs, distributedCache, jobId); + return getFsWithKerberos(hadoopConfigMap, defaultFs, distributedCache); } Configuration conf = getConfiguration(hadoopConfigMap, defaultFs); @@ -100,12 +99,9 @@ public static boolean isOpenKerberos(Map hadoopConfig) { } private static FileSystem getFsWithKerberos( - Map hadoopConfig, - String defaultFs, - DistributedCache distributedCache, - String jobId) + Map hadoopConfig, String defaultFs, DistributedCache distributedCache) throws Exception { - UserGroupInformation ugi = getUGI(hadoopConfig, defaultFs, distributedCache, jobId); + UserGroupInformation ugi = getUGI(hadoopConfig, defaultFs, distributedCache); return ugi.doAs( (PrivilegedAction) @@ -120,16 +116,12 @@ private static FileSystem getFsWithKerberos( } public static UserGroupInformation getUGI( - Map hadoopConfig, - String defaultFs, - DistributedCache distributedCache, - String jobId) + Map hadoopConfig, String defaultFs, DistributedCache distributedCache) throws IOException { String keytabFileName = KerberosUtil.getPrincipalFileName(hadoopConfig); - keytabFileName = - KerberosUtil.loadFile(hadoopConfig, keytabFileName, distributedCache, jobId); + keytabFileName = KerberosUtil.loadFile(hadoopConfig, keytabFileName, distributedCache); String principal = KerberosUtil.getPrincipal(hadoopConfig, keytabFileName); - KerberosUtil.loadKrb5Conf(hadoopConfig, distributedCache, jobId); + KerberosUtil.loadKrb5Conf(hadoopConfig, distributedCache); KerberosUtil.refreshConfig(); return KerberosUtil.loginAndReturnUgi(