19
19
from multiprocessing import get_start_method , set_start_method
20
20
from pathlib import Path
21
21
import platform
22
+ from typing import Callable
23
+
24
+ from _manifest import PathMetadata , DigestAlgorithm , Hashed
22
25
23
26
# Use for testing while keeping disk size low.
24
27
allow_symlinks = False
@@ -111,7 +114,7 @@ def remove_prefix(text, prefix):
111
114
return text
112
115
113
116
114
- def validate_signature_path (model_path : Path , sig_path : Path ):
117
+ def _validate_signature_path (model_path : Path , sig_path : Path ):
115
118
if model_path .is_file ():
116
119
return
117
120
# Note: Only allow top-level folder to have the signature for simplicity.
@@ -131,7 +134,7 @@ def is_relative_to(p: Path, path_list: [Path]) -> bool:
131
134
class Serializer :
132
135
@staticmethod
133
136
# TODO: type of returned value.
134
- def _ordered_files (path : Path , ignorepaths : [Path ]) -> []:
137
+ def _ordered_files (path : Path , ignorepaths : [Path ], ignore_folder : bool = False ) -> []:
135
138
children : [Path ]
136
139
if path .is_file ():
137
140
children = [path ]
@@ -158,6 +161,9 @@ def _ordered_files(path: Path, ignorepaths: [Path]) -> []:
158
161
159
162
if not child .is_file () and not child .is_dir ():
160
163
raise ValueError (f"{ str (child )} is not a dir or file" )
164
+
165
+ if ignore_folder and child .is_dir ():
166
+ continue
161
167
162
168
# The recorded path must *not* contains the folder name,
163
169
# since users may rename it.
@@ -226,7 +232,7 @@ def _create_tasks(children: [], shard_size: int) -> [[]]:
226
232
227
233
@staticmethod
228
234
# TODO: type of tasks
229
- def _run_tasks (path : Path , chunk : int , tasks : []) -> bytes :
235
+ def _run_tasks (path : Path , chunk : int , tasks : [], fn : Callable [[], bytes ] ) -> bytes :
230
236
# See https://superfastpython.com/processpoolexecutor-in-python/
231
237
# NOTE: 32 = length of sha256 digest.
232
238
digest_len = 32
@@ -237,7 +243,7 @@ def _run_tasks(path: Path, chunk: int, tasks: []) -> bytes:
237
243
if platform .system () == "Linux" and get_start_method () != "fork" :
238
244
set_start_method ('fork' )
239
245
with ProcessPoolExecutor () as ppe :
240
- futures = [ppe .submit (Serializer . task , (path , chunk , task ))
246
+ futures = [ppe .submit (fn , (path , chunk , task ))
241
247
for task in tasks ]
242
248
results = [f .result () for f in futures ]
243
249
for i , result in enumerate (results ):
@@ -249,7 +255,7 @@ def _run_tasks(path: Path, chunk: int, tasks: []) -> bytes:
249
255
250
256
@staticmethod
251
257
# TODO: type of task_info.
252
- def task (task_info : []) :
258
+ def _task_v1 (task_info : any ) -> bytes :
253
259
# NOTE: we can get process info using:
254
260
# from multiprocessing import current_process
255
261
# worker = current_process()
@@ -303,7 +309,7 @@ def _serialize_v1(path: Path, chunk: int, shard: int, signature_path: Path,
303
309
raise ValueError (f"{ str (path )} is not a dir or file" )
304
310
305
311
# Validate the signature path.
306
- validate_signature_path (path , signature_path )
312
+ _validate_signature_path (path , signature_path )
307
313
308
314
# Children to hash.
309
315
children = Serializer ._ordered_files (path ,
@@ -317,11 +323,101 @@ def _serialize_v1(path: Path, chunk: int, shard: int, signature_path: Path,
317
323
# Share the computation of hashes.
318
324
# For simplicity, we pre-allocate the entire array that will hold
319
325
# the concatenation of all hashes.
320
- all_hashes = Serializer ._run_tasks (path , chunk , tasks )
326
+ all_hashes = Serializer ._run_tasks (path , chunk , tasks , Serializer . _task_v1 )
321
327
322
328
# Finally, we hash everything.
323
329
return hashlib .sha256 (bytes (all_hashes )).digest ()
324
330
331
+ @staticmethod
332
+ # TODO: type of task_info.
333
+ def _task_v2 (task_info : any ) -> bytes :
334
+ # NOTE: we can get process info using:
335
+ # from multiprocessing import current_process
336
+ # worker = current_process()
337
+ # print(f'Task {task_info},
338
+ # worker name={worker.name}, pid={worker.pid}', flush=True)
339
+ _ , chunk , (name , ty , start_pos , end_pos ) = task_info
340
+ # Only files are recorded.
341
+ if ty != "file" :
342
+ raise ValueError (f"internal: got a non-file path { name } " )
343
+
344
+ return Hasher ._node_file_compute_v1 (name ,
345
+ b'' , start_pos , end_pos , chunk )
346
+
347
+ @staticmethod
348
+ def _to_path_metadata (task_info : [any ], all_hashes : bytes ) -> [PathMetadata ]:
349
+ if not task_info :
350
+ raise ValueError ("internal: task_info is empty" )
351
+
352
+ paths : [PathMetadata ] = []
353
+ # Iterate over all tasks.
354
+ prev_task = task_info [0 ]
355
+ prev_i = 0
356
+ prev_name , _ , _ , _ = prev_task
357
+ for curr_i , curr_task in enumerate (task_info [1 :]):
358
+ curr_name , _ , _ , _ = curr_task
359
+ if prev_name == curr_name :
360
+ continue
361
+ # End of a group of sharded digests for the same file.
362
+ # NOTE: each digest is 32-byte long.
363
+ h = hashlib .sha256 (bytes (all_hashes [prev_i : curr_i + 32 ])).digest ()
364
+ paths += [PathMetadata (prev_name , Hashed (DigestAlgorithm .SHA256_P1 , h ))]
365
+ prev_i = curr_i
366
+ prev_name = curr_name
367
+
368
+ # Compute the digest for the last (unfinished) task.
369
+ if prev_i < len (task_info ):
370
+ h = hashlib .sha256 (bytes (all_hashes [prev_i :])).digest ()
371
+ paths += [PathMetadata (prev_name , Hashed (DigestAlgorithm .SHA256_P1 , h ))]
372
+ # paths += [PathMetadata("path/to/file1", Hashed(DigestAlgorithm.SHA256_P1, b'\abcdef1'))]
373
+ # paths += [PathMetadata("path/to/file2", Hashed(DigestAlgorithm.SHA256_P1, b'\abcdef2'))]
374
+ return paths
375
+
376
+ @staticmethod
377
+ def _serialize_v2 (path : Path , chunk : int , shard : int , signature_path : Path ,
378
+ ignorepaths : [Path ] = []) -> bytes :
379
+ if not path .exists ():
380
+ raise ValueError (f"{ str (path )} does not exist" )
381
+
382
+ if not allow_symlinks and path .is_symlink ():
383
+ raise ValueError (f"{ str (path )} is a symlink" )
384
+
385
+ if chunk < 0 :
386
+ raise ValueError (f"{ str (chunk )} is invalid" )
387
+
388
+ if not path .is_file () and not path .is_dir ():
389
+ raise ValueError (f"{ str (path )} is not a dir or file" )
390
+
391
+ # Validate the signature path.
392
+ _validate_signature_path (path , signature_path )
393
+
394
+ # Children to hash.
395
+ children = Serializer ._ordered_files (path ,
396
+ [signature_path ] + ignorepaths ,
397
+ True )
398
+
399
+ # We shard the computation by creating independent "tasks".
400
+ if shard < 0 :
401
+ raise ValueError (f"{ str (shard )} is invalid" )
402
+ tasks = Serializer ._create_tasks (children , shard )
403
+
404
+ # Share the computation of hashes.
405
+ # For simplicity, we pre-allocate the entire array that will hold
406
+ # the concatenation of all hashes.
407
+ all_hashes = Serializer ._run_tasks (path , chunk , tasks , Serializer ._task_v2 )
408
+
409
+ # Turn hashes into PathMedata
410
+ return Serializer ._to_path_metadata (tasks , all_hashes )
411
+
412
+ def serialize_v2 (path : Path , chunk : int , signature_path : Path ,
413
+ ignorepaths : [Path ] = []) -> [PathMetadata ]:
414
+ # NOTE: The shard size must be the same for all clients for
415
+ # compatibility. We could make it configurable; but in this
416
+ # case the signature file must contain the value used by the signer.
417
+ shard_size = 1000000000 # 1GB
418
+ return Serializer ._serialize_v2 (path , chunk , shard_size ,
419
+ signature_path , ignorepaths )
420
+
325
421
def serialize_v1 (path : Path , chunk : int , signature_path : Path ,
326
422
ignorepaths : [Path ] = []) -> bytes :
327
423
# NOTE: The shard size must be the same for all clients for
@@ -350,7 +446,7 @@ def serialize_v0(path: Path, chunk: int, signature_path: Path,
350
446
raise ValueError (f"{ str (path )} is not a dir" )
351
447
352
448
# Validate the signature path.
353
- validate_signature_path (path , signature_path )
449
+ _validate_signature_path (path , signature_path )
354
450
355
451
children = sorted ([x for x in path .iterdir ()
356
452
if x != signature_path and x not in ignorepaths ])
0 commit comments