diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index f562fd30ae5b..69876e58f33e 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -41,22 +41,23 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you ## Options -| name | type | required | default value | -|---------------------------|---------|----------|---------------------| -| path | string | yes | - | -| file_format_type | string | yes | - | -| read_columns | list | no | - | -| delimiter/field_delimiter | string | no | \001 | -| parse_partition_from_path | boolean | no | true | -| date_format | string | no | yyyy-MM-dd | -| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | -| time_format | string | no | HH:mm:ss | -| skip_header_row_number | long | no | 0 | -| schema | config | no | - | -| sheet_name | string | no | - | -| file_filter_pattern | string | no | - | -| compress_codec | string | no | none | -| common-options | | no | - | +| name | type | required | default value | +|---------------------------|---------|----------|--------------------------------------| +| path | string | yes | - | +| file_format_type | string | yes | - | +| read_columns | list | no | - | +| delimiter/field_delimiter | string | no | \001 | +| parse_partition_from_path | boolean | no | true | +| date_format | string | no | yyyy-MM-dd | +| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | +| time_format | string | no | HH:mm:ss | +| skip_header_row_number | long | no | 0 | +| schema | config | no | - | +| sheet_name | string | no | - | +| file_filter_pattern | string | no | - | +| compress_codec | string | no | none | +| common-options | | no | - | +| local_file_source_configs | list | no | used to define a multiple table task | ### path [string] @@ -244,11 +245,71 @@ The compress codec of files and the details that supported as the following show Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details +### local_file_source_configs + +Used to define a multiple table task, when you have multiple tables to read, you can use this option to define multiple tables. + ## Example +### One Table + +```hocon + +LocalFile { + local_file_source_configs = [ + { + schema { + table = "student" + } + path = "/apps/hive/demo/student" + file_format_type = "parquet" + }, + { + schema { + table = "teacher" + } + path = "/apps/hive/demo/teacher" + file_format_type = "parquet" + } + ] +} + +``` + +```hocon + +LocalFile { + local_file_source_configs = [ + { + schema { + fields { + name = string + age = int + } + } + path = "/apps/hive/demo/student" + file_format_type = "json" + }, + { + schema { + fields { + name = string + age = int + } + } + path = "/apps/hive/demo/teacher" + file_format_type = "json" + } +} + +``` + +### Multiple Table + ```hocon LocalFile { + lo path = "/apps/hive/demo/student" file_format_type = "parquet" } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java index 65e9590f34ea..27dca4a6bc9a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java @@ -23,7 +23,12 @@ public enum FileConnectorErrorCode implements SeaTunnelErrorCode { FILE_TYPE_INVALID("FILE-01", "File type is invalid"), DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"), FILE_LIST_GET_FAILED("FILE-03", "Get file list failed"), - FILE_LIST_EMPTY("FILE-04", "File list is empty"); + FILE_LIST_EMPTY("FILE-04", "File list is empty"), + AGGREGATE_COMMIT_ERROR("FILE-05", "Aggregate committer error"), + FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"), + FORMAT_NOT_SUPPORT("FILE-07", "Format not support"), + FILE_READ_FAILED("FILE-08", "File read failed"), + ; private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java index 18332b943623..7082b6171ea9 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java @@ -63,7 +63,9 @@ public void pollNext(Collector output) throws Exception { FileSourceSplit split = sourceSplits.poll(); if (null != split) { try { - readStrategy.read(split.splitId(), output); + // todo: If there is only one table , the tableId is not needed, but it's better + // to set this + readStrategy.read(split.splitId(), "", output); } catch (Exception e) { String errorMsg = String.format("Read data from this file [%s] failed", split.splitId()); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java index 02f6c3077217..ba7865b315b5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java @@ -74,7 +74,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy { @SneakyThrows @Override - public void read(String path, Collector output) { + public void read(String path, String tableId, Collector output) { Configuration conf = getConfiguration(); FileSystem fs = FileSystem.get(conf); Map partitionsMap = parsePartitionsByPath(path); @@ -124,6 +124,7 @@ public void read(String path, Collector output) { seaTunnelRow.setField(index++, value); } } + seaTunnelRow.setTableId(tableId); output.collect(seaTunnelRow); }); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java index 9095f97236c8..d8dccd86de4e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java @@ -69,7 +69,7 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { } @Override - public void read(String path, Collector output) + public void read(String path, String tableId, Collector output) throws FileConnectorException, IOException { Configuration conf = getConfiguration(); FileSystem fs = FileSystem.get(conf); @@ -105,6 +105,7 @@ public void read(String path, Collector output) seaTunnelRow.setField(index++, value); } } + seaTunnelRow.setTableId(tableId); output.collect(seaTunnelRow); } catch (IOException e) { String errorMsg = diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java index 4bdbac1e9654..56c782e3d0bc 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java @@ -74,7 +74,7 @@ public class OrcReadStrategy extends AbstractReadStrategy { private static final long MIN_SIZE = 16 * 1024; @Override - public void read(String path, Collector output) + public void read(String path, String tableId, Collector output) throws FileConnectorException, IOException { if (Boolean.FALSE.equals(checkFileType(path))) { String errorMsg = @@ -120,6 +120,7 @@ public void read(String path, Collector output) } } SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + seaTunnelRow.setTableId(tableId); output.collect(seaTunnelRow); num++; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java index 944d3ee86a21..e7a0c0af4a9c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java @@ -81,7 +81,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy { private int[] indexes; @Override - public void read(String path, Collector output) + public void read(String path, String tableId, Collector output) throws FileConnectorException, IOException { if (Boolean.FALSE.equals(checkFileType(path))) { String errorMsg = @@ -119,6 +119,7 @@ public void read(String path, Collector output) fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i)); } SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + seaTunnelRow.setTableId(tableId); output.collect(seaTunnelRow); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java index b53e97140e77..3f1a869b1836 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java @@ -36,17 +36,20 @@ public interface ReadStrategy extends Serializable { Configuration getConfiguration(HadoopConf conf); - void read(String path, Collector output) + void read(String path, String tableId, Collector output) throws IOException, FileConnectorException; SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FileConnectorException; + // todo: use CatalogTable void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType); List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException; + // todo: use ReadonlyConfig void setPluginConfig(Config pluginConfig); + // todo: use CatalogTable SeaTunnelRowType getActualSeaTunnelRowTypeInfo(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java index 3aa1874edf56..777490c03122 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java @@ -17,8 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.file.source.reader; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import lombok.extern.slf4j.Slf4j; @@ -28,6 +31,14 @@ public class ReadStrategyFactory { private ReadStrategyFactory() {} + public static ReadStrategy of(ReadonlyConfig readonlyConfig, HadoopConf hadoopConf) { + ReadStrategy readStrategy = + of(readonlyConfig.get(BaseSourceConfig.FILE_FORMAT_TYPE).name()); + readStrategy.setPluginConfig(readonlyConfig.toConfig()); + readStrategy.init(hadoopConf); + return readStrategy; + } + public static ReadStrategy of(String fileType) { try { FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase()); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java index 816e50b57b8d..586d165e1b01 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java @@ -64,7 +64,7 @@ public class TextReadStrategy extends AbstractReadStrategy { private int[] indexes; @Override - public void read(String path, Collector output) + public void read(String path, String tableId, Collector output) throws FileConnectorException, IOException { Configuration conf = getConfiguration(); FileSystem fs = FileSystem.get(conf); @@ -118,6 +118,7 @@ public void read(String path, Collector output) seaTunnelRow.setField(index++, value); } } + seaTunnelRow.setTableId(tableId); output.collect(seaTunnelRow); } catch (IOException e) { String errorMsg = diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java index 5e8eb9a2c8a5..56fbaae386ad 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java @@ -52,7 +52,7 @@ public void testOrcRead() throws Exception { orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath); Assertions.assertNotNull(seaTunnelRowTypeInfo); System.out.println(seaTunnelRowTypeInfo); - orcReadStrategy.read(orcFilePath, testCollector); + orcReadStrategy.read(orcFilePath, "", testCollector); for (SeaTunnelRow row : testCollector.getRows()) { Assertions.assertEquals(row.getField(0).getClass(), Boolean.class); Assertions.assertEquals(row.getField(1).getClass(), Byte.class); @@ -78,7 +78,7 @@ public void testOrcReadProjection() throws Exception { orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath); Assertions.assertNotNull(seaTunnelRowTypeInfo); System.out.println(seaTunnelRowTypeInfo); - orcReadStrategy.read(orcFilePath, testCollector); + orcReadStrategy.read(orcFilePath, "", testCollector); for (SeaTunnelRow row : testCollector.getRows()) { Assertions.assertEquals(row.getField(0).getClass(), Byte.class); Assertions.assertEquals(row.getField(1).getClass(), Boolean.class); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java index 0a3bee6282fd..1c36a9145313 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java @@ -53,7 +53,7 @@ public void testParquetRead1() throws Exception { Assertions.assertNotNull(seaTunnelRowTypeInfo); System.out.println(seaTunnelRowTypeInfo); TestCollector testCollector = new TestCollector(); - parquetReadStrategy.read(path, testCollector); + parquetReadStrategy.read(path, "", testCollector); } @Test @@ -69,7 +69,7 @@ public void testParquetRead2() throws Exception { Assertions.assertNotNull(seaTunnelRowTypeInfo); System.out.println(seaTunnelRowTypeInfo); TestCollector testCollector = new TestCollector(); - parquetReadStrategy.read(path, testCollector); + parquetReadStrategy.read(path, "", testCollector); } @Test @@ -88,13 +88,13 @@ public void testParquetReadUseSystemDefaultTimeZone() throws Exception { TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai"); TimeZone.setDefault(tz1); TestCollector testCollector = new TestCollector(); - parquetReadStrategy.read(path, testCollector); + parquetReadStrategy.read(path, "", testCollector); LocalDateTime time1 = (LocalDateTime) testCollector.getRows().get(0).getField(index); TimeZone tz2 = TimeZone.getTimeZone("UTC"); TimeZone.setDefault(tz2); TestCollector testCollector2 = new TestCollector(); - parquetReadStrategy.read(path, testCollector2); + parquetReadStrategy.read(path, "", testCollector2); LocalDateTime time2 = (LocalDateTime) testCollector2.getRows().get(0).getField(index); Assertions.assertTrue(time1.isAfter(time2)); @@ -121,7 +121,7 @@ public void testParquetReadProjection1() throws Exception { Assertions.assertNotNull(seaTunnelRowTypeInfo); System.out.println(seaTunnelRowTypeInfo); TestCollector testCollector = new TestCollector(); - parquetReadStrategy.read(path, testCollector); + parquetReadStrategy.read(path, "", testCollector); List rows = testCollector.getRows(); for (SeaTunnelRow row : rows) { Assertions.assertEquals(row.getField(0).getClass(), Long.class); @@ -151,7 +151,7 @@ public void testParquetReadProjection2() throws Exception { Assertions.assertNotNull(seaTunnelRowTypeInfo); System.out.println(seaTunnelRowTypeInfo); TestCollector testCollector = new TestCollector(); - parquetReadStrategy.read(path, testCollector); + parquetReadStrategy.read(path, "", testCollector); } public static class TestCollector implements Collector { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalFileHadoopConf.java similarity index 85% rename from seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java rename to seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalFileHadoopConf.java index b1f3f35828be..284167ce7d6f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalFileHadoopConf.java @@ -19,12 +19,14 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; -public class LocalConf extends HadoopConf { +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; + +public class LocalFileHadoopConf extends HadoopConf { private static final String HDFS_IMPL = "org.apache.hadoop.fs.LocalFileSystem"; private static final String SCHEMA = "file"; - public LocalConf(String hdfsNameKey) { - super(hdfsNameKey); + public LocalFileHadoopConf() { + super(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT); } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java index a11b8067a788..4d8037ef5f1c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java @@ -22,11 +22,9 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf; +import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; -import org.apache.hadoop.fs.CommonConfigurationKeys; - import com.google.auto.service.AutoService; @AutoService(SeaTunnelSink.class) @@ -40,6 +38,6 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { super.prepare(pluginConfig); - hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT); + hadoopConf = new LocalFileHadoopConf(); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java index 8ff31155b872..76ee4e452ede 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java @@ -17,35 +17,37 @@ package org.apache.seatunnel.connectors.seatunnel.file.local.source; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportColumnProjection; +import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; -import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf; -import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig; -import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource; -import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.reader.MultipleTableLocalFileSourceReader; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.split.MultipleTableLocalFileSourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.state.LocalFileSourceState; -import org.apache.hadoop.fs.CommonConfigurationKeys; +import java.util.List; +import java.util.stream.Collectors; -import com.google.auto.service.AutoService; +public class LocalFileSource + implements SeaTunnelSource, + SupportParallelism, + SupportColumnProjection { -import java.io.IOException; + private final MultipleTableLocalFileSourceConfig multipleTableLocalFileSourceConfig; -@AutoService(SeaTunnelSource.class) -public class LocalFileSource extends BaseFileSource { + public LocalFileSource(ReadonlyConfig readonlyConfig) { + this.multipleTableLocalFileSourceConfig = + new MultipleTableLocalFileSourceConfig(readonlyConfig); + } @Override public String getPluginName() { @@ -53,75 +55,36 @@ public String getPluginName() { } @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, - LocalSourceConfig.FILE_PATH.key(), - LocalSourceConfig.FILE_FORMAT_TYPE.key()); - if (!result.isSuccess()) { - throw new FileConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - readStrategy = - ReadStrategyFactory.of( - pluginConfig.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key())); - readStrategy.setPluginConfig(pluginConfig); - String path = pluginConfig.getString(LocalSourceConfig.FILE_PATH.key()); - hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT); - try { - filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); - } catch (IOException e) { - String errorMsg = String.format("Get file list from this path [%s] failed", path); - throw new FileConnectorException( - FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e); - } - // support user-defined schema - FileFormat fileFormat = - FileFormat.valueOf( - pluginConfig - .getString(LocalSourceConfig.FILE_FORMAT_TYPE.key()) - .toUpperCase()); - // only json text csv type support user-defined schema now - if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) { - switch (fileFormat) { - case CSV: - case TEXT: - case JSON: - case EXCEL: - SeaTunnelRowType userDefinedSchema = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); - rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); - break; - case ORC: - case PARQUET: - throw new FileConnectorException( - CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, - "SeaTunnel does not support user-defined schema for [parquet, orc] files"); - default: - // never got in there - throw new FileConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - "SeaTunnel does not supported this file format"); - } - } else { - if (filePaths.isEmpty()) { - // When the directory is empty, distribute default behavior schema - rowType = CatalogTableUtil.buildSimpleTextSchema(); - return; - } - try { - rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); - } catch (FileConnectorException e) { - String errorMsg = - String.format("Get table schema from file [%s] failed", filePaths.get(0)); - throw new FileConnectorException( - CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e); - } - } + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public List getProducedCatalogTables() { + return multipleTableLocalFileSourceConfig.getLocalFileSourceConfigs().stream() + .map(LocalFileSourceConfig::getCatalogTable) + .collect(Collectors.toList()); + } + + @Override + public SourceReader createReader( + SourceReader.Context readerContext) { + return new MultipleTableLocalFileSourceReader( + readerContext, multipleTableLocalFileSourceConfig); + } + + @Override + public SourceSplitEnumerator createEnumerator( + SourceSplitEnumerator.Context enumeratorContext) { + return new MultipleTableLocalFileSourceSplitEnumerator( + enumeratorContext, multipleTableLocalFileSourceConfig); + } + + @Override + public SourceSplitEnumerator restoreEnumerator( + SourceSplitEnumerator.Context enumeratorContext, + LocalFileSourceState checkpointState) { + return new MultipleTableLocalFileSourceSplitEnumerator( + enumeratorContext, multipleTableLocalFileSourceConfig, checkpointState); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java index cf34c99a5b89..d0dc77a0b39f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java @@ -19,16 +19,20 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceOptions; import com.google.auto.service.AutoService; +import java.io.Serializable; import java.util.Arrays; @AutoService(Factory.class) @@ -38,11 +42,18 @@ public String factoryIdentifier() { return FileSystemType.LOCAL.getFileSystemPluginName(); } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource) new LocalFileSource(context.getOptions()); + } + @Override public OptionRule optionRule() { return OptionRule.builder() - .required(LocalSourceConfig.FILE_PATH) - .required(BaseSourceConfig.FILE_FORMAT_TYPE) + .optional(LocalFileSourceOptions.LOCAL_FILE_SOURCE_CONFIGS) + .optional(BaseSourceConfig.FILE_PATH) + .optional(BaseSourceConfig.FILE_FORMAT_TYPE) .conditional( BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java new file mode 100644 index 000000000000..f2fd7b880849 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.local.source.config; + +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; + +import org.apache.commons.collections4.CollectionUtils; + +import lombok.Getter; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Getter +public class LocalFileSourceConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private final CatalogTable catalogTable; + private final FileFormat fileFormat; + private final ReadStrategy readStrategy; + private final List filePaths; + private final LocalFileHadoopConf localFileHadoopConf; + + public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) { + validateConfig(readonlyConfig); + this.fileFormat = readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE); + this.localFileHadoopConf = new LocalFileHadoopConf(); + this.readStrategy = ReadStrategyFactory.of(readonlyConfig, localFileHadoopConf); + this.filePaths = parseFilePaths(readonlyConfig); + this.catalogTable = parseCatalogTable(readonlyConfig); + } + + private void validateConfig(ReadonlyConfig readonlyConfig) { + if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) { + throw new FileConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + FileSystemType.LOCAL.getFileSystemPluginName(), + PluginType.SOURCE, + LocalFileSourceOptions.FILE_PATH + " is required")); + } + if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent()) { + throw new FileConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + FileSystemType.LOCAL.getFileSystemPluginName(), + PluginType.SOURCE, + LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " is required")); + } + } + + private List parseFilePaths(ReadonlyConfig readonlyConfig) { + String rootPath = null; + try { + rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH); + return readStrategy.getFileNamesByPath(localFileHadoopConf, rootPath); + } catch (Exception ex) { + String errorMsg = String.format("Get file list from this path [%s] failed", rootPath); + throw new FileConnectorException( + FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex); + } + } + + private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) { + final CatalogTable catalogTable; + if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { + catalogTable = + CatalogTableUtil.buildWithConfig( + FileSystemType.LOCAL.getFileSystemPluginName(), readonlyConfig); + } else { + catalogTable = CatalogTableUtil.buildSimpleTextTable(); + } + if (CollectionUtils.isEmpty(filePaths)) { + return catalogTable; + } + switch (fileFormat) { + case CSV: + case TEXT: + case JSON: + case EXCEL: + readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); + return newCatalogTable(catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo()); + case ORC: + case PARQUET: + return newCatalogTable( + catalogTable, + readStrategy.getSeaTunnelRowTypeInfo( + localFileHadoopConf, filePaths.get(0))); + default: + throw new FileConnectorException( + FileConnectorErrorCode.FORMAT_NOT_SUPPORT, + "SeaTunnel does not supported this file format: [" + fileFormat + "]"); + } + } + + private CatalogTable newCatalogTable( + CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) { + TableSchema tableSchema = catalogTable.getTableSchema(); + + Map columnMap = + tableSchema.getColumns().stream() + .collect(Collectors.toMap(Column::getName, Function.identity())); + String[] fieldNames = seaTunnelRowType.getFieldNames(); + SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); + + List finalColumns = new ArrayList<>(); + for (int i = 0; i < fieldNames.length; i++) { + Column column = columnMap.get(fieldNames[i]); + if (column != null) { + finalColumns.add(column); + } else { + finalColumns.add( + PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, false, null, null)); + } + } + + TableSchema finalSchema = + TableSchema.builder() + .columns(finalColumns) + .primaryKey(tableSchema.getPrimaryKey()) + .constraintKey(tableSchema.getConstraintKeys()) + .build(); + + return CatalogTable.of( + catalogTable.getTableId(), + finalSchema, + catalogTable.getOptions(), + catalogTable.getPartitionKeys(), + catalogTable.getComment(), + catalogTable.getCatalogName()); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceOptions.java new file mode 100644 index 000000000000..45d64167ab49 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceOptions.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.local.source.config; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; + +import java.util.List; +import java.util.Map; + +public final class LocalFileSourceOptions extends BaseSourceConfig { + + public static final Option>> LOCAL_FILE_SOURCE_CONFIGS = + Options.key("local_file_source_configs") + .type(new TypeReference>>() {}) + .noDefaultValue() + .withDescription( + "Local file source configs, used to create multiple local file source."); +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java new file mode 100644 index 000000000000..84ac26ce702a --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.local.source.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import com.google.common.collect.Lists; +import lombok.Getter; + +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; + +public class MultipleTableLocalFileSourceConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + @Getter private List localFileSourceConfigs; + + public MultipleTableLocalFileSourceConfig(ReadonlyConfig localFileSourceRootConfig) { + if (localFileSourceRootConfig + .getOptional(LocalFileSourceOptions.LOCAL_FILE_SOURCE_CONFIGS) + .isPresent()) { + parseFromLocalFileSourceConfigs(localFileSourceRootConfig); + } else { + parseFromLocalFileSourceConfig(localFileSourceRootConfig); + } + } + + private void parseFromLocalFileSourceConfigs(ReadonlyConfig localFileSourceRootConfig) { + this.localFileSourceConfigs = + localFileSourceRootConfig.get(LocalFileSourceOptions.LOCAL_FILE_SOURCE_CONFIGS) + .stream() + .map(ReadonlyConfig::fromMap) + .map(LocalFileSourceConfig::new) + .collect(Collectors.toList()); + } + + private void parseFromLocalFileSourceConfig(ReadonlyConfig localFileSourceRootConfig) { + LocalFileSourceConfig localFileSourceConfig = + new LocalFileSourceConfig(localFileSourceRootConfig); + this.localFileSourceConfigs = Lists.newArrayList(localFileSourceConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/reader/MultipleTableLocalFileSourceReader.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/reader/MultipleTableLocalFileSourceReader.java new file mode 100644 index 000000000000..bd990db50fb9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/reader/MultipleTableLocalFileSourceReader.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.local.source.reader; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_FAILED; +import static org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_STRATEGY_NOT_SUPPORT; + +@Slf4j +public class MultipleTableLocalFileSourceReader + implements SourceReader { + + private final SourceReader.Context context; + private volatile boolean noMoreSplit; + + private final Deque sourceSplits = new ConcurrentLinkedDeque<>(); + + private final Map readStrategyMap; + + public MultipleTableLocalFileSourceReader( + SourceReader.Context context, + MultipleTableLocalFileSourceConfig multipleTableLocalFileSourceConfig) { + this.context = context; + this.readStrategyMap = + multipleTableLocalFileSourceConfig.getLocalFileSourceConfigs().stream() + .collect( + Collectors.toMap( + localFileSourceConfig -> + localFileSourceConfig + .getCatalogTable() + .getTableId() + .toTablePath() + .toString(), + LocalFileSourceConfig::getReadStrategy)); + } + + @Override + public void pollNext(Collector output) { + synchronized (output.getCheckpointLock()) { + LocalFileSourceSplit split = sourceSplits.poll(); + if (null != split) { + ReadStrategy readStrategy = readStrategyMap.get(split.getTableId()); + if (readStrategy == null) { + throw new FileConnectorException( + FILE_READ_STRATEGY_NOT_SUPPORT, + "Cannot found the read strategy for this table: [" + + split.getTableId() + + "]"); + } + try { + readStrategy.read(split.getFilePath(), split.getTableId(), output); + } catch (Exception e) { + String errorMsg = + String.format("Read data from this file [%s] failed", split.splitId()); + throw new FileConnectorException(FILE_READ_FAILED, errorMsg, e); + } + } else if (noMoreSplit && sourceSplits.isEmpty()) { + // signal to the source that we have reached the end of the data. + log.info( + "There is no more element for the bounded MultipleTableLocalFileSourceReader"); + context.signalNoMoreElement(); + } + } + } + + @Override + public List snapshotState(long checkpointId) { + return new ArrayList<>(sourceSplits); + } + + @Override + public void addSplits(List splits) { + sourceSplits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // do nothing + } + + @Override + public void open() throws Exception { + // do nothing + log.info("Opened the MultipleTableLocalFileSourceReader"); + } + + @Override + public void close() throws IOException { + // do nothing + log.info("Closed the MultipleTableLocalFileSourceReader"); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSourceSplit.java similarity index 62% rename from seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java rename to seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSourceSplit.java index 2f43ff8d9f32..89bab1bee474 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSourceSplit.java @@ -15,8 +15,26 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.file.local.source.config; +package org.apache.seatunnel.connectors.seatunnel.file.local.source.split; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.api.source.SourceSplit; -public class LocalSourceConfig extends BaseSourceConfig {} +import lombok.Getter; + +public class LocalFileSourceSplit implements SourceSplit { + + private static final long serialVersionUID = 1L; + + @Getter private final String tableId; + @Getter private final String filePath; + + public LocalFileSourceSplit(String tableId, String filePath) { + this.tableId = tableId; + this.filePath = filePath; + } + + @Override + public String splitId() { + return tableId + "_" + filePath; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/MultipleTableLocalFileSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/MultipleTableLocalFileSourceSplitEnumerator.java new file mode 100644 index 000000000000..f00885f496a4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/MultipleTableLocalFileSourceSplitEnumerator.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.local.source.split; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.local.source.state.LocalFileSourceState; + +import org.apache.commons.collections4.CollectionUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@Slf4j +public class MultipleTableLocalFileSourceSplitEnumerator + implements SourceSplitEnumerator { + + private final SourceSplitEnumerator.Context context; + private final Set pendingSplit; + private final Set assignedSplit; + private final Map> filePathMap; + + public MultipleTableLocalFileSourceSplitEnumerator( + SourceSplitEnumerator.Context context, + MultipleTableLocalFileSourceConfig multipleTableLocalFileSourceConfig) { + this.context = context; + this.filePathMap = + multipleTableLocalFileSourceConfig.getLocalFileSourceConfigs().stream() + .collect( + Collectors.toMap( + localFileSourceConfig -> + localFileSourceConfig + .getCatalogTable() + .getTableId() + .toTablePath() + .toString(), + LocalFileSourceConfig::getFilePaths)); + this.assignedSplit = new HashSet<>(); + this.pendingSplit = new HashSet<>(); + } + + public MultipleTableLocalFileSourceSplitEnumerator( + SourceSplitEnumerator.Context context, + MultipleTableLocalFileSourceConfig multipleTableLocalFileSourceConfig, + LocalFileSourceState localFileSourceState) { + this(context, multipleTableLocalFileSourceConfig); + this.assignedSplit.addAll(localFileSourceState.getAssignedSplit()); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (CollectionUtils.isEmpty(splits)) { + return; + } + pendingSplit.addAll(splits); + assignSplit(subtaskId); + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplit.size(); + } + + @Override + public void handleSplitRequest(int subtaskId) {} + + @Override + public void registerReader(int subtaskId) { + for (Map.Entry> filePathEntry : filePathMap.entrySet()) { + String tableId = filePathEntry.getKey(); + List filePaths = filePathEntry.getValue(); + for (String filePath : filePaths) { + pendingSplit.add(new LocalFileSourceSplit(tableId, filePath)); + } + } + assignSplit(subtaskId); + } + + @Override + public LocalFileSourceState snapshotState(long checkpointId) { + return new LocalFileSourceState(assignedSplit); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // do nothing. + } + + private void assignSplit(int taskId) { + List currentTaskSplits = new ArrayList<>(); + if (context.currentParallelism() == 1) { + // if parallelism == 1, we should assign all the splits to reader + currentTaskSplits.addAll(pendingSplit); + } else { + // if parallelism > 1, according to hashCode of split's id to determine whether to + // allocate the current task + for (LocalFileSourceSplit fileSourceSplit : pendingSplit) { + int splitOwner = + getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism()); + if (splitOwner == taskId) { + currentTaskSplits.add(fileSourceSplit); + } + } + } + // assign splits + context.assignSplit(taskId, currentTaskSplits); + // save the state of assigned splits + assignedSplit.addAll(currentTaskSplits); + // remove the assigned splits from pending splits + currentTaskSplits.forEach(pendingSplit::remove); + log.info( + "SubTask {} is assigned to [{}]", + taskId, + currentTaskSplits.stream() + .map(LocalFileSourceSplit::splitId) + .collect(Collectors.joining(","))); + context.signalNoMoreSplits(taskId); + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + @Override + public void open() { + // do nothing + } + + @Override + public void run() throws Exception { + // do nothing + } + + @Override + public void close() throws IOException { + // do nothing + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/state/LocalFileSourceState.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/state/LocalFileSourceState.java new file mode 100644 index 000000000000..2cc09f92ffd6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/state/LocalFileSourceState.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.local.source.state; + +import org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSourceSplit; + +import java.io.Serializable; +import java.util.Set; + +public class LocalFileSourceState implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Set assignedSplit; + + public LocalFileSourceState(Set assignedSplit) { + this.assignedSplit = assignedSplit; + } + + public Set getAssignedSplit() { + return assignedSplit; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index bb80160f14b0..6822ee154970 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -134,6 +134,50 @@ public void testLocalFileReadAndWrite(TestContainer container) helper.execute("/parquet/local_file_to_console.conf"); } + @TestTemplate + public void testLocalFileReadAndWriteInMultipleTableMode(TestContainer container) + throws IOException, InterruptedException { + TestHelper helper = new TestHelper(container); + // + helper.execute("/excel/fake_to_local_excel.conf"); + helper.execute("/excel/local_excel_to_assert.conf"); + helper.execute("/excel/local_excel_projection_to_assert.conf"); + // test write local text file + helper.execute("/text/fake_to_local_file_text.conf"); + helper.execute("/text/local_file_text_lzo_to_assert.conf"); + helper.execute("/text/local_file_delimiter_assert.conf"); + // test read skip header + helper.execute("/text/local_file_text_skip_headers.conf"); + // test read local text file + helper.execute("/text/local_file_text_to_assert.conf"); + helper.execute("/text/local_file_text_to_assert_in_multipletable.conf"); + // test read local text file with projection + helper.execute("/text/local_file_text_projection_to_assert.conf"); + // test write local json file + helper.execute("/json/fake_to_local_file_json.conf"); + // test read local json file + helper.execute("/json/local_file_json_to_assert.conf"); + helper.execute("/json/local_file_json_lzo_to_console.conf"); + // test write local orc file + helper.execute("/orc/fake_to_local_file_orc.conf"); + // test read local orc file + helper.execute("/orc/local_file_orc_to_assert.conf"); + // test read local orc file with projection + helper.execute("/orc/local_file_orc_projection_to_assert.conf"); + // test write local parquet file + helper.execute("/parquet/fake_to_local_file_parquet.conf"); + // test read local parquet file + helper.execute("/parquet/local_file_parquet_to_assert.conf"); + // test read local parquet file with projection + helper.execute("/parquet/local_file_parquet_projection_to_assert.conf"); + // test read filtered local file + helper.execute("/excel/local_filter_excel_to_assert.conf"); + + // test read empty directory + helper.execute("/json/local_file_to_console.conf"); + helper.execute("/parquet/local_file_to_console.conf"); + } + private Path convertToLzoFile(File file) throws IOException { LzopCodec lzo = new LzopCodec(); Path path = Paths.get(file.getAbsolutePath() + ".lzo"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf index 4595f8388877..cb2b963177ed 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf @@ -29,6 +29,8 @@ source { LocalFile { path = "/tmp/fake_empty" file_format_type = "json" + schema { + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf index 818db0ce5df2..395eba278d40 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf @@ -27,6 +27,9 @@ env { source { LocalFile { + schema = { + table = "fake" + } path = "/seatunnel/read/parquet" file_format_type = "parquet" read_columns = [c_string, c_boolean, c_double] diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf index 07776f0b80c1..df2a71869b39 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf @@ -27,6 +27,9 @@ env { source { LocalFile { + schema = { + table = "fake" + } path = "/seatunnel/read/parquet" file_format_type = "parquet" result_table_name = "fake" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert_in_multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert_in_multipletable.conf new file mode 100644 index 000000000000..02383dd2e407 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert_in_multipletable.conf @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + LocalFile { + local_file_source_configs = [ + { + path = "/seatunnel/read/text" + file_format_type = "text" + schema = { + table = "fake01" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + }, + { + path = "/seatunnel/read/text" + file_format_type = "text" + schema = { + table = "fake02" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } + ] + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + table-names = ["fake01", "fake02"] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf index 6ad7d6fd4267..f32df5fbb7ba 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf @@ -20,51 +20,51 @@ env { job.mode = "BATCH" } - source { - FakeSource { - schema = { - fields { - id = int - val_bool = boolean - val_int8 = tinyint - val_int16 = smallint - val_int32 = int - val_int64 = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - val_unixtime_micros = timestamp - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_BEFORE - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_AFTER - fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - }, - { - kind = DELETE - fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - } - ] +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp } } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_BEFORE + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_AFTER + fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = DELETE + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} sink { kudu{