Skip to content

Commit

Permalink
[AIRFLOW-2424] Add dagrun status endpoint and increased k8s test cove…
Browse files Browse the repository at this point in the history
…rage

[AIRFLOW-2424] Add dagrun status endpoint and
increase k8s test coverage

[AIRFLOW-2424] Added minikube fixes by @kimoonkim

[AIRFLOW-2424] modify endpoint to remove 'status'

Closes apache#3320 from dimberman/add-kubernetes-test
  • Loading branch information
dimberman authored and Fokko Driesprong committed May 10, 2018
1 parent 74027c9 commit 5de22d7
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 39 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ before_script:
- sudo service mysql restart
- psql -c 'create database airflow;' -U postgres
- export PATH=${PATH}:/tmp/hive/bin
# Required for K8s v1.10.x. See
# https://github.com/kubernetes/kubernetes/issues/61058#issuecomment-372764783
- sudo mount --make-shared / && sudo service docker restart
script:
- ./scripts/ci/travis_script.sh
after_success:
Expand Down
44 changes: 44 additions & 0 deletions airflow/api/common/experimental/get_dag_run_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.models import DagBag


def get_dag_run_state(dag_id, execution_date):
"""Return the task object identified by the given dag_id and task_id."""

dagbag = DagBag()

# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)

# Get DagRun object and check that it exists
dagrun = dag.get_dagrun(execution_date=execution_date)
if not dagrun:
error_message = ('Dag Run for date {} not found in dag {}'
.format(execution_date, dag_id))
raise AirflowException(error_message)

return {'state': dagrun.get_state()}
1 change: 0 additions & 1 deletion airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
annotations = {
'iam.cloud.google.com/service-account': gcp_sa_key
} if gcp_sa_key else {}
airflow_command = airflow_command.replace("-sd", "-i -sd")
airflow_path = airflow_command.split('-sd')[-1]
airflow_path = self.worker_airflow_home + airflow_path.split('/')[-1]
airflow_command = airflow_command.split('-sd')[0] + '-sd ' + airflow_path
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
37 changes: 37 additions & 0 deletions airflow/www_rbac/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from airflow.api.common.experimental import trigger_dag as trigger
from airflow.api.common.experimental.get_task import get_task
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils import timezone
Expand Down Expand Up @@ -176,6 +177,42 @@ def task_instance_info(dag_id, execution_date, task_id):
return jsonify(fields)


@api_experimental.route(
'/dags/<string:dag_id>/dag_runs/<string:execution_date>',
methods=['GET'])
@requires_authentication
def dag_run_status(dag_id, execution_date):
"""
Returns a JSON with a dag_run's public instance variables.
The format for the exec_date is expected to be
"YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will
of course need to have been encoded for URL in the request.
"""

# Convert string datetime into actual datetime
try:
execution_date = timezone.parse(execution_date)
except ValueError:
error_message = (
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
_log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400

return response

try:
info = get_dag_run_state(dag_id, execution_date)
except AirflowException as err:
_log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = 404
return response

return jsonify(info)

@api_experimental.route('/latest_runs', methods=['GET'])
@requires_authentication
def latest_dag_runs():
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/kubernetes/docker/airflow-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ cd /usr/local/lib/python2.7/dist-packages/airflow && \
cp -R example_dags/* /root/airflow/dags/ && \
airflow initdb && \
alembic upgrade heads && \
airflow create_user -u airflow -l airflow -f jon -e [email protected] -r Admin -p airflow
(airflow create_user -u airflow -l airflow -f jon -e [email protected] -r Admin -p airflow || true)
4 changes: 2 additions & 2 deletions scripts/ci/kubernetes/kube/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ kubectl delete -f $DIRNAME/postgres.yaml
kubectl delete -f $DIRNAME/airflow.yaml
kubectl delete -f $DIRNAME/secrets.yaml

kubectl apply -f $DIRNAME/postgres.yaml
kubectl apply -f $DIRNAME/volumes.yaml
kubectl apply -f $DIRNAME/secrets.yaml
kubectl apply -f $DIRNAME/configmaps.yaml
kubectl apply -f $DIRNAME/postgres.yaml
kubectl apply -f $DIRNAME/volumes.yaml
kubectl apply -f $DIRNAME/airflow.yaml

# wait for up to 10 minutes for everything to be deployed
Expand Down
69 changes: 69 additions & 0 deletions scripts/ci/kubernetes/minikube/_k8s.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# This script was based on one made by @kimoonkim for kubernetes-hdfs

# Helper bash functions.

# Wait for Kubernetes resources to be up and ready.
function _wait_for_ready () {
local count="$1"
shift
local evidence="$1"
shift
local attempts=40
echo "Waiting till ready (count: $count): $@"
while [[ "$count" != $("$@" 2>&1 | tail -n +2 | grep -c $evidence) ]];
do
if [[ "$attempts" = "1" ]]; then
echo "Last run: $@"
"$@" || true
local command="$@"
command="${command/get/describe}"
$command || true
fi
((attempts--)) || return 1
sleep 5
done
"$@" || true
}

# Wait for all expected number of nodes to be ready
function k8s_all_nodes_ready () {
local count="$1"
shift
_wait_for_ready "$count" "-v NotReady" kubectl get nodes
_wait_for_ready "$count" Ready kubectl get nodes
}

function k8s_single_node_ready () {
k8s_all_nodes_ready 1
}

# Wait for all expected number of pods to be ready. This works only for
# pods with up to 4 containers, as we check "1/1" to "4/4" in
# `kubectl get pods` output.
function k8s_all_pods_ready () {
local count="$1"
shift
local evidence="-e 1/1 -e 2/2 -e 3/3 -e 4/4"
_wait_for_ready "$count" "$evidence" kubectl get pods "$@"
}

function k8s_single_pod_ready () {
k8s_all_pods_ready 1 "$@"
}
25 changes: 24 additions & 1 deletion scripts/ci/kubernetes/minikube/start_minikube.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

#!/usr/bin/env bash


_MY_SCRIPT="${BASH_SOURCE[0]}"
_MY_DIR=$(cd "$(dirname "$_MY_SCRIPT")" && pwd)
# Avoids 1.7.x because of https://github.com/kubernetes/minikube/issues/2240
_KUBERNETES_VERSION="${KUBERNETES_VERSION}"

echo "setting up kubernetes ${_KUBERNETES_VERSION}"

_MINIKUBE_VERSION="v0.25.2"
_MINIKUBE_VERSION="v0.26.0"
_HELM_VERSION=v2.8.1
_VM_DRIVER=none
USE_MINIKUBE_DRIVER_NONE=true
Expand All @@ -45,6 +46,8 @@ export CHANGE_MINIKUBE_NONE_USER=true

cd $_MY_DIR

source _k8s.sh

rm -rf tmp
mkdir -p bin tmp

Expand Down Expand Up @@ -104,3 +107,23 @@ _MINIKUBE="sudo PATH=$PATH minikube"
$_MINIKUBE config set bootstrapper localkube
$_MINIKUBE start --kubernetes-version=${_KUBERNETES_VERSION} --vm-driver=none
$_MINIKUBE update-context

# Wait for Kubernetes to be up and ready.
k8s_single_node_ready

echo Minikube addons:
$_MINIKUBE addons list
kubectl get storageclass
echo Showing kube-system pods
kubectl get -n kube-system pods

(k8s_single_pod_ready -n kube-system -l component=kube-addon-manager) ||
(_ADDON=$(kubectl get pod -n kube-system -l component=kube-addon-manager
--no-headers -o name| cut -d/ -f2);
echo Addon-manager describe:;
kubectl describe pod -n kube-system $_ADDON;
echo Addon-manager log:;
kubectl logs -n kube-system $_ADDON;
exit 1)
k8s_single_pod_ready -n kube-system -l k8s-app=kube-dns
k8s_single_pod_ready -n kube-system storage-provisioner
Loading

0 comments on commit 5de22d7

Please sign in to comment.