1
1
from __future__ import annotations
2
2
3
3
import argparse
4
+ import base64
4
5
import email .parser
5
6
import email .policy
7
+ import hashlib
6
8
import logging
7
- import os
8
9
import pathlib
9
- import re
10
- import tempfile
11
-
12
- import wheel .cli .pack as whl_pck
13
- from wheel .cli .unpack import unpack as wheel_unpack
10
+ import shutil
11
+ import zipfile
14
12
15
13
from variantlib .api import VariantDescription
16
14
from variantlib .api import VariantProperty
17
15
from variantlib .api import set_variant_metadata
18
16
from variantlib .api import validate_variant
19
- from variantlib .constants import VARIANT_HASH_LEN
20
17
from variantlib .constants import WHEEL_NAME_VALIDATION_REGEX
21
18
from variantlib .errors import ValidationError
22
19
30
27
)
31
28
32
29
33
- def wheel_variant_pack (
34
- directory : str | pathlib .Path ,
35
- dest_dir : str | pathlib .Path ,
36
- variant_hash : str ,
37
- build_number : str | None = None ,
38
- ) -> str :
39
- """Repack a previously unpacked wheel directory into a new wheel file.
40
-
41
- The .dist-info/WHEEL file must contain one or more tags so that the target
42
- wheel file name can be determined.
43
-
44
- This function is heavily taken from:
45
- https://github.com/pypa/wheel/blob/main/src/wheel/_commands/pack.py#L14
46
-
47
- Minimal changes tried to be applied to make it work with the Variant Hash.
48
-
49
- :param directory: The unpacked wheel directory
50
- :param dest_dir: Destination directory (defaults to the current directory)
51
- :param variant_hash: The hash of the variant to be stored
52
- """
53
-
54
- # Input Validation
55
- variant_hash_pattern = rf"^[a-fA-F0-9]{{{ VARIANT_HASH_LEN } }}$"
56
- if not re .match (variant_hash_pattern , variant_hash ):
57
- raise ValidationError (f"Invalid Variant Hash Value `{ variant_hash } ` ..." )
58
-
59
- # Find the .dist-info directory
60
- dist_info_dirs = [
61
- fn
62
- for fn in os .listdir (directory ) # noqa: PTH208
63
- if os .path .isdir (os .path .join (directory , fn )) and whl_pck .DIST_INFO_RE .match (fn ) # noqa: PTH112, PTH118
64
- ]
65
- if len (dist_info_dirs ) > 1 :
66
- raise whl_pck .WheelError (
67
- f"Multiple .dist-info directories found in { directory } "
68
- )
69
- if not dist_info_dirs :
70
- raise whl_pck .WheelError (f"No .dist-info directories found in { directory } " )
71
-
72
- # Determine the target wheel filename
73
- dist_info_dir = dist_info_dirs [0 ]
74
- name_version = whl_pck .DIST_INFO_RE .match (dist_info_dir ).group ("namever" )
75
-
76
- # Read the tags and the existing build number from .dist-info/WHEEL
77
- wheel_file_path = os .path .join (directory , dist_info_dir , "WHEEL" ) # noqa: PTH118
78
- with open (wheel_file_path , "rb" ) as f : # noqa: PTH123
79
- info = whl_pck .BytesParser (policy = whl_pck .email .policy .compat32 ).parse (f )
80
- tags : list [str ] = info .get_all ("Tag" , [])
81
- existing_build_number = info .get ("Build" )
82
-
83
- if not tags :
84
- raise whl_pck .WheelError (
85
- f"No tags present in { dist_info_dir } /WHEEL; cannot determine target "
86
- f"wheel filename"
87
- )
88
-
89
- # Set the wheel file name and add/replace/remove the Build tag in .dist-info/WHEEL
90
- build_number = build_number if build_number is not None else existing_build_number
91
- if build_number is not None :
92
- del info ["Build" ]
93
- if build_number :
94
- info ["Build" ] = build_number
95
- name_version += "-" + build_number
96
-
97
- if build_number != existing_build_number :
98
- with open (wheel_file_path , "wb" ) as f : # noqa: PTH123
99
- whl_pck .BytesGenerator (f , maxheaderlen = 0 ).flatten (info )
100
-
101
- # Reassemble the tags for the wheel file
102
- tagline = whl_pck .compute_tagline (tags )
103
-
104
- # Repack the wheel
105
- wheel_path = os .path .join (dest_dir , f"{ name_version } -{ tagline } -{ variant_hash } .whl" ) # noqa: PTH118
106
- with whl_pck .WheelFile (wheel_path , "w" ) as wf :
107
- logging .info (
108
- "Repacking wheel as `%(wheel_path)s` ..." , {"wheel_path" : wheel_path }
109
- )
110
- wf .write_files (directory )
111
-
112
- return wheel_path
113
-
114
-
115
30
def make_variant (args : list [str ]) -> None :
116
31
parser = argparse .ArgumentParser (
117
32
prog = "make_variant" ,
@@ -165,7 +80,7 @@ def make_variant(args: list[str]) -> None:
165
80
166
81
# Input Validation - Wheel Filename is valid and non variant already.
167
82
wheel_info = WHEEL_NAME_VALIDATION_REGEX .match (input_filepath .name )
168
- if not wheel_info :
83
+ if wheel_info is None :
169
84
raise ValueError (f"{ input_filepath .name !r} is not a valid wheel filename." )
170
85
171
86
# Transform properties into a VariantDescription
@@ -185,45 +100,66 @@ def make_variant(args: list[str]) -> None:
185
100
f"{ ', ' .join (x .to_str () for x in vdesc_valid .unknown_properties )} "
186
101
)
187
102
188
- with tempfile . TemporaryDirectory () as _tmpdir :
189
- tempdir = pathlib . Path ( _tmpdir )
190
- wheel_unpack ( input_filepath , tempdir )
191
-
192
- wheel_dir = next ( tempdir . iterdir () )
103
+ # Determine output wheel filename
104
+ output_filepath = (
105
+ output_directory
106
+ / f" { wheel_info . group ( 'base_wheel_name' ) } - { vdesc . hexdigest } .whl"
107
+ )
193
108
194
- for _dir in wheel_dir .iterdir ():
195
- if _dir .is_dir () and _dir .name .endswith (".dist-info" ):
196
- distinfo_dir = _dir
197
- break
109
+ with zipfile .ZipFile (input_filepath , "r" ) as input_zip :
110
+ # First, find METADATA file
111
+ for filename in input_zip .namelist ():
112
+ components = filename .split ("/" , 2 )
113
+ if (
114
+ len (components ) == 2
115
+ and components [0 ].endswith (".dist-info" )
116
+ and components [1 ] == "METADATA"
117
+ ):
118
+ metadata_filename = filename .encode ()
119
+ with input_zip .open (filename , "r" ) as input_file :
120
+ # Parse the metadata
121
+ metadata_parser = email .parser .BytesParser ()
122
+ metadata = metadata_parser .parse (input_file )
123
+
124
+ # Update the metadata
125
+ set_variant_metadata (metadata , vdesc )
126
+
127
+ # Write the serialized metadata
128
+ new_metadata = metadata .as_bytes (policy = METADATA_POLICY )
129
+ break
198
130
else :
199
- raise FileNotFoundError ("Impossible to find the .dist-info directory." )
200
-
201
- if not (metadata_f := distinfo_dir / "METADATA" ).exists ():
202
- raise FileNotFoundError (metadata_f )
203
-
204
- with metadata_f .open (mode = "r+b" ) as file :
205
- # Parse the metadata
206
- metadata_parser = email .parser .BytesParser ()
207
- metadata = metadata_parser .parse (file )
208
-
209
- # Update the metadata
210
- set_variant_metadata (metadata , vdesc )
211
-
212
- # Move the file pointer to the beginning
213
- file .seek (0 )
214
-
215
- # Write back the serialized metadata
216
- file .write (metadata .as_bytes (policy = METADATA_POLICY ))
217
-
218
- # Truncate the file to remove any remaining old content
219
- file .truncate ()
220
-
221
- dest_whl_path = wheel_variant_pack (
222
- directory = wheel_dir ,
223
- dest_dir = output_directory ,
224
- variant_hash = vdesc .hexdigest ,
225
- )
226
-
227
- logger .info (
228
- "Variant Wheel Created: `%s`" , pathlib .Path (dest_whl_path ).resolve ()
229
- )
131
+ raise FileNotFoundError ("No *.dist-info/METADATA file found in wheel" )
132
+
133
+ with zipfile .ZipFile (output_filepath , "w" ) as output_zip :
134
+ for file_info in input_zip .infolist ():
135
+ components = file_info .filename .split ("/" , 2 )
136
+ with (
137
+ input_zip .open (file_info , "r" ) as input_file ,
138
+ output_zip .open (file_info , "w" ) as output_file ,
139
+ ):
140
+ if (
141
+ len (components ) != 2
142
+ or not components [0 ].endswith (".dist-info" )
143
+ or components [1 ] not in ("METADATA" , "RECORD" )
144
+ ):
145
+ shutil .copyfileobj (input_file , output_file )
146
+ elif components [1 ] == "METADATA" :
147
+ # Write the new metadata
148
+ output_file .write (new_metadata )
149
+ else :
150
+ # Update RECORD for the new metadata checksum
151
+ for line in input_file :
152
+ new_line = line
153
+ rec_filename , sha256 , size = line .split (b"," )
154
+ if rec_filename == metadata_filename :
155
+ new_sha256 = base64 .urlsafe_b64encode (
156
+ hashlib .sha256 (new_metadata ).digest ()
157
+ ).rstrip (b"=" )
158
+ new_line = (
159
+ f"{ rec_filename .decode ()} ,"
160
+ f"sha256={ new_sha256 .decode ()} ,"
161
+ f"{ len (new_metadata )} \n "
162
+ ).encode ()
163
+ output_file .write (new_line )
164
+
165
+ logger .info ("Variant Wheel Created: `%s`" , output_filepath .resolve ())
0 commit comments