Skip to content

Commit

Permalink
[AIRFLOW-71] Add support for private Docker images
Browse files Browse the repository at this point in the history
Pulling images from private Docker registries requires authentication,
so additional parameters are added in order to perform the login step.

(cherry picked from commit f101ff0)
Signed-off-by: Bolke de Bruin <[email protected]>
  • Loading branch information
moertel authored and bolkedebruin committed Nov 2, 2017
1 parent d2f9d18 commit d4406c0
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 21 deletions.
1 change: 1 addition & 0 deletions airflow/contrib/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#
# ------------------------------------------------------------------------
_hooks = {
'docker_hook': ['DockerHook'],
'ftp_hook': ['FTPHook'],
'ftps_hook': ['FTPSHook'],
'vertica_hook': ['VerticaHook'],
Expand Down
79 changes: 79 additions & 0 deletions airflow/hooks/docker_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from docker import Client
from docker.errors import APIError

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin

class DockerHook(BaseHook, LoggingMixin):
"""
Interact with a private Docker registry.
:param docker_conn_id: ID of the Airflow connection where
credentials and extra configuration are stored
:type docker_conn_id: str
"""
def __init__(self,
docker_conn_id='docker_default',
base_url=None,
version=None,
tls=None
):
if not base_url:
raise AirflowException('No Docker base URL provided')
if not version:
raise AirflowException('No Docker API version provided')

conn = self.get_connection(docker_conn_id)
if not conn.host:
raise AirflowException('No Docker registry URL provided')
if not conn.login:
raise AirflowException('No username provided')
extra_options = conn.extra_dejson

self.__base_url = base_url
self.__version = version
self.__tls = tls
self.__registry = conn.host
self.__username = conn.login
self.__password = conn.password
self.__email = extra_options.get('email')
self.__reauth = False if extra_options.get('reauth') == 'no' else True

def get_conn(self):
client = Client(
base_url=self.__base_url,
version=self.__version,
tls=self.__tls
)
self.__login(client)
return client

def __login(self, client):
self.log.debug('Logging into Docker registry')
try:
client.login(
username=self.__username,
password=self.__password,
registry=self.__registry,
email=self.__email,
reauth=self.__reauth
)
self.log.debug('Login successful')
except APIError as docker_error:
self.log.error('Docker registry login failed: %s', str(docker_error))
raise AirflowException('Docker registry login failed: %s', str(docker_error))
4 changes: 4 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ class Connection(Base, LoggingMixin):
_extra = Column('extra', String(5000))

_types = [
('docker', 'Docker Registry',),
('fs', 'File (path)'),
('ftp', 'FTP',),
('google_cloud_platform', 'Google Cloud Platform'),
Expand Down Expand Up @@ -696,6 +697,9 @@ def get_hook(self):
elif self.conn_type == 'wasb':
from airflow.contrib.hooks.wasb_hook import WasbHook
return WasbHook(wasb_conn_id=self.conn_id)
elif self.conn_type == 'docker':
from airflow.hooks.docker_hook import DockerHook
return DockerHook(docker_conn_id=self.conn_id)
except:
pass

Expand Down
58 changes: 44 additions & 14 deletions airflow/operators/docker_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

import json

from airflow.hooks.docker_hook import DockerHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory
from docker import APIClient as Client, tls
from docker import Client, tls
import ast


Expand All @@ -30,9 +31,14 @@ class DockerOperator(BaseOperator):
that together exceed the default disk size of 10GB in a container. The path to the mounted
directory can be accessed via the environment variable ``AIRFLOW_TMP_DIR``.
If a login to a private registry is required prior to pulling the image, a
Docker connection needs to be configured in Airflow and the connection ID
be provided with the parameter ``docker_conn_id``.
:param image: Docker image from which to create the container.
:type image: str
:param api_version: Remote API version.
:param api_version: Remote API version. Set to ``auto`` to automatically
detect the server's version.
:type api_version: str
:param command: Command to be run in the container.
:type command: str or list
Expand All @@ -41,10 +47,11 @@ class DockerOperator(BaseOperator):
https://docs.docker.com/engine/reference/run/#cpu-share-constraint
:type cpus: float
:param docker_url: URL of the host running the docker daemon.
Default is unix://var/run/docker.sock
:type docker_url: str
:param environment: Environment variables to set in the container.
:type environment: dict
:param force_pull: Pull the docker image on every run.
:param force_pull: Pull the docker image on every run. Default is false.
:type force_pull: bool
:param mem_limit: Maximum amount of memory the container can use. Either a float value, which
represents the limit in bytes, or a string like ``128m`` or ``1g``.
Expand Down Expand Up @@ -78,6 +85,8 @@ class DockerOperator(BaseOperator):
:type xcom_push: bool
:param xcom_all: Push all the stdout or just the last line. The default is False (last line).
:type xcom_all: bool
:param docker_conn_id: ID of the Airflow connection to use
:type docker_conn_id: str
"""
template_fields = ('command',)
template_ext = ('.sh', '.bash',)
Expand Down Expand Up @@ -105,6 +114,7 @@ def __init__(
working_dir=None,
xcom_push=False,
xcom_all=False,
docker_conn_id=None,
*args,
**kwargs):

Expand All @@ -129,25 +139,32 @@ def __init__(
self.working_dir = working_dir
self.xcom_push_flag = xcom_push
self.xcom_all = xcom_all
self.docker_conn_id = docker_conn_id

self.cli = None
self.container = None

def get_hook(self):
return DockerHook(
docker_conn_id=self.docker_conn_id,
base_url=self.base_url,
version=self.api_version,
tls=self.__get_tls_config()
)

def execute(self, context):
self.log.info('Starting docker container from image %s', self.image)

tls_config = None
if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
tls_config = tls.TLSConfig(
ca_cert=self.tls_ca_cert,
client_cert=(self.tls_client_cert, self.tls_client_key),
verify=True,
ssl_version=self.tls_ssl_version,
assert_hostname=self.tls_hostname
)
self.docker_url = self.docker_url.replace('tcp://', 'https://')
tls_config = self.__get_tls_config()

self.cli = Client(base_url=self.docker_url, version=self.api_version, tls=tls_config)
if self.docker_conn_id:
self.cli = self.get_hook().get_conn()
else:
self.cli = Client(
base_url=self.docker_url,
version=self.api_version,
tls=tls_config
)

if ':' not in self.image:
image = self.image + ':latest'
Expand Down Expand Up @@ -204,3 +221,16 @@ def on_kill(self):
if self.cli is not None:
self.log.info('Stopping docker container')
self.cli.stop(self.container['Id'])

def __get_tls_config(self):
tls_config = None
if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
tls_config = tls.TLSConfig(
ca_cert=self.tls_ca_cert,
client_cert=(self.tls_client_cert, self.tls_client_key),
verify=True,
ssl_version=self.tls_ssl_version,
assert_hostname=self.tls_hostname
)
self.docker_url = self.docker_url.replace('tcp://', 'https://')
return tls_config
9 changes: 8 additions & 1 deletion airflow/www/static/connection_form.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@
'login': 'Username (or API Key)',
'schema': 'Database'
}
}
},
docker: {
hidden_fields: ['port', 'schema'],
relabeling: {
'host': 'Registry URL',
'login': 'Username',
},
},
}
function connTypeChange(connectionType) {
$("div.form-group").removeClass("hide");
Expand Down
1 change: 1 addition & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ Hooks
:show-inheritance:
:members:
DbApiHook,
DockerHook,
HiveCliHook,
HiveMetastoreHook,
HiveServer2Hook,
Expand Down
Loading

0 comments on commit d4406c0

Please sign in to comment.