Skip to content

Commit 8f2049b

Browse files
authored
[Feature][S3 File] Make S3 File Connector support multiple table write (apache#6698)
1 parent f108a5e commit 8f2049b

File tree

56 files changed

+3566
-137
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+3566
-137
lines changed

docs/en/connector-v2/sink/S3File.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ transform {
474474
sink {
475475
S3File {
476476
bucket = "s3a://seatunnel-test"
477-
tmp_path = "/tmp/seatunnel"
477+
tmp_path = "/tmp/seatunnel/${table_name}"
478478
path="/test/${table_name}"
479479
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
480480
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"

seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<name>SeaTunnel : Connectors V2 : File : S3</name>
3131

3232
<properties>
33-
<hadoop-aws.version>2.6.5</hadoop-aws.version>
33+
<hadoop-aws.version>3.1.4</hadoop-aws.version>
3434
<guava.version>27.0-jre</guava.version>
3535
</properties>
3636

seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalogFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@
2424
import org.apache.seatunnel.api.table.factory.Factory;
2525
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2626
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
27-
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
27+
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
2828

2929
import com.google.auto.service.AutoService;
3030

3131
@AutoService(Factory.class)
3232
public class S3FileCatalogFactory implements CatalogFactory {
3333
@Override
3434
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
35-
HadoopConf hadoopConf = S3Conf.buildWithReadOnlyConfig(options);
35+
HadoopConf hadoopConf = S3HadoopConf.buildWithReadOnlyConfig(options);
3636
HadoopFileSystemProxy fileSystemUtils = new HadoopFileSystemProxy(hadoopConf);
3737
return new S3FileCatalog(fileSystemUtils, options);
3838
}
+15-24
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.s3.config;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23-
import org.apache.seatunnel.common.config.CheckConfigUtil;
2421
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2522

2623
import java.util.HashMap;
2724
import java.util.Map;
2825

29-
public class S3Conf extends HadoopConf {
26+
public class S3HadoopConf extends HadoopConf {
3027
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
3128
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
3229
protected static final String S3A_SCHEMA = "s3a";
@@ -47,39 +44,33 @@ public void setSchema(String schema) {
4744
this.schema = schema;
4845
}
4946

50-
protected S3Conf(String hdfsNameKey) {
47+
public S3HadoopConf(String hdfsNameKey) {
5148
super(hdfsNameKey);
5249
}
5350

54-
public static HadoopConf buildWithConfig(Config config) {
51+
public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig config) {
5552

56-
String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());
57-
S3Conf hadoopConf = new S3Conf(bucketName);
53+
String bucketName = config.get(S3ConfigOptions.S3_BUCKET);
54+
S3HadoopConf hadoopConf = new S3HadoopConf(bucketName);
5855
if (bucketName.startsWith(S3A_SCHEMA)) {
5956
hadoopConf.setSchema(S3A_SCHEMA);
6057
}
6158
HashMap<String, String> s3Options = new HashMap<>();
6259
hadoopConf.putS3SK(s3Options, config);
63-
if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
64-
config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
65-
.forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
60+
if (config.getOptional(S3ConfigOptions.S3_PROPERTIES).isPresent()) {
61+
config.get(S3ConfigOptions.S3_PROPERTIES)
62+
.forEach((key, value) -> s3Options.put(key, String.valueOf(value)));
6663
}
6764

6865
s3Options.put(
6966
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
70-
config.getString(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key()));
67+
config.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
7168
s3Options.put(
72-
S3ConfigOptions.FS_S3A_ENDPOINT.key(),
73-
config.getString(S3ConfigOptions.FS_S3A_ENDPOINT.key()));
69+
S3ConfigOptions.FS_S3A_ENDPOINT.key(), config.get(S3ConfigOptions.FS_S3A_ENDPOINT));
7470
hadoopConf.setExtraOptions(s3Options);
7571
return hadoopConf;
7672
}
7773

78-
public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
79-
Config config = readonlyConfig.toConfig();
80-
return buildWithConfig(config);
81-
}
82-
8374
protected String switchHdfsImpl() {
8475
switch (this.schema) {
8576
case S3A_SCHEMA:
@@ -89,13 +80,13 @@ protected String switchHdfsImpl() {
8980
}
9081
}
9182

92-
private void putS3SK(Map<String, String> s3Options, Config config) {
93-
if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())
94-
&& !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {
83+
private void putS3SK(Map<String, String> s3Options, ReadonlyConfig config) {
84+
if (!config.getOptional(S3ConfigOptions.S3_ACCESS_KEY).isPresent()
85+
&& config.getOptional(S3ConfigOptions.S3_SECRET_KEY).isPresent()) {
9586
return;
9687
}
97-
String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());
98-
String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());
88+
String accessKey = config.get(S3ConfigOptions.S3_ACCESS_KEY);
89+
String secretKey = config.get(S3ConfigOptions.S3_SECRET_KEY);
9990
if (S3A_SCHEMA.equals(this.schema)) {
10091
s3Options.put("fs.s3a.access.key", accessKey);
10192
s3Options.put("fs.s3a.secret.key", secretKey);

seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
import org.apache.seatunnel.common.constants.PluginType;
3535
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
3636
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
37-
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
3837
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
38+
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
3939
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
4040

4141
import java.util.Optional;
@@ -55,7 +55,7 @@ public String getPluginName() {
5555
}
5656

5757
public S3FileSink(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
58-
super(S3Conf.buildWithConfig(readonlyConfig.toConfig()), readonlyConfig, catalogTable);
58+
super(S3HadoopConf.buildWithReadOnlyConfig(readonlyConfig), readonlyConfig, catalogTable);
5959
this.catalogTable = catalogTable;
6060
this.readonlyConfig = readonlyConfig;
6161
Config pluginConfig = readonlyConfig.toConfig();

seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java

+7-99
Original file line numberDiff line numberDiff line change
@@ -17,111 +17,19 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.s3.source;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
23-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
24-
import org.apache.seatunnel.api.source.SeaTunnelSource;
25-
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
26-
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
27-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
28-
import org.apache.seatunnel.common.config.CheckConfigUtil;
29-
import org.apache.seatunnel.common.config.CheckResult;
30-
import org.apache.seatunnel.common.constants.PluginType;
31-
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
32-
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
3321
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
34-
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
35-
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
36-
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
37-
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
38-
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
39-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
22+
import org.apache.seatunnel.connectors.seatunnel.file.s3.source.config.MultipleTableS3FileSourceConfig;
23+
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
4024

41-
import com.google.auto.service.AutoService;
25+
public class S3FileSource extends BaseMultipleTableFileSource {
4226

43-
import java.io.IOException;
27+
public S3FileSource(ReadonlyConfig readonlyConfig) {
28+
super(new MultipleTableS3FileSourceConfig(readonlyConfig));
29+
}
4430

45-
@AutoService(SeaTunnelSource.class)
46-
public class S3FileSource extends BaseFileSource {
4731
@Override
4832
public String getPluginName() {
4933
return FileSystemType.S3.getFileSystemPluginName();
5034
}
51-
52-
@Override
53-
public void prepare(Config pluginConfig) throws PrepareFailException {
54-
CheckResult result =
55-
CheckConfigUtil.checkAllExists(
56-
pluginConfig,
57-
S3ConfigOptions.FILE_PATH.key(),
58-
S3ConfigOptions.FILE_FORMAT_TYPE.key(),
59-
S3ConfigOptions.S3_BUCKET.key());
60-
if (!result.isSuccess()) {
61-
throw new FileConnectorException(
62-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
63-
String.format(
64-
"PluginName: %s, PluginType: %s, Message: %s",
65-
getPluginName(), PluginType.SOURCE, result.getMsg()));
66-
}
67-
String path = pluginConfig.getString(S3ConfigOptions.FILE_PATH.key());
68-
hadoopConf = S3Conf.buildWithConfig(pluginConfig);
69-
readStrategy =
70-
ReadStrategyFactory.of(
71-
pluginConfig.getString(S3ConfigOptions.FILE_FORMAT_TYPE.key()));
72-
readStrategy.setPluginConfig(pluginConfig);
73-
readStrategy.init(hadoopConf);
74-
try {
75-
filePaths = readStrategy.getFileNamesByPath(path);
76-
} catch (IOException e) {
77-
String errorMsg = String.format("Get file list from this path [%s] failed", path);
78-
throw new FileConnectorException(
79-
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
80-
}
81-
// support user-defined schema
82-
FileFormat fileFormat =
83-
FileFormat.valueOf(
84-
pluginConfig
85-
.getString(S3ConfigOptions.FILE_FORMAT_TYPE.key())
86-
.toUpperCase());
87-
// only json text csv type support user-defined schema now
88-
if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
89-
switch (fileFormat) {
90-
case CSV:
91-
case TEXT:
92-
case JSON:
93-
case EXCEL:
94-
case XML:
95-
SeaTunnelRowType userDefinedSchema =
96-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
97-
readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
98-
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
99-
break;
100-
case ORC:
101-
case PARQUET:
102-
throw new FileConnectorException(
103-
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
104-
"SeaTunnel does not support user-defined schema for [parquet, orc] files");
105-
default:
106-
// never got in there
107-
throw new FileConnectorException(
108-
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
109-
"SeaTunnel does not supported this file format");
110-
}
111-
} else {
112-
if (filePaths.isEmpty()) {
113-
// When the directory is empty, distribute default behavior schema
114-
rowType = CatalogTableUtil.buildSimpleTextSchema();
115-
return;
116-
}
117-
try {
118-
rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
119-
} catch (FileConnectorException e) {
120-
String errorMsg =
121-
String.format("Get table schema from file [%s] failed", filePaths.get(0));
122-
throw new FileConnectorException(
123-
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
124-
}
125-
}
126-
}
12735
}

seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java

+10
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
2121
import org.apache.seatunnel.api.source.SeaTunnelSource;
22+
import org.apache.seatunnel.api.source.SourceSplit;
2223
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
24+
import org.apache.seatunnel.api.table.connector.TableSource;
2325
import org.apache.seatunnel.api.table.factory.Factory;
2426
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
27+
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
2528
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
2629
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
2730
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
2831
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
2932

3033
import com.google.auto.service.AutoService;
3134

35+
import java.io.Serializable;
3236
import java.util.Arrays;
3337

3438
@AutoService(Factory.class)
@@ -38,6 +42,12 @@ public String factoryIdentifier() {
3842
return FileSystemType.S3.getFileSystemPluginName();
3943
}
4044

45+
@Override
46+
public <T, SplitT extends SourceSplit, StateT extends Serializable>
47+
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
48+
return () -> (SeaTunnelSource<T, SplitT, StateT>) new S3FileSource(context.getOptions());
49+
}
50+
4151
@Override
4252
public OptionRule optionRule() {
4353
return OptionRule.builder()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.s3.source.config;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
23+
24+
public class MultipleTableS3FileSourceConfig extends BaseMultipleTableFileSourceConfig {
25+
26+
public MultipleTableS3FileSourceConfig(ReadonlyConfig s3FileSourceRootConfig) {
27+
super(s3FileSourceRootConfig);
28+
}
29+
30+
@Override
31+
public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig readonlyConfig) {
32+
return new S3FileSourceConfig(readonlyConfig);
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.s3.source.config;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
23+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
24+
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
25+
26+
import lombok.Getter;
27+
28+
@Getter
29+
public class S3FileSourceConfig extends BaseFileSourceConfig {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
@Override
34+
public HadoopConf getHadoopConfig() {
35+
return S3HadoopConf.buildWithReadOnlyConfig(getBaseFileSourceConfig());
36+
}
37+
38+
@Override
39+
public String getPluginName() {
40+
return FileSystemType.S3.getFileSystemPluginName();
41+
}
42+
43+
public S3FileSourceConfig(ReadonlyConfig readonlyConfig) {
44+
super(readonlyConfig);
45+
}
46+
}

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOnS3Conf.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
22-
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
2322
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
23+
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
2424

25-
public class HiveOnS3Conf extends S3Conf {
25+
public class HiveOnS3Conf extends S3HadoopConf {
2626
protected static final String S3_SCHEMA = "s3";
2727
// The emr of amazon on s3 use this EmrFileSystem as the file system
2828
protected static final String HDFS_S3_IMPL = "com.amazon.ws.emr.hadoop.fs.EmrFileSystem";
@@ -43,7 +43,7 @@ protected String switchHdfsImpl() {
4343
}
4444

4545
public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
46-
S3Conf s3Conf = (S3Conf) S3Conf.buildWithReadOnlyConfig(readonlyConfig);
46+
S3HadoopConf s3Conf = (S3HadoopConf) S3HadoopConf.buildWithReadOnlyConfig(readonlyConfig);
4747
String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
4848
if (bucketName.startsWith(DEFAULT_SCHEMA)) {
4949
s3Conf.setSchema(DEFAULT_SCHEMA);

0 commit comments

Comments
 (0)