Skip to content

Commit

Permalink
[AIRFLOW-2301] Sync files of an S3 key with a GCS path
Browse files Browse the repository at this point in the history
Closes apache#3216 from wileeam/s3-to-gcs-operator
  • Loading branch information
Guillermo Rodriguez Cano authored and Fokko Driesprong committed Apr 13, 2018
1 parent 5cb530b commit 34f827f
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 13 deletions.
186 changes: 186 additions & 0 deletions airflow/contrib/operators/s3_to_gcs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from tempfile import NamedTemporaryFile

from airflow.contrib.hooks.gcs_hook import (GoogleCloudStorageHook,
_parse_gcs_url)
from airflow.contrib.operators.s3_list_operator import S3ListOperator
from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
from airflow.utils.decorators import apply_defaults


class S3ToGoogleCloudStorageOperator(S3ListOperator):
"""
Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage
destination path.
:param bucket: The S3 bucket where to find the objects.
:type bucket: string
:param prefix: Prefix string which filters objects whose name begin with
such prefix.
:type prefix: string
:param delimiter: The delimiter by which you want to filter the objects on.
E.g. to list CSV files from a S3 key you would do the following,
`delimiter='.csv'`.
:type delimiter: string
:param aws_conn_id: The source S3 connection
:type aws_conn_id: str
:param dest_gcs_conn_id: The destination connection ID to use
when connecting to Google Cloud Storage.
:type dest_gcs_conn_id: string
:param dest_gcs: The destination Google Cloud Storage bucket and prefix
where you want to store the files.
:type dest_gcs: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: string
:param replace: Whether you want to replace existing destination files
or not.
:type replace: bool
**Example**:
.. code-block:: python
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id='s3_to_gcs_example',
bucket='my-s3-bucket',
prefix='data/customers-201804',
dest_gcs_conn_id='google_cloud_default',
dest_gcs='gs://my.gcs.bucket/some/customers/',
replace=False,
dag=my-dag)
Note that ``bucket``, ``prefix``, ``delimiter`` and ``dest_gcs`` are
templated, so you can use variables in them if you wish.
"""

template_fields = ('bucket', 'prefix', 'delimiter', 'dest_gcs')
ui_color = '#e09411'

@apply_defaults
def __init__(self,
bucket,
prefix='',
delimiter='',
aws_conn_id='aws_default',
dest_gcs_conn_id=None,
dest_gcs=None,
delegate_to=None,
replace=False,
*args,
**kwargs):

super(S3ToGoogleCloudStorageOperator, self).__init__(
bucket=bucket,
prefix=prefix,
delimiter=delimiter,
aws_conn_id=aws_conn_id,
*args,
**kwargs)
self.dest_gcs_conn_id = dest_gcs_conn_id
self.dest_gcs = dest_gcs
self.delegate_to = delegate_to
self.replace = replace

if dest_gcs and not self._gcs_object_is_directory(self.dest_gcs):
self.log.info('Destination Google Cloud Storage path is not a '
'valid "directory", define one and end the path '
'with a slash: "/".')
raise AirflowException('The destination Google Cloud Storage path '
'must end with a slash "/".')

def execute(self, context):
# use the super method to list all the files in an S3 bucket/key
files = super(S3ToGoogleCloudStorageOperator, self).execute(context)

gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.dest_gcs_conn_id,
delegate_to=self.delegate_to)

if not self.replace:
# if we are not replacing -> list all files in the GCS bucket
# and only keep those files which are present in
# S3 and not in Google Cloud Storage
bucket_name, object_prefix = _parse_gcs_url(self.dest_gcs)
existing_files_prefixed = gcs_hook.list(
bucket_name, prefix=object_prefix)

existing_files = []

if existing_files_prefixed:
# Remove the object prefix itself, an empty directory was found
if object_prefix in existing_files_prefixed:
existing_files_prefixed.remove(object_prefix)

# Remove the object prefix from all object string paths
for f in existing_files_prefixed:
if f.startswith(object_prefix):
existing_files.append(f[len(object_prefix):])
else:
existing_files.append(f)

files = set(files) - set(existing_files)
if len(files) > 0:
self.log.info('{0} files are going to be synced: {1}.'.format(
len(files), files))
else:
self.log.info(
'There are no new files to sync. Have a nice day!')

if files:
hook = S3Hook(aws_conn_id=self.aws_conn_id)

for file in files:
# GCS hook builds its own in-memory file so we have to create
# and pass the path
file_object = hook.get_key(file, self.bucket)
with NamedTemporaryFile(mode='wb', delete=True) as f:
file_object.download_fileobj(f)
f.flush()

dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(
self.dest_gcs)
# There will always be a '/' before file because it is
# enforced at instantiation time
dest_gcs_object = dest_gcs_object_prefix + file

# Sync is sequential and the hook already logs too much
# so skip this for now
# self.log.info(
# 'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
# ' as object {3}'.format(file, self.bucket,
# dest_gcs_bucket,
# dest_gcs_object))

gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name)

self.log.info(
"All done, uploaded %d files to Google Cloud Storage",
len(files))
else:
self.log.info(
'In sync, no files needed to be uploaded to Google Cloud'
'Storage')

return files

# Following functionality may be better suited in
# airflow/contrib/hooks/gcs_hook.py
def _gcs_object_is_directory(self, object):
bucket, blob = _parse_gcs_url(object)

return blob.endswith('/')
1 change: 1 addition & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ Operators
.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubPublishOperator
.. autoclass:: airflow.contrib.operators.qubole_operator.QuboleOperator
.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator
.. autoclass:: airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator
.. autoclass:: airflow.contrib.operators.sftp_operator.SFTPOperator
.. autoclass:: airflow.contrib.operators.slack_webhook_operator.SlackWebhookOperator
.. autoclass:: airflow.contrib.operators.snowflake_operator.SnowflakeOperator
Expand Down
34 changes: 21 additions & 13 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,18 @@ EmrHook
AWS S3
''''''

- :ref:`S3ListOperator` : Lists the files matching a key prefix from a S3 location.
- :ref:`S3Hook` : Interact with AWS S3.
- :ref:`S3FileTransformOperator` : Copies data from a source S3 location to a temporary location on the local filesystem.
- :ref:`S3ListOperator` : Lists the files matching a key prefix from a S3 location.
- :ref:`S3ToGoogleCloudStorageOperator` : Syncs an S3 location with a Google Cloud Storage bucket.
- :ref:`S3ToHiveTransfer` : Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table.
- :ref:`S3Hook` : Interact with AWS S3.

.. _S3ListOperator:
.. _S3Hook:

S3ListOperator
""""""""""""""
S3Hook
""""""

.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator
.. autoclass:: airflow.hooks.S3_hook.S3Hook

.. _S3FileTransformOperator:

Expand All @@ -214,20 +215,27 @@ S3FileTransformOperator

.. autoclass:: airflow.operators.s3_file_transform_operator.S3FileTransformOperator

.. _S3ListOperator:

S3ListOperator
""""""""""""""

.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator

.. _S3ToGoogleCloudStorageOperator:

S3ToGoogleCloudStorageOperator
""""""""""""""""""""""""""""""

.. autoclass:: airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator

.. _S3ToHiveTransfer:

S3ToHiveTransfer
""""""""""""""""

.. autoclass:: airflow.operators.s3_to_hive_operator.S3ToHiveTransfer

.. _S3Hook:

S3Hook
"""""""

.. autoclass:: airflow.hooks.S3_hook.S3Hook


AWS EC2 Container Service
'''''''''''''''''''''''''
Expand Down
96 changes: 96 additions & 0 deletions tests/contrib/operators/test_s3_to_gcs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

from airflow.contrib.hooks.gcs_hook import _parse_gcs_url
from airflow.contrib.operators.s3_to_gcs_operator import \
S3ToGoogleCloudStorageOperator

try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None

TASK_ID = 'test-s3-gcs-operator'
S3_BUCKET = 'test-bucket'
S3_PREFIX = 'TEST'
S3_DELIMITER = '/'
GCS_PATH_PREFIX = 'gs://gcs-bucket/data/'
MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"]
AWS_CONN_ID = 'aws_default'
GCS_CONN_ID = 'google_cloud_default'


class S3ToGoogleCloudStorageOperatorTest(unittest.TestCase):
def test_init(self):
"""Test S3ToGoogleCloudStorageOperator instance is properly initialized."""

operator = S3ToGoogleCloudStorageOperator(
task_id=TASK_ID,
bucket=S3_BUCKET,
prefix=S3_PREFIX,
delimiter=S3_DELIMITER,
dest_gcs_conn_id=GCS_CONN_ID,
dest_gcs=GCS_PATH_PREFIX)

self.assertEqual(operator.task_id, TASK_ID)
self.assertEqual(operator.bucket, S3_BUCKET)
self.assertEqual(operator.prefix, S3_PREFIX)
self.assertEqual(operator.delimiter, S3_DELIMITER)
self.assertEqual(operator.dest_gcs_conn_id, GCS_CONN_ID)
self.assertEqual(operator.dest_gcs, GCS_PATH_PREFIX)

@mock.patch('airflow.contrib.operators.s3_to_gcs_operator.S3Hook')
@mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook')
@mock.patch(
'airflow.contrib.operators.s3_to_gcs_operator.GoogleCloudStorageHook')
def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook):
"""Test the execute function when the run is successful."""

operator = S3ToGoogleCloudStorageOperator(
task_id=TASK_ID,
bucket=S3_BUCKET,
prefix=S3_PREFIX,
delimiter=S3_DELIMITER,
dest_gcs_conn_id=GCS_CONN_ID,
dest_gcs=GCS_PATH_PREFIX)

s3_one_mock_hook.return_value.list_keys.return_value = MOCK_FILES
s3_two_mock_hook.return_value.list_keys.return_value = MOCK_FILES

def _assert_upload(bucket, object, tmp_filename):
gcs_bucket, gcs_object_path = _parse_gcs_url(GCS_PATH_PREFIX)

self.assertEqual(gcs_bucket, bucket)
self.assertIn(object[len(gcs_object_path):], MOCK_FILES)

gcs_mock_hook.return_value.upload.side_effect = _assert_upload

uploaded_files = operator.execute(None)

s3_one_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
s3_two_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
gcs_mock_hook.assert_called_once_with(
google_cloud_storage_conn_id=GCS_CONN_ID, delegate_to=None)

# we expect MOCK_FILES to be uploaded
self.assertEqual(sorted(MOCK_FILES), sorted(uploaded_files))


if __name__ == '__main__':
unittest.main()

0 comments on commit 34f827f

Please sign in to comment.