Skip to content

Commit

Permalink
[AIRFLOW-2425] Add lineage support
Browse files Browse the repository at this point in the history
Add lineage support by having inlets and oulets
that
are made available to dependent upstream or
downstream
tasks.

If configured to do so can send lineage data to a
backend. Apache Atlas is supported out of the box.

Closes apache#3321 from bolkedebruin/lineage_exp
  • Loading branch information
bolkedebruin committed May 14, 2018
1 parent 042c3f2 commit 648e1e6
Show file tree
Hide file tree
Showing 20 changed files with 920 additions and 23 deletions.
11 changes: 11 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,17 @@ endpoint_url = http://localhost:8080
# How to authenticate users of the API
auth_backend = airflow.api.auth.backend.default

[lineage]
# what lineage backend to use
backend =

[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =

[operators]
# The default owner assigned to each new operator, unless
# provided explicitly or passed via `default_args`
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/operators/druid_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
141 changes: 141 additions & 0 deletions airflow/lineage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from functools import wraps

from airflow import configuration as conf
from airflow.lineage.datasets import DataSet
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string, prepare_classpath

from itertools import chain

PIPELINE_OUTLETS = "pipeline_outlets"
PIPELINE_INLETS = "pipeline_inlets"

log = LoggingMixin().log


def _get_backend():
backend = None

try:
_backend_str = conf.get("lineage", "backend")
prepare_classpath()
backend = import_string(_backend_str)
except ImportError as ie:
log.debug("Cannot import %s due to %s", _backend_str, ie)
except conf.AirflowConfigException:
log.debug("Could not find lineage backend key in config")

return backend


def apply_lineage(func):
"""
Saves the lineage to XCom and if configured to do so sends it
to the backend.
"""
backend = _get_backend()

@wraps(func)
def wrapper(self, context, *args, **kwargs):
self.log.debug("Lineage called with inlets: %s, outlets: %s",
self.inlets, self.outlets)
ret_val = func(self, context, *args, **kwargs)

outlets = [x.as_dict() for x in self.outlets]
inlets = [x.as_dict() for x in self.inlets]

if len(self.outlets) > 0:
self.xcom_push(context,
key=PIPELINE_OUTLETS,
value=outlets,
execution_date=context['ti'].execution_date)

if len(self.inlets) > 0:
self.xcom_push(context,
key=PIPELINE_INLETS,
value=inlets,
execution_date=context['ti'].execution_date)

if backend:
backend.send_lineage(operator=self, inlets=self.inlets,
outlets=self.outlets, context=context)

return ret_val

return wrapper


def prepare_lineage(func):
"""
Prepares the lineage inlets and outlets
inlets can be:
"auto" -> picks up any outlets from direct upstream tasks that have outlets
defined, as such that if A -> B -> C and B does not have outlets but A does,
these are provided as inlets.
"list of task_ids" -> picks up outlets from the upstream task_ids
"list of datasets" -> manually defined list of DataSet
"""
@wraps(func)
def wrapper(self, context, *args, **kwargs):
self.log.debug("Preparing lineage inlets and outlets")

task_ids = set(self._inlets['task_ids']).intersection(
self.get_flat_relative_ids(upstream=True)
)
if task_ids:
inlets = self.xcom_pull(context,
task_ids=task_ids,
dag_id=self.dag_id,
key=PIPELINE_OUTLETS)
inlets = [item for sublist in inlets if sublist for item in sublist]
inlets = [DataSet.map_type(i['typeName'])(data=i['attributes'])
for i in inlets]
self.inlets.extend(inlets)

if self._inlets['auto']:
# dont append twice
task_ids = set(self._inlets['task_ids']).symmetric_difference(
self.upstream_task_ids
)
inlets = self.xcom_pull(context,
task_ids=task_ids,
dag_id=self.dag_id,
key=PIPELINE_OUTLETS)
inlets = [item for sublist in inlets if sublist for item in sublist]
inlets = [DataSet.map_type(i['typeName'])(data=i['attributes'])
for i in inlets]
self.inlets.extend(inlets)

if len(self._inlets['datasets']) > 0:
self.inlets.extend(self._inlets['datasets'])

# outlets
if len(self._outlets['datasets']) > 0:
self.outlets.extend(self._outlets['datasets'])

self.log.debug("inlets: %s, outlets: %s", self.inlets, self.outlets)

for dataset in chain(self.inlets, self.outlets):
dataset.set_context(context)

return func(self, context, *args, **kwargs)

return wrapper
31 changes: 31 additions & 0 deletions airflow/lineage/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#


class LineageBackend(object):
def send_lineage(self,
operator=None, inlets=None, outlets=None, context=None):
"""
Sends lineage metadata to a backend
:param operator: the operator executing a transformation on the inlets and outlets
:param inlets: the inlets to this operator
:param outlets: the outlets from this operator
:param context: the current context of the task instance
"""
raise NotImplementedError()
98 changes: 98 additions & 0 deletions airflow/lineage/backend/atlas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow import configuration as conf
from airflow.lineage import datasets
from airflow.lineage.backend import LineageBackend
from airflow.lineage.backend.atlas.typedefs import operator_typedef
from airflow.utils.timezone import convert_to_utc

from atlasclient.client import Atlas
from atlasclient.exceptions import HttpError

SERIALIZED_DATE_FORMAT_STR = "%Y-%m-%dT%H:%M:%S.%fZ"

_username = conf.get("atlas", "username")
_password = conf.get("atlas", "password")
_port = conf.get("atlas", "port")
_host = conf.get("atlas", "host")


class AtlasBackend(LineageBackend):
def send_lineage(self, operator, inlets, outlets, context):
client = Atlas(_host, port=_port, username=_username, password=_password)
try:
client.typedefs.create(data=operator_typedef)
except HttpError:
client.typedefs.update(data=operator_typedef)

_execution_date = convert_to_utc(context['ti'].execution_date)
_start_date = convert_to_utc(context['ti'].start_date)
_end_date = convert_to_utc(context['ti'].end_date)

inlet_list = []
if inlets:
for entity in inlets:
if entity is None:
continue

entity.set_context(context)
client.entity_post.create(data={"entity": entity.as_dict()})
inlet_list.append({"typeName": entity.type_name,
"uniqueAttributes": {
"qualifiedName": entity.qualified_name
}})

outlet_list = []
if outlets:
for entity in outlets:
if not entity:
continue

entity.set_context(context)
client.entity_post.create(data={"entity": entity.as_dict()})
outlet_list.append({"typeName": entity.type_name,
"uniqueAttributes": {
"qualifiedName": entity.qualified_name
}})

operator_name = operator.__class__.__name__
name = "{} {} ({})".format(operator.dag_id, operator.task_id, operator_name)
qualified_name = "{}_{}_{}@{}".format(operator.dag_id,
operator.task_id,
_execution_date,
operator_name)

data = {
"dag_id": operator.dag_id,
"task_id": operator.task_id,
"execution_date": _execution_date.strftime(SERIALIZED_DATE_FORMAT_STR),
"name": name,
"inputs": inlet_list,
"outputs": outlet_list,
"command": operator.lineage_data,
}

if _start_date:
data["start_date"] = _start_date.strftime(SERIALIZED_DATE_FORMAT_STR)
if _end_date:
data["end_date"] = _end_date.strftime(SERIALIZED_DATE_FORMAT_STR)

process = datasets.Operator(qualified_name=qualified_name, data=data)
client.entity_post.create(data={"entity": process.as_dict()})
Loading

0 comments on commit 648e1e6

Please sign in to comment.