Skip to content

Commit 16de3b4

Browse files
dsdintertswast
authored andcommitted
Adding sample python DAG and gcs_to_gcs operator as plugin (GoogleCloudPlatform#1678)
* Add gcs_to_gcs operator from current Airflow master as a plugin, this is because is not currently available in current Composer Airflow version (1.9.0) * Add gcs_to_gcs operator from current Airflow master as a plugin, this is because is not currently available in current Composer Airflow version (1.9.0) * Author DAG, including master csv file sample and test: 1. Uses gcs_to_gcs operator from plugins 2. Uses Cloud logging features 3. Dynamically generates tasks based on master csv file * Fix PEP8 warnings * Change sample tables to copy, using NYC Taxi trips * - Replace ":" with valid character for Airflow task - Enable export and import multiple Avro files * The GCS hook must be downloaded from the Airflow repository * Rename config file to "table_list_file_path" * Remove unnecessary logging code * Refactor master to table_file in code and filename * Add gcs_to_gcs module from Airflow 1.10 as third party before adapting for Composer DAG as plugin * Wrap lines in gcs_to_gcs module from Airflow 1.10 and import hook from plugins * Add notes to install module * Remove gcs_to_gcs module after being moved to third_party folder * Refactor test and include actual sample table list file * Add license and instructions in third party folder for hooks and operators * Fix tests and update CSV parsing for Python 3 compat. I moved the plugins directory to third-party to avoid nox from running the linter and tests on Apache Airflow code.
1 parent e55438a commit 16de3b4

File tree

14 files changed

+1274
-4
lines changed

14 files changed

+1274
-4
lines changed

composer/rest/README.rst

+8-2
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,16 @@ credentials for applications.
3131
Install Dependencies
3232
++++++++++++++++++++
3333

34+
#. Clone python-docs-samples and change directory to the sample directory you want to use.
35+
36+
.. code-block:: bash
37+
38+
$ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
39+
3440
#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions.
3541

36-
.. _Python Development Environment Setup Guide:
37-
https://cloud.google.com/python/setup
42+
.. _Python Development Environment Setup Guide:
43+
https://cloud.google.com/python/setup
3844

3945
#. Create a virtualenv. Samples are compatible with Python 2.7 and 3.4+.
4046

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
# Copyright 2018 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Example Airflow DAG that performs an export from BQ tables listed in
16+
config file to GCS, copies GCS objects across locations (e.g., from US to
17+
EU) then imports from GCS to BQ. The DAG imports the gcs_to_gcs operator
18+
from plugins and dynamically builds the tasks based on the list of tables.
19+
Lastly, the DAG defines a specific application logger to generate logs.
20+
21+
This DAG relies on three Airflow variables
22+
(https://airflow.apache.org/concepts.html#variables):
23+
* table_list_file_path - CSV file listing source and target tables, including
24+
Datasets.
25+
* gcs_source_bucket - Google Cloud Storage bucket to use for exporting
26+
BigQuery tables in source.
27+
* gcs_dest_bucket - Google Cloud Storage bucket to use for importing
28+
BigQuery tables in destination.
29+
See https://cloud.google.com/storage/docs/creating-buckets for creating a
30+
bucket.
31+
"""
32+
33+
# --------------------------------------------------------------------------------
34+
# Load The Dependencies
35+
# --------------------------------------------------------------------------------
36+
37+
import csv
38+
import datetime
39+
import io
40+
import logging
41+
42+
from airflow import models
43+
from airflow.contrib.operators import bigquery_to_gcs
44+
from airflow.contrib.operators import gcs_to_bq
45+
from airflow.operators import dummy_operator
46+
# Import operator from plugins
47+
from gcs_plugin.operators import gcs_to_gcs
48+
49+
50+
# --------------------------------------------------------------------------------
51+
# Set default arguments
52+
# --------------------------------------------------------------------------------
53+
54+
default_args = {
55+
'owner': 'airflow',
56+
'start_date': datetime.datetime.today(),
57+
'depends_on_past': False,
58+
'email': [''],
59+
'email_on_failure': False,
60+
'email_on_retry': False,
61+
'retries': 1,
62+
'retry_delay': datetime.timedelta(minutes=5),
63+
}
64+
65+
# --------------------------------------------------------------------------------
66+
# Set variables
67+
# --------------------------------------------------------------------------------
68+
69+
# 'table_list_file_path': This variable will contain the location of the master
70+
# file.
71+
table_list_file_path = models.Variable.get('table_list_file_path')
72+
73+
# Source Bucket
74+
source_bucket = models.Variable.get('gcs_source_bucket')
75+
76+
# Destination Bucket
77+
dest_bucket = models.Variable.get('gcs_dest_bucket')
78+
79+
# --------------------------------------------------------------------------------
80+
# Set GCP logging
81+
# --------------------------------------------------------------------------------
82+
83+
logger = logging.getLogger('bq_copy_us_to_eu_01')
84+
85+
# --------------------------------------------------------------------------------
86+
# Functions
87+
# --------------------------------------------------------------------------------
88+
89+
90+
def read_table_list(table_list_file):
91+
"""
92+
Reads the table list file that will help in creating Airflow tasks in
93+
the DAG dynamically.
94+
:param table_list_file: (String) The file location of the table list file,
95+
e.g. '/home/airflow/framework/table_list.csv'
96+
:return table_list: (List) List of tuples containing the source and
97+
target tables.
98+
"""
99+
table_list = []
100+
logger.info('Reading table_list_file from : %s' % str(table_list_file))
101+
try:
102+
with io.open(table_list_file, 'rt', encoding='utf-8') as csv_file:
103+
csv_reader = csv.reader(csv_file)
104+
next(csv_reader) # skip the headers
105+
for row in csv_reader:
106+
logger.info(row)
107+
table_tuple = {
108+
'table_source': row[0],
109+
'table_dest': row[1]
110+
}
111+
table_list.append(table_tuple)
112+
return table_list
113+
except IOError as e:
114+
logger.error('Error opening table_list_file %s: ' % str(
115+
table_list_file), e)
116+
117+
118+
# --------------------------------------------------------------------------------
119+
# Main DAG
120+
# --------------------------------------------------------------------------------
121+
122+
# Define a DAG (directed acyclic graph) of tasks.
123+
# Any task you create within the context manager is automatically added to the
124+
# DAG object.
125+
with models.DAG('bq_copy_us_to_eu_01',
126+
default_args=default_args,
127+
schedule_interval=None) as dag:
128+
start = dummy_operator.DummyOperator(
129+
task_id='start',
130+
trigger_rule='all_success'
131+
)
132+
133+
end = dummy_operator.DummyOperator(
134+
task_id='end',
135+
136+
trigger_rule='all_success'
137+
)
138+
139+
# Get the table list from master file
140+
all_records = read_table_list(table_list_file_path)
141+
142+
# Loop over each record in the 'all_records' python list to build up
143+
# Airflow tasks
144+
for record in all_records:
145+
logger.info('Generating tasks to transfer table: {}'.format(record))
146+
147+
table_source = record['table_source']
148+
table_dest = record['table_dest']
149+
150+
BQ_to_GCS = bigquery_to_gcs.BigQueryToCloudStorageOperator(
151+
# Replace ":" with valid character for Airflow task
152+
task_id='{}_BQ_to_GCS'.format(table_source.replace(":", "_")),
153+
source_project_dataset_table=table_source,
154+
destination_cloud_storage_uris=['{}-*.avro'.format(
155+
'gs://' + source_bucket + '/' + table_source)],
156+
export_format='AVRO'
157+
)
158+
159+
GCS_to_GCS = gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator(
160+
# Replace ":" with valid character for Airflow task
161+
task_id='{}_GCS_to_GCS'.format(table_source.replace(":", "_")),
162+
source_bucket=source_bucket,
163+
source_object='{}-*.avro'.format(table_source),
164+
destination_bucket=dest_bucket,
165+
# destination_object='{}-*.avro'.format(table_dest)
166+
)
167+
168+
GCS_to_BQ = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
169+
# Replace ":" with valid character for Airflow task
170+
task_id='{}_GCS_to_BQ'.format(table_dest.replace(":", "_")),
171+
bucket=dest_bucket,
172+
source_objects=['{}-*.avro'.format(table_source)],
173+
destination_project_dataset_table=table_dest,
174+
source_format='AVRO',
175+
write_disposition='WRITE_TRUNCATE'
176+
)
177+
178+
start >> BQ_to_GCS >> GCS_to_GCS >> GCS_to_BQ >> end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright 2018 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
import os.path
17+
import sys
18+
19+
from airflow import models
20+
import pytest
21+
22+
23+
@pytest.fixture(scope='module', autouse=True)
24+
def gcs_plugin():
25+
plugins_dir = os.path.abspath(os.path.join(
26+
os.path.dirname(__file__),
27+
'..',
28+
'..',
29+
'third_party',
30+
'apache-airflow',
31+
'plugins',
32+
))
33+
sys.path.append(plugins_dir)
34+
yield
35+
sys.path.remove(plugins_dir)
36+
37+
38+
def test_dag_import():
39+
"""Test that the DAG file can be successfully imported.
40+
41+
This tests that the DAG can be parsed, but does not run it in an Airflow
42+
environment. This is a recommended sanity check by the official Airflow
43+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
44+
"""
45+
example_file_path = os.path.join(
46+
os.path.abspath(os.path.dirname(__file__)),
47+
'bq_copy_eu_to_us_sample.csv')
48+
models.Variable.set('table_list_file_path', example_file_path)
49+
models.Variable.set('gcs_source_bucket', 'example-project')
50+
models.Variable.set('gcs_dest_bucket', 'us-central1-f')
51+
from . import bq_copy_across_locations # noqa
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Source, Target
2+
nyc-tlc:green.trips_2014,nyc_tlc_EU.trips_2014
3+
nyc-tlc:green.trips_2015,nyc_tlc_EU.trips_2015

composer/workflows/requirements.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
apache-airflow[gcp_api]==1.10.0
2+
kubernetes==7.0.0
23
scipy==1.1.0
3-
numpy==1.15.1
4+
numpy==1.15.1

nox.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def _list_files(folder, pattern):
4040

4141
def _collect_dirs(
4242
start_dir,
43-
blacklist=set(['conftest.py', 'nox.py', 'lib']),
43+
blacklist=set(['conftest.py', 'nox.py', 'lib', 'third_party']),
4444
suffix='_test.py',
4545
recurse_further=False):
4646
"""Recursively collects a list of dirs that contain a file matching the

0 commit comments

Comments
 (0)