From 5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5 Mon Sep 17 00:00:00 2001 From: Daniel Imberman Date: Thu, 10 May 2018 19:32:17 +0200 Subject: [PATCH] [AIRFLOW-2424] Add dagrun status endpoint and increased k8s test coverage [AIRFLOW-2424] Add dagrun status endpoint and increase k8s test coverage [AIRFLOW-2424] Added minikube fixes by @kimoonkim [AIRFLOW-2424] modify endpoint to remove 'status' Closes #3320 from dimberman/add-kubernetes-test --- .travis.yml | 3 + .../common/experimental/get_dag_run_state.py | 44 +++++ .../kubernetes/worker_configuration.py | 1 - airflow/www/api/experimental/endpoints.py | 4 +- .../www_rbac/api/experimental/endpoints.py | 37 ++++ scripts/ci/kubernetes/docker/airflow-init.sh | 2 +- scripts/ci/kubernetes/kube/deploy.sh | 4 +- scripts/ci/kubernetes/minikube/_k8s.sh | 69 ++++++++ .../ci/kubernetes/minikube/start_minikube.sh | 25 ++- .../minikube/test_kubernetes_executor.py | 167 ++++++++++++++---- .../api/experimental/test_endpoints.py | 42 +++++ 11 files changed, 359 insertions(+), 39 deletions(-) create mode 100644 airflow/api/common/experimental/get_dag_run_state.py create mode 100644 scripts/ci/kubernetes/minikube/_k8s.sh diff --git a/.travis.yml b/.travis.yml index 87c70661f7e66..e31625685804f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -101,6 +101,9 @@ before_script: - sudo service mysql restart - psql -c 'create database airflow;' -U postgres - export PATH=${PATH}:/tmp/hive/bin + # Required for K8s v1.10.x. See + # https://github.com/kubernetes/kubernetes/issues/61058#issuecomment-372764783 + - sudo mount --make-shared / && sudo service docker restart script: - ./scripts/ci/travis_script.sh after_success: diff --git a/airflow/api/common/experimental/get_dag_run_state.py b/airflow/api/common/experimental/get_dag_run_state.py new file mode 100644 index 0000000000000..a7bd1315980cb --- /dev/null +++ b/airflow/api/common/experimental/get_dag_run_state.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.exceptions import AirflowException +from airflow.models import DagBag + + +def get_dag_run_state(dag_id, execution_date): + """Return the task object identified by the given dag_id and task_id.""" + + dagbag = DagBag() + + # Check DAG exists. + if dag_id not in dagbag.dags: + error_message = "Dag id {} not found".format(dag_id) + raise AirflowException(error_message) + + # Get DAG object and check Task Exists + dag = dagbag.get_dag(dag_id) + + # Get DagRun object and check that it exists + dagrun = dag.get_dagrun(execution_date=execution_date) + if not dagrun: + error_message = ('Dag Run for date {} not found in dag {}' + .format(execution_date, dag_id)) + raise AirflowException(error_message) + + return {'state': dagrun.get_state()} diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index ac4dacf1b691e..89f0902285799 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -177,7 +177,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da annotations = { 'iam.cloud.google.com/service-account': gcp_sa_key } if gcp_sa_key else {} - airflow_command = airflow_command.replace("-sd", "-i -sd") airflow_path = airflow_command.split('-sd')[-1] airflow_path = self.worker_airflow_home + airflow_path.split('/')[-1] airflow_command = airflow_command.split('-sd')[0] + '-sd ' + airflow_path diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index 9cd0af88d0a8d..177800010a058 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py index bddf0c18b4e75..693feecd7025c 100644 --- a/airflow/www_rbac/api/experimental/endpoints.py +++ b/airflow/www_rbac/api/experimental/endpoints.py @@ -22,6 +22,7 @@ from airflow.api.common.experimental import trigger_dag as trigger from airflow.api.common.experimental.get_task import get_task from airflow.api.common.experimental.get_task_instance import get_task_instance +from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state from airflow.exceptions import AirflowException from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils import timezone @@ -176,6 +177,42 @@ def task_instance_info(dag_id, execution_date, task_id): return jsonify(fields) +@api_experimental.route( + '/dags//dag_runs/', + methods=['GET']) +@requires_authentication +def dag_run_status(dag_id, execution_date): + """ + Returns a JSON with a dag_run's public instance variables. + The format for the exec_date is expected to be + "YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will + of course need to have been encoded for URL in the request. + """ + + # Convert string datetime into actual datetime + try: + execution_date = timezone.parse(execution_date) + except ValueError: + error_message = ( + 'Given execution date, {}, could not be identified ' + 'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format( + execution_date)) + _log.info(error_message) + response = jsonify({'error': error_message}) + response.status_code = 400 + + return response + + try: + info = get_dag_run_state(dag_id, execution_date) + except AirflowException as err: + _log.info(err) + response = jsonify(error="{}".format(err)) + response.status_code = 404 + return response + + return jsonify(info) + @api_experimental.route('/latest_runs', methods=['GET']) @requires_authentication def latest_dag_runs(): diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh index dc33625653979..cbd1c98241e46 100755 --- a/scripts/ci/kubernetes/docker/airflow-init.sh +++ b/scripts/ci/kubernetes/docker/airflow-init.sh @@ -21,4 +21,4 @@ cd /usr/local/lib/python2.7/dist-packages/airflow && \ cp -R example_dags/* /root/airflow/dags/ && \ airflow initdb && \ alembic upgrade heads && \ -airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow +(airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow || true) diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh index e585d87e73512..a9a42a7a12d12 100755 --- a/scripts/ci/kubernetes/kube/deploy.sh +++ b/scripts/ci/kubernetes/kube/deploy.sh @@ -25,10 +25,10 @@ kubectl delete -f $DIRNAME/postgres.yaml kubectl delete -f $DIRNAME/airflow.yaml kubectl delete -f $DIRNAME/secrets.yaml -kubectl apply -f $DIRNAME/postgres.yaml -kubectl apply -f $DIRNAME/volumes.yaml kubectl apply -f $DIRNAME/secrets.yaml kubectl apply -f $DIRNAME/configmaps.yaml +kubectl apply -f $DIRNAME/postgres.yaml +kubectl apply -f $DIRNAME/volumes.yaml kubectl apply -f $DIRNAME/airflow.yaml # wait for up to 10 minutes for everything to be deployed diff --git a/scripts/ci/kubernetes/minikube/_k8s.sh b/scripts/ci/kubernetes/minikube/_k8s.sh new file mode 100644 index 0000000000000..6ce9d3170ceb6 --- /dev/null +++ b/scripts/ci/kubernetes/minikube/_k8s.sh @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This script was based on one made by @kimoonkim for kubernetes-hdfs + +# Helper bash functions. + +# Wait for Kubernetes resources to be up and ready. +function _wait_for_ready () { + local count="$1" + shift + local evidence="$1" + shift + local attempts=40 + echo "Waiting till ready (count: $count): $@" + while [[ "$count" != $("$@" 2>&1 | tail -n +2 | grep -c $evidence) ]]; + do + if [[ "$attempts" = "1" ]]; then + echo "Last run: $@" + "$@" || true + local command="$@" + command="${command/get/describe}" + $command || true + fi + ((attempts--)) || return 1 + sleep 5 + done + "$@" || true +} + +# Wait for all expected number of nodes to be ready +function k8s_all_nodes_ready () { + local count="$1" + shift + _wait_for_ready "$count" "-v NotReady" kubectl get nodes + _wait_for_ready "$count" Ready kubectl get nodes +} + +function k8s_single_node_ready () { + k8s_all_nodes_ready 1 +} + +# Wait for all expected number of pods to be ready. This works only for +# pods with up to 4 containers, as we check "1/1" to "4/4" in +# `kubectl get pods` output. +function k8s_all_pods_ready () { + local count="$1" + shift + local evidence="-e 1/1 -e 2/2 -e 3/3 -e 4/4" + _wait_for_ready "$count" "$evidence" kubectl get pods "$@" +} + +function k8s_single_pod_ready () { + k8s_all_pods_ready 1 "$@" +} diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh index 7e50c23bcf699..5171a26313d53 100755 --- a/scripts/ci/kubernetes/minikube/start_minikube.sh +++ b/scripts/ci/kubernetes/minikube/start_minikube.sh @@ -19,6 +19,7 @@ #!/usr/bin/env bash + _MY_SCRIPT="${BASH_SOURCE[0]}" _MY_DIR=$(cd "$(dirname "$_MY_SCRIPT")" && pwd) # Avoids 1.7.x because of https://github.com/kubernetes/minikube/issues/2240 @@ -26,7 +27,7 @@ _KUBERNETES_VERSION="${KUBERNETES_VERSION}" echo "setting up kubernetes ${_KUBERNETES_VERSION}" -_MINIKUBE_VERSION="v0.25.2" +_MINIKUBE_VERSION="v0.26.0" _HELM_VERSION=v2.8.1 _VM_DRIVER=none USE_MINIKUBE_DRIVER_NONE=true @@ -45,6 +46,8 @@ export CHANGE_MINIKUBE_NONE_USER=true cd $_MY_DIR +source _k8s.sh + rm -rf tmp mkdir -p bin tmp @@ -104,3 +107,23 @@ _MINIKUBE="sudo PATH=$PATH minikube" $_MINIKUBE config set bootstrapper localkube $_MINIKUBE start --kubernetes-version=${_KUBERNETES_VERSION} --vm-driver=none $_MINIKUBE update-context + +# Wait for Kubernetes to be up and ready. +k8s_single_node_ready + +echo Minikube addons: +$_MINIKUBE addons list +kubectl get storageclass +echo Showing kube-system pods +kubectl get -n kube-system pods + +(k8s_single_pod_ready -n kube-system -l component=kube-addon-manager) || + (_ADDON=$(kubectl get pod -n kube-system -l component=kube-addon-manager + --no-headers -o name| cut -d/ -f2); + echo Addon-manager describe:; + kubectl describe pod -n kube-system $_ADDON; + echo Addon-manager log:; + kubectl logs -n kube-system $_ADDON; + exit 1) +k8s_single_pod_ready -n kube-system -l k8s-app=kube-dns +k8s_single_pod_ready -n kube-system storage-provisioner diff --git a/tests/contrib/minikube/test_kubernetes_executor.py b/tests/contrib/minikube/test_kubernetes_executor.py index 9827bc8e906a3..5c4617f3a4156 100644 --- a/tests/contrib/minikube/test_kubernetes_executor.py +++ b/tests/contrib/minikube/test_kubernetes_executor.py @@ -18,10 +18,11 @@ import unittest from subprocess import check_call, check_output - +import requests.exceptions import requests import time import six +import re try: check_call(["kubectl", "get", "pods"]) @@ -32,65 +33,167 @@ ) +def get_minikube_host(): + host_ip = check_output(['minikube', 'ip']) + if six.PY3: + host_ip = host_ip.decode('UTF-8') + host = '{}:30809'.format(host_ip.strip()) + return host + + class KubernetesExecutorTest(unittest.TestCase): + def _delete_airflow_pod(self): + air_pod = check_output(['kubectl', 'get', 'pods']).decode() + air_pod = air_pod.split('\n') + names = [re.compile('\s+').split(x)[0] for x in air_pod if 'airflow' in x] + if names: + check_call(['kubectl', 'delete', 'pod', names[0]]) + + def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_state, + timeout): + tries = 0 + state = '' + max_tries = max(int(timeout / 5), 1) + # Wait 100 seconds for the operator to complete + while tries < max_tries: + time.sleep(5) - def test_integration_run_dag(self): - host_ip = check_output(['minikube', 'ip']) - if six.PY3: - host_ip = host_ip.decode('UTF-8') - host = '{}:30809'.format(host_ip.strip()) + # Trigger a new dagrun + try: + result = requests.get( + 'http://{host}/api/experimental/dags/{dag_id}/' + 'dag_runs/{execution_date}/tasks/{task_id}' + .format(host=host, + dag_id=dag_id, + execution_date=execution_date, + task_id=task_id) + ) + self.assertEqual(result.status_code, 200, "Could not get the status") + result_json = result.json() + state = result_json['state'] + print("Attempt {}: Current state of operator is {}".format(tries, state)) + + if state == expected_final_state: + break + tries += 1 + except requests.exceptions.ConnectionError as e: + check_call(["echo", "api call failed. trying again. error {}".format(e)]) + pass + + self.assertEqual(state, expected_final_state) + + # Maybe check if we can retrieve the logs, but then we need to extend the API + + def ensure_dag_expected_state(self, host, execution_date, dag_id, + expected_final_state, + timeout): + tries = 0 + state = '' + max_tries = max(int(timeout / 5), 1) + # Wait 100 seconds for the operator to complete + while tries < max_tries: + time.sleep(5) + + # Trigger a new dagrun + result = requests.get( + 'http://{host}/api/experimental/dags/{dag_id}/' + 'dag_runs/{execution_date}' + .format(host=host, + dag_id=dag_id, + execution_date=execution_date) + ) + print(result) + self.assertEqual(result.status_code, 200, "Could not get the status") + result_json = result.json() + print(result_json) + state = result_json['state'] + check_call( + ["echo", "Attempt {}: Current state of dag is {}".format(tries, state)]) + print("Attempt {}: Current state of dag is {}".format(tries, state)) + + if state == expected_final_state: + break + tries += 1 - # Enable the dag + self.assertEqual(state, expected_final_state) + + # Maybe check if we can retrieve the logs, but then we need to extend the API + + def start_dag(self, dag_id, host): result = requests.get( - 'http://{}/api/experimental/' - 'dags/example_python_operator/paused/false'.format(host) + 'http://{host}/api/experimental/' + 'dags/{dag_id}/paused/false'.format(host=host, dag_id=dag_id) ) - self.assertEqual(result.status_code, 200, "Could not enable DAG") + self.assertEqual(result.status_code, 200, "Could not enable DAG: {result}" + .format(result=result.json())) # Trigger a new dagrun result = requests.post( - 'http://{}/api/experimental/' - 'dags/example_python_operator/dag_runs'.format(host), + 'http://{host}/api/experimental/' + 'dags/{dag_id}/dag_runs'.format(host=host, dag_id=dag_id), json={} ) - self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run") + self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run: {result}" + .format(result=result.json())) time.sleep(1) result = requests.get( 'http://{}/api/experimental/latest_runs'.format(host) ) - self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run") + self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run:" + " {result}" + .format(result=result.json())) result_json = result.json() + return result_json + + def test_integration_run_dag(self): + host = get_minikube_host() + + result_json = self.start_dag(dag_id='example_python_operator', host=host) self.assertGreater(len(result_json['items']), 0) execution_date = result_json['items'][0]['execution_date'] print("Found the job with execution date {}".format(execution_date)) - tries = 0 - state = '' # Wait 100 seconds for the operator to complete - while tries < 20: - time.sleep(5) + self.monitor_task(host=host, + execution_date=execution_date, + dag_id='example_python_operator', + task_id='print_the_context', + expected_final_state='success', timeout=100) - # Trigger a new dagrun - result = requests.get( - 'http://{}/api/experimental/dags/example_python_operator/' - 'dag_runs/{}/tasks/print_the_context'.format(host, execution_date) - ) - self.assertEqual(result.status_code, 200, "Could not get the status") - result_json = result.json() - state = result_json['state'] - print("Attempt {}: Current state of operator is {}".format(tries, state)) + self.ensure_dag_expected_state(host=host, + execution_date=execution_date, + dag_id='example_python_operator', + expected_final_state='success', timeout=100) - if state == 'success': - break - tries += 1 + def test_integration_run_dag_with_scheduler_failure(self): + host = get_minikube_host() - self.assertEqual(state, 'success') + result_json = self.start_dag(dag_id='example_python_operator', host=host) - # Maybe check if we can retrieve the logs, but then we need to extend the API + self.assertGreater(len(result_json['items']), 0) + + execution_date = result_json['items'][0]['execution_date'] + print("Found the job with execution date {}".format(execution_date)) + + self._delete_airflow_pod() + + time.sleep(10) # give time for pod to restart + + # Wait 100 seconds for the operator to complete + self.monitor_task(host=host, + execution_date=execution_date, + dag_id='example_python_operator', + task_id='print_the_context', + expected_final_state='success', timeout=100) + + self.ensure_dag_expected_state(host=host, + execution_date=execution_date, + dag_id='example_python_operator', + expected_final_state='success', timeout=100) if __name__ == '__main__': diff --git a/tests/www_rbac/api/experimental/test_endpoints.py b/tests/www_rbac/api/experimental/test_endpoints.py index a19492ee7ec7a..a84d9cfdb44ae 100644 --- a/tests/www_rbac/api/experimental/test_endpoints.py +++ b/tests/www_rbac/api/experimental/test_endpoints.py @@ -204,6 +204,48 @@ def test_task_instance_info(self): self.assertEqual(400, response.status_code) self.assertIn('error', response.data.decode('utf-8')) + def test_dagrun_status(self): + url_template = '/api/experimental/dags/{}/dag_runs/{}' + dag_id = 'example_bash_operator' + execution_date = utcnow().replace(microsecond=0) + datetime_string = quote_plus(execution_date.isoformat()) + wrong_datetime_string = quote_plus( + datetime(1990, 1, 1, 1, 1, 1).isoformat() + ) + + # Create DagRun + trigger_dag(dag_id=dag_id, + run_id='test_task_instance_info_run', + execution_date=execution_date) + + # Test Correct execution + response = self.app.get( + url_template.format(dag_id, datetime_string) + ) + self.assertEqual(200, response.status_code) + self.assertIn('state', response.data.decode('utf-8')) + self.assertNotIn('error', response.data.decode('utf-8')) + + # Test error for nonexistent dag + response = self.app.get( + url_template.format('does_not_exist_dag', datetime_string), + ) + self.assertEqual(404, response.status_code) + self.assertIn('error', response.data.decode('utf-8')) + + # Test error for nonexistent dag run (wrong execution_date) + response = self.app.get( + url_template.format(dag_id, wrong_datetime_string) + ) + self.assertEqual(404, response.status_code) + self.assertIn('error', response.data.decode('utf-8')) + + # Test error for bad datetime format + response = self.app.get( + url_template.format(dag_id, 'not_a_datetime') + ) + self.assertEqual(400, response.status_code) + self.assertIn('error', response.data.decode('utf-8')) class TestPoolApiExperimental(unittest.TestCase):