Skip to content

Commit

Permalink
Seatunnel LocalFileSource support multiple table
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Nov 11, 2023
1 parent 64f19f2 commit 0c63a0f
Show file tree
Hide file tree
Showing 31 changed files with 1,437 additions and 181 deletions.
92 changes: 76 additions & 16 deletions docs/en/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,23 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you

## Options

| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
| file_format_type | string | yes | - |
| read_columns | list | no | - |
| delimiter/field_delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
| time_format | string | no | HH:mm:ss |
| skip_header_row_number | long | no | 0 |
| schema | config | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |
| compress_codec | string | no | none |
| common-options | | no | - |
| name | type | required | default value |
|---------------------------|---------|----------|--------------------------------------|
| path | string | yes | - |
| file_format_type | string | yes | - |
| read_columns | list | no | - |
| delimiter/field_delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
| time_format | string | no | HH:mm:ss |
| skip_header_row_number | long | no | 0 |
| schema | config | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |
| compress_codec | string | no | none |
| common-options | | no | - |
| local_file_source_configs | list | no | used to define a multiple table task |

### path [string]

Expand Down Expand Up @@ -244,8 +245,14 @@ The compress codec of files and the details that supported as the following show

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details

### local_file_source_configs

Used to define a multiple table task, when you have multiple tables to read, you can use this option to define multiple tables.

## Example

### One Table

```hocon
LocalFile {
Expand All @@ -270,6 +277,59 @@ LocalFile {
```

### Multiple Table

```hocon
LocalFile {
local_file_source_configs = [
{
schema {
table = "student"
}
path = "/apps/hive/demo/student"
file_format_type = "parquet"
},
{
schema {
table = "teacher"
}
path = "/apps/hive/demo/teacher"
file_format_type = "parquet"
}
]
}
```

```hocon
LocalFile {
local_file_source_configs = [
{
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/student"
file_format_type = "json"
},
{
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/teacher"
file_format_type = "json"
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
FILE_LIST_GET_FAILED("FILE-03", "Get file list failed"),
FILE_LIST_EMPTY("FILE-04", "File list is empty");
FILE_LIST_EMPTY("FILE-04", "File list is empty"),
AGGREGATE_COMMIT_ERROR("FILE-05", "Aggregate committer error"),
FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
FORMAT_NOT_SUPPORT("FILE-07", "Format not support"),
FILE_READ_FAILED("FILE-08", "File read failed"),
;

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
FileSourceSplit split = sourceSplits.poll();
if (null != split) {
try {
readStrategy.read(split.splitId(), output);
// todo: If there is only one table , the tableId is not needed, but it's better
// to set this
readStrategy.read(split.splitId(), "", output);
} catch (Exception e) {
String errorMsg =
String.format("Read data from this file [%s] failed", split.splitId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy {

@SneakyThrows
@Override
public void read(String path, Collector<SeaTunnelRow> output) {
public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Map<String, String> partitionsMap = parsePartitionsByPath(path);
Expand Down Expand Up @@ -124,6 +124,7 @@ public void read(String path, Collector<SeaTunnelRow> output) {
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
}

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Expand Down Expand Up @@ -105,6 +105,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
private static final long MIN_SIZE = 16 * 1024;

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
Expand Down Expand Up @@ -120,6 +120,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
}
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
num++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
private int[] indexes;

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
Expand Down Expand Up @@ -119,6 +119,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,20 @@ public interface ReadStrategy extends Serializable {

Configuration getConfiguration(HadoopConf conf);

void read(String path, Collector<SeaTunnelRow> output)
void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException;

SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path)
throws FileConnectorException;

// todo: use CatalogTable
void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);

List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;

// todo: use ReadonlyConfig
void setPluginConfig(Config pluginConfig);

// todo: use CatalogTable
SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,6 +31,14 @@ public class ReadStrategyFactory {

private ReadStrategyFactory() {}

public static ReadStrategy of(ReadonlyConfig readonlyConfig, HadoopConf hadoopConf) {
ReadStrategy readStrategy =
of(readonlyConfig.get(BaseSourceConfig.FILE_FORMAT_TYPE).name());
readStrategy.setPluginConfig(readonlyConfig.toConfig());
readStrategy.init(hadoopConf);
return readStrategy;
}

public static ReadStrategy of(String fileType) {
try {
FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
private int[] indexes;

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Expand Down Expand Up @@ -118,6 +118,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testOrcRead() throws Exception {
orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
orcReadStrategy.read(orcFilePath, testCollector);
orcReadStrategy.read(orcFilePath, "", testCollector);
for (SeaTunnelRow row : testCollector.getRows()) {
Assertions.assertEquals(row.getField(0).getClass(), Boolean.class);
Assertions.assertEquals(row.getField(1).getClass(), Byte.class);
Expand All @@ -78,7 +78,7 @@ public void testOrcReadProjection() throws Exception {
orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
orcReadStrategy.read(orcFilePath, testCollector);
orcReadStrategy.read(orcFilePath, "", testCollector);
for (SeaTunnelRow row : testCollector.getRows()) {
Assertions.assertEquals(row.getField(0).getClass(), Byte.class);
Assertions.assertEquals(row.getField(1).getClass(), Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testParquetRead1() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
}

@Test
Expand All @@ -69,7 +69,7 @@ public void testParquetRead2() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
}

@Test
Expand All @@ -88,13 +88,13 @@ public void testParquetReadUseSystemDefaultTimeZone() throws Exception {
TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai");
TimeZone.setDefault(tz1);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
LocalDateTime time1 = (LocalDateTime) testCollector.getRows().get(0).getField(index);

TimeZone tz2 = TimeZone.getTimeZone("UTC");
TimeZone.setDefault(tz2);
TestCollector testCollector2 = new TestCollector();
parquetReadStrategy.read(path, testCollector2);
parquetReadStrategy.read(path, "", testCollector2);
LocalDateTime time2 = (LocalDateTime) testCollector2.getRows().get(0).getField(index);

Assertions.assertTrue(time1.isAfter(time2));
Expand All @@ -121,7 +121,7 @@ public void testParquetReadProjection1() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
List<SeaTunnelRow> rows = testCollector.getRows();
for (SeaTunnelRow row : rows) {
Assertions.assertEquals(row.getField(0).getClass(), Long.class);
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testParquetReadProjection2() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
}

public static class TestCollector implements Collector<SeaTunnelRow> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

public class LocalConf extends HadoopConf {
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;

public class LocalFileHadoopConf extends HadoopConf {
private static final String HDFS_IMPL = "org.apache.hadoop.fs.LocalFileSystem";
private static final String SCHEMA = "file";

public LocalConf(String hdfsNameKey) {
super(hdfsNameKey);
public LocalFileHadoopConf() {
super(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;

import org.apache.hadoop.fs.CommonConfigurationKeys;

import com.google.auto.service.AutoService;

@AutoService(SeaTunnelSink.class)
Expand All @@ -40,6 +38,6 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
super.prepare(pluginConfig);
hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
hadoopConf = new LocalFileHadoopConf();
}
}
Loading

0 comments on commit 0c63a0f

Please sign in to comment.