Skip to content

Commit

Permalink
Seatunnel LocalFileSource support multiple table
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Nov 5, 2023
1 parent 38132f5 commit cc52c1a
Show file tree
Hide file tree
Showing 23 changed files with 770 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
FILE_LIST_GET_FAILED("FILE-03", "Get file list failed"),
FILE_LIST_EMPTY("FILE-04", "File list is empty");
FILE_LIST_EMPTY("FILE-04", "File list is empty"),
AGGREGATE_COMMIT_ERROR("FILE-05", "Aggregate committer error"),
FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
;

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
FileSourceSplit split = sourceSplits.poll();
if (null != split) {
try {
readStrategy.read(split.splitId(), output);
// todo: If there is only one table , the tableId is not needed, but it's better
// to set this
readStrategy.read(split.splitId(), "", output);
} catch (Exception e) {
String errorMsg =
String.format("Read data from this file [%s] failed", split.splitId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy {

@SneakyThrows
@Override
public void read(String path, Collector<SeaTunnelRow> output) {
public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Map<String, String> partitionsMap = parsePartitionsByPath(path);
Expand Down Expand Up @@ -124,6 +124,7 @@ public void read(String path, Collector<SeaTunnelRow> output) {
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
}

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Expand Down Expand Up @@ -105,6 +105,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
private static final long MIN_SIZE = 16 * 1024;

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
Expand Down Expand Up @@ -120,6 +120,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
}
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
num++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
private int[] indexes;

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
Expand Down Expand Up @@ -119,6 +119,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,20 @@ public interface ReadStrategy extends Serializable {

Configuration getConfiguration(HadoopConf conf);

void read(String path, Collector<SeaTunnelRow> output)
void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException;

SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path)
throws FileConnectorException;

// todo: use CatalogTable
void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);

List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;

// todo: use ReadonlyConfig
void setPluginConfig(Config pluginConfig);

// todo: use CatalogTable
SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,6 +31,14 @@ public class ReadStrategyFactory {

private ReadStrategyFactory() {}

public static ReadStrategy of(ReadonlyConfig readonlyConfig, HadoopConf hadoopConf) {
ReadStrategy readStrategy =
of(readonlyConfig.get(BaseSourceConfig.FILE_FORMAT_TYPE).name());
readStrategy.setPluginConfig(readonlyConfig.toConfig());
readStrategy.init(hadoopConf);
return readStrategy;
}

public static ReadStrategy of(String fileType) {
try {
FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
private int[] indexes;

@Override
public void read(String path, Collector<SeaTunnelRow> output)
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Expand Down Expand Up @@ -118,6 +118,7 @@ public void read(String path, Collector<SeaTunnelRow> output)
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testOrcRead() throws Exception {
orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
orcReadStrategy.read(orcFilePath, testCollector);
orcReadStrategy.read(orcFilePath, "", testCollector);
for (SeaTunnelRow row : testCollector.getRows()) {
Assertions.assertEquals(row.getField(0).getClass(), Boolean.class);
Assertions.assertEquals(row.getField(1).getClass(), Byte.class);
Expand All @@ -78,7 +78,7 @@ public void testOrcReadProjection() throws Exception {
orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
orcReadStrategy.read(orcFilePath, testCollector);
orcReadStrategy.read(orcFilePath, "", testCollector);
for (SeaTunnelRow row : testCollector.getRows()) {
Assertions.assertEquals(row.getField(0).getClass(), Byte.class);
Assertions.assertEquals(row.getField(1).getClass(), Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testParquetRead1() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
}

@Test
Expand All @@ -69,7 +69,7 @@ public void testParquetRead2() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
}

@Test
Expand All @@ -88,13 +88,13 @@ public void testParquetReadUseSystemDefaultTimeZone() throws Exception {
TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai");
TimeZone.setDefault(tz1);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
LocalDateTime time1 = (LocalDateTime) testCollector.getRows().get(0).getField(index);

TimeZone tz2 = TimeZone.getTimeZone("UTC");
TimeZone.setDefault(tz2);
TestCollector testCollector2 = new TestCollector();
parquetReadStrategy.read(path, testCollector2);
parquetReadStrategy.read(path, "", testCollector2);
LocalDateTime time2 = (LocalDateTime) testCollector2.getRows().get(0).getField(index);

Assertions.assertTrue(time1.isAfter(time2));
Expand All @@ -121,7 +121,7 @@ public void testParquetReadProjection1() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
List<SeaTunnelRow> rows = testCollector.getRows();
for (SeaTunnelRow row : rows) {
Assertions.assertEquals(row.getField(0).getClass(), Long.class);
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testParquetReadProjection2() throws Exception {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
parquetReadStrategy.read(path, "", testCollector);
}

public static class TestCollector implements Collector<SeaTunnelRow> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

public class LocalConf extends HadoopConf {
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;

public class LocalFileHadoopConf extends HadoopConf {
private static final String HDFS_IMPL = "org.apache.hadoop.fs.LocalFileSystem";
private static final String SCHEMA = "file";

public LocalConf(String hdfsNameKey) {
super(hdfsNameKey);
public LocalFileHadoopConf() {
super(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;

import org.apache.hadoop.fs.CommonConfigurationKeys;

import com.google.auto.service.AutoService;

@AutoService(SeaTunnelSink.class)
Expand All @@ -40,6 +38,6 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
super.prepare(pluginConfig);
hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
hadoopConf = new LocalFileHadoopConf();
}
}
Loading

0 comments on commit cc52c1a

Please sign in to comment.