diff --git a/.gitignore b/.gitignore index 04c408f85c70f..a42962f7d8283 100644 --- a/.gitignore +++ b/.gitignore @@ -137,4 +137,3 @@ rat-results.txt *.generated *.tar.gz scripts/ci/kubernetes/kube/.generated/airflow.yaml - diff --git a/.travis.yml b/.travis.yml index 6d29a7aed4a7e..77033df486ced 100644 --- a/.travis.yml +++ b/.travis.yml @@ -80,9 +80,6 @@ matrix: env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 - python: "2.7" env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0 - allow_failures: - - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 - - env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0 cache: directories: - $HOME/.wheelhouse/ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index fa5eea081069a..6da4287535371 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -502,6 +504,7 @@ hide_sensitive_variable_fields = True [elasticsearch] elasticsearch_host = + [kubernetes] # The repository and tag of the Kubernetes Image for the Worker to Run worker_container_repository = @@ -550,6 +553,11 @@ image_pull_secrets = # Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2 gcp_service_account_keys = +# Use the service account kubernetes gives to pods to connect to kubernetes cluster. +# It’s intended for clients that expect to be running inside a pod running on kubernetes. +# It will raise an exception if called from a process not running in a kubernetes environment. +in_cluster = True + [kubernetes_secrets] # The scheduler mounts the following secrets into your workers as they are launched by the # scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the diff --git a/airflow/configuration.py b/airflow/configuration.py index 130356ce547e9..20ef0674be54a 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -442,7 +442,10 @@ def parameterized_config(template): ) with open(AIRFLOW_CONFIG, 'w') as f: cfg = parameterized_config(DEFAULT_CONFIG) - f.write(cfg.split(TEMPLATE_START)[-1].strip()) + cfg = cfg.split(TEMPLATE_START)[-1].strip() + if six.PY2: + cfg = cfg.encode('utf8') + f.write(cfg) log.info("Reading the config from %s", AIRFLOW_CONFIG) diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index cdce95f6da8e7..17b290842957e 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -23,6 +23,7 @@ import kubernetes from kubernetes import watch, client from kubernetes.client.rest import ApiException +from airflow.configuration import conf from airflow.contrib.kubernetes.pod_launcher import PodLauncher from airflow.contrib.kubernetes.kube_client import get_kube_client from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration @@ -87,20 +88,6 @@ class KubeConfig: core_section = 'core' kubernetes_section = 'kubernetes' - @staticmethod - def safe_get(section, option, default): - try: - return configuration.get(section, option) - except AirflowConfigException: - return default - - @staticmethod - def safe_getboolean(section, option, default): - try: - return configuration.getboolean(section, option) - except AirflowConfigException: - return default - def __init__(self): configuration_dict = configuration.as_dict(display_sensitive=True) self.core_configuration = configuration_dict['core'] @@ -114,40 +101,37 @@ def __init__(self): self.kubernetes_section, 'worker_container_tag') self.kube_image = '{}:{}'.format( self.worker_container_repository, self.worker_container_tag) - self.delete_worker_pods = self.safe_getboolean( - self.kubernetes_section, 'delete_worker_pods', True) + self.delete_worker_pods = conf.getboolean( + self.kubernetes_section, 'delete_worker_pods') - self.worker_service_account_name = self.safe_get( - self.kubernetes_section, 'worker_service_account_name', 'default') - self.image_pull_secrets = self.safe_get( - self.kubernetes_section, 'image_pull_secrets', '') + self.worker_service_account_name = conf.get( + self.kubernetes_section, 'worker_service_account_name') + self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets') # NOTE: `git_repo` and `git_branch` must be specified together as a pair # The http URL of the git repository to clone from - self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None) + self.git_repo = conf.get(self.kubernetes_section, 'git_repo') # The branch of the repository to be checked out - self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None) + self.git_branch = conf.get(self.kubernetes_section, 'git_branch') # Optionally, the directory in the git repository containing the dags - self.git_subpath = self.safe_get(self.kubernetes_section, 'git_subpath', '') + self.git_subpath = conf.get(self.kubernetes_section, 'git_subpath') # Optionally a user may supply a `git_user` and `git_password` for private # repositories - self.git_user = self.safe_get(self.kubernetes_section, 'git_user', None) - self.git_password = self.safe_get(self.kubernetes_section, 'git_password', None) + self.git_user = conf.get(self.kubernetes_section, 'git_user') + self.git_password = conf.get(self.kubernetes_section, 'git_password') # NOTE: The user may optionally use a volume claim to mount a PV containing # DAGs directly - self.dags_volume_claim = self.safe_get(self.kubernetes_section, - 'dags_volume_claim', None) + self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim') # This prop may optionally be set for PV Claims and is used to write logs - self.logs_volume_claim = self.safe_get( - self.kubernetes_section, 'logs_volume_claim', None) + self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim') # This prop may optionally be set for PV Claims and is used to locate DAGs # on a SubPath - self.dags_volume_subpath = self.safe_get( - self.kubernetes_section, 'dags_volume_subpath', None) + self.dags_volume_subpath = conf.get( + self.kubernetes_section, 'dags_volume_subpath') # This prop may optionally be set for PV Claims and is used to write logs self.base_log_folder = configuration.get(self.core_section, 'base_log_folder') @@ -156,36 +140,32 @@ def __init__(self): # that if your # cluster has RBAC enabled, your scheduler may need service account permissions to # create, watch, get, and delete pods in this namespace. - self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace', - 'default') + self.kube_namespace = conf.get(self.kubernetes_section, 'namespace') # The Kubernetes Namespace in which pods will be created by the executor. Note # that if your # cluster has RBAC enabled, your workers may need service account permissions to # interact with cluster components. - self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace', - 'default') + self.executor_namespace = conf.get(self.kubernetes_section, 'namespace') # Task secrets managed by KubernetesExecutor. - self.gcp_service_account_keys = self.safe_get( - self.kubernetes_section, 'gcp_service_account_keys', None) + self.gcp_service_account_keys = conf.get(self.kubernetes_section, + 'gcp_service_account_keys') # If the user is using the git-sync container to clone their repository via git, # allow them to specify repository, tag, and pod name for the init container. - self.git_sync_container_repository = self.safe_get( - self.kubernetes_section, 'git_sync_container_repository', - 'gcr.io/google-containers/git-sync-amd64') + self.git_sync_container_repository = conf.get( + self.kubernetes_section, 'git_sync_container_repository') - self.git_sync_container_tag = self.safe_get( - self.kubernetes_section, 'git_sync_container_tag', 'v2.0.5') + self.git_sync_container_tag = conf.get( + self.kubernetes_section, 'git_sync_container_tag') self.git_sync_container = '{}:{}'.format( self.git_sync_container_repository, self.git_sync_container_tag) - self.git_sync_init_container_name = self.safe_get( - self.kubernetes_section, 'git_sync_init_container_name', 'git-sync-clone') + self.git_sync_init_container_name = conf.get( + self.kubernetes_section, 'git_sync_init_container_name') # The worker pod may optionally have a valid Airflow config loaded via a # configmap - self.airflow_configmap = self.safe_get(self.kubernetes_section, - 'airflow_configmap', None) + self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap') self._validate() @@ -272,7 +252,7 @@ def process_status(self, pod_id, status, labels, resource_version): self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version)) elif status == 'Succeeded': self.log.info('Event: %s Succeeded', pod_id) - self.watcher_queue.put((pod_id, State.SUCCESS, labels, resource_version)) + self.watcher_queue.put((pod_id, None, labels, resource_version)) elif status == 'Running': self.log.info('Event: %s is Running', pod_id) else: @@ -552,7 +532,8 @@ def start(self): self.log.debug('Start with worker_uuid: %s', self.worker_uuid) # always need to reset resource version since we don't know # when we last started, note for behavior below - # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod + # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs + # /CoreV1Api.md#list_namespaced_pod KubeResourceVersion.reset_resource_version(self._session) self.task_queue = Queue() self.result_queue = Queue() @@ -610,8 +591,7 @@ def _change_state(self, key, state, pod_id): task_id=task_id, execution_date=ex_time ).one() - - if item.state == State.RUNNING or item.state == State.QUEUED: + if state: item.state = state self._session.add(item) self._session.commit() diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index d1a63a2853943..1d3cc9ddfe3c5 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +from airflow.configuration import conf def _load_kube_config(in_cluster): from kubernetes import config, client @@ -26,6 +26,6 @@ def _load_kube_config(in_cluster): return client.CoreV1Api() -def get_kube_client(in_cluster=True): +def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster')): # TODO: This should also allow people to point to a cluster. return _load_kube_config(in_cluster) diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index cd2cb9f11cce0..ac4dacf1b691e 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -21,13 +21,18 @@ from airflow.contrib.kubernetes.pod import Pod, Resources from airflow.contrib.kubernetes.secret import Secret +from airflow.utils.log.logging_mixin import LoggingMixin -class WorkerConfiguration: +class WorkerConfiguration(LoggingMixin): """Contains Kubernetes Airflow Worker configuration logic""" def __init__(self, kube_config): self.kube_config = kube_config + self.worker_airflow_home = self.kube_config.airflow_home + self.worker_airflow_dags = self.kube_config.dags_folder + self.worker_airflow_logs = self.kube_config.base_log_folder + super(WorkerConfiguration, self).__init__() def _get_init_containers(self, volume_mounts): """When using git to retrieve the DAGs, use the GitSync Init Container""" @@ -79,7 +84,7 @@ def _get_environment(self): 'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor' } if self.kube_config.airflow_configmap: - env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home + env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home return env def _get_secrets(self): @@ -129,19 +134,19 @@ def _construct_volume(name, claim, subpath=None): volume_mounts = [{ 'name': dags_volume_name, 'mountPath': os.path.join( - self.kube_config.dags_folder, + self.worker_airflow_dags, self.kube_config.git_subpath ), 'readOnly': True }, { 'name': logs_volume_name, - 'mountPath': self.kube_config.base_log_folder + 'mountPath': self.worker_airflow_logs }] # Mount the airflow.cfg file via a configmap the user has specified if self.kube_config.airflow_configmap: config_volume_name = 'airflow-config' - config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home) + config_path = '{}/airflow.cfg'.format(self.worker_airflow_home) volumes.append({ 'name': config_volume_name, 'configMap': { @@ -172,6 +177,10 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da annotations = { 'iam.cloud.google.com/service-account': gcp_sa_key } if gcp_sa_key else {} + airflow_command = airflow_command.replace("-sd", "-i -sd") + airflow_path = airflow_command.split('-sd')[-1] + airflow_path = self.worker_airflow_home + airflow_path.split('/')[-1] + airflow_command = airflow_command.split('-sd')[0] + '-sd ' + airflow_path return Pod( namespace=namespace, diff --git a/airflow/jobs.py b/airflow/jobs.py index ecbfef899f52d..e5a07b76d9814 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py index 5bc052984e817..bddf0c18b4e75 100644 --- a/airflow/www_rbac/api/experimental/endpoints.py +++ b/airflow/www_rbac/api/experimental/endpoints.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils import timezone from airflow.www_rbac.app import csrf +from airflow import models +from airflow.utils.db import create_session from flask import g, Blueprint, jsonify, request, url_for @@ -112,6 +114,27 @@ def task_info(dag_id, task_id): return jsonify(fields) +@api_experimental.route('/dags//paused/', methods=['GET']) +@requires_authentication +def dag_paused(dag_id, paused): + """(Un)pauses a dag""" + + DagModel = models.DagModel + with create_session() as session: + orm_dag = ( + session.query(DagModel) + .filter(DagModel.dag_id == dag_id).first() + ) + if paused == 'true': + orm_dag.is_paused = True + else: + orm_dag.is_paused = False + session.merge(orm_dag) + session.commit() + + return jsonify({'response': 'ok'}) + + @api_experimental.route( '/dags//dag_runs//tasks/', methods=['GET']) diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile index 967b6980cba19..6d2c62da8b1bc 100644 --- a/scripts/ci/kubernetes/docker/Dockerfile +++ b/scripts/ci/kubernetes/docker/Dockerfile @@ -33,6 +33,9 @@ RUN apt-get update -y && apt-get install -y \ unzip \ && apt-get clean + +RUN pip install --upgrade pip + # Since we install vanilla Airflow, we also want to have support for Postgres and Kubernetes RUN pip install -U setuptools && \ pip install kubernetes && \ @@ -43,6 +46,8 @@ RUN pip install -U setuptools && \ COPY airflow.tar.gz /tmp/airflow.tar.gz RUN pip install /tmp/airflow.tar.gz +COPY airflow-init.sh /tmp/airflow-init.sh + COPY bootstrap.sh /bootstrap.sh RUN chmod +x /bootstrap.sh diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh new file mode 100755 index 0000000000000..dc33625653979 --- /dev/null +++ b/scripts/ci/kubernetes/docker/airflow-init.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. + +cd /usr/local/lib/python2.7/dist-packages/airflow && \ +cp -R example_dags/* /root/airflow/dags/ && \ +airflow initdb && \ +alembic upgrade heads && \ +airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow diff --git a/scripts/ci/kubernetes/docker/build.sh b/scripts/ci/kubernetes/docker/build.sh index 6f14c4d766bfa..b93c6b1715ddb 100755 --- a/scripts/ci/kubernetes/docker/build.sh +++ b/scripts/ci/kubernetes/docker/build.sh @@ -27,7 +27,12 @@ if [ $? -eq 0 ]; then eval $ENVCONFIG fi -cd $AIRFLOW_ROOT && python setup.py sdist && cp $AIRFLOW_ROOT/dist/*.tar.gz $DIRNAME/airflow.tar.gz && \ -cd $DIRNAME && \ -docker build --pull $DIRNAME --tag=${IMAGE}:${TAG} && \ +echo "Airflow directory $AIRFLOW_ROOT" +echo "Airflow Docker directory $DIRNAME" + +cd $AIRFLOW_ROOT +python setup.py sdist -q +echo "Copy distro $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz" +cp $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz +cd $DIRNAME && docker build --pull $DIRNAME --tag=${IMAGE}:${TAG} rm $DIRNAME/airflow.tar.gz diff --git a/scripts/ci/kubernetes/kube/airflow.yaml b/scripts/ci/kubernetes/kube/airflow.yaml index 77566aed95fe4..09bbcd8c4b404 100644 --- a/scripts/ci/kubernetes/kube/airflow.yaml +++ b/scripts/ci/kubernetes/kube/airflow.yaml @@ -61,7 +61,7 @@ spec: - "bash" args: - "-cx" - - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade heads" + - "./tmp/airflow-init.sh" containers: - name: webserver image: airflow @@ -88,20 +88,20 @@ spec: mountPath: /root/airflow/dags - name: airflow-logs mountPath: /root/airflow/logs - readinessProbe: - initialDelaySeconds: 5 - timeoutSeconds: 5 - periodSeconds: 5 - httpGet: - path: /admin - port: 8080 - livenessProbe: - initialDelaySeconds: 5 - timeoutSeconds: 5 - failureThreshold: 5 - httpGet: - path: /admin - port: 8080 +# readinessProbe: +# initialDelaySeconds: 5 +# timeoutSeconds: 5 +# periodSeconds: 5 +# httpGet: +# path: /login +# port: 8080 +# livenessProbe: +# initialDelaySeconds: 5 +# timeoutSeconds: 5 +# failureThreshold: 5 +# httpGet: +# path: /login +# port: 8080 - name: scheduler image: airflow imagePullPolicy: IfNotPresent @@ -146,76 +146,4 @@ spec: nodePort: 30809 selector: name: airflow ---- -apiVersion: v1 -kind: Secret -metadata: - name: airflow-secrets -type: Opaque -data: - # The sql_alchemy_conn value is a base64 encoded represenation of this connection string: - # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow - sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo= ---- -apiVersion: v1 -kind: ConfigMap -metadata: - name: airflow-configmap -data: - airflow.cfg: | - [core] - airflow_home = /root/airflow - dags_folder = /root/airflow/dags - base_log_folder = /root/airflow/logs - logging_level = INFO - executor = KubernetesExecutor - parallelism = 32 - plugins_folder = /root/airflow/plugins - sql_alchemy_conn = $SQL_ALCHEMY_CONN - - [scheduler] - dag_dir_list_interval = 300 - child_process_log_directory = /root/airflow/logs/scheduler - # Task instances listen for external kill signal (when you clear tasks - # from the CLI or the UI), this defines the frequency at which they should - # listen (in seconds). - job_heartbeat_sec = 5 - max_threads = 16 - - # The scheduler constantly tries to trigger new tasks (look at the - # scheduler section in the docs for more information). This defines - # how often the scheduler should run (in seconds). - scheduler_heartbeat_sec = 5 - - # after how much time should the scheduler terminate in seconds - # -1 indicates to run continuously (see also num_runs) - run_duration = -1 - - # after how much time a new DAGs should be picked up from the filesystem - min_file_process_interval = 0 - - statsd_on = False - statsd_host = localhost - statsd_port = 8125 - statsd_prefix = airflow - - # How many seconds to wait between file-parsing loops to prevent the logs from being spammed. - min_file_parsing_loop_time = 1 - - print_stats_interval = 30 - scheduler_zombie_task_threshold = 300 - max_tis_per_query = 0 - authenticate = False - - [kubernetes] - airflow_configmap = airflow-configmap - worker_container_repository = airflow - worker_container_tag = latest - delete_worker_pods = True - git_repo = https://github.com/grantnicholas/testdags.git - git_branch = master - dags_volume_claim = airflow-dags - logs_volume_claim = airflow-logs - [kubernetes_secrets] - SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml new file mode 100644 index 0000000000000..ddba09883ae2d --- /dev/null +++ b/scripts/ci/kubernetes/kube/configmaps.yaml @@ -0,0 +1,359 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * +apiVersion: v1 +kind: ConfigMap +metadata: + name: airflow-configmap +data: + airflow.cfg: | + [core] + airflow_home = /root/airflow + dags_folder = /root/airflow/dags + base_log_folder = /root/airflow/logs + logging_level = INFO + executor = KubernetesExecutor + parallelism = 32 + load_examples = True + plugins_folder = /root/airflow/plugins + sql_alchemy_conn = $SQL_ALCHEMY_CONN + + [scheduler] + dag_dir_list_interval = 300 + child_process_log_directory = /root/airflow/logs/scheduler + # Task instances listen for external kill signal (when you clear tasks + # from the CLI or the UI), this defines the frequency at which they should + # listen (in seconds). + job_heartbeat_sec = 5 + max_threads = 2 + + # The scheduler constantly tries to trigger new tasks (look at the + # scheduler section in the docs for more information). This defines + # how often the scheduler should run (in seconds). + scheduler_heartbeat_sec = 5 + + # after how much time should the scheduler terminate in seconds + # -1 indicates to run continuously (see also num_runs) + run_duration = -1 + + # after how much time a new DAGs should be picked up from the filesystem + min_file_process_interval = 0 + + statsd_on = False + statsd_host = localhost + statsd_port = 8125 + statsd_prefix = airflow + + # How many seconds to wait between file-parsing loops to prevent the logs from being spammed. + min_file_parsing_loop_time = 1 + + print_stats_interval = 30 + scheduler_zombie_task_threshold = 300 + max_tis_per_query = 0 + authenticate = False + + # Turn off scheduler catchup by setting this to False. + # Default behavior is unchanged and + # Command Line Backfills still work, but the scheduler + # will not do scheduler catchup if this is False, + # however it can be set on a per DAG basis in the + # DAG definition (catchup) + catchup_by_default = True + + [webserver] + # The base url of your website as airflow cannot guess what domain or + # cname you are using. This is used in automated emails that + # airflow sends to point links to the right web server + base_url = http://localhost:8080 + + # The ip specified when starting the web server + web_server_host = 0.0.0.0 + + # The port on which to run the web server + web_server_port = 8080 + + # Paths to the SSL certificate and key for the web server. When both are + # provided SSL will be enabled. This does not change the web server port. + web_server_ssl_cert = + web_server_ssl_key = + + # Number of seconds the webserver waits before killing gunicorn master that doesn't respond + web_server_master_timeout = 120 + + # Number of seconds the gunicorn webserver waits before timing out on a worker + web_server_worker_timeout = 120 + + # Number of workers to refresh at a time. When set to 0, worker refresh is + # disabled. When nonzero, airflow periodically refreshes webserver workers by + # bringing up new ones and killing old ones. + worker_refresh_batch_size = 1 + + # Number of seconds to wait before refreshing a batch of workers. + worker_refresh_interval = 30 + + # Secret key used to run your flask app + secret_key = temporary_key + + # Number of workers to run the Gunicorn web server + workers = 4 + + # The worker class gunicorn should use. Choices include + # sync (default), eventlet, gevent + worker_class = sync + + # Log files for the gunicorn webserver. '-' means log to stderr. + access_logfile = - + error_logfile = - + + # Expose the configuration file in the web server + expose_config = False + + # Set to true to turn on authentication: + # https://airflow.incubator.apache.org/security.html#web-authentication + authenticate = False + + # Filter the list of dags by owner name (requires authentication to be enabled) + filter_by_owner = False + + # Filtering mode. Choices include user (default) and ldapgroup. + # Ldap group filtering requires using the ldap backend + # + # Note that the ldap server needs the "memberOf" overlay to be set up + # in order to user the ldapgroup mode. + owner_mode = user + + # Default DAG view. Valid values are: + # tree, graph, duration, gantt, landing_times + dag_default_view = tree + + # Default DAG orientation. Valid values are: + # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top) + dag_orientation = LR + + # Puts the webserver in demonstration mode; blurs the names of Operators for + # privacy. + demo_mode = False + + # The amount of time (in secs) webserver will wait for initial handshake + # while fetching logs from other worker machine + log_fetch_timeout_sec = 5 + + # By default, the webserver shows paused DAGs. Flip this to hide paused + # DAGs by default + hide_paused_dags_by_default = False + + # Consistent page size across all listing views in the UI + page_size = 100 + + # Use FAB-based webserver with RBAC feature + rbac = True + + [smtp] + # If you want airflow to send emails on retries, failure, and you want to use + # the airflow.utils.email.send_email_smtp function, you have to configure an + # smtp server here + smtp_host = localhost + smtp_starttls = True + smtp_ssl = False + # Uncomment and set the user/pass settings if you want to use SMTP AUTH + # smtp_user = airflow + # smtp_password = airflow + smtp_port = 25 + smtp_mail_from = airflow@example.com + + [kubernetes] + airflow_configmap = airflow-configmap + worker_container_repository = airflow + worker_container_tag = latest + delete_worker_pods = True + git_repo = https://github.com/apache/incubator-airflow.git + git_branch = master + git_subpath = airflow/example_dags/ + git_user = + git_password = + dags_volume_claim = airflow-dags + logs_volume_claim = airflow-logs + in_cluster = True + namespace = default + gcp_service_account_keys = + + # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync + git_sync_container_repository = gcr.io/google-containers/git-sync-amd64 + git_sync_container_tag = v2.0.5 + git_sync_init_container_name = git-sync-clone + + [kubernetes_secrets] + SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn + + [hive] + # Default mapreduce queue for HiveOperator tasks + default_hive_mapred_queue = + + [celery] + # This section only applies if you are using the CeleryExecutor in + # [core] section above + + # The app name that will be used by celery + celery_app_name = airflow.executors.celery_executor + + # The concurrency that will be used when starting workers with the + # "airflow worker" command. This defines the number of task instances that + # a worker will take, so size up your workers based on the resources on + # your worker box and the nature of your tasks + worker_concurrency = 16 + + # When you start an airflow worker, airflow starts a tiny web server + # subprocess to serve the workers local log files to the airflow main + # web server, who then builds pages and sends them to users. This defines + # the port on which the logs are served. It needs to be unused, and open + # visible from the main web server to connect into the workers. + worker_log_server_port = 8793 + + # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally + # a sqlalchemy database. Refer to the Celery documentation for more + # information. + # http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings + broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow + + # The Celery result_backend. When a job finishes, it needs to update the + # metadata of the job. Therefore it will post a message on a message bus, + # or insert it into a database (depending of the backend) + # This status is used by the scheduler to update the state of the task + # The use of a database is highly recommended + # http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings + result_backend = db+mysql://airflow:airflow@localhost:3306/airflow + + # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start + # it `airflow flower`. This defines the IP that Celery Flower runs on + flower_host = 0.0.0.0 + + # The root URL for Flower + # Ex: flower_url_prefix = /flower + flower_url_prefix = + + # This defines the port that Celery Flower runs on + flower_port = 5555 + + # Default queue that tasks get assigned to and that worker listen on. + default_queue = default + + # Import path for celery configuration options + celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG + + [celery_broker_transport_options] + # The visibility timeout defines the number of seconds to wait for the worker + # to acknowledge the task before the message is redelivered to another worker. + # Make sure to increase the visibility timeout to match the time of the longest + # ETA you're planning to use. Especially important in case of using Redis or SQS + visibility_timeout = 21600 + + # In case of using SSL + ssl_active = False + ssl_key = + ssl_cert = + ssl_cacert = + + [dask] + # This section only applies if you are using the DaskExecutor in + # [core] section above + + # The IP address and port of the Dask cluster's scheduler. + cluster_address = 127.0.0.1:8786 + # TLS/ SSL settings to access a secured Dask scheduler. + tls_ca = + tls_cert = + tls_key = + + [ldap] + # set this to ldaps://: + uri = + user_filter = objectClass=* + user_name_attr = uid + group_member_attr = memberOf + superuser_filter = + data_profiler_filter = + bind_user = cn=Manager,dc=example,dc=com + bind_password = insecure + basedn = dc=example,dc=com + cacert = /etc/ca/ldap_ca.crt + search_scope = LEVEL + + [mesos] + # Mesos master address which MesosExecutor will connect to. + master = localhost:5050 + + # The framework name which Airflow scheduler will register itself as on mesos + framework_name = Airflow + + # Number of cpu cores required for running one task instance using + # 'airflow run --local -p ' + # command on a mesos slave + task_cpu = 1 + + # Memory in MB required for running one task instance using + # 'airflow run --local -p ' + # command on a mesos slave + task_memory = 256 + + # Enable framework checkpointing for mesos + # See http://mesos.apache.org/documentation/latest/slave-recovery/ + checkpoint = False + + # Failover timeout in milliseconds. + # When checkpointing is enabled and this option is set, Mesos waits + # until the configured timeout for + # the MesosExecutor framework to re-register after a failover. Mesos + # shuts down running tasks if the + # MesosExecutor framework fails to re-register within this timeframe. + # failover_timeout = 604800 + + # Enable framework authentication for mesos + # See http://mesos.apache.org/documentation/latest/configuration/ + authenticate = False + + # Mesos credentials, if authentication is enabled + # default_principal = admin + # default_secret = admin + + # Optional Docker Image to run on slave before running the command + # This image should be accessible from mesos slave i.e mesos slave + # should be able to pull this docker image before executing the command. + # docker_image_slave = puckel/docker-airflow + + [kerberos] + ccache = /tmp/airflow_krb5_ccache + # gets augmented with fqdn + principal = airflow + reinit_frequency = 3600 + kinit_path = kinit + keytab = airflow.keytab + + [cli] + api_client = airflow.api.client.json_client + endpoint_url = http://localhost:8080 + + [api] + auth_backend = airflow.api.auth.backend.default + + [github_enterprise] + api_rev = v3 + + [admin] + # UI to hide sensitive variable fields when set to True + hide_sensitive_variable_fields = True + + [elasticsearch] + elasticsearch_host = diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh index a5adcf8a1d113..e585d87e73512 100755 --- a/scripts/ci/kubernetes/kube/deploy.sh +++ b/scripts/ci/kubernetes/kube/deploy.sh @@ -21,8 +21,14 @@ IMAGE=${1:-airflow/ci} TAG=${2:-latest} DIRNAME=$(cd "$(dirname "$0")"; pwd) +kubectl delete -f $DIRNAME/postgres.yaml +kubectl delete -f $DIRNAME/airflow.yaml +kubectl delete -f $DIRNAME/secrets.yaml + kubectl apply -f $DIRNAME/postgres.yaml kubectl apply -f $DIRNAME/volumes.yaml +kubectl apply -f $DIRNAME/secrets.yaml +kubectl apply -f $DIRNAME/configmaps.yaml kubectl apply -f $DIRNAME/airflow.yaml # wait for up to 10 minutes for everything to be deployed diff --git a/scripts/ci/kubernetes/kube/postgres.yaml b/scripts/ci/kubernetes/kube/postgres.yaml index 67a06359b520e..1130921ee9d1b 100644 --- a/scripts/ci/kubernetes/kube/postgres.yaml +++ b/scripts/ci/kubernetes/kube/postgres.yaml @@ -30,6 +30,7 @@ spec: containers: - name: postgres image: postgres + imagePullPolicy: IfNotPresent ports: - containerPort: 5432 protocol: TCP diff --git a/scripts/ci/kubernetes/kube/secrets.yaml b/scripts/ci/kubernetes/kube/secrets.yaml new file mode 100644 index 0000000000000..4d533b36636e6 --- /dev/null +++ b/scripts/ci/kubernetes/kube/secrets.yaml @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * +apiVersion: v1 +kind: Secret +metadata: + name: airflow-secrets +type: Opaque +data: + # The sql_alchemy_conn value is a base64 encoded represenation of this connection string: + # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo= diff --git a/scripts/ci/kubernetes/kube/volumes.yaml b/scripts/ci/kubernetes/kube/volumes.yaml index 073e98c580587..58ad36813e230 100644 --- a/scripts/ci/kubernetes/kube/volumes.yaml +++ b/scripts/ci/kubernetes/kube/volumes.yaml @@ -62,3 +62,4 @@ spec: resources: requests: storage: 2Gi + diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh index 8766e9482c7f5..52571cce14a35 100755 --- a/scripts/ci/travis_script.sh +++ b/scripts/ci/travis_script.sh @@ -26,7 +26,7 @@ then tox -e $TOX_ENV else KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \ - tox -e $TOX_ENV -- tests.contrib.minikube_tests \ + tox -e $TOX_ENV -- tests.contrib.minikube \ --with-coverage \ --cover-erase \ --cover-html \ diff --git a/setup.cfg b/setup.cfg index 7b3479c635afd..622cc1303a173 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -6,16 +5,15 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - [metadata] name = Airflow summary = Airflow is a system to programmatically author, schedule and monitor data pipelines. @@ -29,8 +27,11 @@ packages = airflow [build_sphinx] source-dir = docs/ -build-dir = docs/_build -all_files = 1 +build-dir = docs/_build +all_files = 1 [upload_sphinx] upload-dir = docs/_build/html + +[easy_install] + diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 9c795675044c3..34c82bcf9b683 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,15 +22,84 @@ from mock import patch, Mock, MagicMock from time import sleep - import psutil - +from argparse import Namespace from airflow import settings -from airflow.bin.cli import get_num_ready_workers_running +from airflow.bin.cli import get_num_ready_workers_running, run, get_dag +from airflow.models import TaskInstance +from airflow.utils import timezone +from airflow.utils.state import State +from airflow.settings import Session +from airflow import models + +import os + +dag_folder_path = '/'.join(os.path.realpath(__file__).split('/')[:-1]) + +TEST_DAG_FOLDER = os.path.join( + os.path.dirname(dag_folder_path), 'dags') +TEST_DAG_ID = 'unit_tests' + + +def reset(dag_id): + session = Session() + tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id) + tis.delete() + session.commit() + session.close() + + +def create_mock_args( + task_id, + dag_id, + subdir, + execution_date, + task_params=None, + dry_run=False, + queue=None, + pool=None, + priority_weight_total=None, + retries=0, + local=True, + mark_success=False, + ignore_all_dependencies=False, + ignore_depends_on_past=False, + ignore_dependencies=False, + force=False, + run_as_user=None, + executor_config={}, + cfg_path=None, + pickle=None, + raw=None, + interactive=None, +): + args = MagicMock(spec=Namespace) + args.task_id = task_id + args.dag_id = dag_id + args.subdir = subdir + args.task_params = task_params + args.execution_date = execution_date + args.dry_run = dry_run + args.queue = queue + args.pool = pool + args.priority_weight_total = priority_weight_total + args.retries = retries + args.local = local + args.run_as_user = run_as_user + args.executor_config = executor_config + args.cfg_path = cfg_path + args.pickle = pickle + args.raw = raw + args.mark_success = mark_success + args.ignore_all_dependencies = ignore_all_dependencies + args.ignore_depends_on_past = ignore_depends_on_past + args.ignore_dependencies = ignore_dependencies + args.force = force + args.interactive = interactive + return args class TestCLI(unittest.TestCase): - def setUp(self): self.gunicorn_master_proc = Mock(pid=None) self.children = MagicMock() @@ -74,3 +143,23 @@ def test_cli_webserver_debug(self): "webserver terminated with return code {} in debug mode".format(return_code)) p.terminate() p.wait() + + def test_local_run(self): + args = create_mock_args( + task_id='print_the_context', + dag_id='example_python_operator', + subdir='/root/dags/example_python_operator.py', + interactive=True, + execution_date=timezone.parse('2018-04-27T08:39:51.298439+00:00') + ) + + reset(args.dag_id) + + with patch('argparse.Namespace', args) as mock_args: + run(mock_args) + dag = get_dag(mock_args) + task = dag.get_task(task_id=args.task_id) + ti = TaskInstance(task, args.execution_date) + ti.refresh_from_db() + state = ti.current_state() + self.assertEqual(state, State.SUCCESS) diff --git a/tests/contrib/kubernetes/__init__.py b/tests/contrib/kubernetes/__init__.py deleted file mode 100644 index 759b563511c1c..0000000000000 --- a/tests/contrib/kubernetes/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube/__init__.py similarity index 99% rename from tests/contrib/minikube_tests/__init__.py rename to tests/contrib/minikube/__init__.py index 4067cc78ee9a2..114d189da14ab 100644 --- a/tests/contrib/minikube_tests/__init__.py +++ b/tests/contrib/minikube/__init__.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/tests/contrib/minikube/test_kubernetes_executor.py b/tests/contrib/minikube/test_kubernetes_executor.py new file mode 100644 index 0000000000000..9827bc8e906a3 --- /dev/null +++ b/tests/contrib/minikube/test_kubernetes_executor.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import unittest +from subprocess import check_call, check_output + +import requests +import time +import six + +try: + check_call(["kubectl", "get", "pods"]) +except Exception as e: + raise unittest.SkipTest( + "Kubernetes integration tests require a minikube cluster;" + "Skipping tests {}".format(e) + ) + + +class KubernetesExecutorTest(unittest.TestCase): + + def test_integration_run_dag(self): + host_ip = check_output(['minikube', 'ip']) + if six.PY3: + host_ip = host_ip.decode('UTF-8') + host = '{}:30809'.format(host_ip.strip()) + + # Enable the dag + result = requests.get( + 'http://{}/api/experimental/' + 'dags/example_python_operator/paused/false'.format(host) + ) + self.assertEqual(result.status_code, 200, "Could not enable DAG") + + # Trigger a new dagrun + result = requests.post( + 'http://{}/api/experimental/' + 'dags/example_python_operator/dag_runs'.format(host), + json={} + ) + self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run") + + time.sleep(1) + + result = requests.get( + 'http://{}/api/experimental/latest_runs'.format(host) + ) + self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run") + result_json = result.json() + + self.assertGreater(len(result_json['items']), 0) + + execution_date = result_json['items'][0]['execution_date'] + print("Found the job with execution date {}".format(execution_date)) + + tries = 0 + state = '' + # Wait 100 seconds for the operator to complete + while tries < 20: + time.sleep(5) + + # Trigger a new dagrun + result = requests.get( + 'http://{}/api/experimental/dags/example_python_operator/' + 'dag_runs/{}/tasks/print_the_context'.format(host, execution_date) + ) + self.assertEqual(result.status_code, 200, "Could not get the status") + result_json = result.json() + state = result_json['state'] + print("Attempt {}: Current state of operator is {}".format(tries, state)) + + if state == 'success': + break + tries += 1 + + self.assertEqual(state, 'success') + + # Maybe check if we can retrieve the logs, but then we need to extend the API + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py similarity index 50% rename from tests/contrib/minikube_tests/test_kubernetes_pod_operator.py rename to tests/contrib/minikube/test_kubernetes_pod_operator.py index 321f01f4f3d0a..081fc047f0d28 100644 --- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py +++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py @@ -33,42 +33,44 @@ class KubernetesPodOperatorTest(unittest.TestCase): def test_working_pod(self): - k = KubernetesPodOperator(namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=["echo", "10"], - labels={"foo": "bar"}, - name="test", - task_id="task" - ) - + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task" + ) k.execute(None) def test_logging(self): with mock.patch.object(PodLauncher, 'log') as mock_logger: - k = KubernetesPodOperator(namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=["echo", "10"], - labels={"foo": "bar"}, - name="test", - task_id="task", - get_logs=True - ) + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + get_logs=True + ) k.execute(None) - mock_logger.info.assert_any_call("+ echo\n") + mock_logger.info.assert_any_call(b"+ echo\n") def test_faulty_image(self): bad_image_name = "foobar" - k = KubernetesPodOperator(namespace='default', - image=bad_image_name, - cmds=["bash", "-cx"], - arguments=["echo", "10"], - labels={"foo": "bar"}, - name="test", - task_id="task", - startup_timeout_seconds=5 - ) + k = KubernetesPodOperator( + namespace='default', + image=bad_image_name, + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + startup_timeout_seconds=5 + ) with self.assertRaises(AirflowException) as cm: k.execute(None), @@ -78,16 +80,19 @@ def test_pod_failure(self): """ Tests that the task fails when a pod reports a failure """ - bad_internal_command = "foobar" - k = KubernetesPodOperator(namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=[bad_internal_command, "10"], - labels={"foo": "bar"}, - name="test", - task_id="task" - ) - + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=[bad_internal_command, "10"], + labels={"foo": "bar"}, + name="test", + task_id="task" + ) with self.assertRaises(AirflowException): k.execute(None) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/contrib/minikube_tests/integration/__init__.py b/tests/contrib/minikube_tests/integration/__init__.py deleted file mode 100644 index 9d7677a99b293..0000000000000 --- a/tests/contrib/minikube_tests/integration/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tests/contrib/minikube_tests/integration/airflow_controller.py b/tests/contrib/minikube_tests/integration/airflow_controller.py deleted file mode 100644 index 5604652fead0e..0000000000000 --- a/tests/contrib/minikube_tests/integration/airflow_controller.py +++ /dev/null @@ -1,166 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import subprocess -import time - - -class RunCommandError(Exception): - pass - - -class TimeoutError(Exception): - pass - - -class DagRunState: - SUCCESS = "success" - FAILED = "failed" - RUNNING = "running" - - -def run_command(command): - process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - if process.returncode != 0: - raise RunCommandError( - "Error while running command: {}; Stdout: {}; Stderr: {}".format( - command, stdout, stderr - )) - return stdout, stderr - - -def run_command_in_pod(pod_name, container_name, command): - return run_command("kubectl exec {pod_name} -c {container_name} -- {command}".format( - pod_name=pod_name, container_name=container_name, command=command - )) - - -def get_scheduler_logs(airflow_pod=None): - airflow_pod = airflow_pod or _get_airflow_pod() - - return run_command("kubectl logs {pod_name} scheduler" - .format(pod_name=airflow_pod)) - - -def _unpause_dag(dag_id, airflow_pod=None): - airflow_pod = airflow_pod or _get_airflow_pod() - return run_command_in_pod(airflow_pod, "scheduler", - "airflow unpause {dag_id}".format(dag_id=dag_id)) - - -def run_dag(dag_id, run_id, airflow_pod=None): - airflow_pod = airflow_pod or _get_airflow_pod() - _unpause_dag(dag_id, airflow_pod) - return run_command_in_pod(airflow_pod, "scheduler", - "airflow trigger_dag {dag_id} -r {run_id}".format( - dag_id=dag_id, run_id=run_id - )) - - -def _get_pod_by_grep(grep_phrase): - stdout, stderr = run_command( - "kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format( - grep_phrase=grep_phrase - )) - pod_name = stdout.strip() - return pod_name - - -def _get_airflow_pod(): - return _get_pod_by_grep("^airflow") - - -def _get_postgres_pod(): - return _get_pod_by_grep("^postgres") - - -def _parse_state(stdout): - end_line = "(1 row)" - prev_line = None - for line in stdout.split("\n"): - if end_line in line: - return prev_line.strip() - prev_line = line - - raise Exception("Unknown psql output: {}".format(stdout)) - - -def get_dag_run_table(postgres_pod=None): - postgres_pod = postgres_pod or _get_postgres_pod() - stdout, stderr = run_command_in_pod( - postgres_pod, "postgres", - """psql airflow -c "select * from dag_run" """ - ) - return stdout - - -def get_task_instance_table(postgres_pod=None): - postgres_pod = postgres_pod or _get_postgres_pod() - stdout, stderr = run_command_in_pod( - postgres_pod, "postgres", - """psql airflow -c "select * from task_instance" """ - ) - return stdout - - -def get_dag_run_state(dag_id, run_id, postgres_pod=None): - postgres_pod = postgres_pod or _get_postgres_pod() - stdout, stderr = run_command_in_pod( - postgres_pod, "postgres", - """psql airflow -c "select state from dag_run where dag_id='{dag_id}' and - run_id='{run_id}'" """.format( - dag_id=dag_id, run_id=run_id - ) - ) - return _parse_state(stdout) - - -def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, timeout=120): - postgres_pod = postgres_pod or _get_postgres_pod() - for _ in range(0, timeout / poll_interval): - dag_state = get_dag_run_state(dag_id, run_id, postgres_pod) - if dag_state != DagRunState.RUNNING: - capture_logs_for_failure(dag_state) - return dag_state - time.sleep(poll_interval) - - raise TimeoutError( - "Timed out while waiting for DagRun with dag_id: {} run_id: {}".format(dag_id, - run_id)) - - -def _kill_pod(pod_name): - return run_command("kubectl delete pod {pod_name}".format(pod_name=pod_name)) - - -def kill_scheduler(): - airflow_pod = _get_pod_by_grep("^airflow") - return _kill_pod(airflow_pod) - - -def capture_logs_for_failure(state): - if state != DagRunState.SUCCESS: - stdout, stderr = get_scheduler_logs() - print("stdout:") - for line in stdout.split('\n'): - print(line) - print("stderr:") - for line in stderr.split('\n'): - print(line) - print("dag_run:") - print(get_dag_run_table()) - print("task_instance") - print(get_task_instance_table()) diff --git a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py b/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py deleted file mode 100644 index 602a717724b41..0000000000000 --- a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py +++ /dev/null @@ -1,67 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time -import unittest -from uuid import uuid4 - -from tests.contrib.minikube_tests.integration.airflow_controller\ - import DagRunState, RunCommandError, \ - dag_final_state, get_dag_run_state, kill_scheduler, run_command, run_dag - -try: - run_command("kubectl get pods") -except RunCommandError: - SKIP_KUBE = True -else: - SKIP_KUBE = False - - -class KubernetesExecutorTest(unittest.TestCase): - @unittest.skipIf(SKIP_KUBE, - 'Kubernetes integration tests are unsupported by this configuration') - def test_kubernetes_executor_dag_runs_successfully(self): - dag_id, run_id = "example_python_operator", uuid4().hex - run_dag(dag_id, run_id) - state = dag_final_state(dag_id, run_id, timeout=120) - self.assertEquals(state, DagRunState.SUCCESS) - - @unittest.skipIf(SKIP_KUBE, - 'Kubernetes integration tests are unsupported by this configuration') - def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self): - dag_id, run_id = "example_python_operator", uuid4().hex - run_dag(dag_id, run_id) - - self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING) - - time.sleep(10) - - kill_scheduler() - - self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), - DagRunState.SUCCESS) - - @unittest.skipIf(SKIP_KUBE, - 'Kubernetes integration tests are unsupported by this configuration') - def test_kubernetes_executor_config_works(self): - dag_id, run_id = "example_kubernetes_executor", uuid4().hex - run_dag(dag_id, run_id) - - self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING) - self.assertEquals(dag_final_state(dag_id, run_id, timeout=300), - DagRunState.SUCCESS) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/www_rbac/api/experimental/test_endpoints.py b/tests/www_rbac/api/experimental/test_endpoints.py index 7bcbb8e37bf77..a19492ee7ec7a 100644 --- a/tests/www_rbac/api/experimental/test_endpoints.py +++ b/tests/www_rbac/api/experimental/test_endpoints.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -77,6 +77,23 @@ def test_task_info(self): self.assertIn('error', response.data.decode('utf-8')) self.assertEqual(404, response.status_code) + def test_task_paused(self): + url_template = '/api/experimental/dags/{}/paused/{}' + + response = self.app.get( + url_template.format('example_bash_operator', 'true') + ) + self.assertIn('ok', response.data.decode('utf-8')) + self.assertEqual(200, response.status_code) + + url_template = '/api/experimental/dags/{}/paused/{}' + + response = self.app.get( + url_template.format('example_bash_operator', 'false') + ) + self.assertIn('ok', response.data.decode('utf-8')) + self.assertEqual(200, response.status_code) + def test_trigger_dag(self): url_template = '/api/experimental/dags/{}/dag_runs' response = self.app.post(