diff --git a/docs/docs/en/guide/task/datavines.md b/docs/docs/en/guide/task/datavines.md new file mode 100644 index 000000000000..5dd2669bd258 --- /dev/null +++ b/docs/docs/en/guide/task/datavines.md @@ -0,0 +1,30 @@ +# Datavines + +## Overview + +Use `Datavines Task` to create a datavines-type task and support data quality job in Datavines. When the worker executes `Datavines Task`, +it will call `Datavines API` to trigger datavines job. Click [here](https://datavane.github.io/datavines-website/) for details about `Datavines`. + +## Create Task + +- Click Project Management-Project Name-Workflow Definition, and click the "Create Workflow" button to enter the DAG editing page. +- Drag from the toolbar to the canvas. + +## Task Parameter + +- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters. + +| **Parameter** | **Description** | +|-------------------|-------------------------------------------------------------------------------------------------------| +| Datavines Address | The URL for the Datavines service, e.g., http://localhost:5600. | +| Datavines Job ID | The unique job id for a datavines job. | +| Datavines token | The Datawines service access token can be obtained through token management on the Datavines service. | +| Block on Failure | When turned on, if the data quality check result is failed, the task result will be set as failed. | + +## Task Example + +This example illustrates how to create a datavines task node. + +![demo-datavines](../../../../img/tasks/demo/datavines_task.png) + +![demo-get-datavines-job-id](../../../../img/tasks/demo/datavines_job_id.png) diff --git a/docs/docs/zh/guide/task/datavines.md b/docs/docs/zh/guide/task/datavines.md new file mode 100644 index 000000000000..af4a3ca47805 --- /dev/null +++ b/docs/docs/zh/guide/task/datavines.md @@ -0,0 +1,31 @@ +# Datavines + +## 综述 + +`Datavines`任务类型,用于创建并执行 `Datavines` 类型任务来执行 Datavines 中的数据质量检查作业。Worker 执行该任务的时候,会通过 `Datavines API` 触发 `Datavines 的作业`。 +点击 [这里](https://datavane.github.io/datavines-website/) 获取更多关于 `Datavines` 的信息。 + +## 创建任务 + +- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。 +- 工具栏中拖动 到画板中,即可完成创建。 + +## 任务参数 + +- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。 + +| **任务参数** | **描述** | +|-----------------|------------------------------------------------------| +| Datavines 地址 | Datavines 服务的 url,例如:`http://localhost:5600`。 | +| Datavines 作业 ID | Datavines 作业对应的唯一ID。 | +| Datavines token | Datavines 服务访问 token, 可在 Datavines 服务上的 token 管理中取得。 | +| 检查失败时阻塞 | 开启时,数据质量检查结果为失败时会将任务结果置为失败。 | + +## 例子 + +这个示例展示了如何创建 Datavines 任务节点: + +![demo-datavines](../../../../img/tasks/demo/datavines_task.png) + +![demo-get-datavines-job-id](../../../../img/tasks/demo/datavines_job_id.png) + diff --git a/docs/img/tasks/demo/datavines_job_id.png b/docs/img/tasks/demo/datavines_job_id.png new file mode 100644 index 000000000000..c850a63c631a Binary files /dev/null and b/docs/img/tasks/demo/datavines_job_id.png differ diff --git a/docs/img/tasks/demo/datavines_task.png b/docs/img/tasks/demo/datavines_task.png new file mode 100644 index 000000000000..0184c743883d Binary files /dev/null and b/docs/img/tasks/demo/datavines_task.png differ diff --git a/docs/img/tasks/icons/datavines.png b/docs/img/tasks/icons/datavines.png new file mode 100644 index 000000000000..375d9da823a8 Binary files /dev/null and b/docs/img/tasks/icons/datavines.png differ diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml b/dolphinscheduler-api/src/main/resources/task-type-config.yaml index 56053b74bdf8..20ba6fae58cb 100644 --- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml +++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml @@ -46,6 +46,8 @@ task: - 'SEATUNNEL' - 'DATAX' - 'SQOOP' + dataQuality: + - 'DATAVINES' machineLearning: - 'JUPYTER' - 'MLFLOW' diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml index 83d3b8ded191..7c7d4902dfb4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml @@ -148,6 +148,12 @@ ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-task-datavines + ${project.version} + + org.apache.dolphinscheduler dolphinscheduler-task-java diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/pom.xml new file mode 100644 index 000000000000..ed7e42961f83 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/pom.xml @@ -0,0 +1,72 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-task-plugin + dev-SNAPSHOT + + + dolphinscheduler-task-datavines + jar + + + task.datavines + + + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + provided + + + + org.apache.dolphinscheduler + dolphinscheduler-common + ${project.version} + provided + + + + org.apache.httpcomponents + httpcore + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + shade + + package + + + + + + + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesParameters.java new file mode 100644 index 000000000000..7d5e06d31e50 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesParameters.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.datavines; + +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.List; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +public class DatavinesParameters extends AbstractParameters { + + private String address; + + private String jobId; + + private String token; + + private boolean failureBlock; + + @Override + public boolean checkParameters() { + return StringUtils.isNotEmpty(this.address) && StringUtils.isNotEmpty(this.jobId); + } + + @Override + public List getResourceFilesList() { + return Collections.emptyList(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTask.java new file mode 100644 index 000000000000..e6fb59694597 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTask.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.datavines; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.*; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; + +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; + +import java.net.URI; +import java.util.Collections; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.MissingNode; +import com.fasterxml.jackson.databind.node.NullNode; + +@Slf4j +public class DatavinesTask extends AbstractRemoteTask { + + private final TaskExecutionContext taskExecutionContext; + + private DatavinesParameters datavinesParameters; + private String jobExecutionId; + private boolean executionStatus; + + protected DatavinesTask(TaskExecutionContext taskExecutionContext) { + super(taskExecutionContext); + this.taskExecutionContext = taskExecutionContext; + } + + @Override + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); + } + + @Override + public void init() { + final String taskParams = taskExecutionContext.getTaskParams(); + this.datavinesParameters = JSONUtils.parseObject(taskParams, DatavinesParameters.class); + log.info("initialize datavines task params : {}", JSONUtils.toPrettyJsonString(datavinesParameters)); + if (this.datavinesParameters == null || !this.datavinesParameters.checkParameters()) { + throw new DatavinesTaskException("datavines task params is not valid"); + } + } + + @Override + public void handle(TaskCallBack taskCallBack) throws TaskException { + super.handle(taskCallBack); + } + + @Override + public void submitApplication() throws TaskException { + executeJob(); + } + + @Override + public void trackApplicationStatus() throws TaskException { + trackApplicationStatusInner(); + } + + private void executeJob() { + try { + String address = this.datavinesParameters.getAddress(); + String jobId = this.datavinesParameters.getJobId(); + String token = this.datavinesParameters.getToken(); + JsonNode result; + String apiResultDataKey = DatavinesTaskConstants.API_RESULT_DATA; + result = executeJob(address, jobId, token); + if (checkResult(result)) { + jobExecutionId = result.get(apiResultDataKey).asText(); + executionStatus = true; + } + } catch (Exception ex) { + Thread.currentThread().interrupt(); + log.error(DatavinesTaskConstants.SUBMIT_FAILED_MSG, ex); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException(DatavinesTaskConstants.SUBMIT_FAILED_MSG, ex); + } + } + + public void trackApplicationStatusInner() throws TaskException { + try { + String address = this.datavinesParameters.getAddress(); + if (executionStatus && jobExecutionId == null) { + // Use address-taskId as app id + setAppIds(String.format(DatavinesTaskConstants.APPIDS_FORMAT, address, this.jobExecutionId)); + setExitStatusCode(mapStatusToExitCode(false)); + log.info("datavines task failed."); + return; + } + String apiResultDataKey = DatavinesTaskConstants.API_RESULT_DATA; + boolean finishFlag = false; + while (!finishFlag) { + JsonNode jobExecutionStatus = + getJobExecutionStatus(address, jobExecutionId, this.datavinesParameters.getToken()); + if (!checkResult(jobExecutionStatus)) { + break; + } + String jobExecutionStatusStr = jobExecutionStatus.get(apiResultDataKey).asText(); + switch (jobExecutionStatusStr) { + case DatavinesTaskConstants.STATUS_SUCCESS: + setAppIds(String.format(DatavinesTaskConstants.APPIDS_FORMAT, address, this.jobExecutionId)); + JsonNode jobExecutionResult = + getJobExecutionResult(address, jobExecutionId, this.datavinesParameters.getToken()); + if (!checkResult(jobExecutionResult)) { + break; + } + + String jobExecutionResultStr = jobExecutionResult.get(apiResultDataKey).asText(); + boolean checkResult = true; + if (this.datavinesParameters.isFailureBlock()) { + checkResult = DatavinesTaskConstants.STATUS_SUCCESS.equals(jobExecutionResultStr); + } + + setExitStatusCode(mapStatusToExitCode(checkResult)); + log.info("datavines task finished, execution status is {} and check result is {}", + jobExecutionStatusStr, jobExecutionResultStr); + finishFlag = true; + break; + case DatavinesTaskConstants.STATUS_FAILURE: + case DatavinesTaskConstants.STATUS_KILL: + errorHandle("task execution status: " + jobExecutionStatusStr); + finishFlag = true; + break; + default: + Thread.sleep(DatavinesTaskConstants.SLEEP_MILLIS); + } + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + log.error(DatavinesTaskConstants.TRACK_FAILED_MSG, ex); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException(DatavinesTaskConstants.TRACK_FAILED_MSG, ex); + } + } + + /** + * map datavines task status to exitStatusCode + * + * @param status datavines job status + * @return exitStatusCode + */ + private int mapStatusToExitCode(boolean status) { + if (status) { + return TaskConstants.EXIT_CODE_SUCCESS; + } else { + return TaskConstants.EXIT_CODE_FAILURE; + } + } + + private boolean checkResult(JsonNode result) { + boolean isCorrect = true; + if (result instanceof MissingNode || result instanceof NullNode) { + errorHandle(DatavinesTaskConstants.API_ERROR_MSG); + isCorrect = false; + } else if (result.get(DatavinesTaskConstants.API_RESULT_CODE) + .asInt() != DatavinesTaskConstants.API_RESULT_CODE_SUCCESS) { + errorHandle(result.get(DatavinesTaskConstants.API_RESULT_MSG)); + isCorrect = false; + } + return isCorrect; + } + + private void errorHandle(Object msg) { + setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + log.error("datavines task execute failed with error: {}", msg); + } + + @Override + public AbstractParameters getParameters() { + return datavinesParameters; + } + + @Override + public void cancelApplication() throws TaskException { + String address = this.datavinesParameters.getAddress(); + log.info("trying terminate datavines task, taskId: {}, address: {}, taskId: {}", + this.taskExecutionContext.getTaskInstanceId(), + address, + jobExecutionId); + killJobExecution(address, jobExecutionId, this.datavinesParameters.getToken()); + log.warn("datavines task terminated, taskId: {}, address: {}, jobExecutionId: {}", + this.taskExecutionContext.getTaskInstanceId(), + address, + jobExecutionId); + } + + private JsonNode executeJob(String address, String jobId, String token) { + return parse(doPost(address + DatavinesTaskConstants.EXECUTE_JOB + jobId, token)); + } + + private JsonNode getJobExecutionStatus(String address, String jobExecutionId, String token) { + return parse(doGet(address + DatavinesTaskConstants.GET_JOB_EXECUTION_STATUS + jobExecutionId, token)); + } + + private JsonNode getJobExecutionResult(String address, String jobExecutionId, String token) { + return parse(doGet(address + DatavinesTaskConstants.GET_JOB_EXECUTION_RESULT + jobExecutionId, token)); + } + + private JsonNode killJobExecution(String address, String jobExecutionId, String token) { + return parse(doPost(address + DatavinesTaskConstants.JOB_EXECUTION_KILL + jobExecutionId, token)); + } + + private JsonNode parse(String res) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode result = null; + try { + result = mapper.readTree(res); + } catch (JsonProcessingException e) { + log.error("datavines task submit failed with error", e); + } + return result; + } + + private String doGet(String url, String token) { + String result = ""; + HttpClient httpClient = HttpClientBuilder.create().build(); + HttpGet httpGet = null; + try { + URIBuilder uriBuilder = new URIBuilder(url); + URI uri = uriBuilder.build(); + httpGet = new HttpGet(uri); + httpGet.setHeader("Authorization", "Bearer " + token); + log.info("access url: {}", uri); + HttpResponse response = httpClient.execute(httpGet); + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + result = EntityUtils.toString(response.getEntity()); + log.info("datavines task succeed with results: {}", result); + } else { + log.error("datavines task terminated,response: {}", response); + } + } catch (IllegalArgumentException ie) { + log.error("datavines task terminated: {}", ie.getMessage()); + } catch (Exception e) { + log.error("datavines task terminated: ", e); + } finally { + if (null != httpGet) { + httpGet.releaseConnection(); + } + } + return result; + } + + private String doPost(String url, String token) { + String result = ""; + HttpClient httpClient = HttpClientBuilder.create().build(); + HttpPost httpPost = new HttpPost(url); + try { + httpPost.setHeader("Authorization", "Bearer " + token); + HttpResponse response = httpClient.execute(httpPost); + log.info("access url: {}", url); + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + result = EntityUtils.toString(response.getEntity()); + log.info("datavines task succeed with results: {}", result); + } else { + log.error("datavines task terminated, response: {}", response); + } + } catch (IllegalArgumentException ie) { + log.error("datavines task terminated: {}", ie.getMessage()); + } catch (Exception he) { + log.error("datavines task terminated: ", he); + } + return result; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannel.java new file mode 100644 index 000000000000..be9f807f6f30 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannel.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.datavines; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; + +public class DatavinesTaskChannel implements TaskChannel { + + @Override + public AbstractTask createTask(TaskExecutionContext taskRequest) { + return new DatavinesTask(taskRequest); + } + + @Override + public AbstractParameters parseParameters(String taskParams) { + return JSONUtils.parseObject(taskParams, DatavinesParameters.class); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelFactory.java new file mode 100644 index 000000000000..0ce2e6e66bd6 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.datavines; + +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; + +import com.google.auto.service.AutoService; + +@AutoService(TaskChannelFactory.class) +public class DatavinesTaskChannelFactory implements TaskChannelFactory { + + @Override + public String getName() { + return "DATAVINES"; + } + + @Override + public TaskChannel create() { + return new DatavinesTaskChannel(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskConstants.java new file mode 100644 index 000000000000..3baafa7964d6 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskConstants.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.datavines; + +/** + * Custom DinkyTaskConstants + */ +public class DatavinesTaskConstants { + + private DatavinesTaskConstants() { + throw new IllegalStateException("Utility class"); + } + + private static final String API_ROUTE = "/api/v1/openapi"; + public static final String EXECUTE_JOB = API_ROUTE + "/job/execute/"; + public static final String GET_JOB_EXECUTION_STATUS = API_ROUTE + "/job/execution/status/"; + public static final String GET_JOB_EXECUTION_RESULT = API_ROUTE + "/job/execution/result/"; + public static final String JOB_EXECUTION_KILL = API_ROUTE + "/job/execution/kill/"; + public static final String API_RESULT_CODE = "code"; + public static final int API_RESULT_CODE_SUCCESS = 200; + public static final String API_RESULT_MSG = "msg"; + public static final String API_RESULT_DATA = "data"; + public static final String API_ERROR_MSG = "please check url or params"; + + public static final String STATUS_SUCCESS = "SUCCESS"; + public static final String STATUS_KILL = "KILL"; + public static final String STATUS_FAILURE = "FAILURE"; + + public static final String SUBMIT_FAILED_MSG = "Submit datavinesTask failed:"; + public static final String TRACK_FAILED_MSG = "Track datavinesTask failed:"; + public static final String APPIDS_FORMAT = "%s-%s"; + + public static final long SLEEP_MILLIS = 3000; + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskException.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskException.java new file mode 100644 index 000000000000..da17d49ea456 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.datavines; + +/** + * Custom DatavinesTaskException + */ +public class DatavinesTaskException extends RuntimeException { + + public DatavinesTaskException() { + super(); + } + + public DatavinesTaskException(String message) { + super(message); + } + + public DatavinesTaskException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml index 543f42f6644a..d59f9b7ff397 100644 --- a/dolphinscheduler-task-plugin/pom.xml +++ b/dolphinscheduler-task-plugin/pom.xml @@ -62,6 +62,7 @@ dolphinscheduler-task-datafactory dolphinscheduler-task-remoteshell dolphinscheduler-task-aliyunserverlessspark + dolphinscheduler-task-datavines diff --git a/dolphinscheduler-ui/public/images/task-icons/datavines.png b/dolphinscheduler-ui/public/images/task-icons/datavines.png new file mode 100644 index 000000000000..7716d54990f2 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/datavines.png differ diff --git a/dolphinscheduler-ui/public/images/task-icons/datavines_hover.png b/dolphinscheduler-ui/public/images/task-icons/datavines_hover.png new file mode 100644 index 000000000000..375d9da823a8 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/datavines_hover.png differ diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 910b364c01d5..56639f9d81f2 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -866,6 +866,13 @@ export default { dinky_task_id: 'Dinky task id', dinky_task_id_tips: 'Please enter the task id of your dinky', dinky_online: 'Online task', + datavines_address: 'Datavines address', + datavines_address_tips: 'Please enter the url of your datavines, eg: http://localhost:5600', + datavines_job_id: 'Datavines job id', + datavines_job_id_tips: 'Please enter the job id of your datavines', + datavines_token: 'Datavines token', + datavines_token_tips: 'Please enter the token of your datavines', + datavines_failure_block: 'Block on failure', pytorch_script: 'Python Script', pytorch_script_params: 'Script Input Parameters', pytorch_other_params: 'Show More Configurations', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 58d48b796bd3..7cb9c4937e02 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -838,6 +838,13 @@ export default { dinky_task_id: 'dinky 作业ID', dinky_task_id_tips: '请输入作业 ID', dinky_online: '是否上线作业', + datavines_address: 'Datavines 地址', + datavines_address_tips: '请输入 Datavines 地址, 比如 http://localhost:5600', + datavines_job_id: 'Datavines 作业ID', + datavines_job_id_tips: '请输入作业 ID', + datavines_token: 'Datavines token', + datavines_token_tips: '请输入 token', + datavines_failure_block: '检查失败时阻塞', pytorch_script: 'python脚本', pytorch_script_params: '脚本启动参数', pytorch_other_params: '展开更多配置', diff --git a/dolphinscheduler-ui/src/store/project/task-type.ts b/dolphinscheduler-ui/src/store/project/task-type.ts index b655a0814934..fcfce325935f 100644 --- a/dolphinscheduler-ui/src/store/project/task-type.ts +++ b/dolphinscheduler-ui/src/store/project/task-type.ts @@ -70,6 +70,10 @@ export const TASK_TYPES_MAP = { CONDITIONS: { alias: 'CONDITIONS' }, + DATAVINES: { + alias: 'DATAVINES', + helperLinkDisable: true + }, SWITCH: { alias: 'SWITCH' }, diff --git a/dolphinscheduler-ui/src/store/project/types.ts b/dolphinscheduler-ui/src/store/project/types.ts index bf2d4df3cee8..2e5058205e9a 100644 --- a/dolphinscheduler-ui/src/store/project/types.ts +++ b/dolphinscheduler-ui/src/store/project/types.ts @@ -58,6 +58,7 @@ type TaskType = | 'DATA_FACTORY' | 'REMOTESHELL' | 'ALIYUN_SERVERLESS_SPARK' + | 'DATAVINES' type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts index cbbe8a70327d..8b5d9b91b1dd 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts @@ -76,6 +76,7 @@ export { useMlflowModels } from './use-mlflow-models' export { useOpenmldb } from './use-openmldb' export { useDvc } from './use-dvc' export { useDinky } from './use-dinky' +export { useDatavines } from './use-datavines' export { useSagemaker } from './use-sagemaker' export { useJava } from './use-java' export { useChunjun } from './use-chunjun' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datavines.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datavines.ts new file mode 100644 index 000000000000..fe2c40bf6d43 --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datavines.ts @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { useI18n } from 'vue-i18n' +import { useCustomParams } from '.' +import type { IJsonItem } from '../types' + +export function useDatavines(model: { [field: string]: any }): IJsonItem[] { + const { t } = useI18n() + + return [ + { + type: 'input', + field: 'address', + name: t('project.node.datavines_address'), + props: { + placeholder: t('project.node.datavines_address_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(_validate: any, value: string) { + if (!value) { + return new Error(t('project.node.datavines_address_tips')) + } + } + } + }, + { + type: 'input', + field: 'jobId', + name: t('project.node.datavines_job_id'), + props: { + placeholder: t('project.node.datavines_job_id_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(_validate: any, value: string) { + if (!value) { + return new Error(t('project.node.datavines_job_id_tips')) + } + } + } + }, + { + type: 'input', + field: 'token', + name: t('project.node.datavines_token'), + props: { + placeholder: t('project.node.datavines_token_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(_validate: any, value: string) { + if (!value) { + return new Error(t('project.node.datavines_token_tips')) + } + } + } + }, + { + type: 'switch', + field: 'failureBlock', + name: t('project.node.datavines_failure_block') + }, + ...useCustomParams({ model, field: 'localParams', isSimple: false }) + ] +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 57c658dd9a6b..dda99ecde588 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -391,6 +391,13 @@ export function formatParams(data: INodeData): { taskParams.online = data.online } + if (data.taskType === 'DATAVINES') { + taskParams.address = data.address + taskParams.jobId = data.jobId + taskParams.token = data.token + taskParams.failureBlock = data.failureBlock + } + if (data.taskType === 'OPENMLDB') { taskParams.zk = data.zk taskParams.zkPath = data.zkPath diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts index e98d709a440f..1b497072ea93 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts @@ -52,6 +52,7 @@ import { useDataFactory } from './use-data-factory' import { useRemoteShell } from './use-remote-shell' import { useDynamic } from './use-dynamic' import { useAliyunServerlessSpark } from './use-aliyun-serverless-spark' +import { useDatavines } from "./use-datavines"; export default { SHELL: useShell, @@ -78,6 +79,7 @@ export default { OPENMLDB: useOpenmldb, DVC: useDvc, DINKY: useDinky, + DATAVINES: useDatavines, SAGEMAKER: userSagemaker, CHUNJUN: useChunjun, FLINK_STREAM: useFlinkStream, diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datavines.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datavines.ts new file mode 100644 index 000000000000..4e565844c3bd --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datavines.ts @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { reactive } from 'vue' +import * as Fields from '../fields/index' +import type { IJsonItem, INodeData, ITaskData } from '../types' + +export function useDatavines({ + projectCode, + from = 0, + readonly, + data +}: { + projectCode: number + from?: number + readonly?: boolean + data?: ITaskData +}) { + const model = reactive({ + name: '', + taskType: 'DATAVINES', + flag: 'YES', + description: '', + timeoutFlag: false, + localParams: [], + environmentCode: null, + failRetryInterval: 1, + failRetryTimes: 0, + workerGroup: 'default', + delayTime: 0, + timeout: 30, + timeoutNotifyStrategy: ['WARN'] + } as INodeData) + + return { + json: [ + Fields.useName(from), + ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }), + Fields.useRunFlag(), + Fields.useCache(), + Fields.useDescription(), + Fields.useTaskPriority(), + Fields.useWorkerGroup(projectCode), + Fields.useEnvironmentName(model, !data?.id), + ...Fields.useTaskGroup(model, projectCode), + ...Fields.useFailed(), + Fields.useDelayTime(model), + ...Fields.useTimeoutAlarm(model), + ...Fields.useDatavines(model), + Fields.usePreTasks() + ] as IJsonItem[], + model + } +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index ff4dcae33a86..db1e09baa50e 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -464,6 +464,9 @@ interface ITaskParams { yarnQueue?: string awsRegion?: string kubeConfig?: string + jobId?: string + token?: string + failureBlock?:string } interface INodeData diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts index aeea696dd862..5a83428004d1 100644 --- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts +++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts @@ -52,6 +52,7 @@ export type TaskType = | 'DATA_FACTORY' | 'REMOTESHELL' | 'ALIYUN_SERVERLESS_SPARK' + | 'DATAVINES' export type TaskExecuteType = 'STREAM' | 'BATCH' @@ -103,6 +104,10 @@ export const TASK_TYPES_MAP = { CONDITIONS: { alias: 'CONDITIONS' }, + DATAVINES: { + alias: 'DATAVINES', + helperLinkDisable: true + }, SWITCH: { alias: 'SWITCH' }, diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss index 217059b0ed41..841b4723a14e 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss @@ -110,6 +110,9 @@ $bgLight: #ffffff; &.icon-dynamic { background-image: url('/images/task-icons/dynamic.png'); } + &.icon-datavines { + background-image: url('/images/task-icons/datavines.png'); + } &.icon-procedure { background-image: url('/images/task-icons/procedure.png'); } @@ -223,6 +226,9 @@ $bgLight: #ffffff; &.icon-dynamic { background-image: url('/images/task-icons/dynamic_hover.png'); } + &.icon-datavines { + background-image: url('/images/task-icons/datavines_hover.png'); + } &.icon-procedure { background-image: url('/images/task-icons/procedure_hover.png'); }