Skip to content

Commit 7b2f538

Browse files
authored
[feature][connector-file-local] add save mode function for localfile (apache#7080)
1 parent 194472b commit 7b2f538

File tree

11 files changed

+441
-14
lines changed

11 files changed

+441
-14
lines changed

docs/en/connector-v2/sink/LocalFile.md

+18
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ By default, we use 2PC commit to ensure `exactly-once`
6060
| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. |
6161
| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.<br/> false:don't write header,true:write header. |
6262
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |
63+
| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method |
64+
| data_save_mode | string | no | APPEND_DATA | Existing data processing method |
6365

6466
### path [string]
6567

@@ -205,6 +207,20 @@ Only used when file_format_type is text,csv.false:don't write header,true:write
205207
Only used when file_format_type is json,text,csv,xml.
206208
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.
207209

210+
### schema_save_mode [string]
211+
212+
Existing dir processing method.
213+
- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist
214+
- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist
215+
- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist
216+
217+
### data_save_mode [string]
218+
219+
Existing data processing method.
220+
- DROP_DATA: preserve dir and delete data files
221+
- APPEND_DATA: preserve dir, preserve data files
222+
- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported
223+
208224
## Example
209225

210226
For orc file format simple config
@@ -278,6 +294,8 @@ LocalFile {
278294
file_format_type="excel"
279295
filename_time_format="yyyy.MM.dd"
280296
is_enable_transaction=true
297+
schema_save_mode=RECREATE_SCHEMA
298+
data_save_mode=DROP_DATA
281299
}
282300

283301
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.catalog;
19+
20+
import org.apache.seatunnel.api.table.catalog.Catalog;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22+
import org.apache.seatunnel.api.table.catalog.TablePath;
23+
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
24+
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
25+
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
26+
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
27+
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
28+
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
29+
30+
import org.apache.commons.collections4.CollectionUtils;
31+
import org.apache.hadoop.fs.LocatedFileStatus;
32+
33+
import lombok.SneakyThrows;
34+
35+
import java.util.List;
36+
37+
public abstract class AbstractFileCatalog implements Catalog {
38+
39+
protected final String catalogName;
40+
private final HadoopFileSystemProxy hadoopFileSystemProxy;
41+
private final String filePath;
42+
43+
protected AbstractFileCatalog(
44+
HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
45+
this.catalogName = catalogName;
46+
this.filePath = filePath;
47+
this.hadoopFileSystemProxy = hadoopFileSystemProxy;
48+
}
49+
50+
@Override
51+
public void open() throws CatalogException {}
52+
53+
@Override
54+
public void close() throws CatalogException {}
55+
56+
@Override
57+
public String name() {
58+
return catalogName;
59+
}
60+
61+
@Override
62+
public String getDefaultDatabase() throws CatalogException {
63+
return null;
64+
}
65+
66+
@Override
67+
public boolean databaseExists(String databaseName) throws CatalogException {
68+
return false;
69+
}
70+
71+
@Override
72+
public List<String> listDatabases() throws CatalogException {
73+
return null;
74+
}
75+
76+
@Override
77+
public List<String> listTables(String databaseName)
78+
throws CatalogException, DatabaseNotExistException {
79+
return null;
80+
}
81+
82+
@SneakyThrows
83+
@Override
84+
public boolean tableExists(TablePath tablePath) throws CatalogException {
85+
return hadoopFileSystemProxy.fileExist(filePath);
86+
}
87+
88+
@Override
89+
public CatalogTable getTable(TablePath tablePath)
90+
throws CatalogException, TableNotExistException {
91+
return null;
92+
}
93+
94+
@SneakyThrows
95+
@Override
96+
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
97+
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
98+
hadoopFileSystemProxy.createDir(filePath);
99+
}
100+
101+
@SneakyThrows
102+
@Override
103+
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
104+
throws TableNotExistException, CatalogException {
105+
hadoopFileSystemProxy.deleteFile(filePath);
106+
}
107+
108+
@Override
109+
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
110+
throws DatabaseAlreadyExistException, CatalogException {}
111+
112+
@Override
113+
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
114+
throws DatabaseNotExistException, CatalogException {}
115+
116+
@SneakyThrows
117+
@Override
118+
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
119+
throws TableNotExistException, CatalogException {
120+
hadoopFileSystemProxy.deleteFile(filePath);
121+
hadoopFileSystemProxy.createDir(filePath);
122+
}
123+
124+
@SneakyThrows
125+
@Override
126+
public boolean isExistsData(TablePath tablePath) {
127+
final List<LocatedFileStatus> locatedFileStatuses =
128+
hadoopFileSystemProxy.listFile(filePath);
129+
return CollectionUtils.isNotEmpty(locatedFileStatuses);
130+
}
131+
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java

+22
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.api.sink.DataSaveMode;
23+
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2224
import org.apache.seatunnel.common.utils.DateTimeUtils;
2325
import org.apache.seatunnel.common.utils.DateUtils;
2426
import org.apache.seatunnel.common.utils.TimeUtils;
@@ -28,6 +30,10 @@
2830
import java.util.Collections;
2931
import java.util.List;
3032

33+
import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
34+
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
35+
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
36+
3137
public class BaseSinkConfig {
3238
public static final String SEATUNNEL = "seatunnel";
3339
public static final String NON_PARTITION = "NON_PARTITION";
@@ -293,4 +299,20 @@ public class BaseSinkConfig {
293299
.defaultValue(Collections.emptyList())
294300
.withDescription(
295301
"Support writing Parquet INT96 from a 12-byte field, only valid for parquet files.");
302+
303+
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
304+
Options.key("schema_save_mode")
305+
.enumType(SchemaSaveMode.class)
306+
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
307+
.withDescription(
308+
"Before the synchronization task begins, process the existing path");
309+
310+
public static final Option<DataSaveMode> DATA_SAVE_MODE =
311+
Options.key("data_save_mode")
312+
.singleChoice(
313+
DataSaveMode.class,
314+
Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
315+
.defaultValue(APPEND_DATA)
316+
.withDescription(
317+
"Before the synchronization task begins, different processing of data files that already exist in the directory");
296318
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,20 @@
2121
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2222
import org.apache.seatunnel.api.serialization.DefaultSerializer;
2323
import org.apache.seatunnel.api.serialization.Serializer;
24+
import org.apache.seatunnel.api.sink.DataSaveMode;
25+
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
26+
import org.apache.seatunnel.api.sink.SaveModeHandler;
27+
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2428
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2529
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
2630
import org.apache.seatunnel.api.sink.SinkWriter;
2731
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
32+
import org.apache.seatunnel.api.sink.SupportSaveMode;
33+
import org.apache.seatunnel.api.table.catalog.Catalog;
2834
import org.apache.seatunnel.api.table.catalog.CatalogTable;
35+
import org.apache.seatunnel.api.table.factory.CatalogFactory;
2936
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
37+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
3038
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
3139
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
3240
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
@@ -39,20 +47,25 @@
3947
import java.util.List;
4048
import java.util.Optional;
4149

50+
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
51+
4252
public abstract class BaseMultipleTableFileSink
4353
implements SeaTunnelSink<
4454
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
45-
SupportMultiTableSink {
55+
SupportMultiTableSink,
56+
SupportSaveMode {
4657

4758
private final HadoopConf hadoopConf;
4859
private final CatalogTable catalogTable;
4960
private final FileSinkConfig fileSinkConfig;
5061
private String jobId;
62+
private final ReadonlyConfig readonlyConfig;
5163

5264
public abstract String getPluginName();
5365

5466
public BaseMultipleTableFileSink(
5567
HadoopConf hadoopConf, ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
68+
this.readonlyConfig = readonlyConfig;
5669
this.hadoopConf = hadoopConf;
5770
this.fileSinkConfig =
5871
new FileSinkConfig(readonlyConfig.toConfig(), catalogTable.getSeaTunnelRowType());
@@ -103,4 +116,23 @@ protected WriteStrategy createWriteStrategy() {
103116
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
104117
return writeStrategy;
105118
}
119+
120+
@Override
121+
public Optional<SaveModeHandler> getSaveModeHandler() {
122+
123+
CatalogFactory catalogFactory =
124+
discoverFactory(
125+
Thread.currentThread().getContextClassLoader(),
126+
CatalogFactory.class,
127+
getPluginName());
128+
if (catalogFactory == null) {
129+
return Optional.empty();
130+
}
131+
final Catalog catalog = catalogFactory.createCatalog(getPluginName(), readonlyConfig);
132+
SchemaSaveMode schemaSaveMode = readonlyConfig.get(BaseSinkConfig.SCHEMA_SAVE_MODE);
133+
DataSaveMode dataSaveMode = readonlyConfig.get(BaseSinkConfig.DATA_SAVE_MODE);
134+
return Optional.of(
135+
new DefaultSaveModeHandler(
136+
schemaSaveMode, dataSaveMode, catalog, catalogTable, null));
137+
}
106138
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.local.catalog;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
21+
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
22+
23+
public class LocalFileCatalog extends AbstractFileCatalog {
24+
25+
public LocalFileCatalog(
26+
HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
27+
super(hadoopFileSystemProxy, filePath, catalogName);
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.local.catalog;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.table.catalog.Catalog;
23+
import org.apache.seatunnel.api.table.factory.CatalogFactory;
24+
import org.apache.seatunnel.api.table.factory.Factory;
25+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
26+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
27+
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
28+
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
29+
30+
import com.google.auto.service.AutoService;
31+
32+
@AutoService(Factory.class)
33+
public class LocalFileCatalogFactory implements CatalogFactory {
34+
@Override
35+
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
36+
HadoopFileSystemProxy fileSystemUtils =
37+
new HadoopFileSystemProxy(new LocalFileHadoopConf());
38+
return new LocalFileCatalog(
39+
fileSystemUtils,
40+
options.get(BaseSourceConfigOptions.FILE_PATH),
41+
factoryIdentifier());
42+
}
43+
44+
@Override
45+
public String factoryIdentifier() {
46+
return FileSystemType.LOCAL.getFileSystemPluginName();
47+
}
48+
49+
@Override
50+
public OptionRule optionRule() {
51+
return OptionRule.builder().build();
52+
}
53+
}

seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public OptionRule optionRule() {
4646
return OptionRule.builder()
4747
.required(BaseSinkConfig.FILE_PATH)
4848
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
49+
.optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
50+
.optional(BaseSinkConfig.DATA_SAVE_MODE)
4951
.conditional(
5052
BaseSinkConfig.FILE_FORMAT_TYPE,
5153
FileFormat.TEXT,

0 commit comments

Comments
 (0)