Skip to content

Commit

Permalink
[AIRFLOW-1527] Refactor celery config
Browse files Browse the repository at this point in the history
The celery config is currently part of the celery executor definition.
This is really inflexible for users wanting to change it. In addition
Celery 4 is moving to lowercase.

Closes apache#2542 from bolkedebruin/upgrade_celery
  • Loading branch information
bolkedebruin authored and criccomini committed Sep 25, 2017
1 parent 87df670 commit f4437be
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 45 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jdk:
services:
- mysql
- postgresql
- rabbitmq
addons:
apt:
packages:
Expand Down
2 changes: 2 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ flower_port = 5555
# Default queue that tasks get assigned to and that worker listen on.
default_queue = default

# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

[dask]
# This section only applies if you are using the DaskExecutor in
Expand Down
58 changes: 58 additions & 0 deletions airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ssl

from airflow.exceptions import AirflowConfigException, AirflowException
from airflow import configuration
from airflow.utils.log.logging_mixin import LoggingMixin


DEFAULT_CELERY_CONFIG = {
'accept_content': ['json', 'pickle'],
'event_serializer': 'json',
'result_serializer': 'pickle',
'worker_prefetch_multiplier': 1,
'task_acks_late': True,
'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
'broker_url': configuration.get('celery', 'BROKER_URL'),
'broker_transport_options': {'visibility_timeout': 21600},
'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'),
'worker_concurrency': configuration.getint('celery', 'CELERYD_CONCURRENCY'),
}

celery_ssl_active = False
try:
celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
except AirflowConfigException as e:
log = LoggingMixin().log
log.warning("Celery Executor will run without SSL")

try:
if celery_ssl_active:
broker_use_ssl = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'),
'certfile': configuration.get('celery', 'CELERY_SSL_CERT'),
'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'),
'cert_reqs': ssl.CERT_REQUIRED}
DEFAULT_CELERY_CONFIG['broker_use_ssl'] = broker_use_ssl
except AirflowConfigException as e:
raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, '
'please ensure CELERY_SSL_KEY, '
'CELERY_SSL_CERT and CELERY_SSL_CACERT are set')
except Exception as e:
raise AirflowException('Exception: There was an unknown Celery SSL Error. '
'Please ensure you want to use '
'SSL and/or have all necessary certs and key ({}).'.format(e))

54 changes: 12 additions & 42 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,66 +12,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from builtins import object
import subprocess
import ssl
import time
import traceback

from celery import Celery
from celery import states as celery_states

from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow import configuration
from airflow.utils.log.logging_mixin import LoggingMixin


PARALLELISM = configuration.get('core', 'PARALLELISM')

'''
To start the celery worker, run the command:
airflow worker
'''

DEFAULT_QUEUE = configuration.get('celery', 'DEFAULT_QUEUE')


class CeleryConfig(object):
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_EVENT_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_TASK_SERIALIZER = 'pickle'
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_ACKS_LATE = True
BROKER_URL = configuration.get('celery', 'BROKER_URL')
CELERY_RESULT_BACKEND = configuration.get('celery', 'CELERY_RESULT_BACKEND')
CELERYD_CONCURRENCY = configuration.getint('celery', 'CELERYD_CONCURRENCY')
CELERY_DEFAULT_QUEUE = DEFAULT_QUEUE
CELERY_DEFAULT_EXCHANGE = DEFAULT_QUEUE

celery_ssl_active = False
try:
celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
except AirflowConfigException as e:
log = LoggingMixin().log
log.warning("Celery Executor will run without SSL")

try:
if celery_ssl_active:
BROKER_USE_SSL = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'),
'certfile': configuration.get('celery', 'CELERY_SSL_CERT'),
'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'),
'cert_reqs': ssl.CERT_REQUIRED}
except AirflowConfigException as e:
raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, please ensure CELERY_SSL_KEY, '
'CELERY_SSL_CERT and CELERY_SSL_CACERT are set')
except Exception as e:
raise AirflowException('Exception: There was an unknown Celery SSL Error. Please ensure you want to use '
'SSL and/or have all necessary certs and key.')
if configuration.has_option('celery', 'celery_config_options'):
celery_configuration = configuration.get('celery', 'celery_config_options')
else:
celery_configuration = DEFAULT_CELERY_CONFIG

app = Celery(
configuration.get('celery', 'CELERY_APP_NAME'),
config_source=CeleryConfig)
config_source=celery_configuration)


@app.task
Expand All @@ -98,8 +66,10 @@ def start(self):
self.tasks = {}
self.last_state = {}

def execute_async(self, key, command, queue=DEFAULT_QUEUE):
self.log.info("[celery] queuing {key} through celery, queue={queue}".format(**locals()))
def execute_async(self, key, command,
queue=DEFAULT_CELERY_CONFIG['task_default_queue']):
self.log.info( "[celery] queuing {key} through celery, "
"queue={queue}".format(**locals()))
self.tasks[key] = execute_command.apply_async(
args=[command], queue=queue)
self.last_state[key] = celery_states.PENDING
Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/airflow_travis.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ smtp_mail_from = [email protected]
celery_app_name = airflow.executors.celery_executor
celeryd_concurrency = 16
worker_log_server_port = 8793
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
broker_url = amqp://guest:guest@localhost:5672/
celery_result_backend = db+mysql://root@localhost/airflow
flower_port = 5555
default_queue = default

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def write_version(filename=os.path.join(*['airflow',
]
azure = ['azure-storage>=0.34.0']
celery = [
'celery>=3.1.17',
'celery>=4.0.0',
'flower>=0.7.3'
]
cgroups = [
Expand Down
55 changes: 55 additions & 0 deletions tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import sys

from airflow.executors.celery_executor import app
from airflow.executors.celery_executor import CeleryExecutor
from airflow.utils.state import State
from celery.contrib.testing.worker import start_worker

# leave this it is used by the test worker
import celery.contrib.testing.tasks


class CeleryExecutorTest(unittest.TestCase):

def test_celery_integration(self):
executor = CeleryExecutor()
executor.start()
with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):

success_command = 'echo 1'
fail_command = 'exit 1'

executor.execute_async(key='success', command=success_command)
# errors are propagated for some reason
try:
executor.execute_async(key='fail', command=fail_command)
except:
pass
executor.running['success'] = True
executor.running['fail'] = True

executor.end(synchronous=True)

self.assertTrue(executor.event_buffer['success'], State.SUCCESS)
self.assertTrue(executor.event_buffer['fail'], State.FAILED)

self.assertNotIn('success', executor.tasks)
self.assertNotIn('fail', executor.tasks)


if __name__ == '__main__':
unittest.main()

0 comments on commit f4437be

Please sign in to comment.