diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..b7f04efd --- /dev/null +++ b/setup.cfg @@ -0,0 +1,47 @@ +[flake8] +count = True +ignore = W503 +max-line-length = 120 +exclude = stubs/* +show-source = True +statistics = True + + +[mypy] +mypy_path = ./stubs/ + +python_version = 3.8 + +ignore_missing_imports = False + +disallow_any_generics = True + +disallow_untyped_defs = True +disallow_incomplete_defs = True +check_untyped_defs = True +disallow_untyped_decorators = True + +no_implicit_optional = True +strict_optional = True + +warn_redundant_casts = True +warn_unused_ignores = True +warn_no_return = True +warn_return_any = True +warn_unreachable = True + +show_none_errors = True +ignore_errors = False +disable_error_code=valid-type + +allow_untyped_globals = False +allow_redefinition = False +implicit_reexport = False +strict_equality = True + +show_error_context = False +show_column_numbers = True +show_error_codes = True +color_output = True +error_summary = True +pretty = True diff --git a/setup.py b/setup.py index c5eb4ce5..c6f190e0 100644 --- a/setup.py +++ b/setup.py @@ -1,50 +1,56 @@ -from setuptools import setup, find_packages -from os import path +#!/usr/bin/env python3 import platform +from pathlib import Path -modules = [] -packages = [ - "vsgenstubs", "vsgenstubs4" -] -requirements = [] -entrypoints = [ - "vsgenstubs=vsgenstubs:main", - "vsgenstubs4=vsgenstubs4:main", -] - -if platform.platform().startswith("Windows"): - modules.extend(["vsrepo", "vsrupdate"]) - entrypoints.extend([ - "vsrepo=vsrepo:noop", - "vsrupdate=vsrupdate:noop" - ]) - requirements.append("tqdm") - -here = path.abspath(path.dirname(__file__)) -with open(path.join(here, 'README.md'), encoding='utf-8') as f: - long_description = f.read() - -setup( - name="vsrepo", - version="50", - description="A simple package repository for VapourSynth.", - long_description = long_description, - long_description_content_type = "text/markdown", - url="http://www.vapoursynth.com/", - author = "Myrsloik", - packages=packages, - py_modules = modules, +import setuptools + +package_name = 'vsrepo' + +is_windows = platform.platform().startswith('Windows') + +packages = ['vsgenstubs', 'vsgenstubs4'] +modules = requirements = [] + +if is_windows: + modules = ['vsrepo', 'vsrupdate'] + requirements = ['tqdm'] + +entrypoints = [*packages, *modules] + + +setuptools.setup( + name=package_name, + version='50', + author='Myrsloik', + author_email='fredrik.mellbin@gmail.com', + description='A simple package repository for VapourSynth', + long_description=Path('README.md').read_text(), + long_description_content_type='text/markdown', + url='https://www.vapoursynth.com/', + project_urls={ + 'Issues': 'https://github.com/vapoursynth/vsrepo/issues', + 'Source': 'https://github.com/vapoursynth/vsrepo' + }, install_requires=requirements, + python_requires='>=3.8', + py_modules=modules, + packages=[ + *packages, *modules, 'utils' + ], include_package_data=True, package_data={ - package: ["*.pyi"] for package in packages + package: ['*.pyi', 'py.typed'] for package in packages }, - entry_points = { - 'console_scripts': entrypoints - }, - project_urls={ - "Issues": "https://github.com/vapoursynth/vsrepo/issues", - "Source": "https://github.com/vapoursynth/vsrepo" + classifiers=[ + 'Programming Language :: Python :: 3', + 'License :: OSI Approved :: MIT License', + 'Operating System :: OS Independent', + ], + entry_points={ + 'console_scripts': [ + f'{module}={module}:main' + for module in entrypoints + ] } ) diff --git a/vsgenstubs/py.typed b/vsgenstubs/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/vsgenstubs4/py.typed b/vsgenstubs4/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/vsrepo.py b/vsrepo.py deleted file mode 100644 index 0c271650..00000000 --- a/vsrepo.py +++ /dev/null @@ -1,988 +0,0 @@ -## MIT License -## -## Copyright (c) 2018-2020 Fredrik Mellbin -## -## Permission is hereby granted, free of charge, to any person obtaining a copy -## of this software and associated documentation files (the "Software"), to deal -## in the Software without restriction, including without limitation the rights -## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -## copies of the Software, and to permit persons to whom the Software is -## furnished to do so, subject to the following conditions: -## -## The above copyright notice and this permission notice shall be included in all -## copies or substantial portions of the Software. -## -## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -## IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -## AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -## SOFTWARE. - -import argparse -import base64 -import binascii -import csv -import email.utils -import glob -import hashlib -import importlib.util as imputil -import io -import json -import os -import os.path -import re -import subprocess -import sys -import tempfile -import urllib.request -import zipfile -from typing import Iterator, List, MutableMapping, Optional, Tuple - -try: - import winreg -except ImportError: - print('{} is only supported on Windows.'.format(__file__)) - sys.exit(1) - -try: - import tqdm # type: ignore -except ImportError: - pass - -bundled_api3_plugins = ['com.vapoursynth.avisource', 'com.vapoursynth.eedi3', 'com.vapoursynth.imwri', 'com.vapoursynth.misc', 'com.vapoursynth.morpho', 'com.vapoursynth.removegrainvs', 'com.vapoursynth.subtext', 'com.vapoursynth.vinverse', 'org.ivtc.v', 'com.nodame.histogram'] - - -def is_venv() -> bool: - return hasattr(sys, "real_prefix") or (hasattr(sys, "base_prefix") and sys.base_prefix != sys.prefix) - - -def detect_vapoursynth_installation() -> str: - try: - spec = imputil.find_spec("vapoursynth") - if spec is None: - raise ModuleNotFoundError() - except (ValueError, ModuleNotFoundError): - print("Could not detect vapoursynth.") - sys.exit(1) - - if not spec.has_location: - try: - import vapoursynth - except ImportError: - print("The vapoursynth-module could not be found or imported.") - else: - return vapoursynth.__file__ - if spec.origin is None: - print("VapourSynth's origin could not be determined.") - sys.exit(1) - return spec.origin - - -def is_sitepackage_install_portable() -> bool: - if args.portable: - return False - - vapoursynth_path = detect_vapoursynth_installation() - return os.path.exists(os.path.join(os.path.dirname(vapoursynth_path), 'portable.vs')) - -def is_sitepackage_install() -> bool: - if args.portable: - return False - - vapoursynth_path = detect_vapoursynth_installation() - base_path = os.path.dirname(vapoursynth_path) - - # We reside in a venv. - if is_venv(): - # VapourSynth has not been installed as a package. - # Assume no site-package install - if len(glob.glob(os.path.join(base_path, 'VapourSynth-*.dist-info'))) == 0: - return False - - if os.path.exists(os.path.join(base_path, "portable.vs")): - return True - - # Assume this is not a global install. - return False - - # We do not reside in a venv. - else: - # pip install vapoursynth-portable - # Install all packages to site-packages and treat them as packages. - if len(glob.glob(os.path.join(base_path, 'VapourSynth_portable-*.dist-info'))) > 0: - return True - - # This is a portable installation, this cannot be a site-package install. - if os.path.exists(os.path.join(base_path, "portable.vs")): - return False - - # This is a global install. Install dist-info files. - return True - -def get_vs_installation_site() -> str: - if is_venv(): - try: - return os.path.dirname(detect_vapoursynth_installation()) - except ImportError: - import setuptools - return os.path.dirname(os.path.dirname(setuptools.__file__)) - - import site - return site.getusersitepackages() - - -is_64bits: bool = sys.maxsize > 2**32 - -parser = argparse.ArgumentParser(description='A simple VapourSynth package manager') -parser.add_argument('operation', choices=['install', 'update', 'upgrade', 'upgrade-all', 'uninstall', 'installed', 'available', 'paths', "genstubs", "gendistinfo"]) -parser.add_argument('package', nargs='*', help='identifier, namespace or module to install, upgrade or uninstall') -parser.add_argument('-f', action='store_true', dest='force', help='force upgrade for packages where the current version is unknown') -parser.add_argument('-p', action='store_true', dest='portable', help='use paths suitable for portable installs') -parser.add_argument('-d', action='store_true', dest='skip_deps', help='skip installing dependencies') -parser.add_argument('-t', choices=['win32', 'win64'], default='win64' if is_64bits else 'win32', dest='target', help='binaries to install, defaults to python\'s architecture') -parser.add_argument('-b', dest='binary_path', help='custom binary install path') -parser.add_argument('-s', dest='script_path', help='custom script install path') -parser.add_argument("--stub-output-file", default="", help="Don't update the typestubs generated by vsrepo.") -parser.add_argument("--force-dist-info", action="store_true", default=False, help="") -args = parser.parse_args() - -is_64bits = args.target == 'win64' - -file_dirname: str = os.path.dirname(os.path.abspath(__file__)) - -# VSRepo is installed to the site-packages. -if os.path.abspath(file_dirname).startswith(os.path.abspath(sys.prefix)): - file_dirname = os.getcwd() - -if args.portable: - plugin32_path = os.path.join(file_dirname, 'vapoursynth32', 'plugins') - plugin64_path = os.path.join(file_dirname, 'vapoursynth64', 'plugins') -elif is_sitepackage_install_portable(): - vapoursynth_path = detect_vapoursynth_installation() - base_path = os.path.dirname(vapoursynth_path) - plugin32_path = os.path.join(base_path, 'vapoursynth32', 'plugins') - plugin64_path = os.path.join(base_path, 'vapoursynth64', 'plugins') - del vapoursynth_path -else: - pluginparent = [str(os.getenv("APPDATA")), 'VapourSynth'] - plugin32_path = os.path.join(*pluginparent, 'plugins32') - plugin64_path = os.path.join(*pluginparent, 'plugins64') - -if (args.operation in ['install', 'upgrade', 'uninstall']) and ((args.package is None) or len(args.package) == 0): - print('Package argument required for install, upgrade and uninstall operations') - sys.exit(1) - -package_json_path = os.path.join(file_dirname, 'vspackages3.json') if args.portable else os.path.join(*pluginparent, 'vsrepo', 'vspackages3.json') - -if args.force_dist_info or is_sitepackage_install(): - if is_venv(): - try: - import setuptools - site_package_dir: Optional[str] = os.path.dirname(os.path.dirname(setuptools.__file__)) - del setuptools - except ImportError: - site_package_dir = None - else: - import site - site_package_dir = site.getusersitepackages() -else: - site_package_dir = None - -py_script_path: str = file_dirname if args.portable else (site_package_dir if site_package_dir is not None else get_vs_installation_site()) -if args.script_path is not None: - py_script_path = args.script_path - - -plugin_path: str = plugin64_path if is_64bits else plugin32_path -if args.binary_path is not None: - plugin_path = args.binary_path - -os.makedirs(py_script_path, exist_ok=True) -os.makedirs(plugin_path, exist_ok=True) -os.makedirs(os.path.dirname(package_json_path), exist_ok=True) - - -cmd7zip_path: str = os.path.join(file_dirname, '7z.exe') -if not os.path.isfile(cmd7zip_path): - try: - with winreg.OpenKeyEx(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\7-Zip', reserved=0, access=winreg.KEY_READ) as regkey: - cmd7zip_path = os.path.join(winreg.QueryValueEx(regkey, 'Path')[0], '7z.exe') - except: - cmd7zip_path = '7z.exe' - -installed_packages: MutableMapping = {} -download_cache: MutableMapping = {} - -def fetch_ur1(url: str, desc: Optional[str] = None) -> bytearray: - with urllib.request.urlopen(url) as urlreq: - if ('tqdm' in sys.modules) and (urlreq.headers['content-length'] is not None): - size = int(urlreq.headers['content-length']) - remaining = size - data = bytearray() - with tqdm.tqdm(total=size, unit='B', unit_scale=True, unit_divisor=1024, desc=desc) as t: - while remaining > 0: - blocksize = min(remaining, 1024*128) - data.extend(urlreq.read(blocksize)) - remaining = remaining - blocksize - t.update(blocksize) - return data - else: - print('Fetching: ' + url) - return urlreq.read() - -def fetch_url_cached(url: str, desc: str = "") -> bytearray: - data = download_cache.get(url, None) - if data is None: - data = fetch_ur1(url, desc) - download_cache[url] = data - return data - -package_print_string = "{:25s} {:15s} {:11s} {:11s} {:s}" - -package_list: Optional[MutableMapping] = None -try: - with open(package_json_path, 'r', encoding='utf-8') as pl: - package_list = json.load(pl) - if package_list is None: - raise ValueError() - if package_list['file-format'] != 3: - print('Package definition format is {} but only version 3 is supported'.format(package_list['file-format'])) - raise ValueError() - package_list = package_list.get('packages') -except (OSError, FileExistsError, ValueError): - pass - -def check_hash(data: bytes, ref_hash: str) -> Tuple[bool, str, str]: - data_hash = hashlib.sha256(data).hexdigest() - return (data_hash == ref_hash, data_hash, ref_hash) - -def get_bin_name(p: MutableMapping): - if p['type'] == 'PyScript': - return 'script' - if p['type'] == 'PyWheel': - return 'wheel' - elif p['type'] == 'VSPlugin': - if is_64bits: - return 'win64' - else: - return 'win32' - else: - raise ValueError('Unknown install type') - -def get_install_path(p: MutableMapping) -> str: - if p['type'] == 'PyScript' or p['type'] == 'PyWheel': - return py_script_path - elif p['type'] == 'VSPlugin': - return plugin_path - else: - raise ValueError('Unknown install type') - -def get_package_from_id(id: str, required: bool = False) -> Optional[MutableMapping]: - if package_list is None: - return None - for p in package_list: - if p['identifier'] == id: - return p - if required: - raise ValueError(f'No package with the identifier {id} found') - return None - -def get_package_from_plugin_name(name: str, required: bool = False) -> Optional[MutableMapping]: - if package_list is None: - return None - for p in package_list: - if p['name'].casefold() == name.casefold(): - return p - if required: - raise ValueError(f'No package with the name {name} found') - return None - -def get_package_from_namespace(namespace: str, required: bool = False) -> Optional[MutableMapping]: - if package_list is None: - return None - for p in package_list: - if 'namespace' in p: - if p['namespace'] == namespace: - return p - if required: - raise ValueError(f'No package with the namespace {namespace} found') - return None - -def get_package_from_modulename(modulename: str, required: bool = False) -> Optional[MutableMapping]: - if package_list is None: - return None - for p in package_list: - if 'modulename' in p: - if p['modulename'] == modulename: - return p - if required: - raise ValueError(f'No package with the modulename {modulename} found') - return None - -def get_package_from_name(name: str) -> MutableMapping: - p = get_package_from_id(name) - if p is None: - p = get_package_from_namespace(name) - if p is None: - p = get_package_from_modulename(name) - if p is None: - p = get_package_from_plugin_name(name) - if p is None: - raise ValueError(f'Package {name} not found') - return p - -def is_package_installed(id: str) -> bool: - return id in installed_packages - -def is_package_upgradable(id: str, force: bool) -> bool: - pkg = get_package_from_id(id, True) - if pkg is None: - return False - lastest_installable = get_latest_installable_release(pkg) - if force: - return (is_package_installed(id) and (lastest_installable is not None) and (installed_packages[id] != lastest_installable['version'])) - else: - return (is_package_installed(id) and (lastest_installable is not None) and (installed_packages[id] != 'Unknown') and (installed_packages[id] != lastest_installable['version'])) - -def get_python_package_name(pkg: MutableMapping) -> str: - if "wheelname" in pkg: - return pkg["wheelname"].replace(".", "_").replace(" ", "_").replace("(", "_").replace(")", "") - else: - return pkg["name"].replace(".", "_").replace(" ", "_").replace("(", "_").replace(")", "") - -def find_dist_version(pkg: MutableMapping, path: Optional[str]) -> Optional[str]: - if path is None: - return None - - name = get_python_package_name(pkg) - versions: List[str] = [] - - for targetname in os.listdir(path): - if (targetname.startswith(f"{name}-") and targetname.endswith(".dist-info")): - # only bother with dist-info dirs that actually have a usable record in case a package uninstall failed to delete the dir - if os.path.isfile(os.path.join(path, targetname, 'RECORD')): - versions.append(targetname[len(name)+1:-10]) - - versions.sort(reverse=True) - return versions[0] if len(versions) > 0 else None - -def detect_installed_packages() -> None: - if package_list is not None: - for p in package_list: - dest_path = get_install_path(p) - if p['type'] == 'PyWheel': - version = find_dist_version(p, dest_path) - if version is not None: - installed_packages[p['identifier']] = version - else: - for v in p['releases']: - matched = True - exists = True - bin_name = get_bin_name(p) - if bin_name in v: - for f in v[bin_name]['files']: - try: - with open(os.path.join(dest_path, f), 'rb') as fh: - if not check_hash(fh.read(), v[bin_name]['files'][f][1])[0]: - matched = False - except FileNotFoundError: - exists = False - matched = False - if matched: - installed_packages[p['identifier']] = v['version'] - break - elif exists: - installed_packages[p['identifier']] = 'Unknown' - else: - print('No valid package definitions found. Run update command first!') - sys.exit(1) - -def print_package_status(p: MutableMapping) -> None: - lastest_installable = get_latest_installable_release(p) - name = p['name'] - if is_package_upgradable(p['identifier'], False): - name = '*' + name - elif is_package_upgradable(p['identifier'], True): - name = '+' + name - print(package_print_string.format(name, p['namespace'] if p['type'] == 'VSPlugin' else p['modulename'], installed_packages.get(p['identifier'], ""), lastest_installable.get('version') if lastest_installable is not None else '', p['identifier'])) - -def list_installed_packages() -> None: - print(package_print_string.format('Name', 'Namespace', 'Installed', 'Latest', 'Identifier')) - for id in installed_packages: - pkg = get_package_from_id(id, True) - if pkg is not None: - print_package_status(pkg) - -def list_available_packages() -> None: - print(package_print_string.format('Name', 'Namespace', 'Installed', 'Latest', 'Identifier')) - if package_list is None: - print("Nothing available to list, please try updating first.") - return - for p in package_list: - print_package_status(p) - -def get_latest_installable_release_with_index(p: MutableMapping) -> Tuple[int, Optional[MutableMapping]]: - max_api = get_vapoursynth_api_version() - package_api: int = 3 - if 'api' in p: - package_api = int(p['api']) - bin_name = get_bin_name(p) - for idx, rel in enumerate(p['releases']): - if not isinstance(rel, MutableMapping): - continue - if bin_name in rel: - bin_api: int = package_api - if 'api' in rel[bin_name]: - bin_api = int(rel[bin_name]['api']) - if bin_api <= max_api and bin_api >= 3: - return (idx, rel) - return (-1, None) - -def get_latest_installable_release(p: MutableMapping) -> Optional[MutableMapping]: - return get_latest_installable_release_with_index(p)[1] - -def can_install(p: MutableMapping) -> bool: - return get_latest_installable_release(p) is not None - - -def make_pyversion(version: str, index: int) -> str: - PEP440REGEX = re.compile(r"(\d+!)?\d+(\.\d+)*((?:a|b|rc)\d+)?(\.post\d+)?(\.dev\d+)?(\+[a-zA-Z0-9]+)?") - - version = version.lower().replace("-", ".") - - if version.startswith("rev"): - return make_pyversion(version[3:], index) - - elif version.startswith("release_"): - return make_pyversion(version[8:], index) - - elif version.startswith("r") or version.startswith("v"): - return make_pyversion(version[1:], index) - - elif version.startswith("test"): - return make_pyversion(version[4:], index) - - elif version.startswith("git:"): - version = version[4:] - return f"{index}+{version}" - - elif PEP440REGEX.match(version): - return version - - else: - return str(index) - - -def rmdir(path: str) -> None: - for path, _, fnames in os.walk(path, topdown=False): - for fname in fnames: - os.remove(os.path.join(path, fname)) - os.rmdir(path) - -# Annotated as Iterator due to https://docs.python.org/3/library/typing.html#typing.Generator -# See the portion about only yielding values, it's an alternative to Generator[str, None, None] -def find_dist_dirs(name: str, path: Optional[str] = site_package_dir) -> Iterator[str]: - if path is None: - return - - for targetname in os.listdir(path): - if not (targetname.startswith(f"{name}-") and targetname.endswith(".dist-info")): - continue - yield os.path.join(path, targetname) - - -def remove_package_meta(pkg: MutableMapping) -> None: - if site_package_dir is None: - return - - name = get_python_package_name(pkg) - - for dist_dir in find_dist_dirs(name): - rmdir(dist_dir) - - -def install_package_meta(files: List[Tuple[str, str, str]], pkg: MutableMapping, rel: MutableMapping, index: int) -> None: - if site_package_dir is None: - return - - name = get_python_package_name(pkg) - - version = make_pyversion(rel["version"], index) - dist_dir = os.path.join(site_package_dir, f"{name}-{version}.dist-info") - - remove_package_meta(pkg) - - os.mkdir(dist_dir) - with open(os.path.join(dist_dir, "INSTALLER"), "w") as f: - files.append((os.path.join(dist_dir, "INSTALLER"), "" ,"")) - f.write("vsrepo") - with open(os.path.join(dist_dir, "METADATA"), "w") as f: - files.append((os.path.join(dist_dir, "METADATA"), "" ,"")) - f.write(f"""Metadata-Version: 2.1 -Name: {name} -Version: {version} -Summary: {pkg.get('description', name)} -Platform: All""") - - with open(os.path.join(dist_dir, "RECORD"), "w", newline="") as f: - files.append((os.path.join(dist_dir, "RECORD"), "", "")) - w = csv.writer(f) - for filename, sha256hex, length in files: - if sha256hex: - sha256hex = "sha256=" + base64.urlsafe_b64encode(binascii.unhexlify(sha256hex.encode("ascii"))).rstrip(b"=").decode("ascii") - try: - filename = os.path.relpath(filename, site_package_dir) - except ValueError: - pass - w.writerow([filename, sha256hex, length]) - - -def install_files(p: MutableMapping) -> Tuple[int, int]: - err = (0, 1) - dest_path = get_install_path(p) - bin_name = get_bin_name(p) - idx, install_rel = get_latest_installable_release_with_index(p) - if install_rel is None: - return err - url = install_rel[bin_name]['url'] - data: Optional[bytearray] = None - try: - data = fetch_url_cached(url, p['name'] + ' ' + install_rel['version']) - except: - print('Failed to download ' + p['name'] + ' ' + install_rel['version'] + ', skipping installation and moving on') - return err - - files: List[Tuple[str, str, str]] = [] - - if bin_name == 'wheel': - try: - hash_result = check_hash(data, install_rel[bin_name]['hash']) - if not hash_result[0]: - raise ValueError('Hash mismatch for ' + url + ' got ' + hash_result[1] + ' but expected ' + hash_result[2]) - with zipfile.ZipFile(io.BytesIO(data), 'r') as zf: - basename: Optional[str] = None - for fn in zf.namelist(): - if fn.endswith('.dist-info/WHEEL'): - basename = fn[:-len('.dist-info/WHEEL')] - break - if basename is None: - raise Exception('Wheel: failed to determine package base name') - for fn in zf.namelist(): - if fn.startswith(basename + '.data'): - raise Exception('Wheel: .data dir mapping not supported') - wheelfile = zf.read(basename + '.dist-info/WHEEL').decode().splitlines() - wheeldict = {} - for line in wheelfile: - tmp = line.split(': ', 2) - if len(tmp) == 2: - wheeldict[tmp[0]] = tmp[1] - if wheeldict['Wheel-Version'] != '1.0': - raise Exception('Wheel: only version 1.0 supported') - if wheeldict['Root-Is-Purelib'] != 'true': - raise Exception('Wheel: only purelib root supported') - zf.extractall(path=dest_path) - with open(os.path.join(dest_path, basename + '.dist-info', 'INSTALLER'), mode='w') as f: - f.write("vsrepo") - with open(os.path.join(dest_path, basename + '.dist-info', 'RECORD')) as f: - contents = f.read() - with open(os.path.join(dest_path, basename + '.dist-info', 'RECORD'), mode='a') as f: - if not contents.endswith("\n"): - f.write("\n") - f.write(basename + '.dist-info/INSTALLER,,\n') - except BaseException as e: - print('Failed to decompress ' + p['name'] + ' ' + install_rel['version'] + ' with error: ' + str(e) + ', skipping installation and moving on') - return err - else: - single_file: Optional[Tuple[str, str, str]] = None - if len(install_rel[bin_name]['files']) == 1: - for key in install_rel[bin_name]['files']: - single_file = (key, install_rel[bin_name]['files'][key][0], install_rel[bin_name]['files'][key][1]) - if (single_file is not None) and (single_file[1] == url.rsplit('/', 2)[-1]): - install_fn = single_file[0] - hash_result = check_hash(data, single_file[2]) - if not hash_result[0]: - raise Exception('Hash mismatch for ' + install_fn + ' got ' + hash_result[1] + ' but expected ' + hash_result[2]) - uninstall_files(p) - os.makedirs(os.path.join(dest_path, os.path.split(install_fn)[0]), exist_ok=True) - with open(os.path.join(dest_path, install_fn), 'wb') as outfile: - files.append((os.path.join(dest_path, install_fn), single_file[2], str(len(data)))) - outfile.write(data) - else: - tffd, tfpath = tempfile.mkstemp(prefix='vsm') - with open(tffd, mode='wb') as tf: - tf.write(data) - result_cache = {} - for install_fn in install_rel[bin_name]['files']: - fn_props = install_rel[bin_name]['files'][install_fn] - result = subprocess.run([cmd7zip_path, "e", "-so", tfpath, fn_props[0]], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - result.check_returncode() - hash_result = check_hash(result.stdout, fn_props[1]) - if not hash_result[0]: - raise Exception('Hash mismatch for ' + install_fn + ' got ' + hash_result[1] + ' but expected ' + hash_result[2]) - result_cache[install_fn] = (result.stdout, fn_props[1]) - uninstall_files(p) - for install_fn in install_rel[bin_name]['files']: - os.makedirs(os.path.join(dest_path, os.path.split(install_fn)[0]), exist_ok=True) - with open(os.path.join(dest_path, install_fn), 'wb') as outfile: - files.append((os.path.join(dest_path, install_fn), str(result_cache[install_fn][1]), str(len(result_cache[install_fn][0])))) - outfile.write(result_cache[install_fn][0]) - os.remove(tfpath) - - install_package_meta(files, p, install_rel, idx) - - installed_packages[p['identifier']] = install_rel['version'] - print('Successfully installed ' + p['name'] + ' ' + install_rel['version']) - return (1, 0) - -def install_package(name: str) -> Tuple[int, int, int]: - p = get_package_from_name(name) - if get_vapoursynth_api_version() <= 3: - if p['identifier'] in bundled_api3_plugins: - print('Binaries are already bundled for ' + p['name'] + ', skipping installation') - return (0, 0, 0) - if can_install(p): - inst = (0, 0, 0) - if not args.skip_deps: - if 'dependencies' in p: - for dep in p['dependencies']: - if not isinstance(dep, str): - continue - res = install_package(dep) - inst = (inst[0], inst[1] + res[0] + res[1], inst[2] + res[2]) - if not is_package_installed(p['identifier']): - fres = install_files(p) - inst = (inst[0] + fres[0], inst[1], inst[2] + fres[1]) - return inst - else: - print('No binaries available for ' + args.target + ' in package ' + p['name'] + ', skipping installation') - return (0, 0, 1) - -def upgrade_files(p: MutableMapping) -> Tuple[int, int, int]: - if can_install(p): - inst = (0, 0, 0) - if 'dependencies' in p: - for dep in p['dependencies']: - if not is_package_installed(dep): - res = install_package(dep) - inst = (inst[0], inst[1] + res[0] + res[1], inst[2] + res[2]) - fres = install_files(p) - return (inst[0] + fres[0], inst[1], inst[2] + fres[1]) - else: - print('No binaries available for ' + args.target + ' in package ' + p['name'] + ', skipping installation') - return (0, 0, 1) - -def upgrade_package(name, force) -> Tuple[int, int, int]: - inst = (0, 0, 0) - p = get_package_from_name(name) - if not is_package_installed(p['identifier']): - print('Package ' + p['name'] + ' not installed, can\'t upgrade') - elif is_package_upgradable(p['identifier'], force): - res = upgrade_files(p) - return (res[0], 0, res[1]) - elif not is_package_upgradable(p['identifier'], True): - print('Package ' + p['name'] + ' not upgraded, latest version installed') - else: - print('Package ' + p['name'] + ' not upgraded, unknown version must use -f to force replacement') - return inst - -def upgrade_all_packages(force: bool) -> Tuple[int, int, int]: - inst = (0, 0, 0) - installed_ids: List[str] = list(installed_packages.keys()) - for id in installed_ids: - if is_package_upgradable(id, force): - pkg = get_package_from_id(id, True) - if pkg is None: - return inst - res = upgrade_files(pkg) - inst = (inst[0] + res[0], inst[1] + res[1], inst[2] + res[2]) - return inst - -def uninstall_files(p: MutableMapping) -> None: - dest_path = get_install_path(p) - bin_name = get_bin_name(p) - - if p['type'] == 'PyWheel': - files: List[str] = [] - pyname = get_python_package_name(p) - for dist_dir in find_dist_dirs(pyname, dest_path): - with open(os.path.join(dest_path, dist_dir, 'RECORD'), mode='r') as rec: - lines = rec.read().splitlines() - for line in lines: - tmp = line.split(',') - if len(tmp) > 0 and len(tmp[0]) > 0: - files.append(tmp[0]) - print(files) - for f in files: - try: - os.remove(os.path.join(dest_path, f)) - except BaseException as e: - print('File removal error: ' + str(e)) - for dist_dir in find_dist_dirs(pyname, dest_path): - rmdir(dist_dir) - else: - installed_rel: Optional[MutableMapping] = None - if p['identifier'] in installed_packages: - for rel in p['releases']: - if rel['version'] == installed_packages[p['identifier']]: - installed_rel = rel - break - if installed_rel is not None: - for f in installed_rel[bin_name]['files']: - os.remove(os.path.join(dest_path, f)) - - remove_package_meta(p) - -def uninstall_package(name: str) -> Tuple[int, int]: - p = get_package_from_name(name) - if is_package_installed(p['identifier']): - if installed_packages[p['identifier']] == 'Unknown': - print('Can\'t uninstall unknown version of package: ' + p['name']) - return (0, 0) - else: - uninstall_files(p) - print('Uninstalled package: ' + p['name'] + ' ' + installed_packages[p['identifier']]) - return (1, 0) - else: - print('No files installed for ' + p['name'] + ', skipping uninstall') - return (0, 0) - -def update_package_definition(url: str) -> None: - localmtimeval = 0.0 - try: - localmtimeval = os.path.getmtime(package_json_path) - except: - pass - localmtime = email.utils.formatdate(localmtimeval + 10, usegmt=True) - req_obj = urllib.request.Request(url, headers={ 'If-Modified-Since': localmtime, 'User-Agent': 'VSRepo' }) - try: - with urllib.request.urlopen(req_obj) as urlreq: - remote_modtime = email.utils.mktime_tz(email.utils.parsedate_tz(urlreq.info()['Last-Modified'])) - data = urlreq.read() - with zipfile.ZipFile(io.BytesIO(data), 'r') as zf: - with zf.open('vspackages3.json') as pkgfile: - with open(package_json_path, 'wb') as dstfile: - dstfile.write(pkgfile.read()) - os.utime(package_json_path, times=(remote_modtime, remote_modtime)) - except urllib.error.HTTPError as httperr: - if httperr.code == 304: - print('Local definitions already up to date: ' + email.utils.formatdate(localmtimeval, usegmt=True)) - else: - raise - else: - print('Local definitions updated to: ' + email.utils.formatdate(remote_modtime, usegmt=True)) - - -def get_vapoursynth_version() -> int: - try: - import vapoursynth - except ImportError: - return 1 - - if hasattr(vapoursynth, "__version__"): - return vapoursynth.__version__[0] - return vapoursynth.core.version_number() - -def get_vapoursynth_api_version() -> int: - try: - import vapoursynth - except ImportError: - return 1 - - if hasattr(vapoursynth, "__api_version__"): - return vapoursynth.__api_version__[0] - # assume lowest widespread api version, will probably error out somewhere else - return 3 - - -def update_genstubs() -> None: - print("Updating VapourSynth stubs") - - if get_vapoursynth_api_version() > 3: - genstubs = os.path.join(os.path.dirname(__file__), "vsgenstubs4/__init__.py") - else: - genstubs = os.path.join(os.path.dirname(__file__), "vsgenstubs/__init__.py") - contents = subprocess.getoutput([sys.executable, genstubs, '-o', '-']) # type: ignore - - site_package = False - stubpath: Optional[str] = args.stub_output_file - if stubpath == "-": - stubpath = None - fp = sys.stdout - elif stubpath == "--": - stubpath = None - fp = sys.stderr - else: - if not stubpath: - if site_package_dir: - stubpath = site_package_dir - site_package = True - else: - stubpath = "." - - if os.path.isdir(stubpath): - stubpath = os.path.join(stubpath, "vapoursynth.pyi") - - fp = open(stubpath, "w") - - with fp: - fp.write(contents) - - if site_package: - if site_package_dir is None: - return - vs_stub_pkg = os.path.join(site_package_dir, "vapoursynth-stubs") - if os.path.exists(vs_stub_pkg): - rmdir(vs_stub_pkg) - - os.makedirs(vs_stub_pkg) - - if stubpath is None: - return - with open(stubpath, "rb") as src: - with open(os.path.join(vs_stub_pkg, "__init__.pyi"), "wb") as dst: - dst.write(src.read()) - os.remove(stubpath) - stubpath = os.path.join(vs_stub_pkg, "__init__.pyi") - - for dist_dir in find_dist_dirs("VapourSynth"): - with open(os.path.join(dist_dir, "RECORD")) as f: - contents = f.read() - - try: - filename = os.path.relpath(stubpath, site_package_dir) - except ValueError: - filename = stubpath - - if "__init__.pyi" not in contents or "vapoursynth.pyi" not in contents: - with open(os.path.join(dist_dir, "RECORD"), "a") as f: - if not contents.endswith("\n"): - f.write("\n") - f.write(f"{filename},,\n") - break - -def rebuild_distinfo() -> None: - print("Rebuilding dist-info dirs for other python package installers") - for pkg_id, pkg_ver in installed_packages.items(): - pkg = get_package_from_id(pkg_id) - if pkg is None: - continue - if pkg['type'] == 'PyWheel': - continue - - for idx, rel in enumerate(pkg["releases"]): - if rel["version"] == pkg_ver: - break - else: - remove_package_meta(pkg) - continue - - dest_path = get_install_path(pkg) - bin_name = get_bin_name(pkg) - files = [ - (os.path.join(dest_path, fn), fd[1], str(os.stat(os.path.join(dest_path, fn)).st_size)) - for fn, fd in rel[bin_name]["files"].items() - ] - - install_package_meta(files, pkg, rel, idx) - - -def print_paths(): - print('Paths:') - print('Definitions: ' + package_json_path) - print('Binaries: ' + plugin_path) - print('Scripts: ' + py_script_path) - - if site_package_dir is not None: - print("Dist-Infos: " + site_package_dir) - else: - print("Dist-Infos: ") - -if args.operation != 'update' and package_list is None: - print('Failed to open vspackages3.json. Run update command.') - sys.exit(1) - -for name in args.package: - try: - assert isinstance(name, str) - get_package_from_name(name) - except Exception as e: - print(e) - sys.exit(1) - -if args.operation == 'install': - detect_installed_packages() - rebuild_distinfo() - - inst = (0, 0, 0) - for name in args.package: - res = install_package(name) - inst = (inst[0] + res[0], inst[1] + res[1], inst[2] + res[2]) - - update_genstubs() - - if (inst[0] == 0) and (inst[1] == 0): - print('Nothing done') - elif (inst[0] > 0) and (inst[1] == 0): - print('{} {} installed'.format(inst[0], 'package' if inst[0] == 1 else 'packages')) - elif (inst[0] == 0) and (inst[1] > 0): - print('{} missing {} installed'.format(inst[1], 'dependency' if inst[1] == 1 else 'dependencies')) - else: - print('{} {} and {} additional {} installed'.format(inst[0], 'package' if inst[0] == 1 else 'packages', inst[1], 'dependency' if inst[1] == 1 else 'dependencies')) - if (inst[2] > 0): - print('{} {} failed'.format(inst[2], 'package' if inst[0] == 1 else 'packages')) -elif args.operation in ('upgrade', 'upgrade-all'): - detect_installed_packages() - rebuild_distinfo() - - inst = (0, 0, 0) - if args.operation == 'upgrade-all': - inst = upgrade_all_packages(args.force) - else: - for name in args.package: - res = upgrade_package(name, args.force) - inst = (inst[0] + res[0], inst[1] + res[1], inst[2] + res[2]) - - update_genstubs() - - if (inst[0] == 0) and (inst[1] == 0): - print('Nothing done') - elif (inst[0] > 0) and (inst[1] == 0): - print('{} {} upgraded'.format(inst[0], 'package' if inst[0] == 1 else 'packages')) - elif (inst[0] == 0) and (inst[1] > 0): - print('{} missing {} installed'.format(inst[1], 'dependency' if inst[1] == 1 else 'dependencies')) - else: - print('{} {} upgraded and {} additional {} installed'.format(inst[0], 'package' if inst[0] == 1 else 'packages', inst[1], 'dependency' if inst[1] == 1 else 'dependencies')) - if (inst[2] > 0): - print('{} {} failed'.format(inst[2], 'package' if inst[0] == 1 else 'packages')) -elif args.operation == 'uninstall': - detect_installed_packages() - uninst = (0, 0) - for name in args.package: - uninst_res = uninstall_package(name) - uninst = (uninst[0] + uninst_res[0], uninst[1] + uninst_res[1]) - if uninst[0] == 0: - print('No packages uninstalled') - else: - print('{} {} uninstalled'.format(uninst[0], 'package' if uninst[0] == 1 else 'packages')) - update_genstubs() -elif args.operation == 'installed': - detect_installed_packages() - list_installed_packages() -elif args.operation == 'available': - detect_installed_packages() - list_available_packages() -elif args.operation == 'update': - update_package_definition('http://www.vapoursynth.com/vsrepo/vspackages3.zip') -elif args.operation == 'paths': - print_paths() -elif args.operation == "genstubs": - update_genstubs() -elif args.operation == "gendistinfo": - detect_installed_packages() - rebuild_distinfo() - - -def noop(): - pass diff --git a/vsrepo/__init__.py b/vsrepo/__init__.py new file mode 100644 index 00000000..79ee058d --- /dev/null +++ b/vsrepo/__init__.py @@ -0,0 +1,171 @@ +# MIT License +# +# Copyright (c) 2018-2020 Fredrik Mellbin +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import sys +from argparse import ArgumentParser +from pathlib import Path +from typing import NoReturn, Sequence, Union + +from vsrepo.utils.types import VSRepoNamespace + +from .actions import ( + detect_installed_packages, install_package, list_available_packages, list_installed_packages, rebuild_distinfo, + set_variables, uninstall_package, update_genstubs, update_package_definition, upgrade_files, upgrade_package +) +from .utils import InstalledPackages, InstallPackageResult, VSPackages, get_installation_info + +__all__ = [ + 'main' +] + + +class NoExitArgumentParser(ArgumentParser): + exit_on_error: bool + + def error(self, message: str) -> NoReturn: + if self.exit_on_error: + super().error(message) + raise ValueError(message) + + +def main(argv: Union[str, Sequence[str], None] = None, exit_on_error: bool = False) -> None: + parser = NoExitArgumentParser(description='A simple VapourSynth package manager') + parser.exit_on_error = exit_on_error + + parser.add_argument( + 'operation', choices=[ + 'install', 'update', 'upgrade', 'upgrade-all', 'uninstall', + 'installed', 'available', 'paths', 'genstubs', 'gendistinfo' + ] + ) + parser.add_argument('package', nargs='*', help='identifier, namespace or module to install, upgrade or uninstall') + parser.add_argument( + '-f', action='store_true', dest='force', help='force upgrade for packages where the current version is unknown' + ) + parser.add_argument('-p', action='store_true', dest='portable', help='use paths suitable for portable installs') + parser.add_argument('-d', action='store_true', dest='skip_deps', help='skip installing dependencies') + parser.add_argument( + '-t', choices=['win32', 'win64'], + default='win64' if sys.maxsize > 2**32 else 'win32', dest='target', + help='binaries to install, defaults to python\'s architecture' + ) + parser.add_argument('-b', dest='binary_path', type=Path, help='custom binary install path') + parser.add_argument('-s', dest='script_path', type=Path, help='custom script install path') + parser.add_argument( + '--stub-output-file', default='', type=Path, help='Don\'t update the typestubs generated by vsrepo.' + ) + parser.add_argument('--force-dist-info', action='store_true', default=False, help='') + + args = parser.parse_args(argv.split(' ') if isinstance(argv, str) else argv, VSRepoNamespace()) + + if (args.operation in ['install', 'upgrade', 'uninstall']) and not args.package: + e = 'Package argument required for install, upgrade and uninstall operations' + if exit: + print(e) + sys.exit(1) + raise ValueError(e) + + info = get_installation_info(args) + + package_list = VSPackages.from_file(Path(info.package_json_path)) + + installed_packages = InstalledPackages(info, package_list) + + for name in args.package: + try: + assert isinstance(name, str) + installed_packages.get_package_from_name(name) + except Exception as e: + if exit: + print(e) + sys.exit(1) + raise e + + set_variables(args, info, package_list, installed_packages) + + if args.operation == 'update': + return update_package_definition('http://www.vapoursynth.com/vsrepo/vspackages3.zip') + elif args.operation == 'paths': + return info.print_info() + elif args.operation == 'genstubs': + return update_genstubs() + + detect_installed_packages() + + if args.operation == 'installed': + return list_installed_packages() + elif args.operation == 'available': + return list_available_packages() + elif args.operation == 'gendistinfo': + return rebuild_distinfo() + + if args.operation == 'uninstall': + uninst = InstallPackageResult() + for name in args.package: + uninst += uninstall_package(name) + + if not uninst.success: + print('No packages uninstalled') + else: + print(f"{uninst.success} package{uninst.success > 1 and 's' or 's'} uninstalled") + + return update_genstubs() + + rebuild_distinfo() + + inst = InstallPackageResult() + if args.operation == 'install': + action = 'installed' + + for name in args.package: + inst += install_package(name) + elif args.operation in {'upgrade', 'upgrade-all'}: + action = 'upgraded' + if args.operation == 'upgrade-all': + for id in installed_packages: + if installed_packages.is_package_upgradable(id, args.force): + pkg = installed_packages.get_package_from_id(id, True) + inst += upgrade_files(pkg) + else: + for name in args.package: + inst += upgrade_package(name, args.force) + + update_genstubs() + + pack_str = f"package{inst.success > 1 and 's' or 's'}" + deps_str = f"dependenc{inst.success > 1 and 'ies' or 'y'}" + + if not inst.success and not inst.success_dependecies: + print('Nothing done') + elif inst.success and not inst.success_dependecies: + print(f'{inst.success} {pack_str} {action}') + elif not inst.success and inst.success_dependecies: + print(f'{inst.success_dependecies} missing {deps_str} {action}') + else: + print(f'{inst.success} {pack_str} {action} and {inst.success_dependecies} additional {deps_str} {action}') + + if inst.error: + print(f'{inst.error} {pack_str} failed') + + +if __name__ == '__main__': + main(exit_on_error=True) diff --git a/vsrepo/actions.py b/vsrepo/actions.py new file mode 100644 index 00000000..16a44e6c --- /dev/null +++ b/vsrepo/actions.py @@ -0,0 +1,632 @@ +import base64 +import binascii +import csv +import hashlib +import io +import os +import re +import subprocess +import sys +import tempfile +import zipfile +from email.utils import formatdate, mktime_tz, parsedate_tz +from pathlib import Path +from typing import Iterator, List, Optional, Tuple +from urllib.error import HTTPError +from urllib.request import Request, urlopen + +from .utils import ( + InstallationInfo, InstalledPackages, InstallFileResult, InstallPackageResult, VSPackage, + VSPackagePlatformReleaseWheel, VSPackageRel, VSPackages, VSPackageType, VSRepoNamespace, fetch_url_cached, + get_vapoursynth_api_version +) + +args: VSRepoNamespace = None # type: ignore +info: InstallationInfo = None # type: ignore +package_list: VSPackages = None # type: ignore +installed_packages: InstalledPackages = None # type: ignore + + +def set_variables( + _args: VSRepoNamespace, _info: InstallationInfo, _package_list: VSPackages, _installed_packages: InstalledPackages +) -> None: + global args, info, package_list, installed_packages + args = _args + info = _info + package_list = _package_list + installed_packages = _installed_packages + + +package_print_string = '{:30s} {:15s} {:11s} {:11s} {:s}' + +bundled_api3_plugins = { + 'com.vapoursynth.avisource', 'com.vapoursynth.eedi3', 'com.vapoursynth.imwri', + 'com.vapoursynth.misc', 'com.vapoursynth.morpho', 'com.vapoursynth.removegrainvs', + 'com.vapoursynth.subtext', 'com.vapoursynth.vinverse', 'org.ivtc.v', 'com.nodame.histogram' +} + +MISSING = object() + + +def check_hash(data: bytes, ref_hash: str) -> Tuple[bool, str, str]: + data_hash = hashlib.sha256(data).hexdigest() + return (data_hash == ref_hash, data_hash, ref_hash) + + +def find_dist_version(pkg: VSPackage, path: Optional[Path]) -> Optional[str]: + if path is None: + return None + + name = pkg.get_python_name() + versions = [ + targetname.name[len(name) + 1:-10] + for targetname in path.iterdir() if ( + # only bother with dist-info dirs that actually have a usable + # record in case a package uninstall failed to delete the dir + targetname.name.startswith(f'{name}-') and targetname.name.endswith('.dist-info') + and (targetname / 'RECORD').is_file() + ) + ] + + return max(versions, default=None) + + +def detect_installed_packages() -> None: + for p in package_list: + dest_path = p.get_install_path(info) + + if p.pkg_type is VSPackageType.WHEEL: + if (version := find_dist_version(p, dest_path)) is not None: + installed_packages[p.identifier] = version + + continue + + for v in p.releases: + matched = True + exists = True + + if (release := v.get_release(p.pkg_type)) and release.files: + for f, rel_file in release.files.items(): + try: + if not check_hash((dest_path / f).read_bytes(), rel_file.filename)[0]: + matched = False + except FileNotFoundError: + exists = False + matched = False + + if matched: + installed_packages[p.identifier] = v.version + break + + if exists: + installed_packages[p.identifier] = 'Unknown' + + +def print_package_status(p: VSPackage) -> None: + lastest_installable = p.get_latest_installable_release() + name = p.name + if installed_packages.is_package_upgradable(p.identifier, False): + name = f'*{name}' + elif installed_packages.is_package_upgradable(p.identifier, True): + name = f'+{name}' + print( + package_print_string.format( + name, p.namespace if p.pkg_type is VSPackageType.PLUGIN else p.modulename, + installed_packages.get(p.identifier, ''), + lastest_installable.version if lastest_installable is not None else '', + p.identifier + ) + ) + + +def list_installed_packages() -> None: + print(package_print_string.format('Name', 'Namespace', 'Installed', 'Latest', 'Identifier')) + for id in installed_packages: + pkg = installed_packages.get_package_from_id(id, True) + if pkg is not None: + print_package_status(pkg) + + +def list_available_packages() -> None: + print(package_print_string.format('Name', 'Namespace', 'Installed', 'Latest', 'Identifier')) + + for p in package_list: + print_package_status(p) + + +def can_install(p: VSPackage) -> bool: + return p.get_latest_installable_release() is not None + + +def make_pyversion(version: str, index: int) -> str: + PEP440REGEX = re.compile(r'(\d+!)?\d+(\.\d+)*((?:a|b|rc)\d+)?(\.post\d+)?(\.dev\d+)?(\+[a-zA-Z0-9]+)?') + + version = version.lower().replace('-', '.') + + if version.startswith('rev'): + return make_pyversion(version[3:], index) + + elif version.startswith('release_'): + return make_pyversion(version[8:], index) + + elif version.startswith('r') or version.startswith('v'): + return make_pyversion(version[1:], index) + + elif version.startswith('test'): + return make_pyversion(version[4:], index) + + elif version.startswith('git:'): + version = version[4:] + return f'{index}+{version}' + + elif PEP440REGEX.match(version): + return version + + else: + return str(index) + + +def rmdir(path: Path) -> None: + for file in path.iterdir(): + if file.is_dir(): + return rmdir(file) + + file.unlink(True) + + path.rmdir() + + +def find_dist_dirs(name: str, path: Optional[Path] = MISSING) -> Iterator[Path]: # type: ignore[assignment] + if path is MISSING: + path = info.site_package_dir + + if path is None: + return + + for targetname in path.iterdir(): + if not (targetname.name.startswith(f'{name}-') and targetname.name.endswith('.dist-info')): + continue + yield targetname + + +def remove_package_meta(pkg: VSPackage) -> None: + if info.site_package_dir is None: + return + + name = pkg.get_python_name() + + for dist_dir in find_dist_dirs(name): + rmdir(dist_dir) + + +def install_package_meta(files: List[Tuple[Path, str, str]], pkg: VSPackage, rel: VSPackageRel, index: int) -> None: + if info.site_package_dir is None: + return + + name = pkg.get_python_name() + + version = make_pyversion(rel.version, index) + dist_dir = info.site_package_dir / f'{name}-{version}.dist-info' + + remove_package_meta(pkg) + + dist_dir.mkdir(parents=True, exist_ok=True) + + inst_path, meta_path, recd_path = [dist_dir / name for name in ('INSTALLER', 'METADATA', 'RECORD')] + + files.extend([(path, '', '') for path in (inst_path, meta_path, recd_path)]) + + inst_path.write_text('vsrepo') + meta_path.write_text( + f'Metadata-Version: 2.1\nName: {name}\nVersion: {version}\nSummary: {pkg.description or name}\nPlatform: All' + ) + + with open(recd_path, 'w', newline='') as f: + w = csv.writer(f) + for filename, sha256hex, length in files: + if sha256hex: + sha256hex = 'sha256=' + base64.urlsafe_b64encode(binascii.unhexlify( + sha256hex.encode('ascii'))).rstrip(b'=').decode('ascii') + try: + filename = filename.relative_to(info.site_package_dir) + except ValueError: + ... + + w.writerow([filename, sha256hex, length]) + + +def install_files(p: VSPackage) -> InstallFileResult: + err = InstallFileResult(0, 1) + dest_path = p.get_install_path(info) + + idx, install_rel = p.get_latest_installable_release_with_index() + + if install_rel is None: + return err + + release = install_rel.get_release(p.pkg_type) + assert release + + data: Optional[bytearray] = None + + try: + data = fetch_url_cached(release.url, f'{p.name} {install_rel.version}') + except BaseException: + print( + f'Failed to download {p.name} {install_rel.version}, skipping installation and moving on' + ) + + return err + + files: List[Tuple[Path, str, str]] = [] + + if p.pkg_type is VSPackageType.WHEEL: + assert isinstance(release, VSPackagePlatformReleaseWheel) + + try: + hash_result = check_hash(data, release.hash) + + if not hash_result[0]: + raise ValueError(f'Hash mismatch for {release.url} got {hash_result[1]} but expected {hash_result[2]}') + + with zipfile.ZipFile(io.BytesIO(data), 'r') as zf: + basename: Optional[str] = None + + for fn in zf.namelist(): + if fn.endswith('.dist-info/WHEEL'): + basename = fn[:-len('.dist-info/WHEEL')] + break + + if basename is None: + raise Exception('Wheel: failed to determine package base name') + + for fn in zf.namelist(): + if fn.startswith(f'{basename}.data'): + raise Exception('Wheel: .data dir mapping not supported') + + wheelfile = zf.read(f'{basename}.dist-info/WHEEL').decode().splitlines() + wheeldict = {} + + for line in wheelfile: + tmp = line.split(': ', 2) + if len(tmp) == 2: + wheeldict[tmp[0]] = tmp[1] + + if wheeldict['Wheel-Version'] != '1.0': + raise Exception('Wheel: only version 1.0 supported') + + if wheeldict['Root-Is-Purelib'] != 'true': + raise Exception('Wheel: only purelib root supported') + + zf.extractall(path=dest_path) + + recd_path = dest_path / f'{basename}.dist-info' / 'RECORD' + + recd_path.write_text('vsrepo') + + newline_end = recd_path.read_text().endswith('\n') + + with open(recd_path, mode='a') as f: + if not newline_end: + f.write('\n') + f.write(f'{basename}.dist-info/INSTALLER,,\n') + except BaseException as e: + print( + f'Failed to decompress {p.name} {install_rel.version} with error: {e},' + ' skipping installation and moving on' + ) + + return err + else: + single_file: Optional[Tuple[Path, str, str]] = None + + if len(release.files) == 1: + key, file = next(iter(release.files.items())) + single_file = (key, file[0], file[1]) + + if (single_file is not None) and (single_file[1] == release.url.rsplit('/', 2)[-1]): + install_fn = single_file[0] + hash_result = check_hash(data, single_file[2]) + + if not hash_result[0]: + raise Exception( + f'Hash mismatch for {install_fn} got {hash_result[1]} but expected {hash_result[2]}' + ) + + uninstall_files(p) + + (dest_path / install_fn.parent).mkdir(parents=True, exist_ok=True) + + out_file = dest_path / install_fn + + files.append((out_file, single_file[2], str(len(data)))) + out_file.write_bytes(data) + else: + tffd, _tfpath = tempfile.mkstemp(prefix='vsm') + tfpath = Path(_tfpath) + + with open(tffd, mode='wb') as tf: + tf.write(data) + + result_cache = {} + + for install_fn in release.files: + fn_props = release.files[install_fn] + result = subprocess.run( + [info.cmd7zip_path, 'e', '-so', tfpath, fn_props[0]], + stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + result.check_returncode() + hash_result = check_hash(result.stdout, fn_props[1]) + + if not hash_result[0]: + raise Exception( + f'Hash mismatch for {install_fn} got {hash_result[1]} but expected {hash_result[2]}' + ) + + result_cache[install_fn] = (result.stdout, fn_props[1]) + + uninstall_files(p) + + for install_fn in release.files: + (dest_path / install_fn.parent).mkdir(parents=True, exist_ok=True) + files.append(( + dest_path / install_fn, + str(result_cache[install_fn][1]), + str(len(result_cache[install_fn][0])) + )) + (dest_path / install_fn).write_bytes(result_cache[install_fn][0]) + tfpath.unlink() + + install_package_meta(files, p, install_rel, idx) + + installed_packages[p.identifier] = install_rel.version + + print(f'Successfully installed {p.name} {install_rel.version}') + + return InstallFileResult(1, 0) + + +def install_package(name: str) -> InstallPackageResult: + p = installed_packages.get_package_from_name(name) + + if get_vapoursynth_api_version() <= 3: + if p.identifier in bundled_api3_plugins: + print(f'Binaries are already bundled for {p.name}, skipping installation') + return InstallPackageResult() + + if can_install(p): + inst = InstallPackageResult() + + if not args.skip_deps: + for dep in p.dependencies: + inst += install_package(dep) + + if not installed_packages.is_package_installed(p.identifier): + inst += install_files(p) + + return inst + + print(f'No binaries available for {args.target} in package {p.name}, skipping installation') + + return InstallPackageResult(error=1) + + +def upgrade_files(p: VSPackage) -> InstallPackageResult: + if can_install(p): + inst = InstallPackageResult() + + for dep in p.dependencies: + if not installed_packages.is_package_installed(dep): + inst += install_package(dep) + + return inst + install_files(p) + + print(f'No binaries available for {args.target} in package {p.name}, skipping installation') + + return InstallPackageResult(error=1) + + +def upgrade_package(name: str, force: bool) -> InstallPackageResult: + p = installed_packages.get_package_from_name(name) + + if not installed_packages.is_package_installed(p.identifier): + print(f'Package {p.name} not installed, can\'t upgrade') + elif installed_packages.is_package_upgradable(p.identifier, force): + return upgrade_files(p) + elif not installed_packages.is_package_upgradable(p.identifier, True): + print(f'Package {p.name} not upgraded, latest version installed') + else: + print(f'Package {p.name} not upgraded, unknown version must use -f to force replacement') + + return InstallPackageResult() + + +def uninstall_files(p: VSPackage) -> None: + dest_path = p.get_install_path(info) + + if p.pkg_type is VSPackageType.WHEEL: + files: List[Path] = [] + pyname = p.get_python_name() + + for dist_dir in find_dist_dirs(pyname, dest_path): + lines = (dest_path / dist_dir / 'RECORD').read_text().splitlines() + for line in lines: + tmp = line.split(',') + if tmp and len(tmp[0]) > 0: + files.append(Path(tmp[0])) + + print(files) + + for f in files: + try: + (dest_path / f).unlink(True) + except BaseException as e: + print(f'File removal error: {e}') + + for dist_dir in find_dist_dirs(pyname, dest_path): + rmdir(dist_dir) + else: + installed_rel: Optional[VSPackageRel] = None + if p.identifier in installed_packages: + for rel in p.releases: + if rel.version == installed_packages[p.identifier]: + installed_rel = rel + break + if installed_rel is not None: + prel = installed_rel.get_release(p.pkg_type) + assert prel + for f in prel.files: + (dest_path / f).unlink(True) + + remove_package_meta(p) + + +def uninstall_package(name: str) -> InstallPackageResult: + p = installed_packages.get_package_from_name(name) + if installed_packages.is_package_installed(p.identifier): + if installed_packages[p.identifier] == 'Unknown': + print(f'Can\'t uninstall unknown version of package: {p.name}') + return InstallPackageResult() + + uninstall_files(p) + + print(f'Uninstalled package: {p.name} {installed_packages[p.identifier]}') + return InstallPackageResult(1) + + print(f'No files installed for {p.name}, skipping uninstall') + + return InstallPackageResult() + + +def update_package_definition(url: str) -> None: + localmtimeval = 0.0 + + try: + localmtimeval = os.path.getmtime(info.package_json_path) + except OSError: + pass + + localmtime = formatdate(localmtimeval + 10, usegmt=True) + req_obj = Request(url, headers={'If-Modified-Since': localmtime, 'User-Agent': 'VSRepo'}) + + try: + with urlopen(req_obj) as urlreq: + remote_modtime = mktime_tz(parsedate_tz(urlreq.info()['Last-Modified'])) + + data = urlreq.read() + + with zipfile.ZipFile(io.BytesIO(data), 'r') as zf: + with zf.open('vspackages3.json') as pkgfile: + with open(info.package_json_path, 'wb') as dstfile: + dstfile.write(pkgfile.read()) + + os.utime(info.package_json_path, times=(remote_modtime, remote_modtime)) + except HTTPError as httperr: + if httperr.code == 304: + print(f'Local definitions already up to date: {formatdate(localmtimeval, usegmt=True)}') + else: + raise + else: + print(f'Local definitions updated to: {formatdate(remote_modtime, usegmt=True)}') + + +def update_genstubs() -> None: + print('Updating VapourSynth stubs') + + version = str(api_ver) if (api_ver := get_vapoursynth_api_version()) > 3 else '' + + genstubs = info.file_dirname / f'vsgenstubs{version}' / '__init__.py' + + contents = subprocess.getoutput([sys.executable, genstubs, '-o', '-']) # type: ignore + + site_package = False + stubpath = Path(args.stub_output_file) if args.stub_output_file else None + if stubpath == '-': + stubpath = None + fp = sys.stdout + elif stubpath == '--': + stubpath = None + fp = sys.stderr + else: + if not stubpath: + if info.site_package_dir: + stubpath = info.site_package_dir + site_package = True + else: + stubpath = Path('.') + + if stubpath.is_dir(): + stubpath = stubpath / 'vapoursynth.pyi' + + fp = open(stubpath, 'w') + + with fp: + fp.write(contents) + + if site_package: + if info.site_package_dir is None: + return + + vs_stub_pkg = info.site_package_dir / 'vapoursynth-stubs' + + if vs_stub_pkg.exists(): + rmdir(vs_stub_pkg) + + vs_stub_pkg.mkdir(parents=True, exist_ok=True) + + if stubpath is None: + return + + (vs_stub_pkg / '__init__.pyi').write_bytes(stubpath.read_bytes()) + + stubpath.unlink() + stubpath = vs_stub_pkg / '__init__.pyi' + + for dist_dir in find_dist_dirs('VapourSynth'): + with open(dist_dir / 'RECORD') as f: + contents = f.read() + + try: + filename = stubpath.relative_to(info.site_package_dir) + except ValueError: + filename = stubpath + + if '__init__.pyi' not in contents or 'vapoursynth.pyi' not in contents: + with open(dist_dir / 'RECORD', 'a') as f: + if not contents.endswith('\n'): + f.write('\n') + f.write(f'{filename},,\n') + break + + +def rebuild_distinfo() -> None: + print('Rebuilding dist-info dirs for other python package installers') + + for pkg_id, pkg_ver in installed_packages.items(): + pkg = installed_packages.get_package_from_id(pkg_id) + + if pkg is None: + continue + + if pkg.pkg_type is VSPackageType.WHEEL: + continue + + for idx, rel in enumerate(pkg.releases): + if rel.version == pkg_ver: + break + else: + remove_package_meta(pkg) + continue + + dest_path = pkg.get_install_path(info) + + prel = rel.get_release(pkg.pkg_type) + assert prel + + files = [ + ((path := (dest_path / fn)), fd[1], str(path.stat().st_size)) + for fn, fd in prel.files.items() + ] + + install_package_meta(files, pkg, rel, idx) diff --git a/vsrepo/py.typed b/vsrepo/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/vsrepo/utils/__init__.py b/vsrepo/utils/__init__.py new file mode 100644 index 00000000..a212e20b --- /dev/null +++ b/vsrepo/utils/__init__.py @@ -0,0 +1,7 @@ +from .install import * # noqa: F401, F403 +from .installations import * # noqa: F401, F403 +from .net import * # noqa: F401, F403 +from .packages import * # noqa: F401, F403 +from .site import * # noqa: F401, F403 +from .types import * # noqa: F401, F403 +from .utils import * # noqa: F401, F403 diff --git a/vsrepo/utils/install.py b/vsrepo/utils/install.py new file mode 100644 index 00000000..3db10003 --- /dev/null +++ b/vsrepo/utils/install.py @@ -0,0 +1,88 @@ +from typing import NamedTuple, Union, overload + + +class InstallFileResult(NamedTuple): + success: int = 0 + error: int = 0 + + @overload # type: ignore + def __add__(self, other: 'InstallFileResult') -> 'InstallFileResult': + ... + + @overload + def __add__(self, other: 'InstallPackageResult') -> 'InstallPackageResult': + ... + + def __add__( + self, other: 'Union[InstallFileResult, InstallPackageResult]' + ) -> 'Union[InstallFileResult, InstallPackageResult]': + return add_result(self, other) + + @overload # type: ignore + def __iadd__(self, other: 'InstallFileResult') -> 'InstallFileResult': + ... + + @overload + def __iadd__(self, other: 'InstallPackageResult') -> 'InstallPackageResult': + ... + + def __iadd__( # type: ignore + self, other: 'Union[InstallFileResult, InstallPackageResult]' + ) -> 'Union[InstallFileResult, InstallPackageResult]': + return add_result(self, other) + + +class InstallPackageResult(NamedTuple): + success: int = 0 + success_dependecies: int = 0 + error: int = 0 + + def __add__( # type: ignore[override] + self, other: 'Union[InstallFileResult, InstallPackageResult]' + ) -> 'InstallPackageResult': + return add_result(self, other) + + def __iadd__( # type: ignore[override] + self, other: 'Union[InstallFileResult, InstallPackageResult]' + ) -> 'InstallPackageResult': + return add_result(self, other) + + +@overload +def add_result( + self: InstallPackageResult, other: Union[InstallFileResult, InstallPackageResult] +) -> InstallPackageResult: + ... + + +@overload +def add_result( + self: Union[InstallFileResult, InstallPackageResult], other: InstallPackageResult +) -> InstallPackageResult: + ... + + +@overload +def add_result( + self: Union[InstallFileResult, InstallPackageResult], other: Union[InstallFileResult, InstallPackageResult] +) -> Union[InstallFileResult, InstallPackageResult]: + ... + + +def add_result( + self: Union[InstallFileResult, InstallPackageResult], other: Union[InstallFileResult, InstallPackageResult] +) -> Union[InstallFileResult, InstallPackageResult]: + if isinstance(other, InstallPackageResult): + deps = other.success_dependecies + + if isinstance(self, InstallPackageResult): + deps += self.success_dependecies + + return InstallPackageResult(self.error + other.success, deps, self.error + other.error) + + if isinstance(self, InstallFileResult): + return InstallFileResult( + self.success + other.success, self.error + other.error + ) + + return add_result(other, self) diff --git a/vsrepo/utils/installations.py b/vsrepo/utils/installations.py new file mode 100644 index 00000000..54f5088b --- /dev/null +++ b/vsrepo/utils/installations.py @@ -0,0 +1,76 @@ +import sys +from glob import glob +from importlib.util import find_spec +from pathlib import Path + + +def get_vapoursynth_version() -> int: + try: + import vapoursynth + except ImportError: + return 1 + + if hasattr(vapoursynth, '__version__'): + return vapoursynth.__version__[0] + return vapoursynth.core.version_number() + + +def get_vapoursynth_api_version() -> int: + try: + import vapoursynth + except ImportError: + return 1 + + if hasattr(vapoursynth, '__api_version__'): + return vapoursynth.__api_version__[0] + # assume lowest widespread api version, will probably error out somewhere else + return 3 + + +def detect_vapoursynth_installation() -> Path: + try: + spec = find_spec('vapoursynth') + + if spec is None: + raise ModuleNotFoundError + except (ValueError, ModuleNotFoundError): + print('Could not detect vapoursynth.') + sys.exit(1) + + if not spec.has_location: + try: + import vapoursynth + except ImportError: + print('The vapoursynth-module could not be found or imported.') + else: + return Path(vapoursynth.__file__) + + if spec.origin is None: + print('VapourSynth\'s origin could not be determined.') + sys.exit(1) + + return Path(spec.origin) + + +def is_sitepackage_install_portable(portable: bool) -> bool: + if portable: + return False + + vapoursynth_path = detect_vapoursynth_installation() + return vapoursynth_path.with_name('portable.vs').exists() + + +def is_sitepackage_install(portable: bool) -> bool: + from .site import is_venv + + if portable: + return False + + base_path = detect_vapoursynth_installation().parent + + vs_path = 'VapourSynth' if is_venv() else 'VapourSynth_portable' + + if not glob(str(base_path / f'{vs_path}-*.dist-info')): + return False + + return (base_path / 'portable.vs').exists() diff --git a/vsrepo/utils/net.py b/vsrepo/utils/net.py new file mode 100644 index 00000000..32ac37c2 --- /dev/null +++ b/vsrepo/utils/net.py @@ -0,0 +1,38 @@ + +from typing import Dict, Union, cast +from urllib.request import urlopen + +try: + import tqdm # type: ignore + tqdm_available = True +except ImportError: + tqdm_available = False + + +def fetch_url(url: str, desc: Union[str, None] = None) -> bytearray: + with urlopen(url) as urlreq: + if tqdm_available and (urlreq.headers['content-length'] is not None): + size = int(urlreq.headers['content-length']) + remaining = size + data = bytearray() + with tqdm.tqdm(total=size, unit='B', unit_scale=True, unit_divisor=1024, desc=desc) as t: + while remaining > 0: + blocksize = min(remaining, 1024 * 128) + data.extend(urlreq.read(blocksize)) + remaining = remaining - blocksize + t.update(blocksize) + return data + else: + print('Fetching: ' + url) + return cast(bytearray, urlreq.read()) + + +download_cache: Dict[str, bytearray] = {} + + +def fetch_url_cached(url: str, desc: str = '') -> bytearray: + data = download_cache.get(url, None) + if data is None: + data = fetch_url(url, desc) + download_cache[url] = data + return data diff --git a/vsrepo/utils/packages.py b/vsrepo/utils/packages.py new file mode 100644 index 00000000..29a0cf99 --- /dev/null +++ b/vsrepo/utils/packages.py @@ -0,0 +1,121 @@ +from typing import Dict, Literal, Optional, overload + +from .site import InstallationInfo +from .types import VSPackage, VSPackages + + +class InstalledPackages(Dict[str, str]): + def __init__(self, info: InstallationInfo, packages: VSPackages) -> None: + self.info = info + self.packages = packages + + @overload + def get_package_from_id(self, id: str, required: Literal[True]) -> VSPackage: + ... + + @overload + def get_package_from_id(self, id: str, required: bool = False) -> Optional[VSPackage]: + ... + + def get_package_from_id(self, id: str, required: bool = False) -> Optional[VSPackage]: + for p in self.packages: + if p.identifier == id: + return p + + if required: + raise ValueError(f'No package with the identifier {id} found') + + return None + + @overload + def get_package_from_plugin_name(self, name: str, required: Literal[True]) -> VSPackage: + ... + + @overload + def get_package_from_plugin_name(self, name: str, required: bool = False) -> Optional[VSPackage]: + ... + + def get_package_from_plugin_name(self, name: str, required: bool = False) -> Optional[VSPackage]: + for p in self.packages: + if p.name.casefold() == name.casefold(): + return p + + if required: + raise ValueError(f'No package with the name {name} found') + + return None + + @overload + def get_package_from_namespace(self, namespace: str, required: Literal[True]) -> VSPackage: + ... + + @overload + def get_package_from_namespace(self, namespace: str, required: bool = False) -> Optional[VSPackage]: + ... + + def get_package_from_namespace(self, namespace: str, required: bool = False) -> Optional[VSPackage]: + for p in self.packages: + if p.namespace == namespace: + return p + + if required: + raise ValueError(f'No package with the namespace {namespace} found') + + return None + + @overload + def get_package_from_modulename(self, modulename: str, required: Literal[True]) -> VSPackage: + ... + + @overload + def get_package_from_modulename(self, modulename: str, required: bool = False) -> Optional[VSPackage]: + ... + + def get_package_from_modulename(self, modulename: str, required: bool = False) -> Optional[VSPackage]: + for p in self.packages: + if p.modulename == modulename: + return p + + if required: + raise ValueError(f'No package with the modulename {modulename} found') + + return None + + def get_package_from_name(self, name: str) -> VSPackage: + p = self.get_package_from_id(name) + + if p is None: + p = self.get_package_from_namespace(name) + + if p is None: + p = self.get_package_from_modulename(name) + + if p is None: + p = self.get_package_from_plugin_name(name) + + if p is None: + raise ValueError(f'Package {name} not found') + + return p + + def is_package_installed(self, id: str) -> bool: + return id in self + + def is_package_upgradable(self, id: str, force: bool) -> bool: + pkg = self.get_package_from_id(id, True) + + lastest_installable = pkg.get_latest_installable_release() + + if force: + return ( + self.is_package_installed(id) + and (lastest_installable is not None) + and (self[id] != lastest_installable.version) + ) + + return ( + self.is_package_installed(id) + and (lastest_installable is not None) + and (self[id] != 'Unknown') + and (self[id] != lastest_installable.version) + ) diff --git a/vsrepo/utils/py.typed b/vsrepo/utils/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/vsrepo/utils/site.py b/vsrepo/utils/site.py new file mode 100644 index 00000000..7abc581c --- /dev/null +++ b/vsrepo/utils/site.py @@ -0,0 +1,123 @@ +import sys +from os import getenv +from pathlib import Path +from typing import TYPE_CHECKING, NamedTuple, Optional, Union + +from .installations import detect_vapoursynth_installation, is_sitepackage_install, is_sitepackage_install_portable + +if TYPE_CHECKING: + from .types import VSRepoNamespace +else: + from argparse import Namespace as VSRepoNamespace + +try: + import winreg +except ImportError: + print('{} is only supported on Windows.'.format(__file__)) + sys.exit(1) + + +def is_venv() -> bool: + return hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix) + + +def get_vs_installation_site() -> Path: + if is_venv(): + try: + return detect_vapoursynth_installation().parent + except ImportError: + import setuptools + return Path(setuptools.__file__).parent.parent + + import site + return Path(site.getusersitepackages()) + + +class InstallationInfo(NamedTuple): + is_64bits: bool + file_dirname: Path + package_json_path: Path + plugin_path: Path + site_package_dir: Union[Path, None] + py_script_path: Path + cmd7zip_path: Path + + def print_info(self) -> None: + print('Paths:') + print(f'Definitions: {self.package_json_path}') + print(f'Binaries: {self.plugin_path}') + print(f'Scripts: {self.py_script_path}') + + if self.site_package_dir is not None: + print(f'Dist-Infos: {self.site_package_dir}') + else: + print('Dist-Infos: ') + + +def get_installation_info(args: VSRepoNamespace) -> InstallationInfo: + is_64bits = str(args.target) == 'win64' + + file_dirname = Path(__file__).parent.parent.absolute() + + # VSRepo is installed to the site-packages. + if Path(sys.prefix) in file_dirname.parents: + file_dirname = Path.cwd() + + if args.portable: + plugin32_path = file_dirname / 'vapoursynth32' / 'plugins' + plugin64_path = file_dirname / 'vapoursynth64' / 'plugins' + package_json_path = file_dirname / 'vspackages3.json' + elif is_sitepackage_install_portable(args.portable): + base_path = detect_vapoursynth_installation().parent + + plugin32_path = base_path / 'vapoursynth32' / 'plugins' + plugin64_path = base_path / 'vapoursynth64' / 'plugins' + package_json_path = base_path / 'vspackages3.json' + else: + pluginparent = Path(str(getenv('APPDATA'))) / 'VapourSynth' + + plugin32_path = pluginparent / 'plugins32' + plugin64_path = pluginparent / 'plugins64' + package_json_path = pluginparent / 'vsrepo' / 'vspackages3.json' + + plugin_path = plugin64_path if is_64bits else plugin32_path + + if args.force_dist_info or is_sitepackage_install(args.portable): + if is_venv(): + try: + import setuptools + site_package_dir: Optional[Path] = Path(setuptools.__file__).parent.parent + del setuptools + except ImportError: + site_package_dir = None + else: + import site + site_package_dir = Path(site.getusersitepackages()) + else: + site_package_dir = None + + py_script_path = file_dirname if args.portable else (site_package_dir or get_vs_installation_site()) + + if args.script_path is not None: + py_script_path = Path(args.script_path) + + if args.binary_path is not None: + plugin_path = Path(args.binary_path) + + py_script_path.mkdir(parents=True, exist_ok=True) + plugin_path.mkdir(parents=True, exist_ok=True) + package_json_path.parent.mkdir(parents=True, exist_ok=True) + + cmd7zip_path = file_dirname / '7z.exe' + if not cmd7zip_path.is_file(): + try: + with winreg.OpenKeyEx( + winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\7-Zip', reserved=0, access=winreg.KEY_READ + ) as regkey: + cmd7zip_path = Path(winreg.QueryValueEx(regkey, 'Path')[0]) / '7z.exe' + except BaseException: + cmd7zip_path = Path('7z.exe') + + return InstallationInfo( + is_64bits, file_dirname, package_json_path, plugin_path, site_package_dir, py_script_path, cmd7zip_path + ) diff --git a/vsrepo/utils/types.py b/vsrepo/utils/types.py new file mode 100644 index 00000000..54b027ee --- /dev/null +++ b/vsrepo/utils/types.py @@ -0,0 +1,289 @@ +import json +import logging +import sys +from argparse import Namespace +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import ( + Any, Dict, Generic, Iterator, List, Literal, NamedTuple, Sequence, Tuple, Type, TypeVar, Union, overload +) + +from .installations import get_vapoursynth_api_version +from .site import InstallationInfo +from .utils import sanitize_dict + +T = TypeVar('T') + + +class DescriptorBase(Generic[T]): + cls_type: Type[T] + + def __class_getitem__(cls, new_cls_type: Type[T]) -> 'DescriptorBase[T]': + class inner_Descriptor(cls): # type: ignore + if isinstance(new_cls_type, type): + cls_type = new_cls_type + else: + @property # type: ignore + def cls_type(cls) -> Type[T]: + if not hasattr(cls, '_cls_type'): + cls._cls_type = eval(new_cls_type) + return cls._cls_type + + return inner_Descriptor # type: ignore + + def __init__(self, default: T) -> None: + self._default = default + + def __set_name__(self, owner: object, name: str) -> None: + self._name = '_' + name + + def __get__(self, obj: object, ctype: type) -> T: + if obj is None: + return self._default + + return getattr(obj, self._name, self._default) + + def __set__(self, obj: object, value: T) -> None: + setattr(obj, self._name, self.cls_type(value)) # type: ignore + + +class CustomEnum(Enum): + @classmethod + def Descriptor(cls: Type[T], default: T) -> T: + return DescriptorBase[cls](default) # type: ignore + + +class VSPackageUpdateMode(str, CustomEnum): + MANUAL = 'manual' + GIT = 'git-commits' + + +class VSPackageDeviceType(str, CustomEnum): + CPU = 'cpu' + CUDA = 'cuda' + OPENCL = 'opencl' + VULKAN = 'vulkan' + + +class VSPackageType(str, CustomEnum): + SCRIPT = 'PyScript' + WHEEL = 'PyWheel' + PLUGIN = 'VSPlugin' + + def get_package_key(self, n_bits: int = 64) -> str: + if self is VSPackageType.SCRIPT: + return 'script' + + if self is VSPackageType.WHEEL: + return 'wheel' + + if self is VSPackageType.PLUGIN: + return f'win{n_bits}' + + raise ValueError + + +class VSPackagePlatformReleaseFile(NamedTuple): + filename: str + hash: str + + +@dataclass +class VSPackagePlatformReleaseWheel: + url: str + hash: str + api: int = 3 + + +@dataclass +class VSPackagePlatformRelease: + url: str + files: Dict[Path, VSPackagePlatformReleaseFile] + api: int = 3 + + +@dataclass +class VSPackageRel: + version: str + published: str + + @staticmethod + def from_dict(obj: Dict[str, Any]) -> 'VSPackageRel': + cls: Type[VSPackageRel] = VSPackageRel + if (key := VSPackageType.SCRIPT.get_package_key()) in obj: + cls = VSPackageRelPyScript + elif (whl_key := VSPackageType.WHEEL.get_package_key()) in obj: + cls = VSPackageRelPyWheel + else: + win32 = (win32_key := VSPackageType.PLUGIN.get_package_key(32)) in obj + win64 = (win64_key := VSPackageType.PLUGIN.get_package_key(64)) in obj + + if win64 and win32: + cls = VSPackageRelWin + elif win64: + cls = VSPackageRelWin64 + key = win64_key + elif win32: + cls = VSPackageRelWin32 + key = win32_key + + if cls is VSPackageRelPyWheel: + obj[whl_key] = VSPackagePlatformReleaseWheel(**obj[whl_key]) + elif cls is not VSPackageRel: + keys = [win32_key, win64_key] if cls is VSPackageRelWin else [key] + for key in keys: + obj[key] = VSPackagePlatformRelease(**sanitize_dict(obj[key], 'platform_release')) + + # kw_only is Py3.10 only... + if 'published' not in obj: + obj['published'] = '2000-01-01T00:00:00Z' + + return cls(**sanitize_dict(obj, 'release')) + + @overload + def get_release(self, pkg_type: Literal[VSPackageType.WHEEL]) -> Union[ # type: ignore + VSPackagePlatformReleaseWheel, None + ]: + ... + + @overload + def get_release(self, pkg_type: VSPackageType) -> Union[VSPackagePlatformRelease, None]: + ... + + def get_release(self, pkg_type: VSPackageType) -> Union[ + VSPackagePlatformRelease, VSPackagePlatformReleaseWheel, None + ]: + return getattr(self, pkg_type.get_package_key(), None) + + +@dataclass +class VSPackageRelPyScript(VSPackageRel): + script: VSPackagePlatformRelease + + +@dataclass +class VSPackageRelPyWheel(VSPackageRel): + wheel: VSPackagePlatformReleaseWheel + + +@dataclass +class VSPackageRelWin32(VSPackageRel): + win32: VSPackagePlatformRelease + + +@dataclass +class VSPackageRelWin64(VSPackageRel): + win64: VSPackagePlatformRelease + + +@dataclass +class VSPackageRelWin(VSPackageRelWin32, VSPackageRelWin64): + ... + + +@dataclass +class VSPackage: + name: str + category: str + description: str + identifier: str + github: str = '' + namespace: str = '' + website: str = '' + doom9: str = '' + api: int = 3 + pkg_type: VSPackageType = VSPackageType.Descriptor(VSPackageType.SCRIPT) + updatemode: VSPackageUpdateMode = VSPackageUpdateMode.Descriptor(VSPackageUpdateMode.MANUAL) + releases: List[VSPackageRel] = field(default_factory=list) + dependencies: List[str] = field(default_factory=list) + device: List[VSPackageDeviceType] = field(default_factory=list) + modulename: Union[str, None] = None + wheelname: Union[str, None] = None + + def is_type(self, pkg_type: VSPackageType) -> bool: + return self.pkg_type is pkg_type + + def get_latest_installable_release_with_index(self) -> Tuple[int, Union[VSPackageRel, None]]: + max_api = get_vapoursynth_api_version() + + for idx, rel in enumerate(self.releases): + prel = rel.get_release(self.pkg_type) + + if prel: + bin_api = max(self.api, prel.api) + + if 3 <= bin_api <= max_api: + return (idx, rel) + + return (-1, None) + + def get_latest_installable_release(self) -> Union[VSPackageRel, None]: + return self.get_latest_installable_release_with_index()[1] + + def get_install_path(self, info: InstallationInfo) -> Path: + if self.pkg_type in {VSPackageType.SCRIPT, VSPackageType.WHEEL}: + return info.py_script_path + + if self.pkg_type is VSPackageType.PLUGIN: + return info.plugin_path + + raise ValueError('Unknown install type') + + def get_python_name(self) -> str: + package_name = self.wheelname or self.name + + return package_name.replace('.', '_').replace(' ', '_').replace('(', '_').replace(')', '') + + +@dataclass +class VSPackages: + file_format: int + packages: List[VSPackage] + + def __iter__(self) -> Iterator[VSPackage]: + return iter(self.packages) + + @classmethod + def from_file(cls, filepath: Path, /) -> 'VSPackages': + try: + vspackages = json.loads(filepath.read_text()) + + if vspackages is None: + raise ValueError + + file_format = int(vspackages['file-format']) + + if file_format != 3: + raise ValueError( + f'Package definition format is {file_format} but only version 3 is supported' + ) + + packages = [ + VSPackage(**sanitize_dict(package, 'package')) + for package in vspackages['packages'] + ] + except (OSError, FileExistsError, ValueError) as e: + message = str(e) or 'No valid package definitions found. Run update command first!' + + logging.error(message) + + sys.exit(1) + + return VSPackages(file_format, packages) + + +class VSRepoNamespace(Namespace): + operation: Literal[ + 'install', 'update', 'upgrade', 'upgrade-all', 'uninstall', + 'installed', 'available', 'paths', 'genstubs', 'gendistinfo' + ] + package: Sequence[str] + force: bool + portable: bool + skip_deps: bool + target: Literal['win32', 'win64'] + binary_path: Path + script_path: Path + stub_output_file: Path + force_dist_info: bool diff --git a/vsrepo/utils/utils.py b/vsrepo/utils/utils.py new file mode 100644 index 00000000..9b2232d6 --- /dev/null +++ b/vsrepo/utils/utils.py @@ -0,0 +1,33 @@ +from pathlib import Path +from typing import Any, Dict, Mapping + + +def sanitize_key(key: str, obj_type: str) -> str: + if obj_type == 'package': + if key == 'type': + return 'pkg_type' + + return key + + +def sanitize_value(key: str, value: Any, obj_type: str) -> Any: + if obj_type == 'package': + if key == 'releases': + from .types import VSPackageRel + return [VSPackageRel.from_dict(release) for release in value] + elif obj_type == 'platform_release': + if key == 'files': + from .types import VSPackagePlatformReleaseFile + return { + Path(filename): VSPackagePlatformReleaseFile(filename, file_hash) + for filename, (filename, file_hash) in value.items() + } + + return value + + +def sanitize_dict(obj: Mapping[str, Any], obj_type: str) -> Dict[str, Any]: + return { + sanitize_key(key, obj_type): sanitize_value(key, value, obj_type) + for key, value in obj.items() + } diff --git a/vsrupdate.py b/vsrupdate/__init__.py similarity index 100% rename from vsrupdate.py rename to vsrupdate/__init__.py diff --git a/vsrupdate/py.typed b/vsrupdate/py.typed new file mode 100644 index 00000000..e69de29b