diff --git a/.rat-excludes b/.rat-excludes index 25fe61e5856f1..e5373aacd5fbc 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -8,6 +8,7 @@ requirements.txt .*log .travis.yml .*pyc +.*lock docs .*md dist diff --git a/airflow/contrib/example_dags/example_jenkins_job_trigger_operator.py.notexecutable b/airflow/contrib/example_dags/example_jenkins_job_trigger_operator.py.notexecutable new file mode 100644 index 0000000000000..6762b9c42a4b8 --- /dev/null +++ b/airflow/contrib/example_dags/example_jenkins_job_trigger_operator.py.notexecutable @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +from airflow import DAG +from airflow.contrib.operators.jenkins_job_trigger_operator import JenkinsJobTriggerOperator +from airflow.operators.python_operator import PythonOperator +from airflow.contrib.hooks.jenkins_hook import JenkinsHook + +from six.moves.urllib.request import Request + +import jenkins +import datetime + + +datetime_start_date = datetime(2017, 6, 1) +default_args = { + "owner": "airflow", + "start_date": datetime_start_date, + "retries": 1, + "retry_delay": timedelta(minutes=5), + "depends_on_past": False, + "concurrency": 8, + "max_active_runs": 8 + +} + + +dag = DAG("test_jenkins", default_args=default_args, schedule_interval=None) +#This DAG shouldn't be executed and is only here to provide example of how to use the JenkinsJobTriggerOperator +#(it requires a jenkins server to be executed) + +job_trigger = JenkinsJobTriggerOperator( + dag=dag, + task_id="trigger_job", + job_name="generate-merlin-config", + parameters={"first_parameter":"a_value", "second_parameter":"18"}, + #parameters="resources/paremeter.json", You can also pass a path to a json file containing your param + jenkins_connection_id="your_jenkins_connection" #The connection must be configured first + ) + +def grabArtifactFromJenkins(**context): + """ + Grab an artifact from the previous job + The python-jenkins library doesn't expose a method for that + But it's totally possible to build manually the request for that + """ + hook = JenkinsHook("your_jenkins_connection") + jenkins_server = hook.get_jenkins_server() + url = context['task_instance'].xcom_pull(task_ids='trigger_job') + #The JenkinsJobTriggerOperator store the job url in the xcom variable corresponding to the task + #You can then use it to access things or to get the job number + #This url looks like : http://jenkins_url/job/job_name/job_number/ + url = url + "artifact/myartifact.xml" #Or any other artifact name + request = Request(url) + response = jenkins_server.jenkins_open(request) + return response #We store the artifact content in a xcom variable for later use + +artifact_grabber = PythonOperator( + task_id='artifact_grabber', + provide_context=True, + python_callable=grabArtifactFromJenkins, + dag=dag) + +artifact_grabber.set_upstream(job_trigger) diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py index 99a1746ee6551..83e9f1faf2017 100644 --- a/airflow/contrib/hooks/__init__.py +++ b/airflow/contrib/hooks/__init__.py @@ -50,6 +50,7 @@ 'fs_hook': ['FSHook'], 'wasb_hook': ['WasbHook'], 'gcp_pubsub_hook': ['PubSubHook'], + 'jenkins_hook': ['JenkinsHook'], 'aws_dynamodb_hook': ['AwsDynamoDBHook'] } diff --git a/airflow/contrib/hooks/jenkins_hook.py b/airflow/contrib/hooks/jenkins_hook.py new file mode 100644 index 0000000000000..66d570e455946 --- /dev/null +++ b/airflow/contrib/hooks/jenkins_hook.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from airflow.hooks.base_hook import BaseHook + +import jenkins +import distutils + + +class JenkinsHook(BaseHook): + """ + Hook to manage connection to jenkins server + """ + + def __init__(self, conn_id='jenkins_default'): + connection = self.get_connection(conn_id) + self.connection = connection + connectionPrefix = 'http' + # connection.extra contains info about using https (true) or http (false) + if connection.extra is None or connection.extra == '': + connection.extra = 'false' + # set a default value to connection.extra + # to avoid rising ValueError in strtobool + if distutils.util.strtobool(connection.extra): + connectionPrefix = 'https' + url = '%s://%s:%d' % (connectionPrefix, connection.host, connection.port) + self.log.info('Trying to connect to %s', url) + self.jenkins_server = jenkins.Jenkins(url, connection.login, connection.password) + + def get_jenkins_server(self): + return self.jenkins_server diff --git a/airflow/contrib/operators/jenkins_job_trigger_operator.py b/airflow/contrib/operators/jenkins_job_trigger_operator.py new file mode 100644 index 0000000000000..5227b24dfe5c6 --- /dev/null +++ b/airflow/contrib/operators/jenkins_job_trigger_operator.py @@ -0,0 +1,244 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import socket +import json +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.jenkins_hook import JenkinsHook +import jenkins +from jenkins import JenkinsException +from six.moves.urllib.request import Request, urlopen +from six.moves.urllib.error import HTTPError, URLError + +try: + basestring +except NameError: + basestring = str # For python3 compatibility + + +# TODO Use jenkins_urlopen instead when it will be available +# in the stable python-jenkins version (> 0.4.15) +def jenkins_request_with_headers(jenkins_server, req, add_crumb=True): + """ + We need to get the headers in addition to the body answer + to get the location from them + This function is just a copy of the one present in python-jenkins library + with just the return call changed + :param jenkins_server: The server to query + :param req: The request to execute + :param add_crumb: Boolean to indicate if it should add crumb to the request + :return: + """ + try: + if jenkins_server.auth: + req.add_header('Authorization', jenkins_server.auth) + if add_crumb: + jenkins_server.maybe_add_crumb(req) + response = urlopen(req, timeout=jenkins_server.timeout) + response_body = response.read() + response_headers = response.info() + if response_body is None: + raise jenkins.EmptyResponseException( + "Error communicating with server[%s]: " + "empty response" % jenkins_server.server) + return {'body': response_body.decode('utf-8'), 'headers': response_headers} + except HTTPError as e: + # Jenkins's funky authentication means its nigh impossible to + # distinguish errors. + if e.code in [401, 403, 500]: + # six.moves.urllib.error.HTTPError provides a 'reason' + # attribute for all python version except for ver 2.6 + # Falling back to HTTPError.msg since it contains the + # same info as reason + raise JenkinsException( + 'Error in request. ' + + 'Possibly authentication failed [%s]: %s' % ( + e.code, e.msg) + ) + elif e.code == 404: + raise jenkins.NotFoundException('Requested item could not be found') + else: + raise + except socket.timeout as e: + raise jenkins.TimeoutException('Error in request: %s' % e) + except URLError as e: + # python 2.6 compatibility to ensure same exception raised + # since URLError wraps a socket timeout on python 2.6. + if str(e.reason) == "timed out": + raise jenkins.TimeoutException('Error in request: %s' % e.reason) + raise JenkinsException('Error in request: %s' % e.reason) + + +class JenkinsJobTriggerOperator(BaseOperator): + """ + Trigger a Jenkins Job and monitor it's execution. + This operator depend on python-jenkins library, + version >= 0.4.15 to communicate with jenkins server. + You'll also need to configure a Jenkins connection in the connections screen. + :param jenkins_connection_id: The jenkins connection to use for this job + :type jenkins_connection_id: string + :param job_name: The name of the job to trigger + :type job_name: string + :param parameters: The parameters block to provide to jenkins + :type parameters: string + :param sleep_time: How long will the operator sleep between each status + request for the job (min 1, default 10) + :type sleep_time: int + :param max_try_before_job_appears: The maximum number of requests to make + while waiting for the job to appears on jenkins server (default 10) + :type max_try_before_job_appears: int + """ + template_fields = ('parameters',) + template_ext = ('.json',) + ui_color = '#f9ec86' + + @apply_defaults + def __init__(self, + jenkins_connection_id, + job_name, + parameters="", + sleep_time=10, + max_try_before_job_appears=10, + *args, + **kwargs): + super(JenkinsJobTriggerOperator, self).__init__(*args, **kwargs) + self.job_name = job_name + self.parameters = parameters + if sleep_time < 1: + sleep_time = 1 + self.sleep_time = sleep_time + self.jenkins_connection_id = jenkins_connection_id + self.max_try_before_job_appears = max_try_before_job_appears + + def build_job(self, jenkins_server): + """ + This function makes an API call to Jenkins to trigger a build for 'job_name' + It returned a dict with 2 keys : body and headers. + headers contains also a dict-like object which can be queried to get + the location to poll in the queue. + :param jenkins_server: The jenkins server where the job should be triggered + :return: Dict containing the response body (key body) + and the headers coming along (headers) + """ + # Warning if the parameter is too long, the URL can be longer than + # the maximum allowed size + if self.parameters and isinstance(self.parameters, basestring): + import ast + self.parameters = ast.literal_eval(self.parameters) + + if not self.parameters: + # We need a None to call the non parametrized jenkins api end point + self.parameters = None + + request = Request(jenkins_server.build_job_url(self.job_name, + self.parameters, None), b'') + return jenkins_request_with_headers(jenkins_server, request) + + def poll_job_in_queue(self, location, jenkins_server): + """ + This method poll the jenkins queue until the job is executed. + When we trigger a job through an API call, + the job is first put in the queue without having a build number assigned. + Thus we have to wait the job exit the queue to know its build number. + To do so, we have to add /api/json (or /api/xml) to the location + returned by the build_job call and poll this file. + When a 'executable' block appears in the json, it means the job execution started + and the field 'number' then contains the build number. + :param location: Location to poll, returned in the header of the build_job call + :param jenkins_server: The jenkins server to poll + :return: The build_number corresponding to the triggered job + """ + try_count = 0 + location = location + '/api/json' + # TODO Use get_queue_info instead + # once it will be available in python-jenkins (v > 0.4.15) + self.log.info('Polling jenkins queue at the url %s', location) + while try_count < self.max_try_before_job_appears: + location_answer = jenkins_request_with_headers(jenkins_server, + Request(location)) + if location_answer is not None: + json_response = json.loads(location_answer['body']) + if 'executable' in json_response: + build_number = json_response['executable']['number'] + self.log.info('Job executed on Jenkins side with the build number %s', + build_number) + return build_number + try_count += 1 + time.sleep(self.sleep_time) + raise AirflowException("The job hasn't been executed" + " after polling the queue %d times", + self.max_try_before_job_appears) + + def get_hook(self): + return JenkinsHook(self.jenkins_connection_id) + + def execute(self, context): + if not self.jenkins_connection_id: + self.log.error( + 'Please specify the jenkins connection id to use.' + 'You must create a Jenkins connection before' + ' being able to use this operator') + raise AirflowException('The jenkins_connection_id parameter is missing,' + 'impossible to trigger the job') + + if not self.job_name: + self.log.error("Please specify the job name to use in the job_name parameter") + raise AirflowException('The job_name parameter is missing,' + 'impossible to trigger the job') + + self.log.info( + 'Triggering the job %s on the jenkins : %s with the parameters : %s', + self.job_name, self.jenkins_connection_id, self.parameters) + jenkins_server = self.get_hook().get_jenkins_server() + jenkins_response = self.build_job(jenkins_server) + build_number = self.poll_job_in_queue( + jenkins_response['headers']['Location'], jenkins_server) + + time.sleep(self.sleep_time) + keep_polling_job = True + build_info = None + while keep_polling_job: + try: + build_info = jenkins_server.get_build_info(name=self.job_name, + number=build_number) + if build_info['result'] is not None: + keep_polling_job = False + # Check if job had errors. + if build_info['result'] != 'SUCCESS': + raise AirflowException( + 'Jenkins job failed, final state : %s.' + 'Find more information on job url : %s' + % (build_info['result'], build_info['url'])) + else: + self.log.info('Waiting for job to complete : %s , build %s', + self.job_name, build_number) + time.sleep(self.sleep_time) + except jenkins.NotFoundException as err: + raise AirflowException( + 'Jenkins job status check failed. Final error was: %s' + % err.resp.status) + except jenkins.JenkinsException as err: + raise AirflowException( + 'Jenkins call failed with error : %s, if you have parameters ' + 'double check them, jenkins sends back ' + 'this exception for unknown parameters' + 'You can also check logs for more details on this exception ' + '(jenkins_url/log/rss)', str(err)) + if build_info: + # If we can we return the url of the job + # for later use (like retrieving an artifact) + return build_info['url'] diff --git a/airflow/models.py b/airflow/models.py index f9f3fbc2daf11..f2c3f2ed5a070 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -546,6 +546,7 @@ class Connection(Base, LoggingMixin): ('hive_metastore', 'Hive Metastore Thrift',), ('hiveserver2', 'Hive Server 2 Thrift',), ('jdbc', 'Jdbc Connection',), + ('jenkins', 'Jenkins'), ('mysql', 'MySQL',), ('postgres', 'Postgres',), ('oracle', 'Oracle',), diff --git a/airflow/www/static/connection_form.js b/airflow/www/static/connection_form.js index c40bba7620b6f..902cf747e52b2 100644 --- a/airflow/www/static/connection_form.js +++ b/airflow/www/static/connection_form.js @@ -39,12 +39,20 @@ 'schema': 'Database' } }, + jenkins: { + hidden_fields: ['schema'], + relabeling: { + 'login': 'Username', + 'password': 'API token or password', + 'extra': 'Use https (true/false, default false)' + } + }, docker: { hidden_fields: ['port', 'schema'], relabeling: { 'host': 'Registry URL', 'login': 'Username', - }, + } }, } function connTypeChange(connectionType) { diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt index 4bcc4530957fc..9c028d5603452 100644 --- a/scripts/ci/requirements.txt +++ b/scripts/ci/requirements.txt @@ -74,6 +74,7 @@ PyOpenSSL PySmbClient python-daemon python-dateutil +python-jenkins qds-sdk>=1.9.6 redis rednose diff --git a/setup.py b/setup.py index 0ff6ea25bce4e..d3f48e3f35e45 100644 --- a/setup.py +++ b/setup.py @@ -128,6 +128,7 @@ def write_version(filename=os.path.join(*['airflow', ] hdfs = ['snakebite>=2.7.8'] webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4'] +jenkins = ['python-jenkins>=0.4.15'] jira = ['JIRA>1.0.7'] hive = [ 'hive-thrift-py>=0.0.1', diff --git a/tests/contrib/operators/test_jenkins_operator.py b/tests/contrib/operators/test_jenkins_operator.py new file mode 100644 index 0000000000000..e61201a6c1942 --- /dev/null +++ b/tests/contrib/operators/test_jenkins_operator.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import jenkins + +from airflow.contrib.operators.jenkins_job_trigger_operator \ + import JenkinsJobTriggerOperator +from airflow.contrib.hooks.jenkins_hook import JenkinsHook + +from airflow.exceptions import AirflowException + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class JenkinsOperatorTestCase(unittest.TestCase): + @unittest.skipIf(mock is None, 'mock package not present') + def test_execute(self): + jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret') + jenkins_mock.get_build_info.return_value = \ + {'result': 'SUCCESS', + 'url': 'http://aaa.fake-url.com/congratulation/its-a-job'} + jenkins_mock.build_job_url.return_value = \ + 'http://www.jenkins.url/somewhere/in/the/universe' + + hook_mock = mock.Mock(spec=JenkinsHook) + hook_mock.get_jenkins_server.return_value = jenkins_mock + + the_parameters = {'a_param': 'blip', 'another_param': '42'} + + with mock.patch.object(JenkinsJobTriggerOperator, "get_hook") as get_hook_mocked,\ + mock.patch('airflow.contrib.operators' + '.jenkins_job_trigger_operator.jenkins_request_with_headers') \ + as mock_make_request: + mock_make_request.side_effect = \ + [{'body': '', 'headers': {'Location': 'http://what-a-strange.url/18'}}, + {'body': '{"executable":{"number":"1"}}', 'headers': {}}] + get_hook_mocked.return_value = hook_mock + operator = JenkinsJobTriggerOperator( + dag=None, + jenkins_connection_id="fake_jenkins_connection", + # The hook is mocked, this connection won't be used + task_id="operator_test", + job_name="a_job_on_jenkins", + parameters=the_parameters, + sleep_time=1) + + operator.execute(None) + + self.assertEquals(jenkins_mock.get_build_info.call_count, 1) + jenkins_mock.get_build_info.assert_called_with(name='a_job_on_jenkins', + number='1') + + @unittest.skipIf(mock is None, 'mock package not present') + def test_execute_job_polling_loop(self): + jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret') + jenkins_mock.get_job_info.return_value = {'nextBuildNumber': '1'} + jenkins_mock.get_build_info.side_effect = \ + [{'result': None}, + {'result': 'SUCCESS', + 'url': 'http://aaa.fake-url.com/congratulation/its-a-job'}] + jenkins_mock.build_job_url.return_value = \ + 'http://www.jenkins.url/somewhere/in/the/universe' + + hook_mock = mock.Mock(spec=JenkinsHook) + hook_mock.get_jenkins_server.return_value = jenkins_mock + + the_parameters = {'a_param': 'blip', 'another_param': '42'} + + with mock.patch.object(JenkinsJobTriggerOperator, "get_hook") as get_hook_mocked,\ + mock.patch('airflow.contrib.operators.jenkins_job_trigger_operator' + '.jenkins_request_with_headers') as mock_make_request: + mock_make_request.side_effect = \ + [{'body': '', 'headers': {'Location': 'http://what-a-strange.url/18'}}, + {'body': '{"executable":{"number":"1"}}', 'headers': {}}] + get_hook_mocked.return_value = hook_mock + operator = JenkinsJobTriggerOperator( + dag=None, + task_id="operator_test", + job_name="a_job_on_jenkins", + jenkins_connection_id="fake_jenkins_connection", + # The hook is mocked, this connection won't be used + parameters=the_parameters, + sleep_time=1) + + operator.execute(None) + self.assertEquals(jenkins_mock.get_build_info.call_count, 2) + + @unittest.skipIf(mock is None, 'mock package not present') + def test_execute_job_failure(self): + jenkins_mock = mock.Mock(spec=jenkins.Jenkins, auth='secret') + jenkins_mock.get_job_info.return_value = {'nextBuildNumber': '1'} + jenkins_mock.get_build_info.return_value = { + 'result': 'FAILURE', + 'url': 'http://aaa.fake-url.com/congratulation/its-a-job'} + jenkins_mock.build_job_url.return_value = \ + 'http://www.jenkins.url/somewhere/in/the/universe' + + hook_mock = mock.Mock(spec=JenkinsHook) + hook_mock.get_jenkins_server.return_value = jenkins_mock + + the_parameters = {'a_param': 'blip', 'another_param': '42'} + + with mock.patch.object(JenkinsJobTriggerOperator, "get_hook") as get_hook_mocked,\ + mock.patch('airflow.contrib.operators.' + 'jenkins_job_trigger_operator.jenkins_request_with_headers') \ + as mock_make_request: + mock_make_request.side_effect = \ + [{'body': '', 'headers': {'Location': 'http://what-a-strange.url/18'}}, + {'body': '{"executable":{"number":"1"}}', 'headers': {}}] + get_hook_mocked.return_value = hook_mock + operator = JenkinsJobTriggerOperator( + dag=None, + task_id="operator_test", + job_name="a_job_on_jenkins", + parameters=the_parameters, + jenkins_connection_id="fake_jenkins_connection", + # The hook is mocked, this connection won't be used + sleep_time=1) + + self.assertRaises(AirflowException, operator.execute, None) + + +if __name__ == "__main__": + unittest.main()