Skip to content

Commit

Permalink
[AIRFLOW-1551] Add operator to trigger Jenkins job
Browse files Browse the repository at this point in the history
Closes apache#2553 from moe-nadal-ck/AIRFLOW-1551/AddJenkinsOperator
  • Loading branch information
moe-nadal-ck authored and Fokko Driesprong committed Feb 27, 2018
1 parent 6efe2e3 commit 667a26c
Show file tree
Hide file tree
Showing 10 changed files with 521 additions and 1 deletion.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ requirements.txt
.*log
.travis.yml
.*pyc
.*lock
docs
.*md
dist
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.



from airflow import DAG
from airflow.contrib.operators.jenkins_job_trigger_operator import JenkinsJobTriggerOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.jenkins_hook import JenkinsHook

from six.moves.urllib.request import Request

import jenkins
import datetime


datetime_start_date = datetime(2017, 6, 1)
default_args = {
"owner": "airflow",
"start_date": datetime_start_date,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"depends_on_past": False,
"concurrency": 8,
"max_active_runs": 8

}


dag = DAG("test_jenkins", default_args=default_args, schedule_interval=None)
#This DAG shouldn't be executed and is only here to provide example of how to use the JenkinsJobTriggerOperator
#(it requires a jenkins server to be executed)

job_trigger = JenkinsJobTriggerOperator(
dag=dag,
task_id="trigger_job",
job_name="generate-merlin-config",
parameters={"first_parameter":"a_value", "second_parameter":"18"},
#parameters="resources/paremeter.json", You can also pass a path to a json file containing your param
jenkins_connection_id="your_jenkins_connection" #The connection must be configured first
)

def grabArtifactFromJenkins(**context):
"""
Grab an artifact from the previous job
The python-jenkins library doesn't expose a method for that
But it's totally possible to build manually the request for that
"""
hook = JenkinsHook("your_jenkins_connection")
jenkins_server = hook.get_jenkins_server()
url = context['task_instance'].xcom_pull(task_ids='trigger_job')
#The JenkinsJobTriggerOperator store the job url in the xcom variable corresponding to the task
#You can then use it to access things or to get the job number
#This url looks like : http://jenkins_url/job/job_name/job_number/
url = url + "artifact/myartifact.xml" #Or any other artifact name
request = Request(url)
response = jenkins_server.jenkins_open(request)
return response #We store the artifact content in a xcom variable for later use

artifact_grabber = PythonOperator(
task_id='artifact_grabber',
provide_context=True,
python_callable=grabArtifactFromJenkins,
dag=dag)

artifact_grabber.set_upstream(job_trigger)
1 change: 1 addition & 0 deletions airflow/contrib/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
'fs_hook': ['FSHook'],
'wasb_hook': ['WasbHook'],
'gcp_pubsub_hook': ['PubSubHook'],
'jenkins_hook': ['JenkinsHook'],
'aws_dynamodb_hook': ['AwsDynamoDBHook']
}

Expand Down
43 changes: 43 additions & 0 deletions airflow/contrib/hooks/jenkins_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from airflow.hooks.base_hook import BaseHook

import jenkins
import distutils


class JenkinsHook(BaseHook):
"""
Hook to manage connection to jenkins server
"""

def __init__(self, conn_id='jenkins_default'):
connection = self.get_connection(conn_id)
self.connection = connection
connectionPrefix = 'http'
# connection.extra contains info about using https (true) or http (false)
if connection.extra is None or connection.extra == '':
connection.extra = 'false'
# set a default value to connection.extra
# to avoid rising ValueError in strtobool
if distutils.util.strtobool(connection.extra):
connectionPrefix = 'https'
url = '%s://%s:%d' % (connectionPrefix, connection.host, connection.port)
self.log.info('Trying to connect to %s', url)
self.jenkins_server = jenkins.Jenkins(url, connection.login, connection.password)

def get_jenkins_server(self):
return self.jenkins_server
244 changes: 244 additions & 0 deletions airflow/contrib/operators/jenkins_job_trigger_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import socket
import json
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.jenkins_hook import JenkinsHook
import jenkins
from jenkins import JenkinsException
from six.moves.urllib.request import Request, urlopen
from six.moves.urllib.error import HTTPError, URLError

try:
basestring
except NameError:
basestring = str # For python3 compatibility


# TODO Use jenkins_urlopen instead when it will be available
# in the stable python-jenkins version (> 0.4.15)
def jenkins_request_with_headers(jenkins_server, req, add_crumb=True):
"""
We need to get the headers in addition to the body answer
to get the location from them
This function is just a copy of the one present in python-jenkins library
with just the return call changed
:param jenkins_server: The server to query
:param req: The request to execute
:param add_crumb: Boolean to indicate if it should add crumb to the request
:return:
"""
try:
if jenkins_server.auth:
req.add_header('Authorization', jenkins_server.auth)
if add_crumb:
jenkins_server.maybe_add_crumb(req)
response = urlopen(req, timeout=jenkins_server.timeout)
response_body = response.read()
response_headers = response.info()
if response_body is None:
raise jenkins.EmptyResponseException(
"Error communicating with server[%s]: "
"empty response" % jenkins_server.server)
return {'body': response_body.decode('utf-8'), 'headers': response_headers}
except HTTPError as e:
# Jenkins's funky authentication means its nigh impossible to
# distinguish errors.
if e.code in [401, 403, 500]:
# six.moves.urllib.error.HTTPError provides a 'reason'
# attribute for all python version except for ver 2.6
# Falling back to HTTPError.msg since it contains the
# same info as reason
raise JenkinsException(
'Error in request. ' +
'Possibly authentication failed [%s]: %s' % (
e.code, e.msg)
)
elif e.code == 404:
raise jenkins.NotFoundException('Requested item could not be found')
else:
raise
except socket.timeout as e:
raise jenkins.TimeoutException('Error in request: %s' % e)
except URLError as e:
# python 2.6 compatibility to ensure same exception raised
# since URLError wraps a socket timeout on python 2.6.
if str(e.reason) == "timed out":
raise jenkins.TimeoutException('Error in request: %s' % e.reason)
raise JenkinsException('Error in request: %s' % e.reason)


class JenkinsJobTriggerOperator(BaseOperator):
"""
Trigger a Jenkins Job and monitor it's execution.
This operator depend on python-jenkins library,
version >= 0.4.15 to communicate with jenkins server.
You'll also need to configure a Jenkins connection in the connections screen.
:param jenkins_connection_id: The jenkins connection to use for this job
:type jenkins_connection_id: string
:param job_name: The name of the job to trigger
:type job_name: string
:param parameters: The parameters block to provide to jenkins
:type parameters: string
:param sleep_time: How long will the operator sleep between each status
request for the job (min 1, default 10)
:type sleep_time: int
:param max_try_before_job_appears: The maximum number of requests to make
while waiting for the job to appears on jenkins server (default 10)
:type max_try_before_job_appears: int
"""
template_fields = ('parameters',)
template_ext = ('.json',)
ui_color = '#f9ec86'

@apply_defaults
def __init__(self,
jenkins_connection_id,
job_name,
parameters="",
sleep_time=10,
max_try_before_job_appears=10,
*args,
**kwargs):
super(JenkinsJobTriggerOperator, self).__init__(*args, **kwargs)
self.job_name = job_name
self.parameters = parameters
if sleep_time < 1:
sleep_time = 1
self.sleep_time = sleep_time
self.jenkins_connection_id = jenkins_connection_id
self.max_try_before_job_appears = max_try_before_job_appears

def build_job(self, jenkins_server):
"""
This function makes an API call to Jenkins to trigger a build for 'job_name'
It returned a dict with 2 keys : body and headers.
headers contains also a dict-like object which can be queried to get
the location to poll in the queue.
:param jenkins_server: The jenkins server where the job should be triggered
:return: Dict containing the response body (key body)
and the headers coming along (headers)
"""
# Warning if the parameter is too long, the URL can be longer than
# the maximum allowed size
if self.parameters and isinstance(self.parameters, basestring):
import ast
self.parameters = ast.literal_eval(self.parameters)

if not self.parameters:
# We need a None to call the non parametrized jenkins api end point
self.parameters = None

request = Request(jenkins_server.build_job_url(self.job_name,
self.parameters, None), b'')
return jenkins_request_with_headers(jenkins_server, request)

def poll_job_in_queue(self, location, jenkins_server):
"""
This method poll the jenkins queue until the job is executed.
When we trigger a job through an API call,
the job is first put in the queue without having a build number assigned.
Thus we have to wait the job exit the queue to know its build number.
To do so, we have to add /api/json (or /api/xml) to the location
returned by the build_job call and poll this file.
When a 'executable' block appears in the json, it means the job execution started
and the field 'number' then contains the build number.
:param location: Location to poll, returned in the header of the build_job call
:param jenkins_server: The jenkins server to poll
:return: The build_number corresponding to the triggered job
"""
try_count = 0
location = location + '/api/json'
# TODO Use get_queue_info instead
# once it will be available in python-jenkins (v > 0.4.15)
self.log.info('Polling jenkins queue at the url %s', location)
while try_count < self.max_try_before_job_appears:
location_answer = jenkins_request_with_headers(jenkins_server,
Request(location))
if location_answer is not None:
json_response = json.loads(location_answer['body'])
if 'executable' in json_response:
build_number = json_response['executable']['number']
self.log.info('Job executed on Jenkins side with the build number %s',
build_number)
return build_number
try_count += 1
time.sleep(self.sleep_time)
raise AirflowException("The job hasn't been executed"
" after polling the queue %d times",
self.max_try_before_job_appears)

def get_hook(self):
return JenkinsHook(self.jenkins_connection_id)

def execute(self, context):
if not self.jenkins_connection_id:
self.log.error(
'Please specify the jenkins connection id to use.'
'You must create a Jenkins connection before'
' being able to use this operator')
raise AirflowException('The jenkins_connection_id parameter is missing,'
'impossible to trigger the job')

if not self.job_name:
self.log.error("Please specify the job name to use in the job_name parameter")
raise AirflowException('The job_name parameter is missing,'
'impossible to trigger the job')

self.log.info(
'Triggering the job %s on the jenkins : %s with the parameters : %s',
self.job_name, self.jenkins_connection_id, self.parameters)
jenkins_server = self.get_hook().get_jenkins_server()
jenkins_response = self.build_job(jenkins_server)
build_number = self.poll_job_in_queue(
jenkins_response['headers']['Location'], jenkins_server)

time.sleep(self.sleep_time)
keep_polling_job = True
build_info = None
while keep_polling_job:
try:
build_info = jenkins_server.get_build_info(name=self.job_name,
number=build_number)
if build_info['result'] is not None:
keep_polling_job = False
# Check if job had errors.
if build_info['result'] != 'SUCCESS':
raise AirflowException(
'Jenkins job failed, final state : %s.'
'Find more information on job url : %s'
% (build_info['result'], build_info['url']))
else:
self.log.info('Waiting for job to complete : %s , build %s',
self.job_name, build_number)
time.sleep(self.sleep_time)
except jenkins.NotFoundException as err:
raise AirflowException(
'Jenkins job status check failed. Final error was: %s'
% err.resp.status)
except jenkins.JenkinsException as err:
raise AirflowException(
'Jenkins call failed with error : %s, if you have parameters '
'double check them, jenkins sends back '
'this exception for unknown parameters'
'You can also check logs for more details on this exception '
'(jenkins_url/log/rss)', str(err))
if build_info:
# If we can we return the url of the job
# for later use (like retrieving an artifact)
return build_info['url']
1 change: 1 addition & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ class Connection(Base, LoggingMixin):
('hive_metastore', 'Hive Metastore Thrift',),
('hiveserver2', 'Hive Server 2 Thrift',),
('jdbc', 'Jdbc Connection',),
('jenkins', 'Jenkins'),
('mysql', 'MySQL',),
('postgres', 'Postgres',),
('oracle', 'Oracle',),
Expand Down
Loading

0 comments on commit 667a26c

Please sign in to comment.