Add S3CopyPrefixOperator to copy all objects under a prefix#68946
Conversation
f122c71 to
0e560de
Compare
o-nikolas
left a comment
There was a problem hiding this comment.
Looks quite good, thanks for the contribution!
In the future we might want to parallelize the copy since prefixes may include thousands or millions of objects depending on the situation. But this is a great start.
75b71de to
25e826f
Compare
7f2f917 to
1cc3ef8
Compare
cfe4257 to
ec1db49
Compare
|
Thank you all for taking the time to check this out :) @o-nikolas does you approval mean we're good to go or would you still prefer if I address your comments? |
I think they are nice to have, so if you have a bit of time to address them, that would be awesome :) |
|
Thanks for the heads up, makes sense @vincbeck I just pushed another commit with the adjustments. |
ea39882 to
6cfd917
Compare
S3CopyObjectOperator handles a single object at a time. Users who need to copy all objects sharing a prefix must implement their own pagination, error handling, and encryption support. This operator encapsulates that pattern so it can be used directly in a Dag.
Chain RuntimeError from the original exception to preserve the traceback. Verify the succeeding object is attempted when continue_on_failure=True.
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
…8946) S3CopyObjectOperator handles a single object at a time. Users who need to copy all objects sharing a prefix must implement their own pagination, error handling, and encryption support. This operator encapsulates that pattern so it can be used directly in a Dag.
Description
This PR introduces a new
S3CopyPrefixOperatorthat enables copying all S3 objects under a specified prefix from a source bucket to a destination bucket. This operator fills a gap in the current S3 operators by providing prefix-based bulk copy functionality.What does this operator do?
• Copies all objects matching a specified prefix from source to destination S3 bucket
• Supports cross-bucket copying
• Provides configurable error handling (continue on failure or stop on first error)
• Integrates with OpenLineage for data lineage tracking
• Supports Airflow templating for dynamic parameter values
Why is this needed?
Currently, Airflow's S3 operators allow copying individual objects. For use cases involving copying entire "directory" structures or large numbers of objects sharing a common prefix, users must implement custom solutions or use multiple operator instances.
This operator provides a native, efficient solution for prefix-based bulk operations.
Key Features
• Error Handling: Configurable
continue_on_failureparameter for resilient operations• Template Fields: All dynamic parameters support Jinja templating
• OpenLineage Integration: Automatic data lineage tracking for copied objects
• Standard Exception Handling: Uses RuntimeError instead of AirflowException per project conventions
Testing
Includes 10 new unit tests (11 test cases) covering:
s3://URL inputs and invalid bucket/URL combinationscontinue_on_failurebehaviours3://URL variants)• System test integration in
providers/amazon/tests/system/amazon/aws/example_s3.py• All tests pass in Breeze testing environment
Usage Example
Checklist
• [x] Tests included (10 comprehensive unit tests)
• [x] Documentation updated
• [x] Code follows project coding standards
• [x] All static code checks pass
• [x] Apache license headers added
• [x] PR is focused on single feature
• [x] Local tests pass
• [x] No unrelated changes included
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (claude-sonnet-4-6) following the guidelines