diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java index 28c20b93ceaf..f92000a3a209 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java @@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.CommandKeyConstants; import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ContextType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.graph.DAG; @@ -53,11 +54,14 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.AbstractTaskInstanceContext; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext; +import org.apache.dolphinscheduler.dao.entity.TaskInstanceDependentDetails; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; @@ -70,6 +74,8 @@ import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; +import org.apache.dolphinscheduler.dao.model.ITaskInstanceContext; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao; @@ -96,6 +102,7 @@ import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -174,6 +181,9 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements Work @Autowired private CuringParamsService curingGlobalParamsService; + @Autowired + private TaskInstanceContextDao taskInstanceContextDao; + /** * return top n SUCCESS workflow instance order by running time which started between startTime and endTime */ @@ -184,7 +194,7 @@ public Map queryTopNLongestRunningWorkflowInstance(User loginUse // check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, - ApiFuncIdentificationConstant.WORKFLOW_INSTANCE); + WORKFLOW_INSTANCE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -233,7 +243,7 @@ public Map queryWorkflowInstanceById(User loginUser, long projec // check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, - ApiFuncIdentificationConstant.WORKFLOW_INSTANCE); + WORKFLOW_INSTANCE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -245,7 +255,7 @@ public Map queryWorkflowInstanceById(User loginUser, long projec workflowInstance.getWorkflowDefinitionVersion()); if (workflowDefinition == null || projectCode != workflowDefinition.getProjectCode()) { - log.error("workflow definition does not exist, projectCode:{}.", projectCode); + log.error("workflow definition does not exist, projectCode: {}.", projectCode); putMsg(result, Status.WORKFLOW_DEFINITION_NOT_EXIST, workflowInstanceId); } else { workflowInstance.setLocations(workflowDefinition.getLocations()); @@ -443,7 +453,7 @@ public Map queryTaskListByWorkflowInstanceId(User loginUser, lon // check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, - ApiFuncIdentificationConstant.WORKFLOW_INSTANCE); + WORKFLOW_INSTANCE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -460,15 +470,47 @@ public Map queryTaskListByWorkflowInstanceId(User loginUser, lon List taskInstanceList = taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstanceId, workflowInstance.getTestFlag()); + List> taskInstanceDependentDetailsList = + setTaskInstanceDependentResult(taskInstanceList); + Map resultMap = new HashMap<>(); resultMap.put(WORKFLOW_INSTANCE_STATE, workflowInstance.getState().toString()); - resultMap.put(TASK_LIST, taskInstanceList); + resultMap.put(TASK_LIST, taskInstanceDependentDetailsList); result.put(DATA_LIST, resultMap); putMsg(result, Status.SUCCESS); return result; } + private List> setTaskInstanceDependentResult(List taskInstanceList) { + List> taskInstanceDependentDetailsList = + taskInstanceList.stream() + .map(taskInstance -> { + TaskInstanceDependentDetails taskInstanceDependentDetails = + new TaskInstanceDependentDetails<>(); + BeanUtils.copyProperties(taskInstance, taskInstanceDependentDetails); + return taskInstanceDependentDetails; + }).collect(Collectors.toList()); + List taskInstanceIdList = taskInstanceList.stream() + .map(TaskInstance::getId).collect(Collectors.toList()); + List taskInstanceContextList = + taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList, + ContextType.DEPENDENT_RESULT_CONTEXT); + for (TaskInstanceContext taskInstanceContext : taskInstanceContextList) { + for (AbstractTaskInstanceContext dependentResultTaskInstanceContext : taskInstanceContext + .getTaskInstanceContext()) { + for (TaskInstanceDependentDetails taskInstanceDependentDetails : taskInstanceDependentDetailsList) { + if (taskInstanceDependentDetails.getId().equals(taskInstanceContext.getTaskInstanceId())) { + taskInstanceDependentDetails + .setTaskInstanceDependentResult( + dependentResultTaskInstanceContext); + } + } + } + } + return taskInstanceDependentDetailsList; + } + @Override public List queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) { TaskInstance taskInstance = taskInstanceDao.queryById(taskId); @@ -488,7 +530,7 @@ public List queryDynamicSubWorkflowInstances(User loginUs .queryAllSubWorkflowInstance((long) taskInstance.getWorkflowInstanceId(), taskInstance.getTaskCode()); List allSubWorkflowInstanceId = relationSubWorkflows.stream() - .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList()); + .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList()); List allSubWorkflows = workflowInstanceDao.queryByIds(allSubWorkflowInstanceId); if (allSubWorkflows == null || allSubWorkflows.isEmpty()) { @@ -539,7 +581,7 @@ public Map querySubWorkflowInstanceByTaskId(User loginUser, long // check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, - ApiFuncIdentificationConstant.WORKFLOW_INSTANCE); + WORKFLOW_INSTANCE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -693,7 +735,7 @@ public Map updateWorkflowInstance(User loginUser, long projectCo "Update task relations complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.", projectCode, workflowDefinition.getCode(), insertVersion); putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, workflowDefinition); + result.put(DATA_LIST, workflowDefinition); } else { log.info( "Update task relations error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.", @@ -750,7 +792,7 @@ public Map queryParentInstanceBySubId(User loginUser, long proje // check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, - ApiFuncIdentificationConstant.WORKFLOW_INSTANCE); + WORKFLOW_INSTANCE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -824,7 +866,7 @@ public Map viewVariables(long projectCode, Integer workflowInsta if (workflowInstance == null) { log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode, workflowInstanceId); - putMsg(result, Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId); + putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId); return result; } @@ -918,7 +960,7 @@ public Map viewGantt(long projectCode, Integer workflowInstanceI if (workflowInstance == null) { log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode, workflowInstanceId); - putMsg(result, Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId); + putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId); return result; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java index 5d075e09ef0b..9395e471ee0b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ContextType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; @@ -42,10 +43,12 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; @@ -60,10 +63,12 @@ import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; +import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.model.TaskNode; @@ -90,6 +95,7 @@ import org.mockito.quality.Strictness; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.google.common.collect.Lists; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -154,6 +160,9 @@ public class WorkflowInstanceServiceTest { @Mock private WorkflowInstanceMapDao workflowInstanceMapDao; + @Mock + private TaskInstanceContextDao taskInstanceContextDao; + private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789," + "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]"; @@ -465,6 +474,18 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException { taskInstance.setTaskType("SHELL"); List taskInstanceList = new ArrayList<>(); taskInstanceList.add(taskInstance); + List dependentResultTaskInstanceContextList = new ArrayList<>(); + TaskInstanceContext taskInstanceContext = new TaskInstanceContext(); + taskInstanceContext.setTaskInstanceId(0); + taskInstanceContext.setContextType(ContextType.DEPENDENT_RESULT_CONTEXT); + DependentResultTaskInstanceContext dependentResultTaskInstanceContext = + new DependentResultTaskInstanceContext(); + dependentResultTaskInstanceContext.setProjectCode(projectCode); + dependentResultTaskInstanceContext.setDependentResult(DependResult.SUCCESS); + taskInstanceContext.setTaskInstanceContext( + Lists.asList(dependentResultTaskInstanceContext, new DependentResultTaskInstanceContext[0])); + List taskInstanceIdList = new ArrayList<>(); + taskInstanceIdList.add(0); Result res = new Result(); res.setCode(Status.SUCCESS.ordinal()); res.setData("xxx"); @@ -476,6 +497,9 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException { workflowInstance.getTestFlag())) .thenReturn(taskInstanceList); when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res); + when(taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList, + ContextType.DEPENDENT_RESULT_CONTEXT)) + .thenReturn(Lists.asList(taskInstanceContext, new TaskInstanceContext[0])); Map successRes = workflowInstanceService.queryTaskListByWorkflowInstanceId(loginUser, projectCode, 1); Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index c5c64d5565ff..7d365bf86f36 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -284,11 +284,13 @@ public final class Constants { public static final String SUBWORKFLOW_INSTANCE_ID = "subWorkflowInstanceId"; public static final String WORKFLOW_INSTANCE_STATE = "workflowInstanceState"; public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance"; - public static final String DEPENDENCE = "dependence"; public static final String TASK_LIST = "taskList"; public static final String QUEUE = "queue"; public static final String QUEUE_NAME = "queueName"; - public static final String DEPENDENT_SPLIT = ":||"; + + /** + * dependent task + */ public static final long DEPENDENT_ALL_TASK_CODE = -1; public static final long DEPENDENT_WORKFLOW_CODE = 0; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ContextType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ContextType.java new file mode 100644 index 000000000000..e5275a801b6c --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ContextType.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +import lombok.Getter; + +@Getter +public enum ContextType { + + DEPENDENT_RESULT_CONTEXT; + + public static ContextType of(String name) { + for (ContextType contextType : values()) { + if (contextType.name().equalsIgnoreCase(name)) { + return contextType; + } + } + return null; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/AbstractTaskInstanceContext.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/AbstractTaskInstanceContext.java new file mode 100644 index 000000000000..af89dcfe20ca --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/AbstractTaskInstanceContext.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import org.apache.dolphinscheduler.common.enums.ContextType; +import org.apache.dolphinscheduler.dao.model.ITaskInstanceContext; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public abstract class AbstractTaskInstanceContext implements ITaskInstanceContext { + + protected ContextType contextType; +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentResultTaskInstanceContext.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentResultTaskInstanceContext.java new file mode 100644 index 000000000000..4589922fd05c --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentResultTaskInstanceContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +@EqualsAndHashCode(callSuper = true) +@Data +@NoArgsConstructor +public class DependentResultTaskInstanceContext extends AbstractTaskInstanceContext { + + private Long projectCode; + + private Long workflowDefinitionCode; + + private Long taskDefinitionCode; + + private String dateCycle; + + private DependResult dependentResult; + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 064082e7410e..45d5738ae770 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.dao.entity; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; @@ -259,10 +258,6 @@ public Map getTaskParamMap() { return taskParamMap; } - public String getDependence() { - return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE); - } - public Integer getCpuQuota() { return cpuQuota == null ? -1 : cpuQuota; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java index 48e24576f5f7..2eca1abcf35f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java @@ -19,11 +19,14 @@ import java.util.Date; +import lombok.Data; + import com.baomidou.mybatisplus.annotation.TableName; /** * task definition log */ +@Data @TableName("t_ds_task_definition_log") public class TaskDefinitionLog extends TaskDefinition { @@ -77,22 +80,6 @@ public TaskDefinitionLog(TaskDefinition taskDefinition) { this.setTaskExecuteType(taskDefinition.getTaskExecuteType()); } - public int getOperator() { - return operator; - } - - public void setOperator(int operator) { - this.operator = operator; - } - - public Date getOperateTime() { - return operateTime; - } - - public void setOperateTime(Date operateTime) { - this.operateTime = operateTime; - } - @Override public boolean equals(Object o) { return super.equals(o); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceContext.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceContext.java new file mode 100644 index 000000000000..d303320d4c45 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import org.apache.dolphinscheduler.common.enums.ContextType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.util.Date; +import java.util.List; + +import lombok.Data; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +@Data +@TableName("t_ds_task_instance_context") +public class TaskInstanceContext { + + @TableId(value = "id", type = IdType.AUTO) + private Integer id; + + private Integer taskInstanceId; + + private String context; + + private ContextType contextType; + + private Date createTime; + + private Date updateTime; + + public void setTaskInstanceContext(List taskInstanceContexts) { + this.context = JSONUtils.toJsonString(taskInstanceContexts); + } + + public List getTaskInstanceContext() { + return JSONUtils.toList(context, AbstractTaskInstanceContext.class); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceDependentDetails.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceDependentDetails.java new file mode 100644 index 000000000000..2efa687ce2c1 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceDependentDetails.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import org.apache.dolphinscheduler.dao.model.ITaskInstanceContext; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +public class TaskInstanceDependentDetails extends TaskInstance { + + private T taskInstanceDependentResult; + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceContextMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceContextMapper.java new file mode 100644 index 000000000000..70490a43b802 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceContextMapper.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.common.enums.ContextType; +import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public interface TaskInstanceContextMapper extends BaseMapper { + + List queryListByTaskInstanceIdAndContextType(@Param("taskInstanceId") int taskInstanceId, + @Param("contextType") ContextType contextType); + + int deleteByTaskInstanceIdAndContextType(@Param("taskInstanceId") int taskInstanceId, + @Param("contextType") ContextType contextType); + + int updateTaskInstanceContextByTaskInstanceIdAndContextType(@Param("taskInstanceId") int taskInstanceId, + @Param("contextType") ContextType contextType, + @Param("context") String context); + + List batchQueryByTaskInstanceIdsAndContextType(@Param("taskInstanceIds") List taskInstanceIds, + @Param("contextType") ContextType contextType); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/ITaskInstanceContext.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/ITaskInstanceContext.java new file mode 100644 index 000000000000..973b416dc355 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/ITaskInstanceContext.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.model; + +import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "contextType", include = JsonTypeInfo.As.EXISTING_PROPERTY, visible = true) +@JsonSubTypes({ + @JsonSubTypes.Type(value = DependentResultTaskInstanceContext.class, name = "DEPENDENT_RESULT_CONTEXT") +}) +public interface ITaskInstanceContext { + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceContextDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceContextDao.java new file mode 100644 index 000000000000..9fcdec80f3f7 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceContextDao.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.common.enums.ContextType; +import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext; + +import java.util.List; + +public interface TaskInstanceContextDao extends IDao { + + List queryListByTaskInstanceIdAndContextType(Integer taskInstanceId, + ContextType contextType); + + int deleteByTaskInstanceIdAndContextType(Integer taskInstanceId, + ContextType contextType); + + int upsertTaskInstanceContext(TaskInstanceContext taskDependentResult); + + List batchQueryByTaskInstanceIdsAndContextType(List taskInstanceIds, + ContextType contextType); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceContextDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceContextDaoImpl.java new file mode 100644 index 000000000000..089ab95d5c46 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceContextDaoImpl.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ContextType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.AbstractTaskInstanceContext; +import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext; +import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceContextMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import org.springframework.stereotype.Repository; + +@Repository +public class TaskInstanceContextDaoImpl extends BaseDao + implements + TaskInstanceContextDao { + + public TaskInstanceContextDaoImpl(TaskInstanceContextMapper taskInstanceContextMapper) { + super(taskInstanceContextMapper); + } + + @Override + public List queryListByTaskInstanceIdAndContextType(Integer taskInstanceId, + ContextType contextType) { + if (taskInstanceId == null) { + return Collections.emptyList(); + } + return mybatisMapper.queryListByTaskInstanceIdAndContextType(taskInstanceId, contextType); + } + + @Override + public int deleteByTaskInstanceIdAndContextType(Integer taskInstanceId, ContextType contextType) { + if (taskInstanceId == null) { + throw new IllegalArgumentException("taskInstanceId cannot be null"); + } + return mybatisMapper.deleteByTaskInstanceIdAndContextType(taskInstanceId, contextType); + } + + @Override + public int upsertTaskInstanceContext(TaskInstanceContext taskInstanceContext) { + if (taskInstanceContext == null) { + return 0; + } + TaskInstanceContext dbTaskInstanceContext = + mybatisMapper.queryListByTaskInstanceIdAndContextType(taskInstanceContext.getTaskInstanceId(), + taskInstanceContext.getContextType()).stream().findFirst().orElse(null); + if (dbTaskInstanceContext == null) { + return mybatisMapper.insert(taskInstanceContext); + } else { + List dbDependentResultTaskInstanceContextList = + dbTaskInstanceContext.getTaskInstanceContext(); + dbDependentResultTaskInstanceContextList.addAll(taskInstanceContext.getTaskInstanceContext()); + List deduplicatedDependentResultTaskInstanceContextList = + dbDependentResultTaskInstanceContextList.stream() + .map(DependentResultTaskInstanceContext.class::cast) + .collect(Collectors.collectingAndThen( + Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing( + o -> o.getProjectCode() + Constants.UNDERLINE + + o.getWorkflowDefinitionCode() + Constants.UNDERLINE + + o.getTaskDefinitionCode() + Constants.UNDERLINE + + o.getDateCycle()))), + ArrayList::new)); + taskInstanceContext.setTaskInstanceContext(deduplicatedDependentResultTaskInstanceContextList); + return mybatisMapper.updateTaskInstanceContextByTaskInstanceIdAndContextType( + taskInstanceContext.getTaskInstanceId(), + taskInstanceContext.getContextType(), JSONUtils.toJsonString(taskInstanceContext.getContext())); + } + } + + @Override + public List batchQueryByTaskInstanceIdsAndContextType(List taskInstanceIds, + ContextType contextType) { + if (CollectionUtils.isEmpty(taskInstanceIds)) { + return Collections.emptyList(); + } + return mybatisMapper.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIds, contextType); + } +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceContextMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceContextMapper.xml new file mode 100644 index 000000000000..04f7e2080c35 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceContextMapper.xml @@ -0,0 +1,69 @@ + + + + + + + + + DELETE FROM t_ds_task_instance_context + WHERE task_instance_id = #{taskInstanceId} + AND context_type = #{contextType} + + + + UPDATE t_ds_task_instance_context + SET + context = #{context} + ,update_time = now() + WHERE 1=1 + AND task_instance_id = #{taskInstanceId} + AND context_type = #{contextType} + + + + diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 8c4510659a2e..ce18e03fb1dd 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -925,6 +925,20 @@ CREATE TABLE t_ds_task_instance -- Records of t_ds_task_instance -- ---------------------------- +-- ---------------------------- +-- Table structure for t_ds_task_instance_context +-- ---------------------------- +DROP TABLE IF EXISTS `t_ds_task_instance_context`; +CREATE TABLE `t_ds_task_instance_context` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `task_instance_id` int(11) NOT NULL, + `context` text NOT NULL, + `context_type` varchar(200) NOT NULL, + `create_time` datetime NOT NULL, + `update_time` datetime NOT NULL, + PRIMARY KEY (`id`) +); + -- ---------------------------- -- Table structure for t_ds_tenant -- ---------------------------- diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index d3774fee192b..3091d7f33720 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -925,6 +925,21 @@ CREATE TABLE `t_ds_task_instance` ( -- Records of t_ds_task_instance -- ---------------------------- +-- ---------------------------- +-- Table structure for t_ds_task_instance_context +-- ---------------------------- +DROP TABLE IF EXISTS `t_ds_task_instance_context`; +CREATE TABLE `t_ds_task_instance_context` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `task_instance_id` int(11) NOT NULL, + `context` text NOT NULL, + `context_type` varchar(200) NOT NULL COMMENT 'context type', + `create_time` datetime NOT NULL, + `update_time` datetime NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `task_instance_id` (`task_instance_id`,`context_type`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin; + -- ---------------------------- -- Table structure for t_ds_tenant -- ---------------------------- diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index c84db98fa2da..1d65eb93bf1a 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -835,6 +835,22 @@ CREATE TABLE t_ds_task_instance ( create index idx_task_instance_code_version on t_ds_task_instance (task_code, task_definition_version); +-- +-- Table structure for t_ds_task_instance_context +-- +DROP TABLE IF EXISTS t_ds_task_instance_context; +CREATE TABLE t_ds_task_instance_context ( + id int NOT NULL, + task_instance_id int NOT NULL, + context text NOT NULL, + context_type varchar(200) NOT NULL, + create_time timestamp NOT NULL, + update_time timestamp NOT NULL, + PRIMARY KEY (id) +); + +create unique index idx_task_instance_id on t_ds_task_instance_context (task_instance_id, context_type); + -- -- Table structure for table t_ds_tenant -- diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql index 79b6fe34d7ff..dd7c9f891904 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql @@ -361,3 +361,15 @@ d// delimiter ; CALL drop_column_t_ds_task_instance_cache_key; DROP PROCEDURE drop_column_t_ds_task_instance_cache_key; + +DROP TABLE IF EXISTS `t_ds_task_instance_context`; +CREATE TABLE `t_ds_task_instance_context` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `task_instance_id` int(11) NOT NULL, + `context` text NOT NULL, + `context_type` varchar(255) NOT NULL COMMENT 'context type', + `create_time` datetime NOT NULL, + `update_time` datetime NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `task_instance_id` (`task_instance_id`,`context_type`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql index f30086111379..22188d2fd728 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -362,3 +362,15 @@ d// select drop_column_t_ds_task_instance_cache_key(); DROP FUNCTION IF EXISTS drop_column_t_ds_task_instance_cache_key(); +DROP TABLE IF EXISTS t_ds_task_instance_context; +CREATE TABLE t_ds_task_instance_context ( + id int NOT NULL, + task_instance_id int NOT NULL, + context text NOT NULL, + context_type varchar(200) NOT NULL, + create_time timestamp NOT NULL, + update_time timestamp NOT NULL, + PRIMARY KEY (id) +); + +create unique index idx_task_instance_id on t_ds_task_instance_context (task_instance_id, context_type); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/condition/ConditionLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/condition/ConditionLogicTask.java index dcfca074e440..7ebb6eb1d90f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/condition/ConditionLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/condition/ConditionLogicTask.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; -import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentItem; import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; import org.apache.dolphinscheduler.server.master.engine.executor.plugin.AbstractLogicTask; @@ -85,13 +85,14 @@ private DependResult calculateConditionResult() { dependentTaskModel.getRelation(), dependentTaskModel.getDependItemList() .stream() - .map(dependentItem -> getDependResultForItem(dependentItem, taskInstanceMap)) + .map(dependentItem -> getDependResultForItem((ConditionDependentItem) dependentItem, + taskInstanceMap)) .collect(Collectors.toList()))) .collect(Collectors.toList()); return DependentUtils.getDependResultForRelation(dependence.getRelation(), dependResults); } - private DependResult getDependResultForItem(DependentItem item, Map taskInstanceMap) { + private DependResult getDependResultForItem(ConditionDependentItem item, Map taskInstanceMap) { TaskInstance taskInstance = taskInstanceMap.get(item.getDepTaskCode()); if (taskInstance == null) { log.info("The depend item: {} has not completed yet", DependResult.FAILED); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentLogicTask.java index 3e4d7314f0c7..d007a8255c97 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentLogicTask.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.repository.ProjectDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; @@ -48,7 +49,8 @@ public DependentLogicTask(TaskExecutionContext taskExecutionContext, TaskDefinitionDao taskDefinitionDao, TaskInstanceDao taskInstanceDao, WorkflowInstanceDao workflowInstanceDao, - IWorkflowExecutionRunnable workflowExecutionRunnable) { + IWorkflowExecutionRunnable workflowExecutionRunnable, + TaskInstanceContextDao taskInstanceContextDao) { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; this.dependentTaskTracker = new DependentTaskTracker( @@ -58,7 +60,8 @@ public DependentLogicTask(TaskExecutionContext taskExecutionContext, workflowDefinitionDao, taskDefinitionDao, taskInstanceDao, - workflowInstanceDao); + workflowInstanceDao, + taskInstanceContextDao); onTaskRunning(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentLogicTaskPluginFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentLogicTaskPluginFactory.java index 6c4f35b1fb49..0e8a7dcaae0b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentLogicTaskPluginFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentLogicTaskPluginFactory.java @@ -19,6 +19,7 @@ import org.apache.dolphinscheduler.dao.repository.ProjectDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; @@ -41,12 +42,19 @@ public class DependentLogicTaskPluginFactory implements ILogicTaskPluginFactory< @Autowired private ProjectDao projectDao; + @Autowired private WorkflowDefinitionDao workflowDefinitionDao; + @Autowired private TaskDefinitionDao taskDefinitionDao; + @Autowired private TaskInstanceDao taskInstanceDao; + + @Autowired + private TaskInstanceContextDao taskInstanceContextDao; + @Autowired private WorkflowInstanceDao workflowInstanceDao; @@ -68,7 +76,8 @@ public DependentLogicTask createLogicTask(final ITaskExecutor taskExecutor) thro taskDefinitionDao, taskInstanceDao, workflowInstanceDao, - workflowExecutionRunnable); + workflowExecutionRunnable, + taskInstanceContextDao); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentTaskTracker.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentTaskTracker.java index f2cae5828e40..c9572a99ad66 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentTaskTracker.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentTaskTracker.java @@ -17,17 +17,19 @@ package org.apache.dolphinscheduler.server.master.engine.executor.plugin.dependent; -import static org.apache.dolphinscheduler.common.constants.Constants.DEPENDENT_SPLIT; - import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ContextType; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.repository.ProjectDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; @@ -54,6 +56,8 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import com.google.common.collect.Lists; + @Slf4j public class DependentTaskTracker { @@ -63,6 +67,7 @@ public class DependentTaskTracker { private final WorkflowDefinitionDao workflowDefinitionDao; private final TaskDefinitionDao taskDefinitionDao; private final TaskInstanceDao taskInstanceDao; + private final TaskInstanceContextDao taskInstanceContextDao; private final WorkflowInstance workflowInstance; private final Date dependentDate; @@ -70,33 +75,41 @@ public class DependentTaskTracker { private final Map dependResultMap; private final Map dependVarPoolPropertyMap; + private Map processDefinitionMap; + private Map taskDefinitionMap; + private Map projectCodeMap; + private TaskInstanceContext taskInstanceContext; + public DependentTaskTracker(TaskExecutionContext taskExecutionContext, DependentParameters dependentParameters, ProjectDao projectDao, WorkflowDefinitionDao workflowDefinitionDao, TaskDefinitionDao taskDefinitionDao, TaskInstanceDao taskInstanceDao, - WorkflowInstanceDao workflowInstanceDao) { + WorkflowInstanceDao workflowInstanceDao, + TaskInstanceContextDao taskInstanceContextDao) { this.taskExecutionContext = taskExecutionContext; this.dependentParameters = dependentParameters; this.projectDao = projectDao; this.workflowDefinitionDao = workflowDefinitionDao; this.taskDefinitionDao = taskDefinitionDao; this.taskInstanceDao = taskInstanceDao; + this.taskInstanceContextDao = taskInstanceContextDao; this.workflowInstance = workflowInstanceDao.queryById(taskExecutionContext.getWorkflowInstanceId()); this.dependentDate = calculateDependentDate(); this.dependentTaskList = initializeDependentTaskList(); - log.info("Initialized dependent task list successfully"); this.dependResultMap = new HashMap<>(); this.dependVarPoolPropertyMap = new HashMap<>(); + this.taskInstanceContext = new TaskInstanceContext(); + initTaskDependentResult(); } public @NonNull TaskExecutionStatus getDependentTaskStatus() { if (isAllDependentTaskFinished()) { log.info("All dependent task finished, will calculate the dependent result"); DependResult dependResult = calculateDependResult(); - log.info("The Dependent result is: {}", dependResult); + log.info("The final Dependent result is: {}", dependResult); if (dependResult == DependResult.SUCCESS) { dependentParameters.setVarPool(JSONUtils.toJsonString(dependVarPoolPropertyMap.values())); log.info("Set dependentParameters varPool: {}", dependentParameters.getVarPool()); @@ -131,12 +144,12 @@ private List initializeDependentTaskList() { } } - final Map projectCodeMap = projectDao.queryByCodes(new ArrayList<>(projectCodes)).stream() + projectCodeMap = projectDao.queryByCodes(new ArrayList<>(projectCodes)).stream() .collect(Collectors.toMap(Project::getCode, Function.identity())); - final Map processDefinitionMap = + processDefinitionMap = workflowDefinitionDao.queryByCodes(processDefinitionCodes).stream() .collect(Collectors.toMap(WorkflowDefinition::getCode, Function.identity())); - final Map taskDefinitionMap = taskDefinitionDao.queryByCodes(taskDefinitionCodes).stream() + taskDefinitionMap = taskDefinitionDao.queryByCodes(taskDefinitionCodes).stream() .collect(Collectors.toMap(TaskDefinition::getCode, Function.identity())); final TaskInstance taskInstance = taskInstanceDao.queryById(taskExecutionContext.getTaskInstanceId()); @@ -158,18 +171,17 @@ private List initializeDependentTaskList() { "The dependent task's workflow is not exist, dependentItem: " + dependentItem); } if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) { - log.info("Add dependent task:"); - log.info("DependentRelation: {}", dependentTaskModel.getRelation()); - log.info("ProjectName: {}", project.getName()); - log.info("WorkflowName: {}", workflowDefinition.getName()); - log.info("TaskName: {}", "ALL"); - log.info("DependentKey: {}", dependentItem.getKey()); + log.info( + "Add dependent all task, ProjectName: {}, WorkflowName: {}, WorkflowCode: {}, DependentCycle: {}, DependentCycleDate: {}, DependentRelation: {}", + project.getName(), workflowDefinition.getName(), workflowDefinition.getCode(), + dependentItem.getCycle(), dependentItem.getDateValue(), + dependentTaskModel.getRelation()); } else if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_WORKFLOW_CODE) { - log.info("Add dependent task:"); - log.info("DependentRelation: {}", dependentTaskModel.getRelation()); - log.info("ProjectName: {}", project.getName()); - log.info("WorkflowName: {}", workflowDefinition.getName()); - log.info("DependentKey: {}", dependentItem.getKey()); + log.info( + "Add dependent workflow task, ProjectName: {}, WorkflowName: {}, WorkflowCode: {}, DependentCycle: {}, DependentCycleDate: {}, DependentRelation: {}", + project.getName(), workflowDefinition.getName(), workflowDefinition.getCode(), + dependentItem.getCycle(), dependentItem.getDateValue(), + dependentTaskModel.getRelation()); } else { TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode()); if (taskDefinition == null) { @@ -179,21 +191,29 @@ private List initializeDependentTaskList() { "The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem); } - log.info("Add dependent task:"); - log.info("DependentRelation: {}", dependentTaskModel.getRelation()); - log.info("ProjectName: {}", project.getName()); - log.info("WorkflowName: {}", workflowDefinition.getName()); - log.info("TaskName: {}", taskDefinition.getName()); - log.info("DependentKey: {}", dependentItem.getKey()); + log.info( + "Add dependent task, ProjectName: {}, WorkflowName: {}, WorkflowCode: {}, TaskName: {}, DependentCycle: {}, DependentCycleDate: {}, DependentRelation: {}", + project.getName(), workflowDefinition.getName(), workflowDefinition.getCode(), + taskDefinition.getName(), dependentItem.getCycle(), dependentItem.getDateValue(), + dependentTaskModel.getRelation()); } } return new DependentExecute(dependentTaskModel.getDependItemList(), dependentTaskModel.getRelation(), workflowInstance, taskInstance); }).collect(Collectors.toList()); - log.info("Initialized dependent task list"); + log.info("Initialized dependent task list successfully"); return dependentExecutes; } + private void initTaskDependentResult() { + taskInstanceContextDao.deleteByTaskInstanceIdAndContextType(taskExecutionContext.getTaskInstanceId(), + ContextType.DEPENDENT_RESULT_CONTEXT); + taskInstanceContext.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskInstanceContext.setContextType(ContextType.DEPENDENT_RESULT_CONTEXT); + taskInstanceContext.setCreateTime(new Date()); + taskInstanceContext.setUpdateTime(new Date()); + } + private DependResult calculateDependResult() { List dependResultList = new ArrayList<>(); Map dependVarPoolEndTimeMap = new HashMap<>(); @@ -223,12 +243,48 @@ private boolean isAllDependentTaskFinished() { dependentExecute.getDependResultMap().forEach((dependentKey, dependResult) -> { if (!dependResultMap.containsKey(dependentKey)) { dependResultMap.put(dependentKey, dependResult); - // The log is applied in: api-server obtains the result of the item dependent in the dependent task - // node.{@link ProcessInstanceServiceImpl#parseLogForDependentResult} - log.info("Dependent item check finished, {} dependentKey: {}, result: {}, dependentDate: {}", - DEPENDENT_SPLIT, - dependentKey, - dependResult, dependentDate); + DependentItem dependentItem = new DependentItem().fromKey(dependentKey); + WorkflowDefinition workflowDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode()); + Project project = projectCodeMap.get(workflowDefinition.getProjectCode()); + DependentResultTaskInstanceContext dependentResultTaskInstanceContext = + new DependentResultTaskInstanceContext(); + dependentResultTaskInstanceContext.setProjectCode(project.getCode()); + dependentResultTaskInstanceContext.setWorkflowDefinitionCode(workflowDefinition.getCode()); + dependentResultTaskInstanceContext.setDependentResult(dependResult); + dependentResultTaskInstanceContext.setContextType(ContextType.DEPENDENT_RESULT_CONTEXT); + if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) { + dependentResultTaskInstanceContext.setTaskDefinitionCode(Constants.DEPENDENT_ALL_TASK_CODE); + dependentResultTaskInstanceContext.setDateCycle(dependentItem.getDateValue()); + taskInstanceContext + .setTaskInstanceContext(Lists.newArrayList(dependentResultTaskInstanceContext)); + taskInstanceContextDao.upsertTaskInstanceContext(taskInstanceContext); + log.info( + "Dependent type all task check finished, DependentResult: {}, DependentDate: {}, ProjectName: {}, WorkflowName: {}, WorkflowCode: {}, DependentCycle: {}, DependentCycleDate: {}", + dependResult, dependentDate, project.getName(), workflowDefinition.getName(), + workflowDefinition.getCode(), dependentItem.getCycle(), dependentItem.getDateValue()); + } else if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_WORKFLOW_CODE) { + dependentResultTaskInstanceContext.setTaskDefinitionCode(Constants.DEPENDENT_WORKFLOW_CODE); + dependentResultTaskInstanceContext.setDateCycle(dependentItem.getDateValue()); + taskInstanceContext + .setTaskInstanceContext(Lists.newArrayList(dependentResultTaskInstanceContext)); + taskInstanceContextDao.upsertTaskInstanceContext(taskInstanceContext); + log.info( + "Dependent type workflow task check finished, DependentResult: {}, DependentDate: {}, ProjectName: {}, WorkflowName: {}, WorkflowCode: {}, DependentCycle: {}, DependentCycleDate: {}", + dependResult, dependentDate, project.getName(), workflowDefinition.getName(), + workflowDefinition.getCode(), dependentItem.getCycle(), dependentItem.getDateValue()); + } else { + TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode()); + dependentResultTaskInstanceContext.setTaskDefinitionCode(taskDefinition.getCode()); + dependentResultTaskInstanceContext.setDateCycle(dependentItem.getDateValue()); + taskInstanceContext + .setTaskInstanceContext(Lists.newArrayList(dependentResultTaskInstanceContext)); + taskInstanceContextDao.upsertTaskInstanceContext(taskInstanceContext); + log.info( + "Dependent type task check finished, DependentResult: {}, DependentDate: {}, ProjectName: {}, WorkflowName: {}, WorkflowCode: {}, TaskName: {}, TaskCode: {}, DependentCycle: {}, DependentCycleDate: {}", + dependResult, dependentDate, project.getName(), workflowDefinition.getName(), + workflowDefinition.getCode(), taskDefinition.getName(), taskDefinition.getCode(), + dependentItem.getCycle(), dependentItem.getDateValue()); + } } }); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java index 2fb46dc04d00..300a989b429c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java @@ -55,6 +55,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; /** @@ -84,6 +85,7 @@ public class DependentExecute { /** * depend result map */ + @Getter private Map dependResultMap = new HashMap<>(); /** @@ -102,8 +104,10 @@ public class DependentExecute { */ private final TaskDefinitionDao taskDefinitionDao = SpringApplicationContext.getBean(TaskDefinitionDao.class); + @Getter private Map dependTaskVarPoolPropertyMap = new HashMap<>(); + @Getter private Map dependTaskVarPoolEndTimeMap = new HashMap<>(); private Map dependItemVarPoolPropertyMap = new HashMap<>(); @@ -410,7 +414,7 @@ public DependResult getModelDependResult(Date currentTime, int testFlag) { continue; } DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag); - if (dependResult != DependResult.WAITING && dependResult != DependResult.FAILED) { + if (dependResult != DependResult.WAITING) { dependResultMap.put(dependentItem.getKey(), dependResult); if (dependentItem.getParameterPassing() && !dependItemVarPoolPropertyMap.isEmpty()) { DependentUtils.addTaskVarPool(dependItemVarPoolPropertyMap, dependItemVarPoolEndTimeMap, @@ -439,18 +443,6 @@ private DependResult getDependResultForItem(DependentItem item, Date currentTime return getDependentResultForItem(item, currentTime, testFlag); } - public Map getDependResultMap() { - return dependResultMap; - } - - public Map getDependTaskVarPoolPropertyMap() { - return dependTaskVarPoolPropertyMap; - } - - public Map getDependTaskVarPoolEndTimeMap() { - return dependTaskVarPoolEndTimeMap; - } - /** * check for self-dependent * @param dependentItem diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml index 0ad9c6d0cae9..2d74923c30a6 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml @@ -35,6 +35,8 @@ ${log.base}/dolphinscheduler-standalone.%d{yyyy-MM-dd_HH}.%i.log 168 200MB + 50GB + true diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependResult.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependResult.java index 539f2e5712bf..f5fca20a34ab 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependResult.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependResult.java @@ -17,9 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.api.enums; -/** - * depend result - */ public enum DependResult { /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependentRelation.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependentRelation.java index 2a60a5e8f31b..3060f3d093dc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependentRelation.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependentRelation.java @@ -17,9 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.api.enums; -/** - * dependent relation: and or - */ public enum DependentRelation { AND, OR; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependentType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependentType.java new file mode 100644 index 000000000000..ba1b5488e43a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/DependentType.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.enums; + +public enum DependentType { + DEPENDENT_ON_WORKFLOW, DEPENDENT_ON_TASK +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/ConditionDependentItem.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/ConditionDependentItem.java new file mode 100644 index 000000000000..dd4b216e7339 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/ConditionDependentItem.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.model; + +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +public class ConditionDependentItem extends DependentItem { + + private TaskExecutionStatus status; +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/ConditionDependentTaskModel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/ConditionDependentTaskModel.java new file mode 100644 index 000000000000..71949e3d6b73 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/ConditionDependentTaskModel.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.model; + +import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; + +import java.util.List; + +import lombok.Data; + +@Data +public class ConditionDependentTaskModel { + + private List dependItemList; + private DependentRelation relation; + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java index 47f42c416c20..785ae2b35484 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java @@ -18,23 +18,20 @@ package org.apache.dolphinscheduler.plugin.task.api.model; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.enums.DependentType; import lombok.Data; -/** - * dependent item - */ @Data public class DependentItem { + private DependentType dependentType; private long projectCode; private long definitionCode; private long depTaskCode; private String cycle; private String dateValue; private DependResult dependResult; - private TaskExecutionStatus status; private Boolean parameterPassing = false; public String getKey() { @@ -45,4 +42,16 @@ public String getKey() { getDateValue()); } + public DependentItem fromKey(String key) { + String[] parts = key.split("-"); + if (parts.length != 4) { + throw new IllegalArgumentException("Invalid key format"); + } + setDefinitionCode(Long.parseLong(parts[0])); + setDepTaskCode(Long.parseLong(parts[1])); + setCycle(parts[2]); + setDateValue(parts[3]); + return this; + } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java index 8ad567578c18..b076eb885259 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.plugin.task.api.parameters; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; -import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentTaskModel; import org.apache.commons.collections4.CollectionUtils; @@ -27,8 +27,10 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +@EqualsAndHashCode(callSuper = true) @Data @Builder @NoArgsConstructor @@ -57,7 +59,7 @@ public boolean checkParameters() { @AllArgsConstructor public static class ConditionDependency { - private List dependTaskList; + private List dependTaskList; private DependentRelation relation; } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts index 8fd407f6fb90..66ac3fbdbfb2 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts @@ -81,14 +81,6 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { ] const CYCLE_LIST = [ - { - value: 'month', - label: t('project.node.month') - }, - { - value: 'week', - label: t('project.node.week') - }, { value: 'day', label: t('project.node.day') @@ -96,6 +88,14 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { { value: 'hour', label: t('project.node.hour') + }, + { + value: 'week', + label: t('project.node.week') + }, + { + value: 'month', + label: t('project.node.month') } ] const DATE_LIST = { @@ -251,16 +251,23 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { const renderState = (item: { definitionCode: number depTaskCode: number - cycle: string + projectCode: number dateValue: string }) => { if (!item || router.currentRoute.value.name !== 'workflow-instance-detail') return null - const key = `${item.definitionCode}-${item.depTaskCode}-${item.cycle}-${item.dateValue}` + const key = `${item.projectCode}-${item.definitionCode}-${item.depTaskCode}-${item.dateValue}` const state: ITaskState = dependentResult[key] - return h(NIcon, { size: 24, color: TasksStateConfig[state]?.color }, () => - h(TasksStateConfig[state]?.icon) - ) + let icon: any + let color: string + if (state) { + icon = TasksStateConfig[state]?.icon + color = TasksStateConfig[state]?.color + } else { + icon = TasksStateConfig.RUNNING_EXECUTION.icon + color = TasksStateConfig.RUNNING_EXECUTION.color + } + return h(NIcon, { size: 24, color: color }, () => h(icon)) } onMounted(() => { @@ -449,6 +456,7 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { } }, options: CYCLE_LIST, + value: CYCLE_LIST.length > 0 ? CYCLE_LIST[0].value : '', path: `dependTaskList.${i}.dependItemList.${j}.cycle`, rule: { required: true, @@ -466,7 +474,9 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { span: 10, name: ' ', options: - selectOptions.value[i]?.dependItemList[j]?.dateOptions || [], + selectOptions.value[i]?.dependItemList[j]?.dateOptions || + DATE_LIST.day, + value: DATE_LIST.day[0].value, path: `dependTaskList.${i}.dependItemList.${j}.dateValue`, rule: { trigger: ['input', 'blur'], @@ -477,19 +487,19 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { } } }), - (j = 0) => ({ - type: 'switch', - field: 'parameterPassing', - span: 20, - name: t('project.node.dependent_task_parameter_passing'), - path: `dependTaskList.${i}.dependItemList.${j}.parameterPassing` - }), (j = 0) => ({ type: 'custom', field: 'state', span: 2, name: ' ', widget: renderState(model.dependTaskList[i]?.dependItemList[j]) + }), + (j = 0) => ({ + type: 'switch', + field: 'parameterPassing', + span: 20, + name: t('project.node.dependent_task_parameter_passing'), + path: `dependTaskList.${i}.dependItemList.${j}.parameterPassing` }) ] }), diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/use-node-status.ts b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/use-node-status.ts index 0b2c68a0c5fe..f1075933e325 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/use-node-status.ts +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/use-node-status.ts @@ -77,17 +77,16 @@ export function useNodeStatus(options: Options) { window.$message.success(t('project.workflow.refresh_status_succeeded')) taskList.value = res.taskList if (taskList.value) { - const allDependentResult = {} + const taskInstanceDependentResult: { [key: string]: any } = {} taskList.value.forEach((taskInstance: any) => { setNodeStatus(taskInstance.taskCode, taskInstance.state, taskInstance) - if (taskInstance.dependentResult) { - Object.assign( - allDependentResult, - JSON.parse(taskInstance.dependentResult) - ) + if (taskInstance.taskInstanceDependentResult) { + const key = `${taskInstance.taskInstanceDependentResult.projectCode}-${taskInstance.taskInstanceDependentResult.workflowDefinitionCode}-${taskInstance.taskInstanceDependentResult.taskDefinitionCode}-${taskInstance.taskInstanceDependentResult.dateCycle}` + taskInstanceDependentResult[key] = + taskInstance.taskInstanceDependentResult.dependentResult } }) - nodeStore.updateDependentResult(allDependentResult) + nodeStore.updateDependentResult(taskInstanceDependentResult) } }) }