Skip to content

Commit 77f6140

Browse files
authored
[Improve][Engine] Support custom job id for rest-api named submit-job (apache#7053)
1 parent 165bfae commit 77f6140

File tree

3 files changed

+100
-104
lines changed

3 files changed

+100
-104
lines changed

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java

+93-96
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer {
6666
private static final Path hadoopJar =
6767
Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar");
6868

69+
private static final long CUSTOM_JOB_ID = 123456789;
70+
6971
@Override
7072
@BeforeEach
7173
public void startUp() throws Exception {
@@ -101,106 +103,24 @@ public void tearDown() throws Exception {
101103
}
102104

103105
@Test
104-
public void testSubmitJob() {
106+
public void testSubmitJobWithCustomJobId() {
105107
AtomicInteger i = new AtomicInteger();
106-
107108
Arrays.asList(server, secondServer)
108109
.forEach(
109-
container -> {
110-
Response response =
111-
i.get() == 0
112-
? submitJob(container, "BATCH", jobName, paramJobName)
113-
: submitJob(container, "BATCH", jobName, null);
114-
if (i.get() == 0) {
115-
response.then()
116-
.statusCode(200)
117-
.body("jobName", equalTo(paramJobName));
118-
} else {
119-
response.then().statusCode(200).body("jobName", equalTo(jobName));
120-
}
121-
String jobId = response.getBody().jsonPath().getString("jobId");
110+
container ->
111+
submitJobAndAssertResponse(
112+
container,
113+
i,
114+
paramJobName + "&jobId=" + CUSTOM_JOB_ID,
115+
true));
116+
}
122117

123-
Awaitility.await()
124-
.atMost(2, TimeUnit.MINUTES)
125-
.untilAsserted(
126-
() -> {
127-
given().get(
128-
http
129-
+ container.getHost()
130-
+ colon
131-
+ container
132-
.getFirstMappedPort()
133-
+ RestConstant
134-
.FINISHED_JOBS_INFO
135-
+ "/FINISHED")
136-
.then()
137-
.statusCode(200)
138-
.body(
139-
"[" + i.get() + "].jobName",
140-
equalTo(
141-
i.get() == 0
142-
? paramJobName
143-
: jobName))
144-
.body(
145-
"[" + i.get() + "].errorMsg",
146-
equalTo(null))
147-
.body(
148-
"[" + i.get() + "].jobDag.jobId",
149-
equalTo(Long.parseLong(jobId)))
150-
.body(
151-
"["
152-
+ i.get()
153-
+ "].metrics.SourceReceivedCount",
154-
equalTo("100"))
155-
.body(
156-
"["
157-
+ i.get()
158-
+ "].metrics.SinkWriteCount",
159-
equalTo("100"))
160-
.body(
161-
"[" + i.get() + "].jobStatus",
162-
equalTo("FINISHED"));
163-
164-
// test for without status parameter.
165-
given().get(
166-
http
167-
+ container.getHost()
168-
+ colon
169-
+ container
170-
.getFirstMappedPort()
171-
+ RestConstant
172-
.FINISHED_JOBS_INFO)
173-
.then()
174-
.statusCode(200)
175-
.body(
176-
"[" + i.get() + "].jobName",
177-
equalTo(
178-
i.get() == 0
179-
? paramJobName
180-
: jobName))
181-
.body(
182-
"[" + i.get() + "].errorMsg",
183-
equalTo(null))
184-
.body(
185-
"[" + i.get() + "].jobDag.jobId",
186-
equalTo(Long.parseLong(jobId)))
187-
.body(
188-
"["
189-
+ i.get()
190-
+ "].metrics.SourceReceivedCount",
191-
equalTo("100"))
192-
.body(
193-
"["
194-
+ i.get()
195-
+ "].metrics.SinkWriteCount",
196-
equalTo("100"))
197-
.body(
198-
"[" + i.get() + "].jobStatus",
199-
equalTo("FINISHED"));
200-
});
201-
202-
i.getAndIncrement();
203-
});
118+
@Test
119+
public void testSubmitJobWithoutCustomJobId() {
120+
AtomicInteger i = new AtomicInteger();
121+
Arrays.asList(server, secondServer)
122+
.forEach(
123+
container -> submitJobAndAssertResponse(container, i, paramJobName, false));
204124
}
205125

206126
@Test
@@ -459,4 +379,81 @@ private GenericContainer<?> createServer(String networkAlias)
459379

460380
return server;
461381
}
382+
383+
private void submitJobAndAssertResponse(
384+
GenericContainer<? extends GenericContainer<?>> container,
385+
AtomicInteger i,
386+
String customParam,
387+
boolean isCustomJobId) {
388+
Response response = submitJobAndResponse(container, i, customParam);
389+
String jobId = response.getBody().jsonPath().getString("jobId");
390+
assertResponse(container, i, jobId, isCustomJobId);
391+
i.getAndIncrement();
392+
}
393+
394+
private Response submitJobAndResponse(
395+
GenericContainer<? extends GenericContainer<?>> container,
396+
AtomicInteger i,
397+
String customParam) {
398+
Response response =
399+
i.get() == 0
400+
? submitJob(container, "BATCH", jobName, customParam)
401+
: submitJob(container, "BATCH", jobName, null);
402+
if (i.get() == 0) {
403+
response.then().statusCode(200).body("jobName", equalTo(paramJobName));
404+
} else {
405+
response.then().statusCode(200).body("jobName", equalTo(jobName));
406+
}
407+
return response;
408+
}
409+
410+
private void assertResponse(
411+
GenericContainer<? extends GenericContainer<?>> container,
412+
AtomicInteger i,
413+
String jobId,
414+
boolean isCustomJobId) {
415+
Awaitility.await()
416+
.atMost(2, TimeUnit.MINUTES)
417+
.untilAsserted(
418+
() -> {
419+
assertWithStatusParameterOrNot(
420+
container, i, jobId, isCustomJobId, true);
421+
422+
// test for without status parameter.
423+
assertWithStatusParameterOrNot(
424+
container, i, jobId, isCustomJobId, false);
425+
});
426+
}
427+
428+
private void assertWithStatusParameterOrNot(
429+
GenericContainer<? extends GenericContainer<?>> container,
430+
AtomicInteger i,
431+
String jobId,
432+
boolean isCustomJobId,
433+
boolean isStatusWithSubmitJob) {
434+
String baseRestUrl = getBaseRestUrl(container);
435+
String restUrl = isStatusWithSubmitJob ? baseRestUrl + "/FINISHED" : baseRestUrl;
436+
given().get(restUrl)
437+
.then()
438+
.statusCode(200)
439+
.body("[" + i.get() + "].jobName", equalTo(i.get() == 0 ? paramJobName : jobName))
440+
.body("[" + i.get() + "].errorMsg", equalTo(null))
441+
.body(
442+
"[" + i.get() + "].jobId",
443+
equalTo(
444+
i.get() == 0 && isCustomJobId
445+
? Long.toString(CUSTOM_JOB_ID)
446+
: jobId))
447+
.body("[" + i.get() + "].metrics.SourceReceivedCount", equalTo("100"))
448+
.body("[" + i.get() + "].metrics.SinkWriteCount", equalTo("100"))
449+
.body("[" + i.get() + "].jobStatus", equalTo("FINISHED"));
450+
}
451+
452+
private String getBaseRestUrl(GenericContainer<? extends GenericContainer<?>> container) {
453+
return http
454+
+ container.getHost()
455+
+ colon
456+
+ container.getFirstMappedPort()
457+
+ RestConstant.FINISHED_JOBS_INFO;
458+
}
462459
}

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
3939
import org.apache.seatunnel.engine.server.utils.RestUtil;
4040

41-
import org.apache.commons.lang.StringUtils;
41+
import org.apache.commons.lang3.StringUtils;
4242

4343
import com.hazelcast.internal.ascii.TextCommandService;
4444
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
@@ -119,6 +119,8 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
119119

120120
boolean startWithSavePoint =
121121
Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT));
122+
String jobIdStr = requestParams.get(RestConstant.JOB_ID);
123+
Long finalJobId = StringUtils.isNotBlank(jobIdStr) ? Long.parseLong(jobIdStr) : null;
122124
SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
123125
RestJobExecutionEnvironment restJobExecutionEnvironment =
124126
new RestJobExecutionEnvironment(
@@ -127,22 +129,18 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
127129
config,
128130
textCommandService.getNode(),
129131
startWithSavePoint,
130-
startWithSavePoint
131-
? Long.parseLong(requestParams.get(RestConstant.JOB_ID))
132-
: null);
132+
finalJobId);
133133
JobImmutableInformation jobImmutableInformation = restJobExecutionEnvironment.build();
134134
Long jobId = jobImmutableInformation.getJobId();
135135
if (!seaTunnelServer.isMasterNode()) {
136136

137137
NodeEngineUtil.sendOperationToMasterNode(
138138
getNode().nodeEngine,
139139
new SubmitJobOperation(
140-
jobImmutableInformation.getJobId(),
141-
getNode().nodeEngine.toData(jobImmutableInformation)))
140+
jobId, getNode().nodeEngine.toData(jobImmutableInformation)))
142141
.join();
143142

144143
} else {
145-
146144
submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);
147145
}
148146

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Collections;
4040
import java.util.HashSet;
4141
import java.util.List;
42+
import java.util.Objects;
4243
import java.util.Set;
4344

4445
public class RestJobExecutionEnvironment extends AbstractJobEnvironment {
@@ -63,7 +64,7 @@ public RestJobExecutionEnvironment(
6364
this.nodeEngine = node.getNodeEngine();
6465
this.jobConfig.setJobContext(
6566
new JobContext(
66-
isStartWithSavePoint
67+
Objects.nonNull(jobId)
6768
? jobId
6869
: nodeEngine
6970
.getHazelcastInstance()

0 commit comments

Comments
 (0)