From 47e610d7cbbea9823da7fe618d6758a362b35c66 Mon Sep 17 00:00:00 2001 From: Alex Revetchi Date: Tue, 21 Sep 2021 23:17:41 +0100 Subject: [PATCH 1/2] Add --marker option to start listing from a specified key --- S3/Config.py | 1 + S3/S3.py | 10 +++++++--- s3cmd | 7 ++++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/S3/Config.py b/S3/Config.py index 8bafce44..cbbdabf7 100644 --- a/S3/Config.py +++ b/S3/Config.py @@ -166,6 +166,7 @@ class Config(object): delete_after_fetch = False max_delete = -1 limit = -1 + marker = u'' _doc['delete_removed'] = u"[sync] Remove remote S3 objects when local file has been deleted" delay_updates = False # OBSOLETE gpg_passphrase = u"" diff --git a/S3/S3.py b/S3/S3.py index 4d60eba4..c2a80ba7 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -327,10 +327,10 @@ def list_all_buckets(self): response["list"] = getListFromXml(response["data"], "Bucket") return response - def bucket_list(self, bucket, prefix = None, recursive = None, uri_params = None, limit = -1): + def bucket_list(self, bucket, prefix = None, recursive = None, uri_params = None, marker=None, limit = -1): item_list = [] prefixes = [] - for truncated, dirs, objects in self.bucket_list_streaming(bucket, prefix, recursive, uri_params, limit): + for truncated, dirs, objects in self.bucket_list_streaming(bucket, prefix, recursive, uri_params, marker, limit): item_list.extend(objects) prefixes.extend(dirs) @@ -340,7 +340,7 @@ def bucket_list(self, bucket, prefix = None, recursive = None, uri_params = None response['truncated'] = truncated return response - def bucket_list_streaming(self, bucket, prefix = None, recursive = None, uri_params = None, limit = -1): + def bucket_list_streaming(self, bucket, prefix = None, recursive = None, uri_params = None, marker=None, limit = -1): """ Generator that produces , pairs of groups of content of a specified bucket. """ def _list_truncated(data): ## can either be "true" or "false" or be missing completely @@ -363,6 +363,10 @@ def _get_next_marker(data, current_list): num_objects = 0 num_prefixes = 0 max_keys = limit + + if marker: + uri_params['marker']=marker + while truncated: response = self.bucket_list_noparse(bucket, prefix, recursive, uri_params, max_keys) diff --git a/s3cmd b/s3cmd index 77b4e74f..9cb0e5dc 100755 --- a/s3cmd +++ b/s3cmd @@ -157,7 +157,7 @@ def cmd_ls(args): if len(args) > 0: uri = S3Uri(args[0]) if uri.type == "s3" and uri.has_bucket(): - subcmd_bucket_list(s3, uri, cfg.limit) + subcmd_bucket_list(s3, uri, cfg.marker, cfg.limit) return EX_OK # If not a s3 type uri or no bucket was provided, list all the buckets @@ -183,7 +183,7 @@ def cmd_all_buckets_list_all_content(args): output(u"") return EX_OK -def subcmd_bucket_list(s3, uri, limit): +def subcmd_bucket_list(s3, uri, marker, limit): cfg = Config() bucket = uri.bucket() @@ -193,7 +193,7 @@ def subcmd_bucket_list(s3, uri, limit): if prefix.endswith('*'): prefix = prefix[:-1] try: - response = s3.bucket_list(bucket, prefix = prefix, limit = limit) + response = s3.bucket_list(bucket, prefix = prefix, marker=marker, limit = limit) except S3Error as e: if e.info["Code"] in S3.codes: error(S3.codes[e.info["Code"]] % bucket) @@ -2769,6 +2769,7 @@ def main(): optparser.add_option( "--delay-updates", dest="delay_updates", action="store_true", help="*OBSOLETE* Put all updated files into place at end [sync]") # OBSOLETE optparser.add_option( "--max-delete", dest="max_delete", action="store", help="Do not delete more than NUM files. [del] and [sync]", metavar="NUM") optparser.add_option( "--limit", dest="limit", action="store", help="Limit number of objects returned in the response body (only for [ls] and [la] commands)", metavar="NUM") + optparser.add_option( "--marker", dest="marker", action="store", help="Marker is where you want S3 to start listing from. S3 starts listing after this specified key. Marker can be any key in the bucket.") optparser.add_option( "--add-destination", dest="additional_destinations", action="append", help="Additional destination for parallel uploads, in addition to last arg. May be repeated.") optparser.add_option( "--delete-after-fetch", dest="delete_after_fetch", action="store_true", help="Delete remote objects after fetching to local file (only for [get] and [sync] commands).") optparser.add_option("-p", "--preserve", dest="preserve_attrs", action="store_true", help="Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.") From e5a4e538cbc7ab43862a6e8f75b55d9b6774b12d Mon Sep 17 00:00:00 2001 From: Alex Revetchi Date: Wed, 22 Sep 2021 00:47:10 +0100 Subject: [PATCH 2/2] Use streaming API, as it fails on big buckets, eventually it runs out of memory --- s3cmd | 86 ++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/s3cmd b/s3cmd index 77b4e74f..5b748f7a 100755 --- a/s3cmd +++ b/s3cmd @@ -157,7 +157,7 @@ def cmd_ls(args): if len(args) > 0: uri = S3Uri(args[0]) if uri.type == "s3" and uri.has_bucket(): - subcmd_bucket_list(s3, uri, cfg.limit) + subcmd_bucket_list(s3, uri, cfg.marker, cfg.limit) return EX_OK # If not a s3 type uri or no bucket was provided, list all the buckets @@ -183,7 +183,7 @@ def cmd_all_buckets_list_all_content(args): output(u"") return EX_OK -def subcmd_bucket_list(s3, uri, limit): +def subcmd_bucket_list(s3, uri, marker, limit): cfg = Config() bucket = uri.bucket() @@ -192,12 +192,6 @@ def subcmd_bucket_list(s3, uri, limit): debug(u"Bucket 's3://%s':" % bucket) if prefix.endswith('*'): prefix = prefix[:-1] - try: - response = s3.bucket_list(bucket, prefix = prefix, limit = limit) - except S3Error as e: - if e.info["Code"] in S3.codes: - error(S3.codes[e.info["Code"]] % bucket) - raise # md5 are 32 char long, but for multipart there could be a suffix if Config().human_readable_sizes: @@ -214,38 +208,51 @@ def subcmd_bucket_list(s3, uri, limit): else: format_string = u"%(timestamp)16s %(size)s %(uri)s" - for prefix in response['common_prefixes']: - output(format_string % { - "timestamp": "", - "size": dir_str, - "md5": "", - "storageclass": "", - "uri": uri.compose_uri(bucket, prefix["Prefix"])}) - - for object in response["list"]: - md5 = object.get('ETag', '').strip('"\'') - storageclass = object.get('StorageClass','') - - if cfg.list_md5: - if '-' in md5: # need to get md5 from the object - object_uri = uri.compose_uri(bucket, object["Key"]) - info_response = s3.object_info(S3Uri(object_uri)) - try: - md5 = info_response['s3cmd-attrs']['md5'] - except KeyError: - pass + truncated = False + common_prefixes = [] + + try: + for _trunkated, _common_prefixes, _objects in s3.bucket_list_streaming(bucket, prefix = prefix, marker=marker, limit = limit): + truncated = _trunkated + + if common_prefixes != _common_prefixes: + common_prefixes = _common_prefixes + for prefix in common_prefixes: + output(format_string % { + "timestamp": "", + "size": dir_str, + "md5": "", + "storageclass": "", + "uri": uri.compose_uri(bucket, prefix["Prefix"])}) + + for object in _objects: + md5 = object.get('ETag', '').strip('"\'') + storageclass = object.get('StorageClass','') + + if cfg.list_md5: + if '-' in md5: # need to get md5 from the object + object_uri = uri.compose_uri(bucket, object["Key"]) + info_response = s3.object_info(S3Uri(object_uri)) + try: + md5 = info_response['s3cmd-attrs']['md5'] + except KeyError: + pass + + size_and_coeff = formatSize(object["Size"], + Config().human_readable_sizes) + output(format_string % { + "timestamp": formatDateTime(object["LastModified"]), + "size" : format_size % size_and_coeff, + "md5" : md5, + "storageclass" : storageclass, + "uri": uri.compose_uri(bucket, object["Key"]), + }) + except S3Error as e: + if e.info["Code"] in S3.codes: + error(S3.codes[e.info["Code"]] % bucket) + raise - size_and_coeff = formatSize(object["Size"], - Config().human_readable_sizes) - output(format_string % { - "timestamp": formatDateTime(object["LastModified"]), - "size" : format_size % size_and_coeff, - "md5" : md5, - "storageclass" : storageclass, - "uri": uri.compose_uri(bucket, object["Key"]), - }) - - if response["truncated"]: + if truncated: warning(u"The list is truncated because the settings limit was reached.") def cmd_bucket_create(args): @@ -2769,6 +2776,7 @@ def main(): optparser.add_option( "--delay-updates", dest="delay_updates", action="store_true", help="*OBSOLETE* Put all updated files into place at end [sync]") # OBSOLETE optparser.add_option( "--max-delete", dest="max_delete", action="store", help="Do not delete more than NUM files. [del] and [sync]", metavar="NUM") optparser.add_option( "--limit", dest="limit", action="store", help="Limit number of objects returned in the response body (only for [ls] and [la] commands)", metavar="NUM") + optparser.add_option( "--marker", dest="marker", action="store", help="Marker is where you want S3 to start listing from. S3 starts listing after this specified key. Marker can be any key in the bucket.") optparser.add_option( "--add-destination", dest="additional_destinations", action="append", help="Additional destination for parallel uploads, in addition to last arg. May be repeated.") optparser.add_option( "--delete-after-fetch", dest="delete_after_fetch", action="store_true", help="Delete remote objects after fetching to local file (only for [get] and [sync] commands).") optparser.add_option("-p", "--preserve", dest="preserve_attrs", action="store_true", help="Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.")