-
-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathinfra.py
167 lines (131 loc) · 5.62 KB
/
infra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""Create folder structure for index."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import re
from typing import Final
from awesomeversion import AwesomeVersion
from packaging.tags import Tag
from packaging.utils import NormalizedName, canonicalize_name, parse_wheel_filename
import requests
from .wheel import check_abi_platform
_RE_REQUIREMENT: Final = re.compile(
r"(?P<package>.+)(?:==|>|<|<=|>=|~=)(?P<version>.+)"
)
_RE_PACKAGE_INDEX: Final = re.compile(r"\"(.+\.whl)\"")
_MUSLLINUX: Final = "musllinux"
@dataclass
class WhlPackage:
"""Represent a wheel information from index."""
name: NormalizedName
version: AwesomeVersion
tags: frozenset[Tag]
def create_wheels_folder(base_folder: Path) -> Path:
"""Create index structure."""
wheels_dir = Path(base_folder, _MUSLLINUX)
wheels_dir.mkdir(parents=True, exist_ok=True)
return wheels_dir
def create_wheels_index(base_index: str) -> str:
"""Create wheels specific URL with a PEP 503 index."""
return f"{base_index}/{_MUSLLINUX}-index/"
def create_wheels_list(base_index: str) -> str:
"""Create wheels specific URL that has a list of all wheel files."""
return f"{base_index}/{_MUSLLINUX}/"
def create_package_map(packages: list[str]) -> dict[NormalizedName, AwesomeVersion]:
"""Create a dictionary from package base name to package and version string."""
results: dict[NormalizedName, AwesomeVersion] = {}
for package in packages:
find = _RE_REQUIREMENT.match(package)
if not find:
continue
package = canonicalize_name(find["package"])
version = AwesomeVersion(find["version"])
results[package] = version
return results
def extract_packages_from_index(index: str) -> dict[NormalizedName, list[WhlPackage]]:
"""Extract packages from index which match the supported."""
available_data = requests.get(index, allow_redirects=True, timeout=60).text
result: dict[NormalizedName, list[WhlPackage]] = {}
for wheel_name in _RE_PACKAGE_INDEX.finditer(available_data):
name, version, _build_tag, tags = parse_wheel_filename(wheel_name[1])
package = WhlPackage(name, AwesomeVersion(str(version)), tags)
for tag in package.tags:
if check_abi_platform(tag.abi, tag.platform):
break
else:
continue
result.setdefault(package.name, []).append(package)
return result
def extract_package_names_from_wheels(
wheels_dir: Path,
) -> dict[NormalizedName, list[Path]]:
"""Map wheel paths to normalized package names."""
result: dict[NormalizedName, list[Path]] = {}
for wheel in wheels_dir.glob("*.whl"):
name, _, _, _ = parse_wheel_filename(wheel.name)
result.setdefault(name, []).append(wheel)
return result
def check_existing_packages(
package_index: dict[NormalizedName, list[WhlPackage]],
package_map: dict[NormalizedName, AwesomeVersion],
) -> set[NormalizedName]:
"""Return the set of package names that already exist in the index."""
found: set[NormalizedName] = set()
for package, version in package_map.items():
if package in package_index and any(
sub_package.version == version for sub_package in package_index[package]
):
found.add(package)
return found
def check_available_binary(
package_index: dict[NormalizedName, list[WhlPackage]],
skip_binary: str,
packages: list[str],
constraints: list[str],
) -> str:
"""Check if binary exists and ignore this skip."""
if skip_binary == ":none:":
return skip_binary
list_binary = list(map(canonicalize_name, skip_binary.split(";")))
# Map of package basename to the desired package version
package_map = create_package_map(packages + constraints)
# View of package map limited to packages in --skip-binary
binary_package_map: dict[NormalizedName, AwesomeVersion] = {}
for binary in list_binary:
if not (version := package_map.get(binary)):
print(
f"Skip binary '{binary}' not in packages/constraints; Can't determine desired version",
flush=True,
)
continue
binary_package_map[binary] = version
print(f"Checking if binaries already exist for packages {binary_package_map}")
list_found = check_existing_packages(package_index, binary_package_map)
print(f"Packages already exist: {list_found}")
list_needed = binary_package_map.keys() - list_found
# Generate needed list of skip binary
if not list_needed:
return ":none:"
print(f"Will force binary build for {list_needed}")
return ",".join(list_needed)
def remove_local_wheels(
package_index: dict[NormalizedName, list[WhlPackage]],
skip_exists: str,
packages: list[str],
wheels_dir: Path,
) -> None:
"""Remove existing wheels if they already exist in the index to avoid syncing."""
package_map = create_package_map(packages)
list_exists = list(map(canonicalize_name, skip_exists.split(";")))
binary_package_map = {
name: package_map[name] for name in list_exists if name in package_map
}
print(f"Checking if binaries already exist for packages {binary_package_map}")
exists = check_existing_packages(package_index, binary_package_map)
wheel_map = extract_package_names_from_wheels(wheels_dir)
for binary in exists:
version = binary_package_map[binary]
print(f"Found existing wheels for {binary}, removing local copy {version}")
for wheel in wheel_map.get(binary, ()):
print(f"Removing local wheel {wheel}")
wheel.unlink()