Skip to content

Commit

Permalink
[AIRFLOW-1899] Fix Kubernetes tests
Browse files Browse the repository at this point in the history
[AIRFLOW-1899] Add full deployment

- Made home directory configurable
- Documentation fix
- Add licenses

[AIRFLOW-1899] Tests for the Kubernetes Executor

Add an integration test for the Kubernetes
executor. Done by
spinning up different versions of kubernetes and
run a DAG
by invoking the REST API

Closes apache#3301 from Fokko/fix-kubernetes-executor
  • Loading branch information
Fokko Driesprong committed May 4, 2018
1 parent d1f7af3 commit 16bae56
Show file tree
Hide file tree
Showing 29 changed files with 795 additions and 473 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,3 @@ rat-results.txt
*.generated
*.tar.gz
scripts/ci/kubernetes/kube/.generated/airflow.yaml

3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ matrix:
env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
- python: "2.7"
env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
allow_failures:
- env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
- env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
cache:
directories:
- $HOME/.wheelhouse/
Expand Down
8 changes: 8 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down Expand Up @@ -502,6 +504,7 @@ hide_sensitive_variable_fields = True

[elasticsearch]
elasticsearch_host =

[kubernetes]
# The repository and tag of the Kubernetes Image for the Worker to Run
worker_container_repository =
Expand Down Expand Up @@ -550,6 +553,11 @@ image_pull_secrets =
# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2
gcp_service_account_keys =

# Use the service account kubernetes gives to pods to connect to kubernetes cluster.
# It’s intended for clients that expect to be running inside a pod running on kubernetes.
# It will raise an exception if called from a process not running in a kubernetes environment.
in_cluster = True

[kubernetes_secrets]
# The scheduler mounts the following secrets into your workers as they are launched by the
# scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the
Expand Down
9 changes: 6 additions & 3 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -442,7 +442,10 @@ def parameterized_config(template):
)
with open(AIRFLOW_CONFIG, 'w') as f:
cfg = parameterized_config(DEFAULT_CONFIG)
f.write(cfg.split(TEMPLATE_START)[-1].strip())
cfg = cfg.split(TEMPLATE_START)[-1].strip()
if six.PY2:
cfg = cfg.encode('utf8')
f.write(cfg)

log.info("Reading the config from %s", AIRFLOW_CONFIG)

Expand Down
80 changes: 30 additions & 50 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import kubernetes
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
from airflow.configuration import conf
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from airflow.contrib.kubernetes.kube_client import get_kube_client
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
Expand Down Expand Up @@ -87,20 +88,6 @@ class KubeConfig:
core_section = 'core'
kubernetes_section = 'kubernetes'

@staticmethod
def safe_get(section, option, default):
try:
return configuration.get(section, option)
except AirflowConfigException:
return default

@staticmethod
def safe_getboolean(section, option, default):
try:
return configuration.getboolean(section, option)
except AirflowConfigException:
return default

def __init__(self):
configuration_dict = configuration.as_dict(display_sensitive=True)
self.core_configuration = configuration_dict['core']
Expand All @@ -114,40 +101,37 @@ def __init__(self):
self.kubernetes_section, 'worker_container_tag')
self.kube_image = '{}:{}'.format(
self.worker_container_repository, self.worker_container_tag)
self.delete_worker_pods = self.safe_getboolean(
self.kubernetes_section, 'delete_worker_pods', True)
self.delete_worker_pods = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods')

self.worker_service_account_name = self.safe_get(
self.kubernetes_section, 'worker_service_account_name', 'default')
self.image_pull_secrets = self.safe_get(
self.kubernetes_section, 'image_pull_secrets', '')
self.worker_service_account_name = conf.get(
self.kubernetes_section, 'worker_service_account_name')
self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets')

# NOTE: `git_repo` and `git_branch` must be specified together as a pair
# The http URL of the git repository to clone from
self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None)
self.git_repo = conf.get(self.kubernetes_section, 'git_repo')
# The branch of the repository to be checked out
self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None)
self.git_branch = conf.get(self.kubernetes_section, 'git_branch')
# Optionally, the directory in the git repository containing the dags
self.git_subpath = self.safe_get(self.kubernetes_section, 'git_subpath', '')
self.git_subpath = conf.get(self.kubernetes_section, 'git_subpath')

# Optionally a user may supply a `git_user` and `git_password` for private
# repositories
self.git_user = self.safe_get(self.kubernetes_section, 'git_user', None)
self.git_password = self.safe_get(self.kubernetes_section, 'git_password', None)
self.git_user = conf.get(self.kubernetes_section, 'git_user')
self.git_password = conf.get(self.kubernetes_section, 'git_password')

# NOTE: The user may optionally use a volume claim to mount a PV containing
# DAGs directly
self.dags_volume_claim = self.safe_get(self.kubernetes_section,
'dags_volume_claim', None)
self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim')

# This prop may optionally be set for PV Claims and is used to write logs
self.logs_volume_claim = self.safe_get(
self.kubernetes_section, 'logs_volume_claim', None)
self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim')

# This prop may optionally be set for PV Claims and is used to locate DAGs
# on a SubPath
self.dags_volume_subpath = self.safe_get(
self.kubernetes_section, 'dags_volume_subpath', None)
self.dags_volume_subpath = conf.get(
self.kubernetes_section, 'dags_volume_subpath')

# This prop may optionally be set for PV Claims and is used to write logs
self.base_log_folder = configuration.get(self.core_section, 'base_log_folder')
Expand All @@ -156,36 +140,32 @@ def __init__(self):
# that if your
# cluster has RBAC enabled, your scheduler may need service account permissions to
# create, watch, get, and delete pods in this namespace.
self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace',
'default')
self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
# interact with cluster components.
self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace',
'default')
self.executor_namespace = conf.get(self.kubernetes_section, 'namespace')
# Task secrets managed by KubernetesExecutor.
self.gcp_service_account_keys = self.safe_get(
self.kubernetes_section, 'gcp_service_account_keys', None)
self.gcp_service_account_keys = conf.get(self.kubernetes_section,
'gcp_service_account_keys')

# If the user is using the git-sync container to clone their repository via git,
# allow them to specify repository, tag, and pod name for the init container.
self.git_sync_container_repository = self.safe_get(
self.kubernetes_section, 'git_sync_container_repository',
'gcr.io/google-containers/git-sync-amd64')
self.git_sync_container_repository = conf.get(
self.kubernetes_section, 'git_sync_container_repository')

self.git_sync_container_tag = self.safe_get(
self.kubernetes_section, 'git_sync_container_tag', 'v2.0.5')
self.git_sync_container_tag = conf.get(
self.kubernetes_section, 'git_sync_container_tag')
self.git_sync_container = '{}:{}'.format(
self.git_sync_container_repository, self.git_sync_container_tag)

self.git_sync_init_container_name = self.safe_get(
self.kubernetes_section, 'git_sync_init_container_name', 'git-sync-clone')
self.git_sync_init_container_name = conf.get(
self.kubernetes_section, 'git_sync_init_container_name')

# The worker pod may optionally have a valid Airflow config loaded via a
# configmap
self.airflow_configmap = self.safe_get(self.kubernetes_section,
'airflow_configmap', None)
self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')

self._validate()

Expand Down Expand Up @@ -272,7 +252,7 @@ def process_status(self, pod_id, status, labels, resource_version):
self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version))
elif status == 'Succeeded':
self.log.info('Event: %s Succeeded', pod_id)
self.watcher_queue.put((pod_id, State.SUCCESS, labels, resource_version))
self.watcher_queue.put((pod_id, None, labels, resource_version))
elif status == 'Running':
self.log.info('Event: %s is Running', pod_id)
else:
Expand Down Expand Up @@ -552,7 +532,8 @@ def start(self):
self.log.debug('Start with worker_uuid: %s', self.worker_uuid)
# always need to reset resource version since we don't know
# when we last started, note for behavior below
# https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod
# https://github.com/kubernetes-client/python/blob/master/kubernetes/docs
# /CoreV1Api.md#list_namespaced_pod
KubeResourceVersion.reset_resource_version(self._session)
self.task_queue = Queue()
self.result_queue = Queue()
Expand Down Expand Up @@ -610,8 +591,7 @@ def _change_state(self, key, state, pod_id):
task_id=task_id,
execution_date=ex_time
).one()

if item.state == State.RUNNING or item.state == State.QUEUED:
if state:
item.state = state
self._session.add(item)
self._session.commit()
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/kubernetes/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.configuration import conf

def _load_kube_config(in_cluster):
from kubernetes import config, client
Expand All @@ -26,6 +26,6 @@ def _load_kube_config(in_cluster):
return client.CoreV1Api()


def get_kube_client(in_cluster=True):
def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster')):
# TODO: This should also allow people to point to a cluster.
return _load_kube_config(in_cluster)
19 changes: 14 additions & 5 deletions airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@

from airflow.contrib.kubernetes.pod import Pod, Resources
from airflow.contrib.kubernetes.secret import Secret
from airflow.utils.log.logging_mixin import LoggingMixin


class WorkerConfiguration:
class WorkerConfiguration(LoggingMixin):
"""Contains Kubernetes Airflow Worker configuration logic"""

def __init__(self, kube_config):
self.kube_config = kube_config
self.worker_airflow_home = self.kube_config.airflow_home
self.worker_airflow_dags = self.kube_config.dags_folder
self.worker_airflow_logs = self.kube_config.base_log_folder
super(WorkerConfiguration, self).__init__()

def _get_init_containers(self, volume_mounts):
"""When using git to retrieve the DAGs, use the GitSync Init Container"""
Expand Down Expand Up @@ -79,7 +84,7 @@ def _get_environment(self):
'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor'
}
if self.kube_config.airflow_configmap:
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
return env

def _get_secrets(self):
Expand Down Expand Up @@ -129,19 +134,19 @@ def _construct_volume(name, claim, subpath=None):
volume_mounts = [{
'name': dags_volume_name,
'mountPath': os.path.join(
self.kube_config.dags_folder,
self.worker_airflow_dags,
self.kube_config.git_subpath
),
'readOnly': True
}, {
'name': logs_volume_name,
'mountPath': self.kube_config.base_log_folder
'mountPath': self.worker_airflow_logs
}]

# Mount the airflow.cfg file via a configmap the user has specified
if self.kube_config.airflow_configmap:
config_volume_name = 'airflow-config'
config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
volumes.append({
'name': config_volume_name,
'configMap': {
Expand Down Expand Up @@ -172,6 +177,10 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
annotations = {
'iam.cloud.google.com/service-account': gcp_sa_key
} if gcp_sa_key else {}
airflow_command = airflow_command.replace("-sd", "-i -sd")
airflow_path = airflow_command.split('-sd')[-1]
airflow_path = self.worker_airflow_home + airflow_path.split('/')[-1]
airflow_command = airflow_command.split('-sd')[0] + '-sd ' + airflow_path

return Pod(
namespace=namespace,
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
27 changes: 25 additions & 2 deletions airflow/www_rbac/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -26,6 +26,8 @@
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils import timezone
from airflow.www_rbac.app import csrf
from airflow import models
from airflow.utils.db import create_session

from flask import g, Blueprint, jsonify, request, url_for

Expand Down Expand Up @@ -112,6 +114,27 @@ def task_info(dag_id, task_id):
return jsonify(fields)


@api_experimental.route('/dags/<string:dag_id>/paused/<string:paused>', methods=['GET'])
@requires_authentication
def dag_paused(dag_id, paused):
"""(Un)pauses a dag"""

DagModel = models.DagModel
with create_session() as session:
orm_dag = (
session.query(DagModel)
.filter(DagModel.dag_id == dag_id).first()
)
if paused == 'true':
orm_dag.is_paused = True
else:
orm_dag.is_paused = False
session.merge(orm_dag)
session.commit()

return jsonify({'response': 'ok'})


@api_experimental.route(
'/dags/<string:dag_id>/dag_runs/<string:execution_date>/tasks/<string:task_id>',
methods=['GET'])
Expand Down
5 changes: 5 additions & 0 deletions scripts/ci/kubernetes/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ RUN apt-get update -y && apt-get install -y \
unzip \
&& apt-get clean


RUN pip install --upgrade pip

# Since we install vanilla Airflow, we also want to have support for Postgres and Kubernetes
RUN pip install -U setuptools && \
pip install kubernetes && \
Expand All @@ -43,6 +46,8 @@ RUN pip install -U setuptools && \
COPY airflow.tar.gz /tmp/airflow.tar.gz
RUN pip install /tmp/airflow.tar.gz

COPY airflow-init.sh /tmp/airflow-init.sh

COPY bootstrap.sh /bootstrap.sh
RUN chmod +x /bootstrap.sh

Expand Down
Loading

0 comments on commit 16bae56

Please sign in to comment.