1
1
"""S3 Mirror pathing management."""
2
2
from __future__ import annotations
3
+ import concurrent .futures
3
4
from typing import Callable , Dict , List
4
5
from pathlib import Path
6
+ from tqdm import tqdm
7
+ import shutil
8
+ import psutil
5
9
from S3MP .global_config import S3MPConfig
6
10
from S3MP .keys import KeySegment , get_matching_s3_keys
7
11
from S3MP .utils .local_file_utils import (
@@ -142,7 +146,7 @@ def get_sibling(self, sibling_name: str) -> MirrorPath:
142
146
)
143
147
144
148
def get_child (self , child_name : str ) -> MirrorPath :
145
- """Get a file with the same parent as this file."""
149
+ """Get a child of this file."""
146
150
return self .replace_key_segments_at_relative_depth ([KeySegment (1 , child_name )])
147
151
148
152
def get_children_on_s3 (self ) -> List [MirrorPath ]:
@@ -208,11 +212,64 @@ def save_local(
208
212
if upload :
209
213
self .upload_from_mirror (overwrite )
210
214
215
+ def copy_to_mp_s3_only (self , dest_mp : MirrorPath ):
216
+ """Copy this file from S3 to a destination on S3."""
217
+ S3MPConfig .s3_client .copy_object (
218
+ CopySource = {"Bucket" : S3MPConfig .default_bucket_key , "Key" : self .s3_key },
219
+ Bucket = S3MPConfig .default_bucket_key ,
220
+ Key = dest_mp .s3_key ,
221
+ )
222
+
223
+ def copy_to_mp_mirror_only (self , dest_mp : MirrorPath ):
224
+ """Copy this file from the mirror to a destination on the mirror."""
225
+ shutil .copy (self .local_path , dest_mp .local_path )
226
+
227
+ def copy_to_mp (self , dest_mp : MirrorPath , use_mirror_as_src : bool = False ):
228
+ """Copy this file to a destination, on S3 and in the mirror.
229
+
230
+ By default, assumes the S3 copy is the source of truth.
231
+ If use_mirror_as_src is True, assumes the mirror is the source of truth.
232
+ """
233
+ if use_mirror_as_src :
234
+ # If we're using the mirror as the source of truth, we copy the file
235
+ # to the dest mirror, then upload it to S3.
236
+ self .copy_to_mp_mirror_only (dest_mp )
237
+ dest_mp .upload_from_mirror (overwrite = True )
238
+ else :
239
+ # If we're using S3 as the source of truth, we copy the file from S3
240
+ # to the dest S3 path, then download it to the dest mirror.
241
+ self .copy_to_mp_s3_only (dest_mp )
242
+ dest_mp .download_to_mirror (overwrite = True )
243
+
244
+
211
245
def get_matching_s3_mirror_paths (
212
246
segments : List [KeySegment ]
213
247
):
214
248
"""Get matching S3 mirror paths."""
215
249
return [
216
250
MirrorPath .from_s3_key (key )
217
251
for key in get_matching_s3_keys (segments )
218
- ]
252
+ ]
253
+
254
+
255
+ def multithread_download_mps_to_mirror (
256
+ mps : list [MirrorPath ], overwrite : bool = False
257
+ ):
258
+ """Download a list of MirrorPaths to the local mirror."""
259
+ n_procs = psutil .cpu_count (logical = False )
260
+ proc_executor = concurrent .futures .ProcessPoolExecutor (max_workers = n_procs )
261
+ all_proc_futures : list [concurrent .futures .Future ] = []
262
+ pbar = tqdm (total = len (mps ), desc = "Downloading to mirror" ) # Init pbar
263
+ for mp in mps :
264
+ pf = proc_executor .submit (mp .download_to_mirror , overwrite = overwrite )
265
+ all_proc_futures .append (pf )
266
+
267
+ # Increment pbar as processes finish
268
+ for _ in concurrent .futures .as_completed (all_proc_futures ):
269
+ pbar .update (n = 1 )
270
+
271
+ all_proc_futures_except = [pf for pf in all_proc_futures if pf .exception ()]
272
+ for pf in all_proc_futures_except :
273
+ raise pf .exception ()
274
+
275
+ proc_executor .shutdown (wait = True )
0 commit comments