diff --git a/.travis.yml b/.travis.yml index 90f33e3fcb9ab..d3cd216ba1252 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,6 +19,7 @@ jdk: services: - mysql - postgresql + - rabbitmq addons: apt: packages: diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 94efe605590ca..ef586b891a693 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -296,6 +296,8 @@ flower_port = 5555 # Default queue that tasks get assigned to and that worker listen on. default_queue = default +# Import path for celery configuration options +celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG [dask] # This section only applies if you are using the DaskExecutor in diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py new file mode 100644 index 0000000000000..48611cb1897b0 --- /dev/null +++ b/airflow/config_templates/default_celery.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ssl + +from airflow.exceptions import AirflowConfigException, AirflowException +from airflow import configuration +from airflow.utils.log.logging_mixin import LoggingMixin + + +DEFAULT_CELERY_CONFIG = { + 'accept_content': ['json', 'pickle'], + 'event_serializer': 'json', + 'result_serializer': 'pickle', + 'worker_prefetch_multiplier': 1, + 'task_acks_late': True, + 'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'), + 'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'), + 'broker_url': configuration.get('celery', 'BROKER_URL'), + 'broker_transport_options': {'visibility_timeout': 21600}, + 'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'), + 'worker_concurrency': configuration.getint('celery', 'CELERYD_CONCURRENCY'), +} + +celery_ssl_active = False +try: + celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE') +except AirflowConfigException as e: + log = LoggingMixin().log + log.warning("Celery Executor will run without SSL") + +try: + if celery_ssl_active: + broker_use_ssl = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'), + 'certfile': configuration.get('celery', 'CELERY_SSL_CERT'), + 'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'), + 'cert_reqs': ssl.CERT_REQUIRED} + DEFAULT_CELERY_CONFIG['broker_use_ssl'] = broker_use_ssl +except AirflowConfigException as e: + raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, ' + 'please ensure CELERY_SSL_KEY, ' + 'CELERY_SSL_CERT and CELERY_SSL_CACERT are set') +except Exception as e: + raise AirflowException('Exception: There was an unknown Celery SSL Error. ' + 'Please ensure you want to use ' + 'SSL and/or have all necessary certs and key ({}).'.format(e)) + diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 360a276983cf8..7e363dba7939a 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -12,20 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from builtins import object import subprocess -import ssl import time -import traceback from celery import Celery from celery import states as celery_states -from airflow.exceptions import AirflowConfigException, AirflowException +from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG +from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor from airflow import configuration from airflow.utils.log.logging_mixin import LoggingMixin + PARALLELISM = configuration.get('core', 'PARALLELISM') ''' @@ -33,45 +32,14 @@ airflow worker ''' -DEFAULT_QUEUE = configuration.get('celery', 'DEFAULT_QUEUE') - - -class CeleryConfig(object): - CELERY_ACCEPT_CONTENT = ['json', 'pickle'] - CELERY_EVENT_SERIALIZER = 'json' - CELERY_RESULT_SERIALIZER = 'pickle' - CELERY_TASK_SERIALIZER = 'pickle' - CELERYD_PREFETCH_MULTIPLIER = 1 - CELERY_ACKS_LATE = True - BROKER_URL = configuration.get('celery', 'BROKER_URL') - CELERY_RESULT_BACKEND = configuration.get('celery', 'CELERY_RESULT_BACKEND') - CELERYD_CONCURRENCY = configuration.getint('celery', 'CELERYD_CONCURRENCY') - CELERY_DEFAULT_QUEUE = DEFAULT_QUEUE - CELERY_DEFAULT_EXCHANGE = DEFAULT_QUEUE - - celery_ssl_active = False - try: - celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE') - except AirflowConfigException as e: - log = LoggingMixin().log - log.warning("Celery Executor will run without SSL") - - try: - if celery_ssl_active: - BROKER_USE_SSL = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'), - 'certfile': configuration.get('celery', 'CELERY_SSL_CERT'), - 'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'), - 'cert_reqs': ssl.CERT_REQUIRED} - except AirflowConfigException as e: - raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, please ensure CELERY_SSL_KEY, ' - 'CELERY_SSL_CERT and CELERY_SSL_CACERT are set') - except Exception as e: - raise AirflowException('Exception: There was an unknown Celery SSL Error. Please ensure you want to use ' - 'SSL and/or have all necessary certs and key.') +if configuration.has_option('celery', 'celery_config_options'): + celery_configuration = configuration.get('celery', 'celery_config_options') +else: + celery_configuration = DEFAULT_CELERY_CONFIG app = Celery( configuration.get('celery', 'CELERY_APP_NAME'), - config_source=CeleryConfig) + config_source=celery_configuration) @app.task @@ -98,8 +66,10 @@ def start(self): self.tasks = {} self.last_state = {} - def execute_async(self, key, command, queue=DEFAULT_QUEUE): - self.log.info("[celery] queuing {key} through celery, queue={queue}".format(**locals())) + def execute_async(self, key, command, + queue=DEFAULT_CELERY_CONFIG['task_default_queue']): + self.log.info( "[celery] queuing {key} through celery, " + "queue={queue}".format(**locals())) self.tasks[key] = execute_command.apply_async( args=[command], queue=queue) self.last_state[key] = celery_states.PENDING diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg index 01bf3a47cd1b9..68271389b2152 100644 --- a/scripts/ci/airflow_travis.cfg +++ b/scripts/ci/airflow_travis.cfg @@ -44,8 +44,8 @@ smtp_mail_from = airflow@example.com celery_app_name = airflow.executors.celery_executor celeryd_concurrency = 16 worker_log_server_port = 8793 -broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow -celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow +broker_url = amqp://guest:guest@localhost:5672/ +celery_result_backend = db+mysql://root@localhost/airflow flower_port = 5555 default_queue = default diff --git a/setup.py b/setup.py index a97abfd0cc4c4..d52bd3b37e687 100644 --- a/setup.py +++ b/setup.py @@ -106,7 +106,7 @@ def write_version(filename=os.path.join(*['airflow', ] azure = ['azure-storage>=0.34.0'] celery = [ - 'celery>=3.1.17', + 'celery>=4.0.0', 'flower>=0.7.3' ] cgroups = [ diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py new file mode 100644 index 0000000000000..1c411e7e8c97e --- /dev/null +++ b/tests/executors/test_celery_executor.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +import sys + +from airflow.executors.celery_executor import app +from airflow.executors.celery_executor import CeleryExecutor +from airflow.utils.state import State +from celery.contrib.testing.worker import start_worker + +# leave this it is used by the test worker +import celery.contrib.testing.tasks + + +class CeleryExecutorTest(unittest.TestCase): + + def test_celery_integration(self): + executor = CeleryExecutor() + executor.start() + with start_worker(app=app, logfile=sys.stdout, loglevel='debug'): + + success_command = 'echo 1' + fail_command = 'exit 1' + + executor.execute_async(key='success', command=success_command) + # errors are propagated for some reason + try: + executor.execute_async(key='fail', command=fail_command) + except: + pass + executor.running['success'] = True + executor.running['fail'] = True + + executor.end(synchronous=True) + + self.assertTrue(executor.event_buffer['success'], State.SUCCESS) + self.assertTrue(executor.event_buffer['fail'], State.FAILED) + + self.assertNotIn('success', executor.tasks) + self.assertNotIn('fail', executor.tasks) + + +if __name__ == '__main__': + unittest.main()