Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] LocalFileSource support multiple table #5781

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 76 additions & 16 deletions docs/en/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,23 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you

## Options

| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
| file_format_type | string | yes | - |
| read_columns | list | no | - |
| delimiter/field_delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
| time_format | string | no | HH:mm:ss |
| skip_header_row_number | long | no | 0 |
| schema | config | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |
| compress_codec | string | no | none |
| common-options | | no | - |
| name | type | required | default value |
|---------------------------|---------|----------|--------------------------------------|
| path | string | yes | - |
| file_format_type | string | yes | - |
| read_columns | list | no | - |
| delimiter/field_delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
| time_format | string | no | HH:mm:ss |
| skip_header_row_number | long | no | 0 |
| schema | config | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |
| compress_codec | string | no | none |
| common-options | | no | - |
| tables_configs | list | no | used to define a multiple table task |

### path [string]

Expand Down Expand Up @@ -244,8 +245,14 @@ The compress codec of files and the details that supported as the following show

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details

### tables_configs

Used to define a multiple table task, when you have multiple tables to read, you can use this option to define multiple tables.

## Example

### One Table

```hocon

LocalFile {
Expand All @@ -270,6 +277,59 @@ LocalFile {

```

### Multiple Table

```hocon

LocalFile {
tables_configs = [
{
schema {
table = "student"
}
path = "/apps/hive/demo/student"
file_format_type = "parquet"
},
{
schema {
table = "teacher"
}
path = "/apps/hive/demo/teacher"
file_format_type = "parquet"
}
]
}

```

```hocon

LocalFile {
tables_configs = [
{
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/student"
file_format_type = "json"
},
{
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/teacher"
file_format_type = "json"
}
}

```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
FILE_LIST_GET_FAILED("FILE-03", "Get file list failed"),
FILE_LIST_EMPTY("FILE-04", "File list is empty");
FILE_LIST_EMPTY("FILE-04", "File list is empty"),
AGGREGATE_COMMIT_ERROR("FILE-05", "Aggregate committer error"),
FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
FORMAT_NOT_SUPPORT("FILE-07", "Format not support"),
FILE_READ_FAILED("FILE-08", "File read failed"),
;

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
FileSourceSplit split = sourceSplits.poll();
if (null != split) {
try {
readStrategy.read(split.splitId(), output);
// todo: If there is only one table , the tableId is not needed, but it's better
// to set this
readStrategy.read(split.splitId(), "", output);
} catch (Exception e) {
String errorMsg =
String.format("Read data from this file [%s] failed", split.splitId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy {

@SneakyThrows
@Override
public void read(String path, Collector<SeaTunnelRow> output) {
public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Map<String, String> partitionsMap = parsePartitionsByPath(path);
Expand Down Expand Up @@ -124,6 +124,7 @@ public void read(String path, Collector<SeaTunnelRow> output) {
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
}

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Expand Down Expand Up @@ -105,6 +105,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
private static final long MIN_SIZE = 16 * 1024;

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
Expand Down Expand Up @@ -120,6 +120,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
}
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
num++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
private int[] indexes;

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
Expand Down Expand Up @@ -119,6 +119,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,20 @@ public interface ReadStrategy extends Serializable {

Configuration getConfiguration(HadoopConf conf);

void read(String path, Collector<SeaTunnelRow> output)
void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException;

SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path)
throws FileConnectorException;

// todo: use CatalogTable
void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);

List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;

// todo: use ReadonlyConfig
void setPluginConfig(Config pluginConfig);

// todo: use CatalogTable
SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,6 +31,14 @@ public class ReadStrategyFactory {

private ReadStrategyFactory() {}

public static ReadStrategy of(ReadonlyConfig readonlyConfig, HadoopConf hadoopConf) {
ReadStrategy readStrategy =
of(readonlyConfig.get(BaseSourceConfig.FILE_FORMAT_TYPE).name());
readStrategy.setPluginConfig(readonlyConfig.toConfig());
readStrategy.init(hadoopConf);
return readStrategy;
}

public static ReadStrategy of(String fileType) {
try {
FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
private int[] indexes;

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Expand Down Expand Up @@ -118,6 +118,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testOrcRead() throws Exception {
orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
orcReadStrategy.read(orcFilePath, testCollector);
orcReadStrategy.read(orcFilePath, "", testCollector);
for (SeaTunnelRow row : testCollector.getRows()) {
Assertions.assertEquals(row.getField(0).getClass(), Boolean.class);
Assertions.assertEquals(row.getField(1).getClass(), Byte.class);
Expand All @@ -78,7 +78,7 @@ public void testOrcReadProjection() throws Exception {
orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
orcReadStrategy.read(orcFilePath, testCollector);
orcReadStrategy.read(orcFilePath, "", testCollector);
for (SeaTunnelRow row : testCollector.getRows()) {
Assertions.assertEquals(row.getField(0).getClass(), Byte.class);
Assertions.assertEquals(row.getField(1).getClass(), Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testParquetRead1() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
}

@Test
Expand All @@ -69,7 +69,7 @@ public void testParquetRead2() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
}

@Test
Expand All @@ -88,13 +88,13 @@ public void testParquetReadUseSystemDefaultTimeZone() throws Exception {
TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai");
TimeZone.setDefault(tz1);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
LocalDateTime time1 = (LocalDateTime) testCollector.getRows().get(0).getField(index);

TimeZone tz2 = TimeZone.getTimeZone("UTC");
TimeZone.setDefault(tz2);
TestCollector testCollector2 = new TestCollector();
parquetReadStrategy.read(path, testCollector2);
parquetReadStrategy.read(path, "", testCollector2);
LocalDateTime time2 = (LocalDateTime) testCollector2.getRows().get(0).getField(index);

Assertions.assertTrue(time1.isAfter(time2));
Expand All @@ -121,7 +121,7 @@ public void testParquetReadProjection1() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
List<SeaTunnelRow> rows = testCollector.getRows();
for (SeaTunnelRow row : rows) {
Assertions.assertEquals(row.getField(0).getClass(), Long.class);
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testParquetReadProjection2() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
}

public static class TestCollector implements Collector<SeaTunnelRow> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

public class LocalConf extends HadoopConf {
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;

public class LocalFileHadoopConf extends HadoopConf {
private static final String HDFS_IMPL = "org.apache.hadoop.fs.LocalFileSystem";
private static final String SCHEMA = "file";

public LocalConf(String hdfsNameKey) {
super(hdfsNameKey);
public LocalFileHadoopConf() {
super(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;

import org.apache.hadoop.fs.CommonConfigurationKeys;

import com.google.auto.service.AutoService;

@AutoService(SeaTunnelSink.class)
Expand All @@ -40,6 +38,6 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
super.prepare(pluginConfig);
hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
hadoopConf = new LocalFileHadoopConf();
}
}
Loading
Loading