Skip to content

Commit a09a5a2

Browse files
authored
[Fix] Fix doris stream load failed not reported error (apache#6315)
1 parent 3f931ee commit a09a5a2

File tree

7 files changed

+209
-5
lines changed

7 files changed

+209
-5
lines changed

docs/en/connector-v2/sink/Doris.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ The internal implementation of Doris sink connector is cached and imported by st
3636
| table | String | Yes | - | The table name of `Doris` table, use `${table_name}` to represent the upstream table name |
3737
| table.identifier | String | Yes | - | The name of `Doris` table, it will deprecate after version 2.3.5, please use `database` and `table` instead. |
3838
| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. |
39-
| sink.enable-2pc | bool | No | - | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD). |
39+
| sink.enable-2pc | bool | No | false | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD). |
4040
| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual) |
4141
| sink.check-interval | int | No | 10000 | check exception with the interval while loading |
4242
| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed |

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public interface DorisOptions {
152152
Option<Boolean> SINK_ENABLE_2PC =
153153
Options.key("sink.enable-2pc")
154154
.booleanType()
155-
.defaultValue(true)
155+
.defaultValue(false)
156156
.withDescription("enable 2PC while loading");
157157

158158
Option<Integer> SINK_CHECK_INTERVAL =

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private void initializeLoad() {
116116
}
117117
// get main work thread.
118118
executorThread = Thread.currentThread();
119-
dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
119+
startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
120120
// when uploading data in streaming mode, we need to regularly detect whether there are
121121
// exceptions.
122122
scheduledExecutorService.scheduleWithFixedDelay(

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void startUp() {
8585
.untilAsserted(this::initializeJdbcConnection);
8686
}
8787

88-
private void initializeJdbcConnection() throws SQLException {
88+
protected void initializeJdbcConnection() throws SQLException {
8989
Properties props = new Properties();
9090
props.put("user", USERNAME);
9191
props.put("password", PASSWORD);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.e2e.connector.doris;
19+
20+
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
21+
import org.apache.seatunnel.e2e.common.container.EngineType;
22+
import org.apache.seatunnel.e2e.common.container.TestContainer;
23+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
24+
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
25+
26+
import org.junit.jupiter.api.Assertions;
27+
import org.junit.jupiter.api.TestTemplate;
28+
import org.testcontainers.containers.Container;
29+
30+
import lombok.extern.slf4j.Slf4j;
31+
32+
import java.io.IOException;
33+
import java.sql.SQLException;
34+
import java.sql.Statement;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ExecutionException;
37+
import java.util.concurrent.TimeUnit;
38+
39+
import static org.awaitility.Awaitility.given;
40+
41+
@Slf4j
42+
public class DorisErrorIT extends AbstractDorisIT {
43+
private static final String TABLE = "doris_e2e_table";
44+
private static final String DRIVER_JAR =
45+
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
46+
47+
private static final String sinkDB = "e2e_sink";
48+
49+
@TestContainerExtension
50+
protected final ContainerExtendedFactory extendedFactory =
51+
container -> {
52+
Container.ExecResult extraCommands =
53+
container.execInContainer(
54+
"bash",
55+
"-c",
56+
"mkdir -p /tmp/seatunnel/plugins/jdbc/lib && cd /tmp/seatunnel/plugins/jdbc/lib && wget "
57+
+ DRIVER_JAR);
58+
Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
59+
};
60+
61+
@TestTemplate
62+
@DisabledOnContainer(
63+
value = {},
64+
type = {EngineType.SPARK, EngineType.FLINK},
65+
disabledReason = "flink/spark failed reason not same")
66+
public void testDoris(TestContainer container) throws InterruptedException, ExecutionException {
67+
initializeJdbcTable();
68+
CompletableFuture<Container.ExecResult> future =
69+
CompletableFuture.supplyAsync(
70+
() -> {
71+
try {
72+
return container.executeJob(
73+
"/fake_source_and_doris_sink_timeout_error.conf");
74+
} catch (IOException | InterruptedException e) {
75+
throw new RuntimeException(e);
76+
}
77+
});
78+
// wait for the job to start
79+
Thread.sleep(10 * 1000);
80+
super.container.stop();
81+
Assertions.assertNotEquals(0, future.get().getExitCode());
82+
super.container.start();
83+
// wait for the container to restart
84+
given().ignoreExceptions()
85+
.await()
86+
.atMost(10000, TimeUnit.SECONDS)
87+
.untilAsserted(this::initializeJdbcConnection);
88+
}
89+
90+
private void initializeJdbcTable() {
91+
try {
92+
try (Statement statement = jdbcConnection.createStatement()) {
93+
// create test databases
94+
statement.execute(createDatabase(sinkDB));
95+
log.info("create sink database succeed");
96+
// create sink table
97+
statement.execute(createTableForTest(sinkDB));
98+
} catch (SQLException e) {
99+
throw new RuntimeException("Initializing table failed!", e);
100+
}
101+
} catch (Exception e) {
102+
throw new RuntimeException("Initializing jdbc failed!", e);
103+
}
104+
}
105+
106+
private String createDatabase(String db) {
107+
return String.format("CREATE DATABASE IF NOT EXISTS %s ;", db);
108+
}
109+
110+
private String createTableForTest(String db) {
111+
String createTableSql =
112+
"create table if not exists `%s`.`%s`(\n"
113+
+ "F_ID bigint null,\n"
114+
+ "F_INT int null,\n"
115+
+ "F_BIGINT bigint null,\n"
116+
+ "F_TINYINT tinyint null,\n"
117+
+ "F_SMALLINT smallint null,\n"
118+
+ "F_DECIMAL decimal(18,6) null,\n"
119+
+ "F_LARGEINT largeint null,\n"
120+
+ "F_BOOLEAN boolean null,\n"
121+
+ "F_DOUBLE double null,\n"
122+
+ "F_FLOAT float null,\n"
123+
+ "F_CHAR char null,\n"
124+
+ "F_VARCHAR_11 varchar(11) null,\n"
125+
+ "F_STRING string null,\n"
126+
+ "F_DATETIME_P datetime(6),\n"
127+
+ "F_DATETIME datetime,\n"
128+
+ "F_DATE date\n"
129+
+ ")\n"
130+
+ "duplicate KEY(`F_ID`)\n"
131+
+ "DISTRIBUTED BY HASH(`F_ID`) BUCKETS 1\n"
132+
+ "properties(\n"
133+
+ "\"replication_allocation\" = \"tag.location.default: 1\""
134+
+ ");";
135+
return String.format(createTableSql, db, TABLE);
136+
}
137+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env{
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source{
24+
FakeSource {
25+
row.num = 100
26+
split.num = 10
27+
split.read-interval = 10000
28+
string.length = 1
29+
schema = {
30+
fields {
31+
F_ID = "string"
32+
F_INT = "int"
33+
F_BIGINT = "time"
34+
F_TINYINT = "tinyint"
35+
F_SMALLINT = "smallint"
36+
F_DECIMAL = "decimal(10,2)"
37+
F_LARGEINT = "bigint"
38+
F_BOOLEAN = "boolean"
39+
F_DOUBLE = "double"
40+
F_FLOAT = "float"
41+
F_CHAR = "string"
42+
F_VARCHAR_11 = "string"
43+
F_STRING = "string"
44+
F_DATETIME_P = "timestamp"
45+
F_DATETIME = "timestamp"
46+
F_DATE = "date"
47+
}
48+
}
49+
}
50+
}
51+
52+
transform {}
53+
54+
sink{
55+
Doris {
56+
fenodes = "doris_e2e:8030"
57+
username = root
58+
password = ""
59+
table.identifier = "e2e_sink.doris_e2e_table"
60+
sink.enable-2pc = "true"
61+
sink.label-prefix = "test_json"
62+
doris.config = {
63+
format="json"
64+
read_json_by_line="true"
65+
}
66+
}
67+
}

seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public Container.ExecResult executeJob(String confFile)
155155

156156
@Override
157157
public String getServerLogs() {
158-
return jobManager.getLogs();
158+
return jobManager.getLogs() + "\n" + taskManager.getLogs();
159159
}
160160

161161
public String executeJobManagerInnerCommand(String command)

0 commit comments

Comments
 (0)