Skip to content

Commit 50923e7

Browse files
authored
[Improvement-16887][Dependent Task] Dependent task improvement (#16910)
1 parent 9237aa8 commit 50923e7

File tree

35 files changed

+886
-126
lines changed

35 files changed

+886
-126
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java

+53-11
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.dolphinscheduler.api.utils.Result;
4646
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
4747
import org.apache.dolphinscheduler.common.constants.Constants;
48+
import org.apache.dolphinscheduler.common.enums.ContextType;
4849
import org.apache.dolphinscheduler.common.enums.Flag;
4950
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
5051
import org.apache.dolphinscheduler.common.graph.DAG;
@@ -53,11 +54,14 @@
5354
import org.apache.dolphinscheduler.common.utils.JSONUtils;
5455
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
5556
import org.apache.dolphinscheduler.dao.AlertDao;
57+
import org.apache.dolphinscheduler.dao.entity.AbstractTaskInstanceContext;
5658
import org.apache.dolphinscheduler.dao.entity.Project;
5759
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
5860
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
5961
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
6062
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
63+
import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext;
64+
import org.apache.dolphinscheduler.dao.entity.TaskInstanceDependentDetails;
6165
import org.apache.dolphinscheduler.dao.entity.User;
6266
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
6367
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
@@ -70,6 +74,8 @@
7074
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
7175
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
7276
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
77+
import org.apache.dolphinscheduler.dao.model.ITaskInstanceContext;
78+
import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao;
7379
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
7480
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
7581
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
@@ -96,6 +102,7 @@
96102

97103
import lombok.extern.slf4j.Slf4j;
98104

105+
import org.springframework.beans.BeanUtils;
99106
import org.springframework.beans.factory.annotation.Autowired;
100107
import org.springframework.context.annotation.Lazy;
101108
import org.springframework.stereotype.Service;
@@ -174,6 +181,9 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements Work
174181
@Autowired
175182
private CuringParamsService curingGlobalParamsService;
176183

184+
@Autowired
185+
private TaskInstanceContextDao taskInstanceContextDao;
186+
177187
/**
178188
* return top n SUCCESS workflow instance order by running time which started between startTime and endTime
179189
*/
@@ -184,7 +194,7 @@ public Map<String, Object> queryTopNLongestRunningWorkflowInstance(User loginUse
184194
// check user access for project
185195
Map<String, Object> result =
186196
projectService.checkProjectAndAuth(loginUser, project, projectCode,
187-
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
197+
WORKFLOW_INSTANCE);
188198
if (result.get(Constants.STATUS) != Status.SUCCESS) {
189199
return result;
190200
}
@@ -233,7 +243,7 @@ public Map<String, Object> queryWorkflowInstanceById(User loginUser, long projec
233243
// check user access for project
234244
Map<String, Object> result =
235245
projectService.checkProjectAndAuth(loginUser, project, projectCode,
236-
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
246+
WORKFLOW_INSTANCE);
237247
if (result.get(Constants.STATUS) != Status.SUCCESS) {
238248
return result;
239249
}
@@ -245,7 +255,7 @@ public Map<String, Object> queryWorkflowInstanceById(User loginUser, long projec
245255
workflowInstance.getWorkflowDefinitionVersion());
246256

247257
if (workflowDefinition == null || projectCode != workflowDefinition.getProjectCode()) {
248-
log.error("workflow definition does not exist, projectCode:{}.", projectCode);
258+
log.error("workflow definition does not exist, projectCode: {}.", projectCode);
249259
putMsg(result, Status.WORKFLOW_DEFINITION_NOT_EXIST, workflowInstanceId);
250260
} else {
251261
workflowInstance.setLocations(workflowDefinition.getLocations());
@@ -443,7 +453,7 @@ public Map<String, Object> queryTaskListByWorkflowInstanceId(User loginUser, lon
443453
// check user access for project
444454
Map<String, Object> result =
445455
projectService.checkProjectAndAuth(loginUser, project, projectCode,
446-
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
456+
WORKFLOW_INSTANCE);
447457
if (result.get(Constants.STATUS) != Status.SUCCESS) {
448458
return result;
449459
}
@@ -460,15 +470,47 @@ public Map<String, Object> queryTaskListByWorkflowInstanceId(User loginUser, lon
460470
List<TaskInstance> taskInstanceList =
461471
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstanceId,
462472
workflowInstance.getTestFlag());
473+
List<TaskInstanceDependentDetails<ITaskInstanceContext>> taskInstanceDependentDetailsList =
474+
setTaskInstanceDependentResult(taskInstanceList);
475+
463476
Map<String, Object> resultMap = new HashMap<>();
464477
resultMap.put(WORKFLOW_INSTANCE_STATE, workflowInstance.getState().toString());
465-
resultMap.put(TASK_LIST, taskInstanceList);
478+
resultMap.put(TASK_LIST, taskInstanceDependentDetailsList);
466479
result.put(DATA_LIST, resultMap);
467480

468481
putMsg(result, Status.SUCCESS);
469482
return result;
470483
}
471484

485+
private List<TaskInstanceDependentDetails<ITaskInstanceContext>> setTaskInstanceDependentResult(List<TaskInstance> taskInstanceList) {
486+
List<TaskInstanceDependentDetails<ITaskInstanceContext>> taskInstanceDependentDetailsList =
487+
taskInstanceList.stream()
488+
.map(taskInstance -> {
489+
TaskInstanceDependentDetails<ITaskInstanceContext> taskInstanceDependentDetails =
490+
new TaskInstanceDependentDetails<>();
491+
BeanUtils.copyProperties(taskInstance, taskInstanceDependentDetails);
492+
return taskInstanceDependentDetails;
493+
}).collect(Collectors.toList());
494+
List<Integer> taskInstanceIdList = taskInstanceList.stream()
495+
.map(TaskInstance::getId).collect(Collectors.toList());
496+
List<TaskInstanceContext> taskInstanceContextList =
497+
taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList,
498+
ContextType.DEPENDENT_RESULT_CONTEXT);
499+
for (TaskInstanceContext taskInstanceContext : taskInstanceContextList) {
500+
for (AbstractTaskInstanceContext dependentResultTaskInstanceContext : taskInstanceContext
501+
.getTaskInstanceContext()) {
502+
for (TaskInstanceDependentDetails<ITaskInstanceContext> taskInstanceDependentDetails : taskInstanceDependentDetailsList) {
503+
if (taskInstanceDependentDetails.getId().equals(taskInstanceContext.getTaskInstanceId())) {
504+
taskInstanceDependentDetails
505+
.setTaskInstanceDependentResult(
506+
dependentResultTaskInstanceContext);
507+
}
508+
}
509+
}
510+
}
511+
return taskInstanceDependentDetailsList;
512+
}
513+
472514
@Override
473515
public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) {
474516
TaskInstance taskInstance = taskInstanceDao.queryById(taskId);
@@ -488,7 +530,7 @@ public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUs
488530
.queryAllSubWorkflowInstance((long) taskInstance.getWorkflowInstanceId(),
489531
taskInstance.getTaskCode());
490532
List<Long> allSubWorkflowInstanceId = relationSubWorkflows.stream()
491-
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList());
533+
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList());
492534
List<WorkflowInstance> allSubWorkflows = workflowInstanceDao.queryByIds(allSubWorkflowInstanceId);
493535

494536
if (allSubWorkflows == null || allSubWorkflows.isEmpty()) {
@@ -539,7 +581,7 @@ public Map<String, Object> querySubWorkflowInstanceByTaskId(User loginUser, long
539581
// check user access for project
540582
Map<String, Object> result =
541583
projectService.checkProjectAndAuth(loginUser, project, projectCode,
542-
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
584+
WORKFLOW_INSTANCE);
543585
if (result.get(Constants.STATUS) != Status.SUCCESS) {
544586
return result;
545587
}
@@ -693,7 +735,7 @@ public Map<String, Object> updateWorkflowInstance(User loginUser, long projectCo
693735
"Update task relations complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
694736
projectCode, workflowDefinition.getCode(), insertVersion);
695737
putMsg(result, Status.SUCCESS);
696-
result.put(Constants.DATA_LIST, workflowDefinition);
738+
result.put(DATA_LIST, workflowDefinition);
697739
} else {
698740
log.info(
699741
"Update task relations error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
@@ -750,7 +792,7 @@ public Map<String, Object> queryParentInstanceBySubId(User loginUser, long proje
750792
// check user access for project
751793
Map<String, Object> result =
752794
projectService.checkProjectAndAuth(loginUser, project, projectCode,
753-
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
795+
WORKFLOW_INSTANCE);
754796
if (result.get(Constants.STATUS) != Status.SUCCESS) {
755797
return result;
756798
}
@@ -824,7 +866,7 @@ public Map<String, Object> viewVariables(long projectCode, Integer workflowInsta
824866
if (workflowInstance == null) {
825867
log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode,
826868
workflowInstanceId);
827-
putMsg(result, Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
869+
putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
828870
return result;
829871
}
830872

@@ -918,7 +960,7 @@ public Map<String, Object> viewGantt(long projectCode, Integer workflowInstanceI
918960
if (workflowInstance == null) {
919961
log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode,
920962
workflowInstanceId);
921-
putMsg(result, Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
963+
putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
922964
return result;
923965
}
924966

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.dolphinscheduler.api.utils.Result;
3535
import org.apache.dolphinscheduler.common.constants.Constants;
3636
import org.apache.dolphinscheduler.common.enums.CommandType;
37+
import org.apache.dolphinscheduler.common.enums.ContextType;
3738
import org.apache.dolphinscheduler.common.enums.Flag;
3839
import org.apache.dolphinscheduler.common.enums.UserType;
3940
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
@@ -42,10 +43,12 @@
4243
import org.apache.dolphinscheduler.common.utils.DateUtils;
4344
import org.apache.dolphinscheduler.common.utils.JSONUtils;
4445
import org.apache.dolphinscheduler.dao.AlertDao;
46+
import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext;
4547
import org.apache.dolphinscheduler.dao.entity.Project;
4648
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
4749
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
4850
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
51+
import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext;
4952
import org.apache.dolphinscheduler.dao.entity.Tenant;
5053
import org.apache.dolphinscheduler.dao.entity.User;
5154
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
@@ -60,10 +63,12 @@
6063
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
6164
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
6265
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
66+
import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao;
6367
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
6468
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
6569
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
6670
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
71+
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
6772
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
6873
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
6974
import org.apache.dolphinscheduler.service.model.TaskNode;
@@ -90,6 +95,7 @@
9095
import org.mockito.quality.Strictness;
9196

9297
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
98+
import com.google.common.collect.Lists;
9399

94100
@ExtendWith(MockitoExtension.class)
95101
@MockitoSettings(strictness = Strictness.LENIENT)
@@ -154,6 +160,9 @@ public class WorkflowInstanceServiceTest {
154160
@Mock
155161
private WorkflowInstanceMapDao workflowInstanceMapDao;
156162

163+
@Mock
164+
private TaskInstanceContextDao taskInstanceContextDao;
165+
157166
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
158167
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
159168
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
@@ -465,6 +474,18 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException {
465474
taskInstance.setTaskType("SHELL");
466475
List<TaskInstance> taskInstanceList = new ArrayList<>();
467476
taskInstanceList.add(taskInstance);
477+
List<DependentResultTaskInstanceContext> dependentResultTaskInstanceContextList = new ArrayList<>();
478+
TaskInstanceContext taskInstanceContext = new TaskInstanceContext();
479+
taskInstanceContext.setTaskInstanceId(0);
480+
taskInstanceContext.setContextType(ContextType.DEPENDENT_RESULT_CONTEXT);
481+
DependentResultTaskInstanceContext dependentResultTaskInstanceContext =
482+
new DependentResultTaskInstanceContext();
483+
dependentResultTaskInstanceContext.setProjectCode(projectCode);
484+
dependentResultTaskInstanceContext.setDependentResult(DependResult.SUCCESS);
485+
taskInstanceContext.setTaskInstanceContext(
486+
Lists.asList(dependentResultTaskInstanceContext, new DependentResultTaskInstanceContext[0]));
487+
List<Integer> taskInstanceIdList = new ArrayList<>();
488+
taskInstanceIdList.add(0);
468489
Result res = new Result();
469490
res.setCode(Status.SUCCESS.ordinal());
470491
res.setData("xxx");
@@ -476,6 +497,9 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException {
476497
workflowInstance.getTestFlag()))
477498
.thenReturn(taskInstanceList);
478499
when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res);
500+
when(taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList,
501+
ContextType.DEPENDENT_RESULT_CONTEXT))
502+
.thenReturn(Lists.asList(taskInstanceContext, new TaskInstanceContext[0]));
479503
Map<String, Object> successRes =
480504
workflowInstanceService.queryTaskListByWorkflowInstanceId(loginUser, projectCode, 1);
481505
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,13 @@ public final class Constants {
284284
public static final String SUBWORKFLOW_INSTANCE_ID = "subWorkflowInstanceId";
285285
public static final String WORKFLOW_INSTANCE_STATE = "workflowInstanceState";
286286
public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
287-
public static final String DEPENDENCE = "dependence";
288287
public static final String TASK_LIST = "taskList";
289288
public static final String QUEUE = "queue";
290289
public static final String QUEUE_NAME = "queueName";
291-
public static final String DEPENDENT_SPLIT = ":||";
290+
291+
/**
292+
* dependent task
293+
*/
292294
public static final long DEPENDENT_ALL_TASK_CODE = -1;
293295
public static final long DEPENDENT_WORKFLOW_CODE = 0;
294296

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.common.enums;
19+
20+
import lombok.Getter;
21+
22+
@Getter
23+
public enum ContextType {
24+
25+
DEPENDENT_RESULT_CONTEXT;
26+
27+
public static ContextType of(String name) {
28+
for (ContextType contextType : values()) {
29+
if (contextType.name().equalsIgnoreCase(name)) {
30+
return contextType;
31+
}
32+
}
33+
return null;
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.entity;
19+
20+
import org.apache.dolphinscheduler.common.enums.ContextType;
21+
import org.apache.dolphinscheduler.dao.model.ITaskInstanceContext;
22+
23+
import lombok.Data;
24+
import lombok.NoArgsConstructor;
25+
26+
@Data
27+
@NoArgsConstructor
28+
public abstract class AbstractTaskInstanceContext implements ITaskInstanceContext {
29+
30+
protected ContextType contextType;
31+
}

0 commit comments

Comments
 (0)