diff --git a/repogen/cache.py b/repogen/cache.py new file mode 100644 index 0000000..2c33d64 --- /dev/null +++ b/repogen/cache.py @@ -0,0 +1,26 @@ +import os +from pathlib import Path +from typing import IO, TypeVar, Callable + +_approot = Path(__file__).parent.parent + +assert _approot.samefile(os.getcwd()) + +_cachepath = _approot.joinpath('cache') + +F = TypeVar("F", bound=Callable) + + +def copy_signature(_: F) -> Callable[..., F]: + return lambda f: f + + +def path(name: str) -> Path: + return _cachepath.joinpath(name) + + +@copy_signature(open) +def open_file(file: str, *args, **kwargs) -> IO: + if not _cachepath.exists(): + _cachepath.mkdir() + return open(_cachepath.joinpath(file), *args, **kwargs) diff --git a/repogen/common.py b/repogen/common.py index 994ef9e..ef8e652 100644 --- a/repogen/common.py +++ b/repogen/common.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import urllib +from os import path from urllib.parse import urljoin import requests @@ -9,6 +10,8 @@ def url_fixup(u: str) -> str: parsed = urllib.parse.urlparse(u) + if parsed.scheme != 'https': + return u segs = parsed.path.split('/') if parsed.hostname == 'github.com' and len(segs) == 7 and segs[3] == 'releases' and segs[4] == 'latest': resp = requests.get(u, allow_redirects=False) diff --git a/repogen/downloadipk.py b/repogen/downloadipk.py index 0400684..bd7c5ed 100644 --- a/repogen/downloadipk.py +++ b/repogen/downloadipk.py @@ -10,7 +10,7 @@ parser.add_argument('-o', '--output', required=True) args = parser.parse_args() - pkginfo = pkg_info.parse_package_info(args.info) + pkginfo = pkg_info.from_package_info_file(args.info) with requests.get(pkginfo['manifest']['ipkUrl'], allow_redirects=True) as resp: with open(args.output, 'wb') as f: f.write(resp.content) diff --git a/repogen/ipk_file.py b/repogen/ipk_file.py new file mode 100644 index 0000000..6b20ac1 --- /dev/null +++ b/repogen/ipk_file.py @@ -0,0 +1,28 @@ +import json +import re +import tarfile +from typing import Optional, TypedDict, NotRequired + +import ar +import io + + +class AppInfo(TypedDict): + id: str + title: str + version: str + type: str + appDescription: NotRequired[str] + + +def get_appinfo(ipk_path: str) -> AppInfo: + with open(ipk_path, 'rb') as f: + archive = ar.Archive(f) + control_file = io.BytesIO(archive.open('control.tar.gz', mode='rb').read()) + with tarfile.open(fileobj=control_file, mode='r:gz') as control: + with control.extractfile(control.getmember('control')) as cf: + package_name = re.compile(r'Package: (.+)\n').match(cf.readline().decode('utf-8')).group(1) + data_file = io.BytesIO(archive.open('data.tar.gz', mode='rb').read()) + with tarfile.open(fileobj=data_file, mode='r:gz') as data: + with data.extractfile(data.getmember(f'usr/palm/applications/{package_name}/appinfo.json')) as af: + return json.load(af) diff --git a/repogen/lintpkg.py b/repogen/lintpkg.py index 1e90f15..443abd5 100644 --- a/repogen/lintpkg.py +++ b/repogen/lintpkg.py @@ -1,4 +1,6 @@ +from os import path from urllib.parse import urlparse +from urllib.request import url2pathname from xml.etree import ElementTree import requests @@ -6,6 +8,7 @@ from markdown.treeprocessors import Treeprocessor from repogen import pkg_info +from repogen.pkg_info import PackageInfo class PackageInfoLinter: @@ -18,6 +21,7 @@ def _assert(errors: [str], condition, message): class ImageProcessor(Treeprocessor): def __init__(self, errors: [str]): + super().__init__() self.errors = errors def run(self, root: ElementTree.Element): @@ -27,19 +31,19 @@ def run(self, root: ElementTree.Element): self.errors.append("Use HTTPS URL for %s" % src) return None - def lint(self, pkginfo) -> [str]: + def lint(self, info: PackageInfo) -> [str]: errors: [str] = [] # Pool property - if pkginfo.get('nopool', False): + if info.get('nopool', False): errors.append('pool property is required (`main` or `non-free`)') - elif pkginfo['pool'] not in ['main', 'non-free']: + elif info['pool'] not in ['main', 'non-free']: errors.append('pool property must be `main` or `non-free`') # Process icon - icon_uri = urlparse(pkginfo['iconUri']) + icon_uri = urlparse(info['iconUri']) if icon_uri.scheme == 'data' or icon_uri.scheme == 'https': - with requests.get(pkginfo['iconUri']) as resp: + with requests.get(info['iconUri']) as resp: if resp.status_code == 200: pass else: @@ -48,12 +52,15 @@ def lint(self, pkginfo) -> [str]: errors.append('iconUrl must be data URI or use HTTPS') # Process manifest - PackageInfoLinter._validate_manifest_url(pkginfo['manifestUrl'], 'manifestUrl', errors) + if 'manifestUrl' in info: + PackageInfoLinter._validate_manifest_url(info['manifestUrl'], 'manifestUrl', errors) + elif 'manifest' not in info: + errors.append('Either `manifestUrl` or `manifest` is required') - if 'manifestUrlBeta' in pkginfo: - PackageInfoLinter._validate_manifest_url(pkginfo['manifestUrlBeta'], 'manifestUrlBeta', errors) + if 'manifestUrlBeta' in info: + PackageInfoLinter._validate_manifest_url(info['manifestUrlBeta'], 'manifestUrlBeta', errors) - description = pkginfo.get('description', '') + description = info.get('description', '') mk = Markdown() # patch in the customized image pattern matcher with url checking mk.treeprocessors.register( @@ -64,14 +71,17 @@ def lint(self, pkginfo) -> [str]: @staticmethod def _validate_manifest_url(url: str, key: str, e: [str]): manifest_url_pre = urlparse(url) - if manifest_url_pre.scheme == 'https': - with requests.get(url) as resp: - if resp.status_code == 200: - resp.json() - else: - e.append(f"{key} must be accessible") - else: - e.append(f"{key} must be HTTPS URL") + match manifest_url_pre.scheme: + case 'https': + with requests.get(url) as resp: + if resp.status_code == 200: + resp.json() + else: + e.append(f"{key} must be accessible") + case 'file': + assert path.isfile(url2pathname(manifest_url_pre.path)) + case _: + e.append(f"{key} must be HTTPS URL") if __name__ == '__main__': @@ -81,14 +91,16 @@ def _validate_manifest_url(url: str, key: str, e: [str]): parser.add_argument('-f', '--file', required=True) args = parser.parse_args() - pkginfo = pkg_info.parse_package_info(args.file) + lint_pkginfo = pkg_info.from_package_info_file(args.file) + if lint_pkginfo is None: + raise ValueError('No package info') linter = PackageInfoLinter() - errors = linter.lint(pkginfo) + lint_errors = linter.lint(lint_pkginfo) - if len(errors): + if len(lint_errors): print('#### Issue:') - for err in errors: + for err in lint_errors: print(' * %s' % err) exit(1) else: diff --git a/repogen/pkg_info.py b/repogen/pkg_info.py index 4948ba4..470a372 100644 --- a/repogen/pkg_info.py +++ b/repogen/pkg_info.py @@ -1,7 +1,7 @@ import locale import os from datetime import datetime -from os.path import basename, isfile, join +from os.path import isfile, join from typing import TypedDict, Optional, List, NotRequired import bleach @@ -31,18 +31,21 @@ class PackageInfo(TypedDict): lastmodified_str: str -def parse_package_info(info_path: str, offline=False) -> Optional[PackageInfo]: +def from_package_info_file(info_path: str, offline=False) -> Optional[PackageInfo]: extension = os.path.splitext(info_path)[1] content: PackageRegistry if extension == '.yml': - content = parse_yml_package(info_path) + pkgid, content = parse_yml_package(info_path) elif extension == '.py': - content = load_py_package(info_path) + pkgid, content = load_py_package(info_path) else: return None - if not ('title' in content) and ('iconUri' in content) and ('manifestUrl' in content): + if not ('title' in content and 'iconUri' in content and 'manifestUrl' in content): return None - pkgid = os.path.splitext(basename(info_path))[0] + return from_package_info(pkgid, content, offline) + + +def from_package_info(pkgid: str, content: PackageRegistry, offline=False): manifest_url = url_fixup(content['manifestUrl']) pkginfo: PackageInfo = { 'id': pkgid, @@ -82,7 +85,8 @@ def parse_package_info(info_path: str, offline=False) -> Optional[PackageInfo]: def list_packages(pkgdir: str, offline: bool = False) -> List[PackageInfo]: paths = [join(pkgdir, f) for f in os.listdir(pkgdir) if isfile(join(pkgdir, f))] - return sorted(filter(lambda x: x, map(lambda p: parse_package_info(p, offline), paths)), key=lambda x: x['title']) + return sorted(filter(lambda x: x, map(lambda p: from_package_info_file(p, offline), paths)), + key=lambda x: x['title']) def valid_pool(value: str) -> str: diff --git a/repogen/pkg_manifest.py b/repogen/pkg_manifest.py index 930ec3f..ed4b2c7 100644 --- a/repogen/pkg_manifest.py +++ b/repogen/pkg_manifest.py @@ -1,52 +1,71 @@ import json import os +import urllib.parse from datetime import datetime from email.utils import parsedate_to_datetime from json import JSONDecodeError from os import path -from typing import Tuple, TypedDict, Optional +from typing import Tuple, TypedDict, Optional, NotRequired, Literal from urllib.parse import urljoin +from urllib.request import url2pathname import requests +from repogen import cache from repogen.common import url_fixup, url_size +class PackageHash(TypedDict): + sha256: str + + class PackageManifest(TypedDict): id: str title: str version: str type: str appDescription: Optional[str] + iconUri: str + sourceUrl: NotRequired[str] + rootRequired: NotRequired[bool | Literal['optional']] ipkUrl: str + ipkHash: PackageHash ipkSize: int -def obtain_manifest(pkgid: str, channel: str, url: str, offline: bool = False) -> Tuple[PackageManifest, datetime]: - if not path.exists('cache'): - os.mkdir('cache') - cache_file = path.join('cache', f'manifest_{pkgid}_{channel}.json') - try: - if offline: - raise requests.exceptions.ConnectionError('Offline') - url = url_fixup(url) - resp = requests.get(url=url, allow_redirects=True) - manifest = resp.json() - manifest['ipkUrl'] = urljoin(url, manifest['ipkUrl']) - manifest['ipkSize'] = url_size(manifest['ipkUrl']) - with open(cache_file, 'w', encoding='utf-8') as f: - json.dump(manifest, f) - last_modified = datetime.now() - if 'last-modified' in resp.headers: - last_modified = parsedate_to_datetime( - resp.headers['last-modified']) - os.utime(cache_file, (last_modified.timestamp(), last_modified.timestamp())) - return manifest, last_modified - except requests.exceptions.RequestException as e: - if path.exists(cache_file): - try: - with open(cache_file, encoding='utf-8') as f: - return json.load(f), datetime.fromtimestamp(os.stat(cache_file).st_mtime) - except IOError or JSONDecodeError: - os.unlink(cache_file) - raise e +def obtain_manifest(pkgid: str, channel: str, uri: str, offline: bool = False) -> Tuple[PackageManifest, datetime]: + parsed = urllib.parse.urlparse(uri) + if parsed.scheme == 'file': + manifest_path = url2pathname(parsed.path) + try: + with open(manifest_path, encoding='utf-8') as f: + return json.load(f), datetime.fromtimestamp(os.stat(manifest_path).st_mtime) + except IOError or JSONDecodeError: + os.unlink(manifest_path) + else: + cache_name = f'manifest_{pkgid}_{channel}.json' + cache_file = cache.path(cache_name) + try: + if offline: + raise requests.exceptions.ConnectionError('Offline') + uri = url_fixup(uri) + resp = requests.get(url=uri, allow_redirects=True) + manifest = resp.json() + manifest['ipkUrl'] = urljoin(uri, manifest['ipkUrl']) + manifest['ipkSize'] = url_size(manifest['ipkUrl']) + with cache.open_file(cache_name, mode='w', encoding='utf-8') as f: + json.dump(manifest, f) + last_modified = datetime.now() + if 'last-modified' in resp.headers: + last_modified = parsedate_to_datetime( + resp.headers['last-modified']) + os.utime(cache_file, (last_modified.timestamp(), last_modified.timestamp())) + return manifest, last_modified + except requests.exceptions.RequestException as e: + if cache_file.exists(): + try: + with cache.open_file(cache_name, encoding='utf-8') as f: + return json.load(f), datetime.fromtimestamp(cache_file.stat().st_mtime) + except IOError or JSONDecodeError: + cache_file.unlink() + raise e diff --git a/repogen/pkg_registery.py b/repogen/pkg_registery.py index 59415a8..6e05c14 100644 --- a/repogen/pkg_registery.py +++ b/repogen/pkg_registery.py @@ -1,4 +1,6 @@ -from typing import TypedDict, Optional +import importlib +from os import path +from typing import TypedDict, NotRequired, Literal import yaml @@ -7,21 +9,25 @@ class PackageRegistry(TypedDict): title: str iconUri: str manifestUrl: str + manifestUrlBeta: NotRequired[str] category: str - detailIconUri: Optional[str] - funding: Optional[dict] + description: str + pool: Literal['main', 'non-free'] + detailIconUri: NotRequired[str] + funding: NotRequired[dict] -def parse_yml_package(p: str) -> PackageRegistry: +def parse_yml_package(p: str) -> (str, PackageRegistry): + pkgid = path.splitext(path.basename(p))[0] with open(p, encoding='utf-8') as f: - content = yaml.safe_load(f) - return content + content: PackageRegistry = yaml.safe_load(f) + return pkgid, content # noinspection PyUnresolvedReferences -def load_py_package(p: str) -> PackageRegistry: - pkgid = os.path.splitext(basename(p))[0] +def load_py_package(p: str) -> (str, PackageRegistry): + pkgid = path.splitext(path.basename(p))[0] spec = importlib.util.spec_from_file_location(f"pkg.{pkgid}", p) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) - return module.load() + return pkgid, module.load() diff --git a/repogen/plugin.py b/repogen/plugin.py index 9863911..625c3e6 100644 --- a/repogen/plugin.py +++ b/repogen/plugin.py @@ -23,7 +23,7 @@ def __init__(self, *args, **kwargs): self._md = Markdown(**self.settings['MARKDOWN']) def read(self, filename): - info = pkg_info.parse_package_info(filename, offline='CI' not in os.environ) + info = pkg_info.from_package_info_file(filename, offline='CI' not in os.environ) metadata = { 'title': info['title'], 'override_save_as': f'apps/{info["id"]}.html', diff --git a/requirements.txt b/requirements.txt index bf858ca..6ed9631 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,5 @@ pyscss~=1.4.0 cssmin~=0.2.0 livereload~=2.6.3 invoke~=2.0.0 +ar~=0.3.2 git+https://github.com/webosbrew/pelican-theme-webosbrew.git@v1.0.4#egg=pelican-theme-webosbrew \ No newline at end of file