Skip to content

Commit 3af6b2f

Browse files
authored
[Feature][Zeta] Add tag to node used to filter worker when submit job (apache#7045)
1 parent a93c9d6 commit 3af6b2f

File tree

49 files changed

+601
-81
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+601
-81
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ SeaTunnel addresses common data integration challenges:
6161

6262
## SeaTunnel Workflow
6363

64-
![SeaTunnel Workflow](docs/en/images/architecture_diagram.png)
64+
![SeaTunnel Workflow](docs/images/architecture_diagram.png)
6565

6666
Configure jobs, select execution engines, and parallelize data using Source Connectors. Easily develop and extend connectors to meet your needs.
6767

docs/en/about.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ SeaTunnel focuses on data integration and data synchronization, and is mainly de
3434

3535
## SeaTunnel work flowchart
3636

37-
![SeaTunnel work flowchart](images/architecture_diagram.png)
37+
![SeaTunnel work flowchart](../images/architecture_diagram.png)
3838

3939
The runtime process of SeaTunnel is shown in the figure above.
4040

docs/en/faq.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ Refer to: [lightbend/config#456](https://github.com/lightbend/config/issues/456)
6565

6666
Of course! See the screenshot below:
6767

68-
![workflow.png](images/workflow.png)
68+
![workflow.png](../images/workflow.png)
6969

70-
![azkaban.png](images/azkaban.png)
70+
![azkaban.png](../images/azkaban.png)
7171

7272
## Does SeaTunnel have a case for configuring multiple sources, such as configuring elasticsearch and hdfs in source at the same time?
7373

@@ -184,7 +184,7 @@ The following conclusions can be drawn:
184184

185185
3. In general, both M and N are determined, and the conclusion can be drawn from 2: The size of `spark.streaming.kafka.maxRatePerPartition` is positively correlated with the size of `spark.executor.cores` * `spark.executor.instances`, and it can be increased while increasing the resource `maxRatePerPartition` to speed up consumption.
186186

187-
![kafka](images/kafka.png)
187+
![kafka](../images/kafka.png)
188188

189189
## How can I solve the Error `Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE`?
190190

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
---
2+
3+
sidebar_position: 9
4+
-------------------
5+
6+
After version 2.3.6. SeaTunnel can add `tag` to each worker node, when you submit job you can use `tag_filter` to filter the node you want run this job.
7+
8+
# How to archive this:
9+
10+
1. update the config in `hazelcast.yaml`,
11+
12+
```yaml
13+
hazelcast:
14+
cluster-name: seatunnel
15+
network:
16+
rest-api:
17+
enabled: true
18+
endpoint-groups:
19+
CLUSTER_WRITE:
20+
enabled: true
21+
DATA:
22+
enabled: true
23+
join:
24+
tcp-ip:
25+
enabled: true
26+
member-list:
27+
- localhost
28+
port:
29+
auto-increment: false
30+
port: 5801
31+
properties:
32+
hazelcast.invocation.max.retry.count: 20
33+
hazelcast.tcp.join.port.try.count: 30
34+
hazelcast.logging.type: log4j2
35+
hazelcast.operation.generic.thread.count: 50
36+
member-attributes:
37+
group:
38+
type: string
39+
value: platform
40+
team:
41+
type: string
42+
value: team1
43+
```
44+
45+
In this config, we specify the tag by `member-attributes`, the node has `group=platform, team=team1` tags.
46+
47+
2. add `tag_filter` to your job config
48+
49+
```hacon
50+
env {
51+
parallelism = 1
52+
job.mode = "BATCH"
53+
tag_filter {
54+
group = "platform"
55+
team = "team1"
56+
}
57+
}
58+
source {
59+
FakeSource {
60+
result_table_name = "fake"
61+
parallelism = 1
62+
schema = {
63+
fields {
64+
name = "string"
65+
}
66+
}
67+
}
68+
}
69+
transform {
70+
}
71+
sink {
72+
console {
73+
source_table_name="fake"
74+
}
75+
}
76+
```
77+
78+
**Notice:**
79+
- If not set `tag_filter` in job config, it will random choose the node in all active nodes.
80+
- When you add multiple tag in `tag_filter`, it need all key exist and value match. if all node not match, you will get `NoEnoughResourceException` exception.
81+
82+
![img.png](../../images/resource-isolation.png)
83+
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

docs/images/resource-isolation.png

68.3 KB
Loading
File renamed without changes.
File renamed without changes.
File renamed without changes.

docs/sidebars.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ const sidebars = {
178178
"seatunnel-engine/checkpoint-storage",
179179
"seatunnel-engine/rest-api",
180180
"seatunnel-engine/tcp",
181-
"seatunnel-engine/engine-jar-storage-mode"
181+
"seatunnel-engine/engine-jar-storage-mode",
182+
"seatunnel-engine/resource-isolation",
182183
]
183184
},
184185
{

docs/zh/about.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ SeaTunnel专注于数据集成和数据同步,主要旨在解决数据集成
3232

3333
## SeaTunnel work flowchart
3434

35-
![SeaTunnel work flowchart](images/architecture_diagram.png)
35+
![SeaTunnel work flowchart](../images/architecture_diagram.png)
3636

3737
SeaTunnel的运行流程如上图所示。
3838

docs/zh/faq.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ your string 1
6565

6666
当然! 请参阅下面的屏幕截图:
6767

68-
![工作流程.png](images/workflow.png)
68+
![工作流程.png](../images/workflow.png)
6969

70-
![azkaban.png](images/azkaban.png)
70+
![azkaban.png](../images/azkaban.png)
7171

7272
## SeaTunnel是否有配置多个源的情况,例如同时在源中配置elasticsearch和hdfs?
7373

@@ -185,7 +185,7 @@ sink {
185185

186186
3、一般来说,M和N都确定了,从2可以得出结论:`spark.streaming.kafka.maxRatePerPartition`的大小与`spark.executor.cores` * `spark的大小正相关 .executor.instances`,可以在增加资源`maxRatePerPartition`的同时增加,以加快消耗。
187187

188-
![kafka](images/kafka.png)
188+
![kafka](../images/kafka.png)
189189

190190
## 如何解决错误 `Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE`
191191

-76.1 KB
Binary file not shown.

docs/zh/images/azkaban.png

-715 KB
Binary file not shown.

docs/zh/images/checkstyle.png

-468 KB
Binary file not shown.

docs/zh/images/kafka.png

-31.4 KB
Binary file not shown.

docs/zh/images/seatunnel-workflow.svg

-4
This file was deleted.
-760 KB
Binary file not shown.

docs/zh/images/seatunnel_starter.png

-414 KB
Binary file not shown.

docs/zh/images/workflow.png

-253 KB
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
---
2+
3+
sidebar_position: 9
4+
-------------------
5+
6+
在2.3.6版本之后, SeaTunnel支持对每个实例添加`tag`, 然后在提交任务时可以在配置文件中使用`tag_filter`来选择任务将要运行的节点.
7+
8+
# 如何实现改功能
9+
10+
1. 更新`hazelcast.yaml`文件
11+
12+
```yaml
13+
hazelcast:
14+
cluster-name: seatunnel
15+
network:
16+
rest-api:
17+
enabled: true
18+
endpoint-groups:
19+
CLUSTER_WRITE:
20+
enabled: true
21+
DATA:
22+
enabled: true
23+
join:
24+
tcp-ip:
25+
enabled: true
26+
member-list:
27+
- localhost
28+
port:
29+
auto-increment: false
30+
port: 5801
31+
properties:
32+
hazelcast.invocation.max.retry.count: 20
33+
hazelcast.tcp.join.port.try.count: 30
34+
hazelcast.logging.type: log4j2
35+
hazelcast.operation.generic.thread.count: 50
36+
member-attributes:
37+
group:
38+
type: string
39+
value: platform
40+
team:
41+
type: string
42+
value: team1
43+
```
44+
45+
在这个配置中, 我们通过`member-attributes`设置了`group=platform, team=team1`这样两个`tag`
46+
47+
2. 在任务的配置中添加`tag_filter`来选择你需要运行该任务的节点
48+
49+
```hacon
50+
env {
51+
parallelism = 1
52+
job.mode = "BATCH"
53+
tag_filter {
54+
group = "platform"
55+
team = "team1"
56+
}
57+
}
58+
source {
59+
FakeSource {
60+
result_table_name = "fake"
61+
parallelism = 1
62+
schema = {
63+
fields {
64+
name = "string"
65+
}
66+
}
67+
}
68+
}
69+
transform {
70+
}
71+
sink {
72+
console {
73+
source_table_name="fake"
74+
}
75+
}
76+
```
77+
78+
**注意:**
79+
- 当在任务的配置中, 没有添加`tag_filter`时, 会从所有节点中随机选择节点来运行任务.
80+
- 当`tag_filter`中存在多个过滤条件时, 会根据key存在以及value相等的全部匹配的节点, 当没有找到匹配的节点时, 会抛出 `NoEnoughResourceException`异常.
81+
82+
![img.png](../../images/resource-isolation.png)
83+

seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ public <T> T get(Option<T> option) {
6969
/**
7070
* Transform to Config todo: This method should be removed after we remove Config
7171
*
72-
* @deprecated Please use ReadonlyConfig directly
7372
* @return Config
73+
* @deprecated Please use ReadonlyConfig directly
7474
*/
7575
@Deprecated
7676
public Config toConfig() {
@@ -96,6 +96,10 @@ public void toMap(Map<String, String> result) {
9696
}
9797
}
9898

99+
public Map<String, Object> getSourceMap() {
100+
return confData;
101+
}
102+
99103
public <T> Optional<T> getOptional(Option<T> option) {
100104
if (option == null) {
101105
throw new NullPointerException("Option not be null.");

seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java

+6
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,10 @@ public interface EnvCommonOptions {
9494
.mapType()
9595
.noDefaultValue()
9696
.withDescription("custom parameters for run engine");
97+
98+
Option<Map<String, String>> NODE_TAG_FILTER =
99+
Options.key("tag_filter")
100+
.mapType()
101+
.noDefaultValue()
102+
.withDescription("Define the worker where the job runs by tag");
97103
}

seatunnel-core/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Introduction
22

33
This module is the seatunnel job entrypoint. SeaTunnel jobs are started by the below process.
4-
![seatunnel-workflow.svg](../docs/en/images/seatunnel_starter.png)
4+
![seatunnel-workflow.svg](../docs/images/seatunnel_starter.png)
55

66
- seatunnel-core-flink: The flink job starter.
77
- seatunnel-core-flink-sql: The flink sql job starter.

seatunnel-core/seatunnel-core-starter/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This module is the base start module for SeaTunnel new connector API.
44

5-
![seatunnel_architecture.png](../../docs/en/images/seatunnel_architecture.png)
5+
![seatunnel_architecture.png](../../docs/images/seatunnel_architecture.png)
66

77
# SeaTunnel Job Execute Process
88

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.seatunnel.engine.e2e.resourceIsolation;
20+
21+
import org.apache.seatunnel.e2e.common.TestSuiteBase;
22+
import org.apache.seatunnel.e2e.common.container.EngineType;
23+
import org.apache.seatunnel.e2e.common.container.TestContainer;
24+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
25+
26+
import org.apache.commons.lang3.StringUtils;
27+
28+
import org.junit.jupiter.api.Assertions;
29+
import org.junit.jupiter.api.TestTemplate;
30+
import org.testcontainers.containers.Container;
31+
32+
import java.io.IOException;
33+
34+
public class ResourceIsolationIT extends TestSuiteBase {
35+
36+
@TestTemplate
37+
@DisabledOnContainer(
38+
value = {},
39+
type = {EngineType.SPARK, EngineType.FLINK},
40+
disabledReason = "only work on Zeta")
41+
public void testTagMatch(TestContainer container) throws IOException, InterruptedException {
42+
Container.ExecResult execResult =
43+
container.executeJob("/resource-isolation/fakesource_to_console.conf");
44+
Assertions.assertEquals(0, execResult.getExitCode());
45+
}
46+
47+
@TestTemplate
48+
@DisabledOnContainer(
49+
value = {},
50+
type = {EngineType.SPARK, EngineType.FLINK},
51+
disabledReason = "only work on Zeta")
52+
public void testTagNotMatch(TestContainer container) throws IOException, InterruptedException {
53+
Container.ExecResult execResult =
54+
container.executeJob(
55+
"/resource-isolation/fakesource_to_console_tag_not_match.conf");
56+
Assertions.assertNotEquals(0, execResult.getExitCode());
57+
Assertions.assertTrue(
58+
StringUtils.isNotBlank(execResult.getStderr())
59+
&& execResult
60+
.getStderr()
61+
.contains(
62+
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException"));
63+
}
64+
}

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml

+7
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,10 @@ hazelcast:
4141
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true
4242
hazelcast.logging.type: log4j2
4343
hazelcast.operation.generic.thread.count: 200
44+
member-attributes:
45+
group:
46+
type: string
47+
value: platform
48+
team:
49+
type: string
50+
value: team1

0 commit comments

Comments
 (0)