Skip to content

Commit 5a8409c

Browse files
authored
Add S3CopyPrefixOperator to copy all objects under a prefix (#68946)
S3CopyObjectOperator handles a single object at a time. Users who need to copy all objects sharing a prefix must implement their own pagination, error handling, and encryption support. This operator encapsulates that pattern so it can be used directly in a Dag.
1 parent 930d96e commit 5a8409c

4 files changed

Lines changed: 430 additions & 0 deletions

File tree

providers/amazon/docs/operators/s3/s3.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,24 @@ Ensure the role or user has the necessary permissions to use the key.
132132
:start-after: [START howto_operator_s3_copy_object]
133133
:end-before: [END howto_operator_s3_copy_object]
134134

135+
.. _howto/operator:S3CopyPrefixOperator:
136+
137+
Copy Amazon S3 objects by prefix
138+
================================
139+
140+
To copy all Amazon S3 objects under a prefix from one bucket to another you can use
141+
:class:`~airflow.providers.amazon.aws.operators.s3.S3CopyPrefixOperator`.
142+
The Amazon S3 connection used here needs to have access to both source and destination bucket/prefix.
143+
You can also specify server-side encryption using AWS KMS if you do not want to use destination buckets default key.
144+
When using KMS, you must provide both the ``kms_key_id`` and ``kms_encryption_type`` parameters.
145+
Ensure the role or user has the necessary permissions to use the key.
146+
147+
.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_s3.py
148+
:language: python
149+
:dedent: 4
150+
:start-after: [START howto_operator_s3_copy_prefix]
151+
:end-before: [END howto_operator_s3_copy_prefix]
152+
135153
.. _howto/operator:S3DeleteObjectsOperator:
136154

137155
Delete Amazon S3 objects

providers/amazon/src/airflow/providers/amazon/aws/operators/s3.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,161 @@ def get_openlineage_facets_on_start(self):
385385
)
386386

387387

388+
class S3CopyPrefixOperator(AwsBaseOperator[S3Hook]):
389+
"""
390+
Creates a copy of all objects under a prefix already stored in S3.
391+
392+
Note: the S3 connection used here needs to have access to both
393+
source and destination bucket/prefix.
394+
395+
.. seealso::
396+
For more information on how to use this operator, take a look at the guide:
397+
:ref:`howto/operator:S3CopyPrefixOperator`
398+
399+
:param source_bucket_prefix: The prefix in the source bucket. (templated)
400+
It can be either full s3:// style url or relative path from root level.
401+
When it's specified as a full s3:// url, please omit source_bucket_name.
402+
:param dest_bucket_prefix: The prefix in the destination to copy to. (templated)
403+
The convention to specify `dest_bucket_prefix` is the same as `source_bucket_prefix`.
404+
:param source_bucket_name: Name of the S3 bucket where the source objects are in. (templated)
405+
It should be omitted when `source_bucket_prefix` is provided as a full s3:// url.
406+
:param dest_bucket_name: Name of the S3 bucket to where the objects are copied. (templated)
407+
It should be omitted when `dest_bucket_prefix` is provided as a full s3:// url.
408+
:param kms_key_id: The ARN, id or alias of the AWS KMS key to use for encrypting the destination object.
409+
Required if using KMS-based server-side encryption with a non-default key. (templated)
410+
:param kms_encryption_type: Type of KMS encryption to use for the object.
411+
Can be either "aws:kms" (standard KMS) or "aws:kms:dsse" (double-shielded KMS).
412+
:param continue_on_failure: If False, stop and fail the task on the first copy error.
413+
If True, try to copy every object in the prefix and then fail the task on any error.
414+
Default is False.
415+
:param acl_policy: String specifying the canned ACL policy for the file being
416+
uploaded to the S3 bucket.
417+
:param meta_data_directive: Whether to `COPY` the metadata from the source object or `REPLACE` it with
418+
metadata that's provided in the request.
419+
:param aws_conn_id: The Airflow connection used for AWS credentials.
420+
If this is ``None`` or empty then the default boto3 behaviour is used. If
421+
running Airflow in a distributed manner and aws_conn_id is None or
422+
empty, then default boto3 configuration would be used (and must be
423+
maintained on each worker node).
424+
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
425+
:param verify: Whether or not to verify SSL certificates. See:
426+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
427+
:param botocore_config: Configuration dictionary (key-values) for botocore client. See:
428+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
429+
"""
430+
431+
template_fields: Sequence[str] = aws_template_fields(
432+
"source_bucket_prefix",
433+
"dest_bucket_prefix",
434+
"source_bucket_name",
435+
"dest_bucket_name",
436+
"kms_key_id",
437+
)
438+
aws_hook_class = S3Hook
439+
440+
def __init__(
441+
self,
442+
*,
443+
source_bucket_prefix: str,
444+
dest_bucket_prefix: str,
445+
source_bucket_name: str | None = None,
446+
dest_bucket_name: str | None = None,
447+
kms_key_id: str | None = None,
448+
kms_encryption_type: str | None = None,
449+
continue_on_failure: bool = False,
450+
acl_policy: str | None = None,
451+
meta_data_directive: str | None = None,
452+
**kwargs,
453+
):
454+
super().__init__(**kwargs)
455+
456+
self.source_bucket_prefix = source_bucket_prefix
457+
self.dest_bucket_prefix = dest_bucket_prefix
458+
self.source_bucket_name = source_bucket_name
459+
self.dest_bucket_name = dest_bucket_name
460+
self.kms_key_id = kms_key_id
461+
self.kms_encryption_type = kms_encryption_type
462+
self.continue_on_failure = continue_on_failure
463+
self.acl_policy = acl_policy
464+
self.meta_data_directive = meta_data_directive
465+
466+
def execute(self, context: Context):
467+
source_bucket_name, source_bucket_prefix = self.hook.get_s3_bucket_key(
468+
self.source_bucket_name, self.source_bucket_prefix, "source_bucket_name", "source_bucket_prefix"
469+
)
470+
471+
dest_bucket_name, dest_bucket_prefix = self.hook.get_s3_bucket_key(
472+
self.dest_bucket_name, self.dest_bucket_prefix, "dest_bucket_name", "dest_bucket_prefix"
473+
)
474+
475+
s3_client = self.hook.get_conn()
476+
477+
paginator = s3_client.get_paginator("list_objects_v2")
478+
pages = paginator.paginate(
479+
Bucket=source_bucket_name,
480+
Prefix=source_bucket_prefix,
481+
)
482+
483+
copied_object_count = 0
484+
failed_object_count = 0
485+
for page in pages:
486+
if "Contents" in page:
487+
for obj in page["Contents"]:
488+
source_key = obj["Key"]
489+
dest_key = dest_bucket_prefix + source_key[len(source_bucket_prefix) :]
490+
491+
try:
492+
self.hook.copy_object(
493+
source_bucket_key=source_key,
494+
dest_bucket_key=dest_key,
495+
source_bucket_name=source_bucket_name,
496+
dest_bucket_name=dest_bucket_name,
497+
kms_key_id=self.kms_key_id,
498+
kms_encryption_type=self.kms_encryption_type,
499+
acl_policy=self.acl_policy,
500+
meta_data_directive=self.meta_data_directive,
501+
)
502+
503+
copied_object_count += 1
504+
except Exception as e:
505+
if self.continue_on_failure:
506+
self.log.error("Failed to copy %s: %s", source_key, e)
507+
failed_object_count += 1
508+
else:
509+
raise RuntimeError(f"Failed to copy {source_key}: {e}") from e
510+
511+
self.log.info("Successfully copied %s object(s)", copied_object_count)
512+
513+
if failed_object_count > 0:
514+
raise RuntimeError(f"Failed to copy {failed_object_count} object(s)")
515+
516+
def get_openlineage_facets_on_start(self):
517+
from airflow.providers.common.compat.openlineage.facet import Dataset
518+
from airflow.providers.openlineage.extractors import OperatorLineage
519+
520+
source_bucket_name, source_bucket_prefix = self.hook.get_s3_bucket_key(
521+
self.source_bucket_name, self.source_bucket_prefix, "source_bucket_name", "source_bucket_prefix"
522+
)
523+
524+
dest_bucket_name, dest_bucket_prefix = self.hook.get_s3_bucket_key(
525+
self.dest_bucket_name, self.dest_bucket_prefix, "dest_bucket_name", "dest_bucket_prefix"
526+
)
527+
528+
input_dataset = Dataset(
529+
namespace=f"s3://{source_bucket_name}",
530+
name=source_bucket_prefix,
531+
)
532+
output_dataset = Dataset(
533+
namespace=f"s3://{dest_bucket_name}",
534+
name=dest_bucket_prefix,
535+
)
536+
537+
return OperatorLineage(
538+
inputs=[input_dataset],
539+
outputs=[output_dataset],
540+
)
541+
542+
388543
class S3CreateObjectOperator(AwsBaseOperator[S3Hook]):
389544
"""
390545
Creates a new object from `data` as string or bytes.

providers/amazon/tests/system/amazon/aws/example_s3.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from airflow.providers.amazon.aws.operators.s3 import (
2222
S3CopyObjectOperator,
23+
S3CopyPrefixOperator,
2324
S3CreateBucketOperator,
2425
S3CreateObjectOperator,
2526
S3DeleteBucketOperator,
@@ -254,6 +255,16 @@ def check_fn(files: list, **kwargs) -> bool:
254255
)
255256
# [END howto_operator_s3_copy_object]
256257

258+
# [START howto_operator_s3_copy_prefix]
259+
copy_prefix = S3CopyPrefixOperator(
260+
task_id="copy_prefix",
261+
source_bucket_name=bucket_name,
262+
source_bucket_prefix=f"{env_id}-",
263+
dest_bucket_name=bucket_name_2,
264+
dest_bucket_prefix=f"{env_id}-copied-",
265+
)
266+
# [END howto_operator_s3_copy_prefix]
267+
257268
# [START howto_operator_s3_file_transform]
258269
file_transform = S3FileTransformOperator(
259270
task_id="file_transform",
@@ -321,6 +332,7 @@ def check_fn(files: list, **kwargs) -> bool:
321332
sensor_key_with_regex_deferrable,
322333
],
323334
copy_object,
335+
copy_prefix,
324336
file_transform,
325337
sensor_keys_unchanged,
326338
# TEST TEARDOWN

0 commit comments

Comments
 (0)