diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index ee28cc5a71811..60d11562c3f94 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -177,6 +177,17 @@ endpoint_url = http://localhost:8080 # How to authenticate users of the API auth_backend = airflow.api.auth.backend.default +[lineage] +# what lineage backend to use +backend = + +[atlas] +sasl_enabled = False +host = +port = 21000 +username = +password = + [operators] # The default owner assigned to each new operator, unless # provided explicitly or passed via `default_args` diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py index b100816fa8910..c0cb09dee47c4 100644 --- a/airflow/contrib/operators/druid_operator.py +++ b/airflow/contrib/operators/druid_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py new file mode 100644 index 0000000000000..3e2af478bb5e0 --- /dev/null +++ b/airflow/lineage/__init__.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from functools import wraps + +from airflow import configuration as conf +from airflow.lineage.datasets import DataSet +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.module_loading import import_string, prepare_classpath + +from itertools import chain + +PIPELINE_OUTLETS = "pipeline_outlets" +PIPELINE_INLETS = "pipeline_inlets" + +log = LoggingMixin().log + + +def _get_backend(): + backend = None + + try: + _backend_str = conf.get("lineage", "backend") + prepare_classpath() + backend = import_string(_backend_str) + except ImportError as ie: + log.debug("Cannot import %s due to %s", _backend_str, ie) + except conf.AirflowConfigException: + log.debug("Could not find lineage backend key in config") + + return backend + + +def apply_lineage(func): + """ + Saves the lineage to XCom and if configured to do so sends it + to the backend. + """ + backend = _get_backend() + + @wraps(func) + def wrapper(self, context, *args, **kwargs): + self.log.debug("Lineage called with inlets: %s, outlets: %s", + self.inlets, self.outlets) + ret_val = func(self, context, *args, **kwargs) + + outlets = [x.as_dict() for x in self.outlets] + inlets = [x.as_dict() for x in self.inlets] + + if len(self.outlets) > 0: + self.xcom_push(context, + key=PIPELINE_OUTLETS, + value=outlets, + execution_date=context['ti'].execution_date) + + if len(self.inlets) > 0: + self.xcom_push(context, + key=PIPELINE_INLETS, + value=inlets, + execution_date=context['ti'].execution_date) + + if backend: + backend.send_lineage(operator=self, inlets=self.inlets, + outlets=self.outlets, context=context) + + return ret_val + + return wrapper + + +def prepare_lineage(func): + """ + Prepares the lineage inlets and outlets + inlets can be: + "auto" -> picks up any outlets from direct upstream tasks that have outlets + defined, as such that if A -> B -> C and B does not have outlets but A does, + these are provided as inlets. + "list of task_ids" -> picks up outlets from the upstream task_ids + "list of datasets" -> manually defined list of DataSet + """ + @wraps(func) + def wrapper(self, context, *args, **kwargs): + self.log.debug("Preparing lineage inlets and outlets") + + task_ids = set(self._inlets['task_ids']).intersection( + self.get_flat_relative_ids(upstream=True) + ) + if task_ids: + inlets = self.xcom_pull(context, + task_ids=task_ids, + dag_id=self.dag_id, + key=PIPELINE_OUTLETS) + inlets = [item for sublist in inlets if sublist for item in sublist] + inlets = [DataSet.map_type(i['typeName'])(data=i['attributes']) + for i in inlets] + self.inlets.extend(inlets) + + if self._inlets['auto']: + # dont append twice + task_ids = set(self._inlets['task_ids']).symmetric_difference( + self.upstream_task_ids + ) + inlets = self.xcom_pull(context, + task_ids=task_ids, + dag_id=self.dag_id, + key=PIPELINE_OUTLETS) + inlets = [item for sublist in inlets if sublist for item in sublist] + inlets = [DataSet.map_type(i['typeName'])(data=i['attributes']) + for i in inlets] + self.inlets.extend(inlets) + + if len(self._inlets['datasets']) > 0: + self.inlets.extend(self._inlets['datasets']) + + # outlets + if len(self._outlets['datasets']) > 0: + self.outlets.extend(self._outlets['datasets']) + + self.log.debug("inlets: %s, outlets: %s", self.inlets, self.outlets) + + for dataset in chain(self.inlets, self.outlets): + dataset.set_context(context) + + return func(self, context, *args, **kwargs) + + return wrapper diff --git a/airflow/lineage/backend/__init__.py b/airflow/lineage/backend/__init__.py new file mode 100644 index 0000000000000..7913021a20ecb --- /dev/null +++ b/airflow/lineage/backend/__init__.py @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +class LineageBackend(object): + def send_lineage(self, + operator=None, inlets=None, outlets=None, context=None): + """ + Sends lineage metadata to a backend + :param operator: the operator executing a transformation on the inlets and outlets + :param inlets: the inlets to this operator + :param outlets: the outlets from this operator + :param context: the current context of the task instance + """ + raise NotImplementedError() diff --git a/airflow/lineage/backend/atlas/__init__.py b/airflow/lineage/backend/atlas/__init__.py new file mode 100644 index 0000000000000..69335becd8688 --- /dev/null +++ b/airflow/lineage/backend/atlas/__init__.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from airflow import configuration as conf +from airflow.lineage import datasets +from airflow.lineage.backend import LineageBackend +from airflow.lineage.backend.atlas.typedefs import operator_typedef +from airflow.utils.timezone import convert_to_utc + +from atlasclient.client import Atlas +from atlasclient.exceptions import HttpError + +SERIALIZED_DATE_FORMAT_STR = "%Y-%m-%dT%H:%M:%S.%fZ" + +_username = conf.get("atlas", "username") +_password = conf.get("atlas", "password") +_port = conf.get("atlas", "port") +_host = conf.get("atlas", "host") + + +class AtlasBackend(LineageBackend): + def send_lineage(self, operator, inlets, outlets, context): + client = Atlas(_host, port=_port, username=_username, password=_password) + try: + client.typedefs.create(data=operator_typedef) + except HttpError: + client.typedefs.update(data=operator_typedef) + + _execution_date = convert_to_utc(context['ti'].execution_date) + _start_date = convert_to_utc(context['ti'].start_date) + _end_date = convert_to_utc(context['ti'].end_date) + + inlet_list = [] + if inlets: + for entity in inlets: + if entity is None: + continue + + entity.set_context(context) + client.entity_post.create(data={"entity": entity.as_dict()}) + inlet_list.append({"typeName": entity.type_name, + "uniqueAttributes": { + "qualifiedName": entity.qualified_name + }}) + + outlet_list = [] + if outlets: + for entity in outlets: + if not entity: + continue + + entity.set_context(context) + client.entity_post.create(data={"entity": entity.as_dict()}) + outlet_list.append({"typeName": entity.type_name, + "uniqueAttributes": { + "qualifiedName": entity.qualified_name + }}) + + operator_name = operator.__class__.__name__ + name = "{} {} ({})".format(operator.dag_id, operator.task_id, operator_name) + qualified_name = "{}_{}_{}@{}".format(operator.dag_id, + operator.task_id, + _execution_date, + operator_name) + + data = { + "dag_id": operator.dag_id, + "task_id": operator.task_id, + "execution_date": _execution_date.strftime(SERIALIZED_DATE_FORMAT_STR), + "name": name, + "inputs": inlet_list, + "outputs": outlet_list, + "command": operator.lineage_data, + } + + if _start_date: + data["start_date"] = _start_date.strftime(SERIALIZED_DATE_FORMAT_STR) + if _end_date: + data["end_date"] = _end_date.strftime(SERIALIZED_DATE_FORMAT_STR) + + process = datasets.Operator(qualified_name=qualified_name, data=data) + client.entity_post.create(data={"entity": process.as_dict()}) diff --git a/airflow/lineage/backend/atlas/typedefs.py b/airflow/lineage/backend/atlas/typedefs.py new file mode 100644 index 0000000000000..1df22bb6166e7 --- /dev/null +++ b/airflow/lineage/backend/atlas/typedefs.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +operator_typedef = { + "enumDefs": [], + "structDefs": [], + "classificationDefs": [], + "entityDefs": [ + { + "superTypes": [ + "Process" + ], + "name": "airflow_operator", + "description": "Airflow Operator", + "createdBy": "airflow", + "updatedBy": "airflow", + "attributeDefs": [ + # "name" will be set to Operator name + # "qualifiedName" will be set to dag_id_task_id@operator_name + { + "name": "dag_id", + "isOptional": False, + "isUnique": False, + "isIndexable": True, + "typeName": "string", + "valuesMaxCount": 1, + "cardinality": "SINGLE", + "valuesMinCount": 0 + }, + { + "name": "task_id", + "isOptional": False, + "isUnique": False, + "isIndexable": True, + "typeName": "string", + "valuesMaxCount": 1, + "cardinality": "SINGLE", + "valuesMinCount": 0 + }, + { + "name": "command", + "isOptional": True, + "isUnique": False, + "isIndexable": False, + "typeName": "string", + "valuesMaxCount": 1, + "cardinality": "SINGLE", + "valuesMinCount": 0 + }, + { + "name": "conn_id", + "isOptional": True, + "isUnique": False, + "isIndexable": False, + "typeName": "string", + "valuesMaxCount": 1, + "cardinality": "SINGLE", + "valuesMinCount": 0 + }, + { + "name": "execution_date", + "isOptional": False, + "isUnique": False, + "isIndexable": True, + "typeName": "date", + "valuesMaxCount": 1, + "cardinality": "SINGLE", + "valuesMinCount": 0 + }, + { + "name": "start_date", + "isOptional": True, + "isUnique": False, + "isIndexable": False, + "typeName": "date", + "valuesMaxCount": 1, + "cardinality": "SINGLE", + "valuesMinCount": 0 + }, + { + "name": "end_date", + "isOptional": True, + "isUnique": False, + "isIndexable": False, + "typeName": "date", + "valuesMaxCount": 1, + "cardinality": "SINGLE", + "valuesMinCount": 0 + }, + ], + }, + ], +} diff --git a/airflow/lineage/datasets.py b/airflow/lineage/datasets.py new file mode 100644 index 0000000000000..40c8edc9a8f9d --- /dev/null +++ b/airflow/lineage/datasets.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import six + +from jinja2 import Environment + + +def _inherited(cls): + return set(cls.__subclasses__()).union( + [s for c in cls.__subclasses__() for s in _inherited(c)] + ) + + +class DataSet(object): + attributes = [] + type_name = "dataSet" + + def __init__(self, qualified_name=None, data=None, **kwargs): + self._qualified_name = qualified_name + self.context = None + self._data = dict() + + self._data.update(dict((key, value) for key, value in six.iteritems(kwargs) + if key in set(self.attributes))) + + if data: + if "qualifiedName" in data: + self._qualified_name = data.pop("qualifiedName") + + self._data = dict((key, value) for key, value in six.iteritems(data) + if key in set(self.attributes)) + + def set_context(self, context): + self.context = context + + @property + def qualified_name(self): + if self.context: + env = Environment() + return env.from_string(self._qualified_name).render(**self.context) + + return self._qualified_name + + def __getattr__(self, attr): + if attr in self.attributes: + if self.context: + env = Environment() + return env.from_string(self._data.get(attr)).render(**self.context) + + return self._data.get(attr) + + raise AttributeError(attr) + + def __getitem__(self, item): + return self.__getattr__(item) + + def __iter__(self): + for key, value in six.iteritems(self._data): + yield (key, value) + + def as_dict(self): + attributes = dict(self._data) + attributes.update({"qualifiedName": self.qualified_name}) + + env = Environment() + if self.context: + for key, value in six.iteritems(attributes): + attributes[key] = env.from_string(value).render(**self.context) + + d = { + "typeName": self.type_name, + "attributes": attributes, + } + + return d + + @staticmethod + def map_type(name): + for cls in _inherited(DataSet): + if cls.type_name == name: + return cls + + raise NotImplemented("No known mapping for {}".format(name)) + + +class DataBase(DataSet): + type_name = "dbStore" + attributes = ["dbStoreType", "storeUse", "source", "description", "userName", + "storeUri", "operation", "startTime", "endTime", "commandlineOpts", + "attribute_db"] + + +class File(DataSet): + type_name = "fs_path" + attributes = ["name", "path", "isFile", "isSymlink"] + + def __init__(self, name=None, data=None): + super(File, self).__init__(name=name, data=data) + + self._qualified_name = 'file://' + self.name + self._data['path'] = self.name + + +class HadoopFile(File): + cluster_name = "none" + attributes = ["name", "path", "clusterName"] + + type_name = "hdfs_file" + + def __init__(self, name=None, data=None): + super(File, self).__init__(name=name, data=data) + + self._qualified_name = "{}@{}".format(self.name, self.cluster_name) + self._data['path'] = self.name + + self._data['clusterName'] = self.cluster_name + + +class Operator(DataSet): + type_name = "airflow_operator" + + # todo we can derive this from the spec + attributes = ["dag_id", "task_id", "command", "conn_id", "name", "execution_date", + "start_date", "end_date", "inputs", "outputs"] diff --git a/airflow/logging_config.py b/airflow/logging_config.py index cf1a275ba44cf..33c2dc82e1be5 100644 --- a/airflow/logging_config.py +++ b/airflow/logging_config.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,25 +18,15 @@ # under the License. # import logging -import os -import sys from logging.config import dictConfig from airflow import configuration as conf from airflow.exceptions import AirflowConfigException -from airflow.utils.module_loading import import_string +from airflow.utils.module_loading import import_string, prepare_classpath log = logging.getLogger(__name__) -def prepare_classpath(): - config_path = os.path.join(conf.get('core', 'airflow_home'), 'config') - config_path = os.path.expanduser(config_path) - - if config_path not in sys.path: - sys.path.append(config_path) - - def configure_logging(): logging_class_path = '' try: diff --git a/airflow/models.py b/airflow/models.py index 5903075b8612e..ec4d2bb9d0010 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -73,6 +73,7 @@ AirflowDagCycleException, AirflowException, AirflowSkipException, AirflowTaskTimeout ) from airflow.dag.base_dag import BaseDag, BaseDagBag +from airflow.lineage import apply_lineage, prepare_lineage from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep @@ -1832,7 +1833,9 @@ def __repr__(self): 'var': { 'value': VariableAccessor(), 'json': VariableJsonAccessor() - } + }, + 'inlets': task.inlets, + 'outlets': task.outlets, } def render_templates(self): @@ -2282,6 +2285,8 @@ def __init__( run_as_user=None, task_concurrency=None, executor_config=None, + inlets=None, + outlets=None, *args, **kwargs): @@ -2369,6 +2374,27 @@ def __init__( self._log = logging.getLogger("airflow.task.operators") + # lineage + self.inlets = [] + self.outlets = [] + self.lineage_data = None + + self._inlets = { + "auto": False, + "task_ids": [], + "datasets": [], + } + + self._outlets = { + "datasets": [], + } + + if inlets: + self._inlets.update(inlets) + + if outlets: + self._outlets.update(outlets) + self._comps = { 'task_id', 'dag_id', @@ -2546,6 +2572,7 @@ def priority_weight_total(self): self.get_flat_relative_ids(upstream=upstream)) ) + @prepare_lineage def pre_execute(self, context): """ This hook is triggered right before self.execute() is called. @@ -2561,6 +2588,7 @@ def execute(self, context): """ raise NotImplementedError() + @apply_lineage def post_execute(self, context, result=None): """ This hook is triggered right after self.execute() is called. diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index 4b16d69b4e734..7db562e46c3b5 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -89,6 +89,7 @@ def execute(self, context): bash_command = ('export {}={}; '.format(AIRFLOW_HOME_VAR, airflow_home_value) + 'export {}={}; '.format(PYTHONPATH_VAR, pythonpath_value) + self.bash_command) + self.lineage_data = bash_command with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: diff --git a/airflow/utils/module_loading.py b/airflow/utils/module_loading.py index 846c8e0334a5c..6e638b00d3318 100644 --- a/airflow/utils/module_loading.py +++ b/airflow/utils/module_loading.py @@ -7,19 +7,33 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import os +import sys +from airflow import configuration as conf from importlib import import_module +def prepare_classpath(): + """ + Ensures that the Airflow home directory is on the classpath + """ + config_path = os.path.join(conf.get('core', 'airflow_home'), 'config') + config_path = os.path.expanduser(config_path) + + if config_path not in sys.path: + sys.path.append(config_path) + + def import_string(dotted_path): """ Import a dotted module path and return the attribute/class designated by the diff --git a/docs/index.rst b/docs/index.rst index 42349eabb6f90..125a1fb13a828 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -86,5 +86,6 @@ Content timezone api integration + lineage faq code diff --git a/docs/lineage.rst b/docs/lineage.rst new file mode 100644 index 0000000000000..719ef0115eb0c --- /dev/null +++ b/docs/lineage.rst @@ -0,0 +1,85 @@ +Lineage +======= + +.. note:: Lineage support is very experimental and subject to change. + +Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having +audit trails and data governance, but also debugging of data flows. + +Airflow tracks data by means of inlets and outlets of the tasks. Let's work from an example and see how it +works. + +.. code:: python + + from airflow.operators.bash_operator import BashOperator + from airflow.operators.dummy_operator import DummyOperator + from airflow.lineage.datasets import File + from airflow.models import DAG + from datetime import timedelta + + FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"] + + args = { + 'owner': 'airflow', + 'start_date': airflow.utils.dates.days_ago(2) + } + + dag = DAG( + dag_id='example_lineage', default_args=args, + schedule_interval='0 0 * * *', + dagrun_timeout=timedelta(minutes=60)) + + f_final = File("/tmp/final") + run_this_last = DummyOperator(task_id='run_this_last', dag=dag, + inlets={"auto": True}, + outlets={"datasets": [f_final,]}) + + f_in = File("/tmp/whole_directory/") + outlets = [] + for file in FILE_CATEGORIES: + f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file)) + outlets.append(f_out) + run_this = BashOperator( + task_id='run_me_first', bash_command='echo 1', dag=dag, + inlets={"datasets": [f_in,]}, + outlets={"datasets": outlets} + ) + run_this.set_downstream(run_this_last) + + +Tasks take the parameters `inlets` and `outlets`. Inlets can be manually defined by a list of dataset `{"datasets": +[dataset1, dataset2]}` or can be configured to look for outlets from upstream tasks `{"task_ids": ["task_id1", "task_id2"]}` +or can be configured to pick up outlets from direct upstream tasks `{"auto": True}` or a combination of them. Outlets +are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any fields for the dataset are templated with +the context when the task is being executed. + +.. note:: Operators can add inlets and outlets automatically if the operator supports it. + +In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets: `CAT1`, `CAT2`, `CAT3`, that are +generated from a list. Note that `execution_date` is a templated field and will be rendered when the task is running. + +.. note:: Behind the scenes Airflow prepares the lineage metadata as part of the `pre_execute` method of a task. When the task + has finished execution `post_execute` is called and lineage metadata is pushed into XCOM. Thus if you are creating + your own operators that override this method make sure to decorate your method with `prepare_lineage` and `apply_lineage` + respectively. + + +Apache Atlas +------------ + +Airflow can send its lineage metadata to Apache Atlas. You need to enable the `atlas` backend and configure it +properly, e.g. in your `airflow.cfg`: + +.. code:: python + + [lineage] + backend = airflow.lineage.backend.atlas + + [atlas] + username = my_username + password = my_password + host = host + port = 21000 + + +Please make sure to have the `atlasclient` package installed. diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt index a2a78df81c85d..8ab52fc3e672e 100644 --- a/scripts/ci/requirements.txt +++ b/scripts/ci/requirements.txt @@ -12,6 +12,7 @@ # limitations under the License. alembic +atlasclient azure-storage>=0.34.0 bcrypt bleach diff --git a/setup.py b/setup.py index 7e78439f489c2..8999695cc206d 100644 --- a/setup.py +++ b/setup.py @@ -107,6 +107,7 @@ def write_version(filename=os.path.join(*['airflow', 'eventlet>= 0.9.7', 'gevent>=0.13' ] +atlas = ['atlasclient>=0.1.2'] azure = ['azure-storage>=0.34.0'] sendgrid = ['sendgrid>=5.2.0'] celery = [ @@ -212,7 +213,7 @@ def write_version(filename=os.path.join(*['airflow', devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh + kubernetes + celery + azure + redis + gcp_api + datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins + - druid + pinot + segment + snowflake + elasticsearch) + druid + pinot + segment + snowflake + elasticsearch + atlas) # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'( if PY3: @@ -279,6 +280,7 @@ def do_setup(): 'all': devel_all, 'devel_ci': devel_ci, 'all_dbs': all_dbs, + 'atlas': atlas, 'async': async, 'azure': azure, 'celery': celery, diff --git a/tests/__init__.py b/tests/__init__.py index 59c97e51f0e94..eff9d4b9f311d 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,6 +28,7 @@ from .executors import * from .jobs import * from .impersonation import * +from .lineage import * from .models import * from .operators import * from .security import * diff --git a/tests/lineage/__init__.py b/tests/lineage/__init__.py new file mode 100644 index 0000000000000..114d189da14ab --- /dev/null +++ b/tests/lineage/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/lineage/backend/__init__.py b/tests/lineage/backend/__init__.py new file mode 100644 index 0000000000000..114d189da14ab --- /dev/null +++ b/tests/lineage/backend/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/lineage/backend/test_atlas.py b/tests/lineage/backend/test_atlas.py new file mode 100644 index 0000000000000..9c98334a9a634 --- /dev/null +++ b/tests/lineage/backend/test_atlas.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from airflow import configuration as conf +from airflow.configuration import AirflowConfigException +from airflow.lineage.backend.atlas import AtlasBackend +from airflow.lineage.datasets import File +from airflow.models import DAG, TaskInstance as TI +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils import timezone + +from backports.configparser import DuplicateSectionError + +DEFAULT_DATE = timezone.datetime(2016, 1, 1) + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestAtlas(unittest.TestCase): + def setUp(self): + conf.load_test_config() + try: + conf.conf.add_section("atlas") + except AirflowConfigException: + pass + except DuplicateSectionError: + pass + + conf.conf.set("atlas", "username", "none") + conf.conf.set("atlas", "password", "none") + conf.conf.set("atlas", "host", "none") + conf.conf.set("atlas", "port", "0") + + self.atlas = AtlasBackend() + + @mock.patch("airflow.lineage.backend.atlas.Atlas") + def test_lineage_send(self, atlas_mock): + td = mock.MagicMock() + en = mock.MagicMock() + atlas_mock.return_value = mock.Mock(typedefs=td, entity_post=en) + + dag = DAG( + dag_id='test_prepare_lineage', + start_date=DEFAULT_DATE + ) + + f1 = File("/tmp/does_not_exist_1") + f2 = File("/tmp/does_not_exist_2") + + inlets_d = [f1, ] + outlets_d = [f2, ] + + with dag: + op1 = DummyOperator(task_id='leave1', + inlets={"datasets": inlets_d}, + outlets={"datasets": outlets_d}) + + ctx = {"ti": TI(task=op1, execution_date=DEFAULT_DATE)} + + self.atlas.send_lineage(operator=op1, inlets=inlets_d, + outlets=outlets_d, context=ctx) + + self.assertEqual(td.create.call_count, 1) + self.assertTrue(en.create.called) + self.assertEqual(len(en.mock_calls), 3) + + # test can be broader diff --git a/tests/lineage/test_lineage.py b/tests/lineage/test_lineage.py new file mode 100644 index 0000000000000..92b74fe869ed4 --- /dev/null +++ b/tests/lineage/test_lineage.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from airflow.lineage import apply_lineage, prepare_lineage +from airflow.lineage.datasets import File +from airflow.models import DAG, TaskInstance as TI +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2016, 1, 1) + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestLineage(unittest.TestCase): + + @mock.patch("airflow.lineage._get_backend") + def test_lineage(self, _get_backend): + backend = mock.Mock() + send_mock = mock.Mock() + backend.send_lineage = send_mock + + _get_backend.return_value = backend + + dag = DAG( + dag_id='test_prepare_lineage', + start_date=DEFAULT_DATE + ) + + f1 = File("/tmp/does_not_exist_1") + f2 = File("/tmp/does_not_exist_2") + f3 = File("/tmp/does_not_exist_3") + + with dag: + op1 = DummyOperator(task_id='leave1', + inlets={"datasets": [f1, ]}, + outlets={"datasets": [f2, ]}) + op2 = DummyOperator(task_id='leave2') + op3 = DummyOperator(task_id='upstream_level_1', + inlets={"auto": True}, + outlets={"datasets": [f3, ]}) + op4 = DummyOperator(task_id='upstream_level_2') + op5 = DummyOperator(task_id='upstream_level_3', + inlets={"task_ids": ["leave1", "upstream_level_1"]}) + + op1.set_downstream(op3) + op2.set_downstream(op3) + op3.set_downstream(op4) + op4.set_downstream(op5) + + ctx1 = {"ti": TI(task=op1, execution_date=DEFAULT_DATE)} + ctx2 = {"ti": TI(task=op2, execution_date=DEFAULT_DATE)} + ctx3 = {"ti": TI(task=op3, execution_date=DEFAULT_DATE)} + ctx5 = {"ti": TI(task=op5, execution_date=DEFAULT_DATE)} + + func = mock.Mock() + func.__name__ = 'foo' + + # prepare with manual inlets and outlets + prep = prepare_lineage(func) + prep(op1, ctx1) + + self.assertEqual(len(op1.inlets), 1) + self.assertEqual(op1.inlets[0], f1) + + self.assertEqual(len(op1.outlets), 1) + self.assertEqual(op1.outlets[0], f2) + + # post process with no backend + post = apply_lineage(func) + post(op1, ctx1) + self.assertEqual(send_mock.call_count, 1) + send_mock.reset_mock() + + prep(op2, ctx2) + self.assertEqual(len(op2.inlets), 0) + post(op2, ctx2) + self.assertEqual(send_mock.call_count, 1) + send_mock.reset_mock() + + prep(op3, ctx3) + self.assertEqual(len(op3.inlets), 1) + self.assertEqual(op3.inlets[0].qualified_name, f2.qualified_name) + post(op3, ctx3) + self.assertEqual(send_mock.call_count, 1) + send_mock.reset_mock() + + # skip 4 + + prep(op5, ctx5) + self.assertEqual(len(op5.inlets), 2) + post(op5, ctx5) + self.assertEqual(send_mock.call_count, 1) + send_mock.reset_mock()