From cb4f948678a1ee1bd8aa24d3cf9a4a8241167bec Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 1 Jun 2023 23:17:18 +0200 Subject: [PATCH 01/36] start: allow multiple mirrors per package, extensive include/exclude pattern matching --- quetz/cli.py | 22 +++------ quetz/dao.py | 24 ++++++++- quetz/db_models.py | 6 +++ quetz/frontend.py | 4 +- quetz/main.py | 42 +++++++++------- quetz/rest_models.py | 13 +++-- quetz/tasks/assertions.py | 2 +- quetz/tasks/mirror.py | 100 +++++++++++++++++++++++--------------- quetz/utils.py | 57 ++++++++++++++++++++++ 9 files changed, 189 insertions(+), 81 deletions(-) diff --git a/quetz/cli.py b/quetz/cli.py index 5cc2a82c..bffe161e 100644 --- a/quetz/cli.py +++ b/quetz/cli.py @@ -72,8 +72,7 @@ def _alembic_config(db_url: str) -> AlembicConfig: script_location = "quetz:migrations" migration_modules = [ - f"{ep.module}:versions" - for ep in entry_points().select(group='quetz.migrations') + f"{ep.module}:versions" for ep in entry_points().select(group='quetz.migrations') ] migration_modules.append("quetz:migrations/versions") @@ -128,9 +127,7 @@ def _make_migrations( found = True if plugin_name != "quetz" and not found: - raise Exception( - f"models entrypoint (quetz.models) for plugin {plugin_name} not registered" - ) + raise Exception(f"models entrypoint (quetz.models) for plugin {plugin_name} not registered") logger.info('Making DB migrations on %r for %r', db_url, plugin_name) if not alembic_config and db_url: @@ -140,9 +137,7 @@ def _make_migrations( if plugin_name == "quetz": version_path = None # Path(quetz.__file__).parent / 'migrations' / 'versions' else: - entry_point = tuple( - entry_points().select(group='quetz.migrations', name=plugin_name) - )[0] + entry_point = tuple(entry_points().select(group='quetz.migrations', name=plugin_name))[0] module = entry_point.load() version_path = str(Path(module.__file__).parent / "versions") @@ -203,9 +198,7 @@ def _set_user_roles(db: Session, config: Config): f"with identity from provider '{provider}'" ) elif user.role is not None and user.role != default_role: - logger.warning( - f"user has already role {user.role} not assigning a new role" - ) + logger.warning(f"user has already role {user.role} not assigning a new role") else: user.role = role @@ -436,8 +429,7 @@ def create( if _is_deployment(deployment_folder): if exists_ok: logger.info( - f'Quetz deployment already exists at {deployment_folder}.\n' - f'Skipping creation.' + f'Quetz deployment already exists at {deployment_folder}.\n' f'Skipping creation.' ) return if delete and (copy_conf or create_conf): @@ -733,9 +725,7 @@ def plugin( # Try to install pip if it's missing if conda_exe_path is not None: print("pip is missing, installing...") - subprocess.call( - [conda_exe_path, 'install', '--channel', 'conda-forge', 'pip'] - ) + subprocess.call([conda_exe_path, 'install', '--channel', 'conda-forge', 'pip']) pip_exe_path = find_executable('pip') if pip_exe_path is None: diff --git a/quetz/dao.py b/quetz/dao.py index 3e61aa0b..d5085d9f 100644 --- a/quetz/dao.py +++ b/quetz/dao.py @@ -311,10 +311,17 @@ def create_channel( "only ASCII characters should be used in channel name" ) + if isinstance(data.mirror_channel_url, str): + mirror_channel_urls = data.mirror_channel_url + elif isinstance(data.mirror_channel_url, list): + mirror_channel_urls = ";".join(data.mirror_channel_url) + else: + mirror_channel_urls = None # type: ignore + channel = Channel( name=data.name, description=data.description, - mirror_channel_url=data.mirror_channel_url, + mirror_channel_url=mirror_channel_urls, mirror_mode=data.mirror_mode, private=data.private, ttl=data.ttl, @@ -332,6 +339,21 @@ def create_channel( return channel + def remove_package(self, channel_name: str, package_name: str): + self.db.query(Package).filter(Package.channel_name == channel_name).filter( + Package.name == package_name + ).delete() + self.db.query(PackageVersion).filter( + PackageVersion.channel_name == channel_name + ).filter(PackageVersion.package_name == package_name).delete() + + # remove PackageMember entries as well + self.db.query(PackageMember).filter( + PackageMember.channel_name == channel_name + ).filter(PackageMember.package_name == package_name).delete() + + self.db.commit() + def cleanup_channel_db( self, channel_name: str, diff --git a/quetz/db_models.py b/quetz/db_models.py index 10ece686..7b621731 100644 --- a/quetz/db_models.py +++ b/quetz/db_models.py @@ -208,6 +208,12 @@ class Channel(Base): size_limit = Column(BigInteger, default=None) ttl = Column(Integer, server_default=f'{60 * 60 * 10}', nullable=False) # 10 hours + def get_mirror_channel_urls(self) -> list[str]: + if self.mirror_channel_url: + return self.mirror_channel_url.split(";") + else: + return [] + packages = relationship( 'Package', back_populates='channel', cascade="all,delete", uselist=True ) diff --git a/quetz/frontend.py b/quetz/frontend.py index 02f0a695..c1b0b7ad 100644 --- a/quetz/frontend.py +++ b/quetz/frontend.py @@ -96,6 +96,4 @@ def register(app): frontend_dir = f"{sys.prefix}/share/quetz/frontend/" else: logger.info("Using basic fallback frontend") - frontend_dir = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "basic_frontend" - ) + frontend_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "basic_frontend") diff --git a/quetz/main.py b/quetz/main.py index fe7d0c53..710ebaab 100644 --- a/quetz/main.py +++ b/quetz/main.py @@ -749,22 +749,24 @@ def post_channel( # register mirror if is_mirror and register_mirror: - mirror_url = str(new_channel.mirror_channel_url) - mirror_url = mirror_url.replace("get", "api/channels") - headers = {"x-api-key": mirror_api_key} if mirror_api_key else {} - api_endpoint = str(request.url.replace(query=None)) + '/' + new_channel.name - request.url - response = session.post( - mirror_url + '/mirrors', - json={ - "url": api_endpoint.replace("api/channels", "get"), - "api_endpoint": api_endpoint, - "metrics_endpoint": api_endpoint.replace("api", "metrics"), - }, - headers=headers, - ) - if response.status_code != 201: - logger.warning(f"could not register mirror due to error {response.text}") + for mirror_url in new_channel.get_mirror_channel_urls(): + mirror_url = mirror_url.replace("get", "api/channels") + headers = {"x-api-key": mirror_api_key} if mirror_api_key else {} + api_endpoint = str(request.url.replace(query=None)) + '/' + new_channel.name + request.url + response = session.post( + mirror_url + '/mirrors', + json={ + "url": api_endpoint.replace("api/channels", "get"), + "api_endpoint": api_endpoint, + "metrics_endpoint": api_endpoint.replace("api", "metrics"), + }, + headers=headers, + ) + if response.status_code != 201: + logger.warning( + f"could not register mirror due to error {response.text}" + ) for action in actions: task.execute_channel_action( @@ -1800,12 +1802,16 @@ def serve_path( if is_package_request and channel.mirror_channel_url: # if we exclude the package from syncing, redirect to original URL + # use the first mirror url entry for the redirect channel_proxylist = json.loads(channel.channel_metadata).get('proxylist', []) if channel_proxylist and package_name and package_name in channel_proxylist: - return RedirectResponse(f"{channel.mirror_channel_url}/{path}") + redirect_url = channel.get_mirror_channel_urls()[0] + return RedirectResponse(f"{redirect_url}/{path}") + # note: you can only proxy, when you have exactly one remote channel in your config if channel.mirror_channel_url and channel.mirror_mode == "proxy": - repository = RemoteRepository(channel.mirror_channel_url, session) + proxy_url = channel.get_mirror_channel_urls()[0] + repository = RemoteRepository(proxy_url, session) if not pkgstore.file_exists(channel.name, path): download_remote_file(repository, pkgstore, channel.name, path) elif path.endswith(".json"): diff --git a/quetz/rest_models.py b/quetz/rest_models.py index ac6d8e0c..5ee6d2e6 100644 --- a/quetz/rest_models.py +++ b/quetz/rest_models.py @@ -7,7 +7,7 @@ import uuid from datetime import date, datetime from enum import Enum -from typing import Generic, List, Optional, TypeVar +from typing import Dict, Generic, List, Optional, TypeVar, Union from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator @@ -65,8 +65,10 @@ class ChannelBase(BaseModel): private: bool = Field(True, title="channel should be private") size_limit: Optional[int] = Field(None, title="size limit of the channel") ttl: int = Field(36000, title="ttl of the channel") - mirror_channel_url: Optional[str] = Field(None, pattern="^(http|https)://.+") - mirror_mode: Optional[MirrorMode] = Field(None) + mirror_channel_url: Optional[Union[str, List[str]]] = Field( + None, pattern="^(http|https)://.+", nullable=True + ) + mirror_mode: Optional[MirrorMode] = Field(None, nullable=True) @field_validator("size_limit") @classmethod @@ -121,6 +123,11 @@ class ChannelMetadata(BaseModel): None, title="list of packages to include while creating a channel", ) + include_pattern_list: Optional[Dict[str, List]] = Field( + None, + title="{'remote0': ['numpy*', 'pandas*'], 'remote1': ['*.tar.bz2']}", + nullable=True, + ) excludelist: Optional[List[str]] = Field( None, title="list of packages to exclude while creating a channel", diff --git a/quetz/tasks/assertions.py b/quetz/tasks/assertions.py index dd14fcfb..491639dd 100644 --- a/quetz/tasks/assertions.py +++ b/quetz/tasks/assertions.py @@ -1,5 +1,5 @@ def can_channel_synchronize(channel): - return channel.mirror_channel_url and (channel.mirror_mode == "mirror") + return channel.mirror_channel_url and (channel.mirror_mode in ["mirror", "proxy"]) def can_channel_synchronize_metrics(channel): diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index cc20f749..c83788f9 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -348,6 +348,7 @@ def initial_sync_mirror( update_batch = [] update_size = 0 + remove_batch = [] def handle_batch(update_batch): # i_batch += 1 @@ -404,15 +405,15 @@ def handle_batch(update_batch): return False - for package_name, metadata in packages.items(): - if check_package_membership(package_name, includelist, excludelist): - path = os.path.join(arch, package_name) + for repo_package_name, metadata in packages.items(): + if check_package_membership(repo_package_name, includelist, excludelist): + path = os.path.join(arch, repo_package_name) # try to find out whether it's a new package version is_uptodate = None for _check in version_checks: - is_uptodate = _check(package_name, metadata) + is_uptodate = _check(repo_package_name, metadata) if is_uptodate is not None: break @@ -420,11 +421,17 @@ def handle_batch(update_batch): if is_uptodate: continue else: - logger.debug(f"updating package {package_name} from {arch}") + logger.debug(f"updating package {repo_package_name} from {arch}") - update_batch.append((path, package_name, metadata)) + update_batch.append((path, repo_package_name, metadata)) update_size += metadata.get('size', 100_000) + else: + logger.debug( + f"package {repo_package_name} not member of channel anymore." + ) + remove_batch.append((arch, repo_package_name)) + if len(update_batch) >= max_batch_length or update_size >= max_batch_size: logger.debug(f"Executing batch with {update_size}") any_updated |= handle_batch(update_batch) @@ -434,8 +441,21 @@ def handle_batch(update_batch): # handle final batch any_updated |= handle_batch(update_batch) + if remove_batch: + logger.debug(f"Removing {len(remove_batch)} packages: {remove_batch}") + package_specs_remove = set([p[1].split("-")[0] for p in remove_batch]) + # TODO: reuse route [DELETE /api/channels/{channel_name}/packages] logic + for package_specs in package_specs_remove: + dao.remove_package(channel_name, package_name=package_specs) + dao.cleanup_channel_db(channel_name, package_name=package_specs) + pkgstore.delete_file(channel.name, destination=package_specs) + + any_updated |= True + if any_updated: - indexing.update_indexes(dao, pkgstore, channel_name, subdirs=[arch]) + indexing.update_indexes( + dao, pkgstore, channel_name, subdirs=[arch] + ) # build repodata def create_packages_from_channeldata( @@ -540,37 +560,39 @@ def synchronize_packages( logger.error(f"channel {channel_name} not found") return - host = new_channel.mirror_channel_url - - remote_repo = RemoteRepository(new_channel.mirror_channel_url, session) + mirror_channel_urls = new_channel.get_mirror_channel_urls() + for mirror_channel_url in mirror_channel_urls: + remote_repo = RemoteRepository(mirror_channel_url, session) - user_id = auth.assert_user() + user_id = auth.assert_user() - try: - channel_data = remote_repo.open("channeldata.json").json() - if use_repodata: - create_packages_from_channeldata(channel_name, user_id, channel_data, dao) - subdirs = channel_data.get("subdirs", []) - except (RemoteFileNotFound, json.JSONDecodeError): - subdirs = None - except RemoteServerError: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail=f"Remote channel {host} unavailable", - ) - # if no channel data use known architectures - if subdirs is None: - subdirs = KNOWN_SUBDIRS - - for arch in subdirs: - initial_sync_mirror( - new_channel.name, - remote_repo, - arch, - dao, - pkgstore, - auth, - includelist, - excludelist, - use_repodata=use_repodata, - ) + try: + channel_data = remote_repo.open("channeldata.json").json() + if use_repodata: + create_packages_from_channeldata( + channel_name, user_id, channel_data, dao + ) + subdirs = channel_data.get("subdirs", []) + except (RemoteFileNotFound, json.JSONDecodeError): + subdirs = None + except RemoteServerError: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=f"Remote channel {mirror_channel_url} unavailable", + ) + # if no channel data use known architectures + if subdirs is None: + subdirs = KNOWN_SUBDIRS + + for arch in subdirs: + initial_sync_mirror( + new_channel.name, + remote_repo, + arch, + dao, + pkgstore, + auth, + includelist, + excludelist, + use_repodata=use_repodata, + ) diff --git a/quetz/utils.py b/quetz/utils.py index 447f2952..a22465da 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -39,6 +39,47 @@ def check_package_membership(package_name, includelist, excludelist): return True +def _include_pattern_match(name, version, build, pattern): + """ + 3 possible formats + + name + name=version + name=version=build_string + Note that if a given regular expression is given, it must match the full name/version/build string. + """ + import re + + eq_count = pattern.count("=") + if eq_count == 0: + return re.match(pattern, name) + + elif eq_count == 1: + name_pattern, version_pattern = pattern.split("=") + return re.match(name_pattern, name) and re.match(version_pattern, version) + + elif eq_count == 2: + name_pattern, version_pattern, build_pattern = pattern.split("=") + return ( + re.match(name_pattern, name) + and re.match(version_pattern, version) + and re.match(build_pattern, build) + ) + else: + raise ValueError(f"Invalid pattern: {pattern}") + + +def check_package_membership_pattern( + package_spec, include_pattern_list, exclude_pattern_list +): + name, version, build = _parse_package_spec(package_spec) + for include_pattern in include_pattern_list: + if _include_pattern_match(name, version, build, include_pattern): + return True + else: + return False + + def add_static_file(contents, channel_name, subdir, fname, pkgstore, file_index=None): if type(contents) is not bytes: raw_file = contents.encode("utf-8") @@ -279,3 +320,19 @@ async def wrapper(*args: Any, **kwargs: Any) -> None: ) return wrapper + + +def _parse_package_spec(package_spec: str) -> tuple: + """ + Determine name, version and build number string from package spec. + + Package specs are allowed in three formats: + 1. `name` + 2. `name=version` + 3. `name=version=build` + + This function disassembles the spec into its parts. If version + and / or build are missing (cases 1 and 2), the missing field + is filled with a regular expression matching all versions / builds. + """ + raise NotImplementedError() From 267b94ab397a6ebde7c7a669d30a7d6ccb66e287 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 9 Jun 2023 12:35:52 +0200 Subject: [PATCH 02/36] use condas MatchSpec utility --- quetz/utils.py | 60 +++++++++++++------------------------------------- 1 file changed, 15 insertions(+), 45 deletions(-) diff --git a/quetz/utils.py b/quetz/utils.py index a22465da..94de0a8d 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -20,6 +20,7 @@ from typing import Any, Callable from urllib.parse import unquote +from conda.models.match_spec import MatchSpec, _parse_spec_str from sqlalchemy import String, and_, cast, collate, not_, or_ from .db_models import Channel, Package, PackageVersion, User @@ -39,43 +40,28 @@ def check_package_membership(package_name, includelist, excludelist): return True -def _include_pattern_match(name, version, build, pattern): - """ - 3 possible formats +def _parse_package_spec(package_spec: str) -> tuple[str, str, str]: + name, version, build, _ = _parse_spec_str(package_spec).values() + # version, build = _parse_version_plus_build(version) - name - name=version - name=version=build_string - Note that if a given regular expression is given, it must match the full name/version/build string. - """ - import re - - eq_count = pattern.count("=") - if eq_count == 0: - return re.match(pattern, name) - - elif eq_count == 1: - name_pattern, version_pattern = pattern.split("=") - return re.match(name_pattern, name) and re.match(version_pattern, version) - - elif eq_count == 2: - name_pattern, version_pattern, build_pattern = pattern.split("=") - return ( - re.match(name_pattern, name) - and re.match(version_pattern, version) - and re.match(build_pattern, build) - ) - else: - raise ValueError(f"Invalid pattern: {pattern}") + return name, version, build def check_package_membership_pattern( - package_spec, include_pattern_list, exclude_pattern_list + package_spec, include_pattern_list=[], exclude_pattern_list=[] ): + # TODO: validate performance, can we save the MatchSpec instances between calls? + # might be okay for <100 packages to check against, but what about 1000s? name, version, build = _parse_package_spec(package_spec) for include_pattern in include_pattern_list: - if _include_pattern_match(name, version, build, include_pattern): + # TODO: how do we get the build number? + include = MatchSpec(include_pattern).match( + {"name": name, "version": version, "build": build, "build_number": 0} + ) + exclude = False # TODO + if include and not exclude: return True + else: return False @@ -320,19 +306,3 @@ async def wrapper(*args: Any, **kwargs: Any) -> None: ) return wrapper - - -def _parse_package_spec(package_spec: str) -> tuple: - """ - Determine name, version and build number string from package spec. - - Package specs are allowed in three formats: - 1. `name` - 2. `name=version` - 3. `name=version=build` - - This function disassembles the spec into its parts. If version - and / or build are missing (cases 1 and 2), the missing field - is filled with a regular expression matching all versions / builds. - """ - raise NotImplementedError() From 557cec018fec3649c314b3931318ca17b8a735c5 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 9 Jun 2023 13:43:01 +0200 Subject: [PATCH 03/36] package spec relies on = while our repo package names use '-' --- quetz/utils.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/quetz/utils.py b/quetz/utils.py index 94de0a8d..68f7ecf3 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -41,10 +41,12 @@ def check_package_membership(package_name, includelist, excludelist): def _parse_package_spec(package_spec: str) -> tuple[str, str, str]: - name, version, build, _ = _parse_spec_str(package_spec).values() - # version, build = _parse_version_plus_build(version) - - return name, version, build + # spec = _parse_spec_str(package_spec) + # return spec.get("name", ""), spec.get("version", ""), spec.get("build", "") + # TODO: the package spec here looks like "numpy-1.23.4-py39hefdcf20_0.tar.bz2" + # and does not have "=" + spec = package_spec.split("-") + return spec[0], spec[1] if len(spec) > 1 else "", spec[2] if len(spec) > 2 else "" def check_package_membership_pattern( From df8a217dc94ac0cce67e0f4e5c0c9af6efe5a9bc Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 9 Jun 2023 13:43:29 +0200 Subject: [PATCH 04/36] validate that package was therefore before delete attempt is issued --- quetz/tasks/mirror.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index c83788f9..46b21097 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -23,7 +23,12 @@ from quetz.errors import DBError from quetz.pkgstores import PackageStore from quetz.tasks import indexing -from quetz.utils import TicToc, add_static_file, check_package_membership +from quetz.utils import ( + TicToc, + add_static_file, + check_package_membership, + check_package_membership_pattern, +) # copy common subdirs from conda: # https://github.com/conda/conda/blob/a78a2387f26a188991d771967fc33aa1fb5bb810/conda/base/constants.py#L63 @@ -406,7 +411,10 @@ def handle_batch(update_batch): return False for repo_package_name, metadata in packages.items(): - if check_package_membership(repo_package_name, includelist, excludelist): + # if check_package_membership(repo_package_name, includelist, excludelist): + if check_package_membership_pattern( + repo_package_name, includelist, excludelist + ): path = os.path.join(arch, repo_package_name) # try to find out whether it's a new package version @@ -446,9 +454,12 @@ def handle_batch(update_batch): package_specs_remove = set([p[1].split("-")[0] for p in remove_batch]) # TODO: reuse route [DELETE /api/channels/{channel_name}/packages] logic for package_specs in package_specs_remove: + # TODO: only remove if it already exists of course dao.remove_package(channel_name, package_name=package_specs) + # TODO: is this needed every time? dao.cleanup_channel_db(channel_name, package_name=package_specs) - pkgstore.delete_file(channel.name, destination=package_specs) + if pkgstore.file_exists(channel.name, package_specs): + pkgstore.delete_file(channel.name, destination=package_specs) any_updated |= True From 5815902996c34a0c9899c1b1e2a92c6e46c14e35 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 15 Jun 2023 13:44:55 +0200 Subject: [PATCH 05/36] update: include condas matchspec --- quetz/tasks/mirror.py | 10 ++++------ quetz/utils.py | 3 ++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 46b21097..5affdd79 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -23,12 +23,7 @@ from quetz.errors import DBError from quetz.pkgstores import PackageStore from quetz.tasks import indexing -from quetz.utils import ( - TicToc, - add_static_file, - check_package_membership, - check_package_membership_pattern, -) +from quetz.utils import TicToc, add_static_file, check_package_membership_pattern # copy common subdirs from conda: # https://github.com/conda/conda/blob/a78a2387f26a188991d771967fc33aa1fb5bb810/conda/base/constants.py#L63 @@ -410,6 +405,9 @@ def handle_batch(update_batch): return False + # TODO: also remove all packages that are not in the remote repository anymore + # practically re-write the complete sync mechanism? + for repo_package_name, metadata in packages.items(): # if check_package_membership(repo_package_name, includelist, excludelist): if check_package_membership_pattern( diff --git a/quetz/utils.py b/quetz/utils.py index 68f7ecf3..fe722340 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -20,7 +20,7 @@ from typing import Any, Callable from urllib.parse import unquote -from conda.models.match_spec import MatchSpec, _parse_spec_str +from conda.models.match_spec import MatchSpec from sqlalchemy import String, and_, cast, collate, not_, or_ from .db_models import Channel, Package, PackageVersion, User @@ -54,6 +54,7 @@ def check_package_membership_pattern( ): # TODO: validate performance, can we save the MatchSpec instances between calls? # might be okay for <100 packages to check against, but what about 1000s? + # TODO: matchspec vs package spec and build string matching with * name, version, build = _parse_package_spec(package_spec) for include_pattern in include_pattern_list: # TODO: how do we get the build number? From de2796938c7e3ec97b55c53a5b35fcc9c63bf654 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 15 Jun 2023 23:59:25 +0200 Subject: [PATCH 06/36] refactor: extract repodata retrieval || add: package matching on more complicated patterns --- quetz/cli.py | 22 +++++++--- quetz/frontend.py | 4 +- quetz/rest_models.py | 2 +- quetz/tasks/mirror.py | 80 ++++++++++++++++++++------------- quetz/utils.py | 100 +++++++++++++++++++++++++++++------------- 5 files changed, 138 insertions(+), 70 deletions(-) diff --git a/quetz/cli.py b/quetz/cli.py index bffe161e..5cc2a82c 100644 --- a/quetz/cli.py +++ b/quetz/cli.py @@ -72,7 +72,8 @@ def _alembic_config(db_url: str) -> AlembicConfig: script_location = "quetz:migrations" migration_modules = [ - f"{ep.module}:versions" for ep in entry_points().select(group='quetz.migrations') + f"{ep.module}:versions" + for ep in entry_points().select(group='quetz.migrations') ] migration_modules.append("quetz:migrations/versions") @@ -127,7 +128,9 @@ def _make_migrations( found = True if plugin_name != "quetz" and not found: - raise Exception(f"models entrypoint (quetz.models) for plugin {plugin_name} not registered") + raise Exception( + f"models entrypoint (quetz.models) for plugin {plugin_name} not registered" + ) logger.info('Making DB migrations on %r for %r', db_url, plugin_name) if not alembic_config and db_url: @@ -137,7 +140,9 @@ def _make_migrations( if plugin_name == "quetz": version_path = None # Path(quetz.__file__).parent / 'migrations' / 'versions' else: - entry_point = tuple(entry_points().select(group='quetz.migrations', name=plugin_name))[0] + entry_point = tuple( + entry_points().select(group='quetz.migrations', name=plugin_name) + )[0] module = entry_point.load() version_path = str(Path(module.__file__).parent / "versions") @@ -198,7 +203,9 @@ def _set_user_roles(db: Session, config: Config): f"with identity from provider '{provider}'" ) elif user.role is not None and user.role != default_role: - logger.warning(f"user has already role {user.role} not assigning a new role") + logger.warning( + f"user has already role {user.role} not assigning a new role" + ) else: user.role = role @@ -429,7 +436,8 @@ def create( if _is_deployment(deployment_folder): if exists_ok: logger.info( - f'Quetz deployment already exists at {deployment_folder}.\n' f'Skipping creation.' + f'Quetz deployment already exists at {deployment_folder}.\n' + f'Skipping creation.' ) return if delete and (copy_conf or create_conf): @@ -725,7 +733,9 @@ def plugin( # Try to install pip if it's missing if conda_exe_path is not None: print("pip is missing, installing...") - subprocess.call([conda_exe_path, 'install', '--channel', 'conda-forge', 'pip']) + subprocess.call( + [conda_exe_path, 'install', '--channel', 'conda-forge', 'pip'] + ) pip_exe_path = find_executable('pip') if pip_exe_path is None: diff --git a/quetz/frontend.py b/quetz/frontend.py index c1b0b7ad..02f0a695 100644 --- a/quetz/frontend.py +++ b/quetz/frontend.py @@ -96,4 +96,6 @@ def register(app): frontend_dir = f"{sys.prefix}/share/quetz/frontend/" else: logger.info("Using basic fallback frontend") - frontend_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "basic_frontend") + frontend_dir = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "basic_frontend" + ) diff --git a/quetz/rest_models.py b/quetz/rest_models.py index 5ee6d2e6..c65921b8 100644 --- a/quetz/rest_models.py +++ b/quetz/rest_models.py @@ -119,7 +119,7 @@ class ChannelActionEnum(str, Enum): class ChannelMetadata(BaseModel): - includelist: Optional[List[str]] = Field( + includelist: Optional[Union[List[str], Dict[str, List]]] = Field( None, title="list of packages to include while creating a channel", ) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 5affdd79..952f998c 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -6,7 +6,7 @@ from concurrent.futures import ThreadPoolExecutor from http.client import IncompleteRead from tempfile import SpooledTemporaryFile -from typing import List +from typing import List, Union import requests from fastapi import HTTPException, status @@ -23,7 +23,7 @@ from quetz.errors import DBError from quetz.pkgstores import PackageStore from quetz.tasks import indexing -from quetz.utils import TicToc, add_static_file, check_package_membership_pattern +from quetz.utils import TicToc, add_static_file, check_package_membership # copy common subdirs from conda: # https://github.com/conda/conda/blob/a78a2387f26a188991d771967fc33aa1fb5bb810/conda/base/constants.py#L63 @@ -278,36 +278,22 @@ def _upload_package(file, channel_name, subdir): file.file.close() -def initial_sync_mirror( - channel_name: str, - remote_repository: RemoteRepository, - arch: str, - dao: Dao, - pkgstore: PackageStore, - auth: authorization.Rules, - includelist: List[str] = None, - excludelist: List[str] = None, - skip_errors: bool = True, - use_repodata: bool = False, -): - force = True # needed for updating packages - logger.info( - f"Running channel mirroring {channel_name}/{arch} from {remote_repository.host}" - ) - +def get_remote_repodata( + channel_name: str, arch: str, remote_repository: RemoteRepository +) -> Union[dict, None]: repodata = {} for repodata_fn in ["repodata_from_packages.json", "repodata.json"]: try: repo_file = remote_repository.open(os.path.join(arch, repodata_fn)) repodata = json.load(repo_file.file) - break + return repodata except RemoteServerError: logger.error( f"can not get {repodata_fn} for channel {arch}/{channel_name}." ) if repodata_fn == "repodata.json": logger.error(f"Giving up for {channel_name}/{arch}.") - return + return None else: logger.error("Trying next filename.") continue @@ -317,10 +303,35 @@ def initial_sync_mirror( f"in channel {channel_name}" ) if repodata_fn == "repodata.json": - return + return None + + return {} - channel = dao.get_channel(channel_name) +def initial_sync_mirror( + channel_name: str, + remote_repository: RemoteRepository, + arch: str, + dao: Dao, + pkgstore: PackageStore, + auth: authorization.Rules, + includelist: List[str] = None, + excludelist: List[str] = None, + skip_errors: bool = True, + use_repodata: bool = False, +): + force = True # needed for updating packages + logger.info( + f"Running channel mirroring {channel_name}/{arch} from {remote_repository.host}" + ) + + repodata = get_remote_repodata(channel_name, arch, remote_repository) + if not repodata: + return # quit; error has already been logged. + + packages = repodata.get("packages", {}) + + channel = dao.get_channel(channel_name) if not channel: logger.error(f"channel {channel_name} not found") return @@ -408,10 +419,13 @@ def handle_batch(update_batch): # TODO: also remove all packages that are not in the remote repository anymore # practically re-write the complete sync mechanism? + # SYNC: Remote -> Local + # for each package in the remote repository: + # validate if it should be downloaded to this channel + # also: remove packages if they are not supposed to in this channel anymore for repo_package_name, metadata in packages.items(): - # if check_package_membership(repo_package_name, includelist, excludelist): - if check_package_membership_pattern( - repo_package_name, includelist, excludelist + if check_package_membership( + channel, repo_package_name, metadata, remote_host=remote_repository.host ): path = os.path.join(arch, repo_package_name) @@ -447,24 +461,28 @@ def handle_batch(update_batch): # handle final batch any_updated |= handle_batch(update_batch) + # SYNC: Local checks Remote + # Validate if all packages in this channel are still + # also present in the remote channel + # if not: add them to the remove batch as well + # TODO + if remove_batch: logger.debug(f"Removing {len(remove_batch)} packages: {remove_batch}") package_specs_remove = set([p[1].split("-")[0] for p in remove_batch]) - # TODO: reuse route [DELETE /api/channels/{channel_name}/packages] logic for package_specs in package_specs_remove: - # TODO: only remove if it already exists of course dao.remove_package(channel_name, package_name=package_specs) # TODO: is this needed every time? dao.cleanup_channel_db(channel_name, package_name=package_specs) + # only remove if exists if pkgstore.file_exists(channel.name, package_specs): pkgstore.delete_file(channel.name, destination=package_specs) any_updated |= True if any_updated: - indexing.update_indexes( - dao, pkgstore, channel_name, subdirs=[arch] - ) # build repodata + # build local repodata + indexing.update_indexes(dao, pkgstore, channel_name, subdirs=[arch]) def create_packages_from_channeldata( diff --git a/quetz/utils.py b/quetz/utils.py index fe722340..78ee38c8 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -26,47 +26,85 @@ from .db_models import Channel, Package, PackageVersion, User -def check_package_membership(package_name, includelist, excludelist): - if includelist: - for each_package in includelist: - if package_name.startswith(each_package): - return True - return False - elif excludelist: - for each_package in excludelist: - if package_name.startswith(each_package): - return False - return True - return True - - -def _parse_package_spec(package_spec: str) -> tuple[str, str, str]: +def _parse_package_spec(package_name: str, package_metadata) -> tuple[str, str, str]: + """Given a package name and metadata, return the package spec. + + Args: + package_name (str): The package name in file format, + e.g. "numpy-1.23.4-py39hefdcf20_0.tar.bz2" + package_metadata (_type_): Metadata of the package, + e.g. from repodata.json + + Returns: + tuple[str, str, str]: (name, version, build-string) + """ + # spec = _parse_spec_str(package_spec) # return spec.get("name", ""), spec.get("version", ""), spec.get("build", "") # TODO: the package spec here looks like "numpy-1.23.4-py39hefdcf20_0.tar.bz2" # and does not have "=" - spec = package_spec.split("-") + spec = package_name.split("-") return spec[0], spec[1] if len(spec) > 1 else "", spec[2] if len(spec) > 2 else "" -def check_package_membership_pattern( - package_spec, include_pattern_list=[], exclude_pattern_list=[] -): - # TODO: validate performance, can we save the MatchSpec instances between calls? - # might be okay for <100 packages to check against, but what about 1000s? - # TODO: matchspec vs package spec and build string matching with * - name, version, build = _parse_package_spec(package_spec) - for include_pattern in include_pattern_list: - # TODO: how do we get the build number? - include = MatchSpec(include_pattern).match( +def _check_package_match( + package_spec: tuple[str, str, str], + include_or_exclude_list: list[str], +) -> bool: + """ + Check if the given package specification matches + with the given include or exclude list. + Returns true if a match is found. + """ + name, version, build = package_spec + for pattern in include_or_exclude_list: + # TODO: validate if this matches with our current implementation + if MatchSpec(pattern).match( {"name": name, "version": version, "build": build, "build_number": 0} - ) - exclude = False # TODO - if include and not exclude: + ): return True - else: - return False + return False + + +def check_package_membership( + channel: Channel, + package_name: str, + package_metadata: dict, + remote_host: str, +): + """ + Check if a package should be in a channel according + to the rules defined in the channel metadata. + + Args: + channel (Channel): Channel object returned from the database + package_name (str): name of the package in file format, + e.g. "numpy-1.23.4-py39hefdcf20_0.tar.bz2" + package_metadata (dict): package metadata, + information that can be found in repodata.json for example + includelist (Union[list[str], dict, None], optional): + excludelist (Union[list[str], dict, None], optional): + + Returns: + bool: if the package should be in this channel or not according to the rules. + """ + package_spec = _parse_package_spec(package_name, package_metadata) + metadata = channel.load_channel_metadata() + if (includelist := metadata['includelist']) is not None: + # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} + if isinstance(includelist, dict): + if channel.name not in includelist: + include_package = False + channel_includelist = includelist[remote_host.split("/")[-1]] + include_package = _check_package_match(package_spec, channel_includelist) + # Example: ["numpy", "pandas", "r-base"] + elif isinstance(includelist, list): + include_package = _check_package_match(package_spec, includelist) + + # TODO: implement excludelist + + return include_package def add_static_file(contents, channel_name, subdir, fname, pkgstore, file_index=None): From dbd3f75cdb95e01e63fb11e33e1dcaf87e47f1ce Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 29 Jun 2023 20:16:22 +0200 Subject: [PATCH 07/36] add: support for excludelist --- quetz/utils.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/quetz/utils.py b/quetz/utils.py index 78ee38c8..a19d2bc4 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -91,6 +91,8 @@ def check_package_membership( """ package_spec = _parse_package_spec(package_name, package_metadata) metadata = channel.load_channel_metadata() + include_package = True + exclude_package = False if (includelist := metadata['includelist']) is not None: # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} if isinstance(includelist, dict): @@ -102,9 +104,21 @@ def check_package_membership( elif isinstance(includelist, list): include_package = _check_package_match(package_spec, includelist) - # TODO: implement excludelist + if (excludelist := metadata['excludelist']) is not None: + # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} + if isinstance(excludelist, dict): + if channel.name in excludelist: + channel_excludelist = excludelist[remote_host.split("/")[-1]] + exclude_package = _check_package_match( + package_spec, channel_excludelist + ) + else: + exclude_package = False + # Example: ["numpy", "pandas", "r-base"] + elif isinstance(excludelist, list): + exclude_package = _check_package_match(package_spec, excludelist) - return include_package + return include_package and not exclude_package def add_static_file(contents, channel_name, subdir, fname, pkgstore, file_index=None): From 1d4af06c740112a1900e81a49494a0490e7baf81 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 29 Jun 2023 20:53:22 +0200 Subject: [PATCH 08/36] add: new database model for accessing channel urls --- quetz/dao.py | 11 +++-------- quetz/db_models.py | 20 +++++++++++++++----- quetz/deps.py | 2 +- quetz/main.py | 16 +++++++++++----- quetz/tasks/mirror.py | 3 +-- quetz/tests/test_mirror.py | 4 ++-- 6 files changed, 33 insertions(+), 23 deletions(-) diff --git a/quetz/dao.py b/quetz/dao.py index d5085d9f..c048b8af 100644 --- a/quetz/dao.py +++ b/quetz/dao.py @@ -311,17 +311,12 @@ def create_channel( "only ASCII characters should be used in channel name" ) - if isinstance(data.mirror_channel_url, str): - mirror_channel_urls = data.mirror_channel_url - elif isinstance(data.mirror_channel_url, list): - mirror_channel_urls = ";".join(data.mirror_channel_url) - else: - mirror_channel_urls = None # type: ignore - + # note: due to backwards compatibility, the rest model still calls it + # 'mirror_channel_url' but the db model calls it 'mirror_channel_urls' channel = Channel( name=data.name, description=data.description, - mirror_channel_url=mirror_channel_urls, + mirror_channel_urls=data.mirror_channel_url, mirror_mode=data.mirror_mode, private=data.private, ttl=data.ttl, diff --git a/quetz/db_models.py b/quetz/db_models.py index 7b621731..cd4491b5 100644 --- a/quetz/db_models.py +++ b/quetz/db_models.py @@ -200,7 +200,7 @@ class Channel(Base): ) description = Column(String) private = Column(Boolean, default=False) - mirror_channel_url = Column(String) + _mirror_channel_url = Column(String) mirror_mode = Column(String) channel_metadata = Column(String, server_default='{}', nullable=False) timestamp_mirror_sync = Column(Integer, default=0) @@ -208,11 +208,21 @@ class Channel(Base): size_limit = Column(BigInteger, default=None) ttl = Column(Integer, server_default=f'{60 * 60 * 10}', nullable=False) # 10 hours - def get_mirror_channel_urls(self) -> list[str]: - if self.mirror_channel_url: - return self.mirror_channel_url.split(";") + @property + def mirror_channel_urls(self): + if ";" in self._mirror_channel_url: + return self._mirror_channel_url.split(";") + else: + return [self._mirror_channel_url] + + @mirror_channel_urls.setter + def mirror_channel_urls(self, value): + if isinstance(value, str): + self._mirror_channel_url = value + elif isinstance(value, list): + self._mirror_channel_url = ";".join(value) else: - return [] + self._mirror_channel_url = None # type: ignore packages = relationship( 'Package', back_populates='channel', cascade="all,delete", uselist=True diff --git a/quetz/deps.py b/quetz/deps.py index 3ef97665..aae77cc5 100644 --- a/quetz/deps.py +++ b/quetz/deps.py @@ -125,7 +125,7 @@ def __call__( auth.assert_channel_read(channel) - mirror_url = channel.mirror_channel_url + mirror_url = channel.mirror_channel_urls is_proxy = mirror_url and channel.mirror_mode == "proxy" is_mirror = mirror_url and channel.mirror_mode == "mirror" diff --git a/quetz/main.py b/quetz/main.py index 710ebaab..7711fa54 100644 --- a/quetz/main.py +++ b/quetz/main.py @@ -742,6 +742,7 @@ def post_channel( else: size_limit = None + # create database model (channel) from rest model (new_channel) channel = dao.create_channel(new_channel, user_id, authorization.OWNER, size_limit) pkgstore.create_channel(new_channel.name) if not is_proxy: @@ -749,7 +750,7 @@ def post_channel( # register mirror if is_mirror and register_mirror: - for mirror_url in new_channel.get_mirror_channel_urls(): + for mirror_url in channel.mirror_channel_urls: mirror_url = mirror_url.replace("get", "api/channels") headers = {"x-api-key": mirror_api_key} if mirror_api_key else {} api_endpoint = str(request.url.replace(query=None)) + '/' + new_channel.name @@ -1800,17 +1801,22 @@ def serve_path( except ValueError: pass - if is_package_request and channel.mirror_channel_url: + if is_package_request and channel.mirror_channel_urls: # if we exclude the package from syncing, redirect to original URL # use the first mirror url entry for the redirect channel_proxylist = json.loads(channel.channel_metadata).get('proxylist', []) if channel_proxylist and package_name and package_name in channel_proxylist: - redirect_url = channel.get_mirror_channel_urls()[0] + redirect_url = channel.mirror_channel_urls[0] return RedirectResponse(f"{redirect_url}/{path}") # note: you can only proxy, when you have exactly one remote channel in your config - if channel.mirror_channel_url and channel.mirror_mode == "proxy": - proxy_url = channel.get_mirror_channel_urls()[0] + if channel.mirror_channel_urls and channel.mirror_mode == "proxy": + proxy_url = channel.mirror_channel_urls[0] + if len(channel.mirror_channel_urls) > 1: + logger.warning( + "More than one mirror channel url configured " + f"for channel {channel.name}. Proxying only to the first one." + ) repository = RemoteRepository(proxy_url, session) if not pkgstore.file_exists(channel.name, path): download_remote_file(repository, pkgstore, channel.name, path) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 952f998c..13e143df 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -587,8 +587,7 @@ def synchronize_packages( logger.error(f"channel {channel_name} not found") return - mirror_channel_urls = new_channel.get_mirror_channel_urls() - for mirror_channel_url in mirror_channel_urls: + for mirror_channel_url in new_channel.mirror_channel_urls: remote_repo = RemoteRepository(mirror_channel_url, session) user_id = auth.assert_user() diff --git a/quetz/tests/test_mirror.py b/quetz/tests/test_mirror.py index e715a253..5e8ea677 100644 --- a/quetz/tests/test_mirror.py +++ b/quetz/tests/test_mirror.py @@ -508,14 +508,14 @@ def close(self): ) def test_synchronisation_no_checksums_in_db( repo_content, + arch, + n_new_packages, mirror_channel, dao, config, dummy_response, db, user, - n_new_packages, - arch, package_version, mocker, ): From 0f0783879add62b58073c33adbb3bc9522556ce8 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 29 Jun 2023 21:22:26 +0200 Subject: [PATCH 09/36] fix: not set mirror_channel_url --- quetz/db_models.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/quetz/db_models.py b/quetz/db_models.py index cd4491b5..29439a7f 100644 --- a/quetz/db_models.py +++ b/quetz/db_models.py @@ -210,7 +210,9 @@ class Channel(Base): @property def mirror_channel_urls(self): - if ";" in self._mirror_channel_url: + if self._mirror_channel_url is None: + return [] + elif ";" in self._mirror_channel_url: return self._mirror_channel_url.split(";") else: return [self._mirror_channel_url] From a5e3847f68ebe1bea07ae289f0d4141ae2167d41 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 29 Jun 2023 21:25:11 +0200 Subject: [PATCH 10/36] fix: mirror_channel_urlS --- quetz/tasks/assertions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quetz/tasks/assertions.py b/quetz/tasks/assertions.py index 491639dd..c6d6ea78 100644 --- a/quetz/tasks/assertions.py +++ b/quetz/tasks/assertions.py @@ -3,7 +3,7 @@ def can_channel_synchronize(channel): def can_channel_synchronize_metrics(channel): - return not channel.mirror_channel_url + return not channel.mirror_channel_urls def can_channel_generate_indexes(channel): From 821736864489230a0e5c1e94aa62cdd65b3d62e6 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 29 Jun 2023 22:16:29 +0200 Subject: [PATCH 11/36] add: new tests for validating pattern checking --- quetz/rest_models.py | 21 ++++++++++++--------- quetz/tasks/assertions.py | 2 +- quetz/tests/test_mirror.py | 21 +++++++++++++++------ quetz/utils.py | 8 +++++--- 4 files changed, 33 insertions(+), 19 deletions(-) diff --git a/quetz/rest_models.py b/quetz/rest_models.py index c65921b8..bd398cd0 100644 --- a/quetz/rest_models.py +++ b/quetz/rest_models.py @@ -65,8 +65,8 @@ class ChannelBase(BaseModel): private: bool = Field(True, title="channel should be private") size_limit: Optional[int] = Field(None, title="size limit of the channel") ttl: int = Field(36000, title="ttl of the channel") - mirror_channel_url: Optional[Union[str, List[str]]] = Field( - None, pattern="^(http|https)://.+", nullable=True + mirror_channel_url: Optional[Union[str, List[str], List[dict]]] = Field( + None, pattern="^(http|https)://.+|None|^(\\[.*\\])+$", nullable=True ) mirror_mode: Optional[MirrorMode] = Field(None, nullable=True) @@ -119,16 +119,19 @@ class ChannelActionEnum(str, Enum): class ChannelMetadata(BaseModel): - includelist: Optional[Union[List[str], Dict[str, List]]] = Field( + + """ + examples: + - includelist: "numpy" + - includelist: ["numpy", "pandas"] + - includelist: {"channel1: ["numpy", "pandas"]}, {"channel2": ["scipy"]} + """ + + includelist: List[str] | str | Dict[str, List[str]] = Field( None, title="list of packages to include while creating a channel", ) - include_pattern_list: Optional[Dict[str, List]] = Field( - None, - title="{'remote0': ['numpy*', 'pandas*'], 'remote1': ['*.tar.bz2']}", - nullable=True, - ) - excludelist: Optional[List[str]] = Field( + excludelist: List[str] | str | List[Dict[str, List[str] | str]] = Field( None, title="list of packages to exclude while creating a channel", ) diff --git a/quetz/tasks/assertions.py b/quetz/tasks/assertions.py index c6d6ea78..8789ea8f 100644 --- a/quetz/tasks/assertions.py +++ b/quetz/tasks/assertions.py @@ -1,5 +1,5 @@ def can_channel_synchronize(channel): - return channel.mirror_channel_url and (channel.mirror_mode in ["mirror", "proxy"]) + return channel.mirror_channel_urls and (channel.mirror_mode in ["mirror", "proxy"]) def can_channel_synchronize_metrics(channel): diff --git a/quetz/tests/test_mirror.py b/quetz/tests/test_mirror.py index 5e8ea677..08421749 100644 --- a/quetz/tests/test_mirror.py +++ b/quetz/tests/test_mirror.py @@ -1006,11 +1006,20 @@ def test_sync_mirror_channel(mirror_channel, user, client, dummy_repo): ], ) @pytest.mark.parametrize( - "package_list_type,expected_package", - [("includelist", "nrnpython"), ("excludelist", "test-package")], + "package_list_type,list_values, expected_packages", + [ + ("includelist", ["nrnpython"], ["nrnpython"]), + ( + "includelist", + {"https://conda.anaconda.org/btel": ["nrnpython"]}, + ["nrnpython"], + ), + ("excludelist", ["nrnpython"], ["test-package"]), + (None, None, ["nrnpython", "test-package"]), + ], ) def test_packagelist_mirror_channel( - owner, client, package_list_type, expected_package, db, job_supervisor + owner, client, package_list_type, list_values, expected_packages, db, job_supervisor ): response = client.get("/api/dummylogin/bartosz") assert response.status_code == 200 @@ -1022,7 +1031,7 @@ def test_packagelist_mirror_channel( "private": False, "mirror_channel_url": "https://conda.anaconda.org/btel", "mirror_mode": "mirror", - "metadata": {package_list_type: ["nrnpython"]}, + "metadata": {package_list_type: list_values}, }, ) assert response.status_code == 201 @@ -1030,8 +1039,8 @@ def test_packagelist_mirror_channel( response = client.get("/api/channels/mirror-channel-btel/packages") assert response.status_code == 200 - assert len(response.json()) == 1 - assert response.json()[0]['name'] == expected_package + assert len(response.json()) == len(expected_packages) + assert [p['name'] for p in response.json()] == expected_packages def test_includelist_and_excludelist_mirror_channel(owner, client): diff --git a/quetz/utils.py b/quetz/utils.py index a19d2bc4..bea2ccb0 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -94,17 +94,19 @@ def check_package_membership( include_package = True exclude_package = False if (includelist := metadata['includelist']) is not None: + include_package = False # default to False if includelist is defined # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} if isinstance(includelist, dict): - if channel.name not in includelist: - include_package = False - channel_includelist = includelist[remote_host.split("/")[-1]] + channel_includelist = includelist.get( + remote_host.split("/")[-1], [] + ) or includelist.get(remote_host, []) include_package = _check_package_match(package_spec, channel_includelist) # Example: ["numpy", "pandas", "r-base"] elif isinstance(includelist, list): include_package = _check_package_match(package_spec, includelist) if (excludelist := metadata['excludelist']) is not None: + exclude_package = False # default to False if excludelist is defined # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} if isinstance(excludelist, dict): if channel.name in excludelist: From c3c546706cbcef4aab98c54455d899766c2e8ec8 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Tue, 18 Jul 2023 22:18:40 +0200 Subject: [PATCH 12/36] refactor: channel membership is completely refactored --- quetz/db_models.py | 2 +- quetz/tasks/mirror.py | 22 ++++++++++---- quetz/utils.py | 68 +++++++++++++++++++++++++++++++------------ 3 files changed, 67 insertions(+), 25 deletions(-) diff --git a/quetz/db_models.py b/quetz/db_models.py index 29439a7f..7004c115 100644 --- a/quetz/db_models.py +++ b/quetz/db_models.py @@ -200,7 +200,7 @@ class Channel(Base): ) description = Column(String) private = Column(Boolean, default=False) - _mirror_channel_url = Column(String) + _mirror_channel_url = Column("mirror_channel_url", String) mirror_mode = Column(String) channel_metadata = Column(String, server_default='{}', nullable=False) timestamp_mirror_sync = Column(Integer, default=0) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 13e143df..60baf4e4 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -23,7 +23,12 @@ from quetz.errors import DBError from quetz.pkgstores import PackageStore from quetz.tasks import indexing -from quetz.utils import TicToc, add_static_file, check_package_membership +from quetz.utils import ( + TicToc, + add_static_file, + check_package_membership, + MembershipAction, +) # copy common subdirs from conda: # https://github.com/conda/conda/blob/a78a2387f26a188991d771967fc33aa1fb5bb810/conda/base/constants.py#L63 @@ -424,9 +429,11 @@ def handle_batch(update_batch): # validate if it should be downloaded to this channel # also: remove packages if they are not supposed to in this channel anymore for repo_package_name, metadata in packages.items(): - if check_package_membership( + # find action to do with package + action = check_package_membership( channel, repo_package_name, metadata, remote_host=remote_repository.host - ): + ) + if action == MembershipAction.INCLUDE: path = os.path.join(arch, repo_package_name) # try to find out whether it's a new package version @@ -445,13 +452,18 @@ def handle_batch(update_batch): update_batch.append((path, repo_package_name, metadata)) update_size += metadata.get('size', 100_000) - + elif action == MembershipAction.NOTHING: + logger.debug( + f"package {repo_package_name} not needed by {remote_repository.host} but other channels." + ) else: logger.debug( - f"package {repo_package_name} not member of channel anymore." + f"package {repo_package_name} not needed by {remote_repository.host} and no other channels." ) + # TODO: only add to remove if exists. remove_batch.append((arch, repo_package_name)) + # perform either downloads or removals if len(update_batch) >= max_batch_length or update_size >= max_batch_size: logger.debug(f"Executing batch with {update_size}") any_updated |= handle_batch(update_batch) diff --git a/quetz/utils.py b/quetz/utils.py index bea2ccb0..5cebd076 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -14,6 +14,7 @@ import time import traceback import uuid +from enum import Enum from datetime import datetime, timezone from functools import wraps from pathlib import Path @@ -67,18 +68,39 @@ def _check_package_match( return False +class MembershipAction(Enum): + INCLUDE = "include" # package should be added to the channel + NOTHING = "nothing" # package is not member of this channel but of another + REMOVE = "remove" # package is not member of any channel + + +def _all_matching_hosts( + include_or_exclude_list: dict, package_spec: tuple[str, str, str] +) -> list[str]: + """ + Return the names of all matching hosts from the includelist that whould allow _this_ package spec. + include_or_exclude_list: e.g. { "remote1": ["numpy", "pandas"], "remote2": ["r-base"]} + """ + name, version, build = package_spec + matching_hosts = [] + for host, patterns in include_or_exclude_list.items(): + if _check_package_match(package_spec, patterns): + matching_hosts.append(host) + return matching_hosts + + def check_package_membership( channel: Channel, package_name: str, package_metadata: dict, remote_host: str, -): +) -> MembershipAction: """ Check if a package should be in a channel according to the rules defined in the channel metadata. Args: - channel (Channel): Channel object returned from the database + channel (Channel): mirror Channel object returned from the database package_name (str): name of the package in file format, e.g. "numpy-1.23.4-py39hefdcf20_0.tar.bz2" package_metadata (dict): package metadata, @@ -91,36 +113,44 @@ def check_package_membership( """ package_spec = _parse_package_spec(package_name, package_metadata) metadata = channel.load_channel_metadata() - include_package = True - exclude_package = False + incl_act = MembershipAction.NOTHING + exclude_now = False if (includelist := metadata['includelist']) is not None: - include_package = False # default to False if includelist is defined + incl_act = False # default to False if includelist is defined # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} if isinstance(includelist, dict): - channel_includelist = includelist.get( - remote_host.split("/")[-1], [] - ) or includelist.get(remote_host, []) - include_package = _check_package_match(package_spec, channel_includelist) + matches = _all_matching_hosts(includelist, package_spec) + if remote_host in matches or remote_host.split("/")[-1] in matches: + incl_act = MembershipAction.INCLUDE + elif len(matches) > 0: # we have a match but not for this host + incl_act = MembershipAction.NOTHING + else: + incl_act = MembershipAction.REMOVE + # Example: ["numpy", "pandas", "r-base"] elif isinstance(includelist, list): - include_package = _check_package_match(package_spec, includelist) + if _check_package_match(package_spec, includelist): + incl_act = MembershipAction.INCLUDE + else: + incl_act = MembershipAction.REMOVE + # for exclude list, we only check the current host if (excludelist := metadata['excludelist']) is not None: - exclude_package = False # default to False if excludelist is defined - # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} + exclude_now = False if isinstance(excludelist, dict): if channel.name in excludelist: channel_excludelist = excludelist[remote_host.split("/")[-1]] - exclude_package = _check_package_match( - package_spec, channel_excludelist - ) + exclude_now = _check_package_match(package_spec, channel_excludelist) else: - exclude_package = False - # Example: ["numpy", "pandas", "r-base"] + exclude_now = False elif isinstance(excludelist, list): - exclude_package = _check_package_match(package_spec, excludelist) + exclude_now = _check_package_match(package_spec, excludelist) - return include_package and not exclude_package + # as long as it does not need to be removed here, we can include it + if not exclude_now: + return incl_act + else: + return MembershipAction.REMOVE def add_static_file(contents, channel_name, subdir, fname, pkgstore, file_index=None): From b2af3393f78e2e0795d392d68b8a25c9765ab6c9 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Tue, 18 Jul 2023 23:28:20 +0200 Subject: [PATCH 13/36] add + refactor: option to remove local packages that would _not_ fit includelist/excludelist --- quetz/tasks/mirror.py | 107 ++++++++++++++++++++++++++++-------------- quetz/utils.py | 9 ++-- 2 files changed, 77 insertions(+), 39 deletions(-) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 60baf4e4..b4bb0b95 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -19,15 +19,15 @@ from quetz.condainfo import CondaInfo, get_subdir_compat from quetz.config import Config from quetz.dao import Dao -from quetz.db_models import PackageVersion +from quetz.db_models import Channel, PackageVersion from quetz.errors import DBError from quetz.pkgstores import PackageStore from quetz.tasks import indexing from quetz.utils import ( + MembershipAction, TicToc, add_static_file, check_package_membership, - MembershipAction, ) # copy common subdirs from conda: @@ -421,23 +421,13 @@ def handle_batch(update_batch): return False - # TODO: also remove all packages that are not in the remote repository anymore - # practically re-write the complete sync mechanism? - - # SYNC: Remote -> Local - # for each package in the remote repository: - # validate if it should be downloaded to this channel - # also: remove packages if they are not supposed to in this channel anymore + # go through all packages from remote channel for repo_package_name, metadata in packages.items(): - # find action to do with package action = check_package_membership( channel, repo_package_name, metadata, remote_host=remote_repository.host ) if action == MembershipAction.INCLUDE: - path = os.path.join(arch, repo_package_name) - # try to find out whether it's a new package version - is_uptodate = None for _check in version_checks: is_uptodate = _check(repo_package_name, metadata) @@ -447,20 +437,23 @@ def handle_batch(update_batch): # if package is up-to-date skip uploading file if is_uptodate: continue - else: - logger.debug(f"updating package {repo_package_name} from {arch}") + logger.debug(f"updating package {repo_package_name} from {arch}") + + path = os.path.join(arch, repo_package_name) update_batch.append((path, repo_package_name, metadata)) update_size += metadata.get('size', 100_000) elif action == MembershipAction.NOTHING: logger.debug( - f"package {repo_package_name} not needed by {remote_repository.host} but other channels." + f"package {repo_package_name} not needed by " + f"{remote_repository.host} but other channels" ) else: logger.debug( - f"package {repo_package_name} not needed by {remote_repository.host} and no other channels." + f"package {repo_package_name} not needed by " + f"{remote_repository.host} and no other channels." ) - # TODO: only add to remove if exists. + # TODO: only add to remove if exists in this (mirror) channel. remove_batch.append((arch, repo_package_name)) # perform either downloads or removals @@ -473,30 +466,74 @@ def handle_batch(update_batch): # handle final batch any_updated |= handle_batch(update_batch) - # SYNC: Local checks Remote - # Validate if all packages in this channel are still - # also present in the remote channel - # if not: add them to the remove batch as well - # TODO - + # remove packages marked for removal if remove_batch: - logger.debug(f"Removing {len(remove_batch)} packages: {remove_batch}") - package_specs_remove = set([p[1].split("-")[0] for p in remove_batch]) - for package_specs in package_specs_remove: - dao.remove_package(channel_name, package_name=package_specs) - # TODO: is this needed every time? - dao.cleanup_channel_db(channel_name, package_name=package_specs) - # only remove if exists - if pkgstore.file_exists(channel.name, package_specs): - pkgstore.delete_file(channel.name, destination=package_specs) - - any_updated |= True + any_updated |= remove_packages(remove_batch, channel, dao, pkgstore) if any_updated: # build local repodata indexing.update_indexes(dao, pkgstore, channel_name, subdirs=[arch]) +def remove_packages(remove_batch, channel: Channel, dao, pkgstore) -> bool: + logger.debug(f"Removing {len(remove_batch)} packages: {remove_batch}") + removal_performed = False + package_specs_remove = set([p[1].split("-")[0] for p in remove_batch]) + for package_specs in package_specs_remove: + dao.remove_package(channel.name, package_name=package_specs) + # TODO: is this needed every time? + dao.cleanup_channel_db(channel.name, package_name=package_specs) + if pkgstore.file_exists(channel.name, package_specs): + pkgstore.delete_file(channel.name, destination=package_specs) + removal_performed |= True + + return removal_performed + + +def remove_local_packages( + channel_name: str, + dao: Dao, + pkgstore: PackageStore, + auth: authorization.Rules, + includelist: List[str | dict] = None, + excludelist: List[str | dict] = None, +): + """ + For each package in the channel, check if it is in the includelist and + not in the excludelist. If not, remove it from the channel and the package store. + + We assume that the includelist and excludelist are well-formed, + e.g. they don't overlap. + """ + from utils import _all_matching_hosts + + channel = dao.get_channel(channel_name) + if not channel: + logger.error(f"channel {channel_name} not found") + return + + if not includelist: + includelist = [] + if not excludelist: + excludelist = [] + + packages_to_remove = [] + + for package in channel.packages: + info = package.current_package_version + name, version, build_string = info.package_name, info.version, info.build_string + # check: does this package match _any_ of the includelist patterns? + if not _all_matching_hosts(name, version, build_string, includelist): + packages_to_remove.append((None, name)) + + # check: does this package match _any_ of the excludelist patterns? + if _all_matching_hosts(name, version, build_string, excludelist): + packages_to_remove.append((None, name)) + + if packages_to_remove: + remove_packages(packages_to_remove, channel, dao, pkgstore) + + def create_packages_from_channeldata( channel_name: str, user_id: bytes, channeldata: dict, dao: Dao ): diff --git a/quetz/utils.py b/quetz/utils.py index 5cebd076..c1639888 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -14,8 +14,8 @@ import time import traceback import uuid -from enum import Enum from datetime import datetime, timezone +from enum import Enum from functools import wraps from pathlib import Path from typing import Any, Callable @@ -78,8 +78,10 @@ def _all_matching_hosts( include_or_exclude_list: dict, package_spec: tuple[str, str, str] ) -> list[str]: """ - Return the names of all matching hosts from the includelist that whould allow _this_ package spec. - include_or_exclude_list: e.g. { "remote1": ["numpy", "pandas"], "remote2": ["r-base"]} + Return the names of all matching hosts from the includelist + that whould allow _this_ package spec. + include_or_exclude_list: + e.g. { "remote1": ["numpy", "pandas"], "remote2": ["r-base"]} """ name, version, build = package_spec matching_hosts = [] @@ -116,7 +118,6 @@ def check_package_membership( incl_act = MembershipAction.NOTHING exclude_now = False if (includelist := metadata['includelist']) is not None: - incl_act = False # default to False if includelist is defined # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} if isinstance(includelist, dict): matches = _all_matching_hosts(includelist, package_spec) From 76e7c29f7682881ef54b0ad12c9734b4ba9f3dd6 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 17 Aug 2023 14:49:56 +0200 Subject: [PATCH 14/36] refactoring: clean up code, extract methods, document some others --- quetz/dao.py | 12 ++++-- quetz/pkgstores.py | 4 ++ quetz/tasks/mirror.py | 95 +++++++++++++++++++++++++++---------------- 3 files changed, 73 insertions(+), 38 deletions(-) diff --git a/quetz/dao.py b/quetz/dao.py index c048b8af..19064875 100644 --- a/quetz/dao.py +++ b/quetz/dao.py @@ -361,7 +361,7 @@ def cleanup_channel_db( Package.channel_name == channel_name ) if package_name: - all_packages = all_packages.filter( + all_packages = all_packages.join(PackageVersion).filter( PackageVersion.package_name == package_name ) for each_package in all_packages: @@ -396,7 +396,7 @@ def cleanup_channel_db( Package.channel_name == channel_name ) if package_name: - all_packages = all_packages.filter( + all_packages = all_packages.join(PackageVersion).filter( PackageVersion.package_name == package_name ) for each_package in all_packages: @@ -429,7 +429,7 @@ def cleanup_channel_db( Package.channel_name == channel_name ) if package_name: - all_packages = all_packages.filter( + all_packages = all_packages.join(PackageVersion).filter( PackageVersion.package_name == package_name ) for x, each_package in enumerate(all_packages): @@ -595,6 +595,7 @@ def get_channel(self, channel_name: str) -> Optional[Channel]: return self.db.query(Channel).filter(Channel.name == channel_name).one_or_none() def get_package(self, channel_name: str, package_name: str) -> Optional[Package]: + print(f"get_package: {channel_name}{package_name}") return ( self.db.query(Package) .join(Channel) @@ -1026,6 +1027,11 @@ def get_channel_datas(self, channel_name: str): ) def assert_size_limits(self, channel_name: str, size: int): + """ + validate that adding a package of size `size` to channel `channel_name` + does not exceed the channel size limit. + raises: QuotaError + """ channel_size, channel_size_limit = ( self.db.query(Channel.size, Channel.size_limit) .filter(Channel.name == channel_name) diff --git a/quetz/pkgstores.py b/quetz/pkgstores.py index 5ce49497..6802dc0a 100644 --- a/quetz/pkgstores.py +++ b/quetz/pkgstores.py @@ -67,6 +67,10 @@ def support_redirect(self) -> bool: @abc.abstractmethod def create_channel(self, name): + """ + create channel with given name, depending on backend, + e.g. create a directory or bucket + """ pass @abc.abstractmethod diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index b4bb0b95..2799c451 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -202,6 +202,26 @@ def download_file(remote_repository, path_metadata): return f, package_name, metadata +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + after=after_log(logger, logging.WARNING), +) +def _upload_package(file, channel_name, subdir, pkgstore): + dest = os.path.join(subdir, file.filename) + + try: + file.file.seek(0) + logger.debug( + f"uploading file {dest} from channel {channel_name} to package store" + ) + pkgstore.add_package(file.file, channel_name, dest) + + except AttributeError as e: + logger.error(f"Could not upload {file}, {file.filename}. {str(e)}") + raise TryAgain + + def handle_repodata_package( channel, files_metadata, @@ -217,6 +237,7 @@ def handle_repodata_package( proxylist = channel.load_channel_metadata().get('proxylist', []) user_id = auth.assert_user() + # check package format and permissions, calculate total size total_size = 0 for file, package_name, metadata in files_metadata: parts = file.filename.rsplit("-", 2) @@ -240,38 +261,24 @@ def handle_repodata_package( total_size += size file.file.seek(0) - dao.assert_size_limits(channel_name, total_size) - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - after=after_log(logger, logging.WARNING), - ) - def _upload_package(file, channel_name, subdir): - dest = os.path.join(subdir, file.filename) - - try: - file.file.seek(0) - logger.debug( - f"uploading file {dest} from channel {channel_name} to package store" - ) - pkgstore.add_package(file.file, channel_name, dest) + # create package in database + # channel_data = _load_remote_channel_data(remote_repository) + # create_packages_from_channeldata(channel_name, user_id, channel_data, dao) - except AttributeError as e: - logger.error(f"Could not upload {file}, {file.filename}. {str(e)}") - raise TryAgain + # validate quota + dao.assert_size_limits(channel_name, total_size) pkgstore.create_channel(channel_name) - nthreads = config.general_package_unpack_threads with TicToc("upload file without extracting"): + nthreads = config.general_package_unpack_threads with ThreadPoolExecutor(max_workers=nthreads) as executor: for file, package_name, metadata in files_metadata: if proxylist and package_name in proxylist: # skip packages that should only ever be proxied continue subdir = get_subdir_compat(metadata) - executor.submit(_upload_package, file, channel_name, subdir) + executor.submit(_upload_package, file, channel_name, subdir, pkgstore) with TicToc("add versions to the db"): for file, package_name, metadata in files_metadata: @@ -618,6 +625,24 @@ def create_versions_from_repodata( create_version_from_metadata(channel_name, user_id, filename, metadata, dao) +def _load_remote_channel_data(remote_repository: RemoteRepository) -> dict: + """ + given the remote repository, load the channeldata.json file + raises: HTTPException if the remote server is unavailable + """ + try: + channel_data = remote_repository.open("channeldata.json").json() + except (RemoteFileNotFound, json.JSONDecodeError): + channel_data = {} + except RemoteServerError as e: + logger.error(f"Remote server error: {e}") + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=f"Remote channel {remote_repository.host} unavailable", + ) + return channel_data + + def synchronize_packages( channel_name: str, dao: Dao, @@ -628,6 +653,14 @@ def synchronize_packages( excludelist: List[str] = None, use_repodata: bool = False, ): + """synchronize package from a remote channel. + + Args: + channel_name (str): the channel to be updated, e.g. the mirror channel + dao (Dao): database access object + pkgstore (PackageStore): the target channels package store + use_repodata (bool, optional): wether to create packages from repodata.json + """ logger.info(f"executing synchronize_packages task in a process {os.getpid()}") new_channel = dao.get_channel(channel_name) @@ -639,22 +672,14 @@ def synchronize_packages( for mirror_channel_url in new_channel.mirror_channel_urls: remote_repo = RemoteRepository(mirror_channel_url, session) - user_id = auth.assert_user() + auth.assert_user() - try: - channel_data = remote_repo.open("channeldata.json").json() - if use_repodata: - create_packages_from_channeldata( - channel_name, user_id, channel_data, dao - ) + channel_data = _load_remote_channel_data(remote_repo) + subdirs = None + if use_repodata: + # create_packages_from_channeldata(channel_name, user_id, channel_data, dao) subdirs = channel_data.get("subdirs", []) - except (RemoteFileNotFound, json.JSONDecodeError): - subdirs = None - except RemoteServerError: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail=f"Remote channel {mirror_channel_url} unavailable", - ) + # if no channel data use known architectures if subdirs is None: subdirs = KNOWN_SUBDIRS From 1f6be80d871fc5519c9fe6a853687cadffb2e4bb Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Tue, 22 Aug 2023 15:35:14 +0200 Subject: [PATCH 15/36] debug: update version to validate deployment --- pyproject.toml | 2 +- quetz/_version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 95bd793c..45b31f5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ check-imports = ["quetz"] ignore = ["W004"] [tool.tbump.version] -current = "0.9.2" +current = "0.10.0" regex = ''' (?P\d+)\.(?P\d+)\.(?P\d+) ((?Pa|b|rc|.dev)(?P\d+))? diff --git a/quetz/_version.py b/quetz/_version.py index 7e2da70f..310cf9d1 100644 --- a/quetz/_version.py +++ b/quetz/_version.py @@ -1,2 +1,2 @@ -version_info = (0, 9, 2, "", "") +version_info = (0, 10, 0, "", "") __version__ = '.'.join(filter(lambda s: len(s) > 0, map(str, version_info))) From 8b4b96576e5e1ea22cbe94379e291e2d459683e0 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Tue, 29 Aug 2023 09:08:57 +0200 Subject: [PATCH 16/36] add custom URLType for validating urls --- docs/source/using/mirroring.rst | 20 +++++++++++++--- quetz/main.py | 1 + quetz/rest_models.py | 23 ++++++++++++++---- quetz/tasks/mirror.py | 19 +++++++++++---- quetz/utils.py | 42 ++++++++++++++++----------------- 5 files changed, 72 insertions(+), 33 deletions(-) diff --git a/docs/source/using/mirroring.rst b/docs/source/using/mirroring.rst index c12cfc07..7ea7dc96 100644 --- a/docs/source/using/mirroring.rst +++ b/docs/source/using/mirroring.rst @@ -61,6 +61,18 @@ Creating a mirror channel is similar to creating proxy channels except that you "mirror_mode": "mirror" } +To mirror packages from multiple source channels, provide a list of URLs in the ``mirror_channel_url`` attribute: + +.. code:: json + + { + "name": "mirror-channel", + "private": false, + "mirror_channel_url": ["https://conda.anaconda.org/btel", "https://conda.anaconda.org/conda-forge"], + "mirror_mode": "mirror" + } + +Note that setting multiple mirror channel urls will work for mirror channels only. Proxy channels can only mirror a single channel and will therefore use only the first url in the list. .. code:: bash @@ -80,7 +92,7 @@ Mirror channels are read only (you can not add or change packages in these chann curl ${QUETZ_HOST}/api/channels/mirror-channel/packages -You can also postpone the synchronising the channel by adding ``{"actions": []}`` to the request: +You can also postpone the synchronisation of the channel by adding ``{"actions": []}`` to the request: .. code:: bash @@ -94,6 +106,8 @@ You can also postpone the synchronising the channel by adding ``{"actions": []}` "mirror_mode":"mirror", "actions": []}' +Otherwise, this will be done automatically after the channel is created. + Synchronising mirror channel ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -118,8 +132,8 @@ If you don't want to mirror all packages for a channel or can't mirror packages :excludelist: Don't download packages in list. :proxylist: Parse package metadata, but redirect downloads to upstream server for packages in list. - -It's possible to change metadata after creating a channel using the PATCH ``/api​/channels​/{channel_name}`` endpoint: +After creating a mirror channel you can only modify some attributes, e.g. ``private``, ``size_limit``, ``metadata`` and ``ttl``. +It's possible to change metadata, and therefore modify the ``includelist``, ``excludelist`` or ``proxylist`` attributes, after creating a channel using the PATCH ``/api​/channels​/{channel_name}`` endpoint: .. code:: json diff --git a/quetz/main.py b/quetz/main.py index 7711fa54..6a027e19 100644 --- a/quetz/main.py +++ b/quetz/main.py @@ -1806,6 +1806,7 @@ def serve_path( # use the first mirror url entry for the redirect channel_proxylist = json.loads(channel.channel_metadata).get('proxylist', []) if channel_proxylist and package_name and package_name in channel_proxylist: + # proxying packages only works for the first url in the list redirect_url = channel.mirror_channel_urls[0] return RedirectResponse(f"{redirect_url}/{path}") diff --git a/quetz/rest_models.py b/quetz/rest_models.py index bd398cd0..df73f7be 100644 --- a/quetz/rest_models.py +++ b/quetz/rest_models.py @@ -7,11 +7,12 @@ import uuid from datetime import date, datetime from enum import Enum -from typing import Dict, Generic, List, Optional, TypeVar, Union +from typing import Annotated, Dict, Generic, List, Optional, TypeVar, Union from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator T = TypeVar('T') +URLType = Annotated[str, Field(pattern="^(http|https)://.+")] class BaseProfile(BaseModel): @@ -65,8 +66,9 @@ class ChannelBase(BaseModel): private: bool = Field(True, title="channel should be private") size_limit: Optional[int] = Field(None, title="size limit of the channel") ttl: int = Field(36000, title="ttl of the channel") - mirror_channel_url: Optional[Union[str, List[str], List[dict]]] = Field( - None, pattern="^(http|https)://.+|None|^(\\[.*\\])+$", nullable=True + mirror_channel_url: Optional[Union[URLType, List[URLType], List[dict]]] = Field( + None, + nullable=True, ) mirror_mode: Optional[MirrorMode] = Field(None, nullable=True) @@ -144,7 +146,20 @@ class ChannelMetadata(BaseModel): class Channel(ChannelBase): metadata: ChannelMetadata = Field( - default_factory=ChannelMetadata, title="channel metadata", examples={} + default_factory=ChannelMetadata, + title="channel metadata", + examples=[ + {}, + { + "includelist": ["numpy", "pandas"], + }, + { + "includelist": { + "channel1": ["numpy", "pandas"], + "channel2": ["scipy"], + }, + }, + ], ) actions: Optional[List[ChannelActionEnum]] = Field( diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 2799c451..1bbaa892 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -450,7 +450,7 @@ def handle_batch(update_batch): path = os.path.join(arch, repo_package_name) update_batch.append((path, repo_package_name, metadata)) update_size += metadata.get('size', 100_000) - elif action == MembershipAction.NOTHING: + elif action == MembershipAction.IGNORE: logger.debug( f"package {repo_package_name} not needed by " f"{remote_repository.host} but other channels" @@ -460,7 +460,6 @@ def handle_batch(update_batch): f"package {repo_package_name} not needed by " f"{remote_repository.host} and no other channels." ) - # TODO: only add to remove if exists in this (mirror) channel. remove_batch.append((arch, repo_package_name)) # perform either downloads or removals @@ -483,6 +482,16 @@ def handle_batch(update_batch): def remove_packages(remove_batch, channel: Channel, dao, pkgstore) -> bool: + """ + Remove packages from the channel and the package store. + Args: + remove_batch: list of (arch, repo_package_name) tuples + e.g. [('linux-64', 'foo-1.0-0.tar.bz2'), ...] + channel: the channel to remove packages from + dao + pkgstore + Returns True if any removals were performed. + """ logger.debug(f"Removing {len(remove_batch)} packages: {remove_batch}") removal_performed = False package_specs_remove = set([p[1].split("-")[0] for p in remove_batch]) @@ -512,7 +521,7 @@ def remove_local_packages( We assume that the includelist and excludelist are well-formed, e.g. they don't overlap. """ - from utils import _all_matching_hosts + from utils import get_matching_hosts channel = dao.get_channel(channel_name) if not channel: @@ -530,11 +539,11 @@ def remove_local_packages( info = package.current_package_version name, version, build_string = info.package_name, info.version, info.build_string # check: does this package match _any_ of the includelist patterns? - if not _all_matching_hosts(name, version, build_string, includelist): + if not get_matching_hosts(name, version, build_string, includelist): packages_to_remove.append((None, name)) # check: does this package match _any_ of the excludelist patterns? - if _all_matching_hosts(name, version, build_string, excludelist): + if get_matching_hosts(name, version, build_string, excludelist): packages_to_remove.append((None, name)) if packages_to_remove: diff --git a/quetz/utils.py b/quetz/utils.py index c1639888..5db233a5 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -27,28 +27,22 @@ from .db_models import Channel, Package, PackageVersion, User -def _parse_package_spec(package_name: str, package_metadata) -> tuple[str, str, str]: +def parse_package_filename(package_name: str) -> tuple[str, str, str]: """Given a package name and metadata, return the package spec. Args: package_name (str): The package name in file format, e.g. "numpy-1.23.4-py39hefdcf20_0.tar.bz2" - package_metadata (_type_): Metadata of the package, - e.g. from repodata.json Returns: tuple[str, str, str]: (name, version, build-string) """ - # spec = _parse_spec_str(package_spec) - # return spec.get("name", ""), spec.get("version", ""), spec.get("build", "") - # TODO: the package spec here looks like "numpy-1.23.4-py39hefdcf20_0.tar.bz2" - # and does not have "=" spec = package_name.split("-") return spec[0], spec[1] if len(spec) > 1 else "", spec[2] if len(spec) > 2 else "" -def _check_package_match( +def check_package_match( package_spec: tuple[str, str, str], include_or_exclude_list: list[str], ) -> bool: @@ -59,7 +53,6 @@ def _check_package_match( """ name, version, build = package_spec for pattern in include_or_exclude_list: - # TODO: validate if this matches with our current implementation if MatchSpec(pattern).match( {"name": name, "version": version, "build": build, "build_number": 0} ): @@ -70,11 +63,11 @@ def _check_package_match( class MembershipAction(Enum): INCLUDE = "include" # package should be added to the channel - NOTHING = "nothing" # package is not member of this channel but of another + IGNORE = "ignore" # package is not member of this channel but of another REMOVE = "remove" # package is not member of any channel -def _all_matching_hosts( +def get_matching_hosts( include_or_exclude_list: dict, package_spec: tuple[str, str, str] ) -> list[str]: """ @@ -86,7 +79,7 @@ def _all_matching_hosts( name, version, build = package_spec matching_hosts = [] for host, patterns in include_or_exclude_list.items(): - if _check_package_match(package_spec, patterns): + if check_package_match(package_spec, patterns): matching_hosts.append(host) return matching_hosts @@ -101,6 +94,10 @@ def check_package_membership( Check if a package should be in a channel according to the rules defined in the channel metadata. + Since for a given package and includelist, a package can be not in the includelist + of one remote channel but in the includelist of another remote channel, the result + of this check can also be to ignore the current package. + Args: channel (Channel): mirror Channel object returned from the database package_name (str): name of the package in file format, @@ -108,29 +105,32 @@ def check_package_membership( package_metadata (dict): package metadata, information that can be found in repodata.json for example includelist (Union[list[str], dict, None], optional): + list of packages or dict of {channel: [packages]} that should be included excludelist (Union[list[str], dict, None], optional): + list of packages or dict of {channel: [packages]} that should be excluded Returns: - bool: if the package should be in this channel or not according to the rules. + MembershipAction: this determines if the package should be included, + ignored or removed from the channel """ - package_spec = _parse_package_spec(package_name, package_metadata) + package_spec = parse_package_filename(package_name) metadata = channel.load_channel_metadata() - incl_act = MembershipAction.NOTHING + incl_act = MembershipAction.IGNORE exclude_now = False if (includelist := metadata['includelist']) is not None: # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} if isinstance(includelist, dict): - matches = _all_matching_hosts(includelist, package_spec) + matches = get_matching_hosts(includelist, package_spec) if remote_host in matches or remote_host.split("/")[-1] in matches: incl_act = MembershipAction.INCLUDE elif len(matches) > 0: # we have a match but not for this host - incl_act = MembershipAction.NOTHING + incl_act = MembershipAction.IGNORE else: incl_act = MembershipAction.REMOVE # Example: ["numpy", "pandas", "r-base"] elif isinstance(includelist, list): - if _check_package_match(package_spec, includelist): + if check_package_match(package_spec, includelist): incl_act = MembershipAction.INCLUDE else: incl_act = MembershipAction.REMOVE @@ -141,13 +141,13 @@ def check_package_membership( if isinstance(excludelist, dict): if channel.name in excludelist: channel_excludelist = excludelist[remote_host.split("/")[-1]] - exclude_now = _check_package_match(package_spec, channel_excludelist) + exclude_now = check_package_match(package_spec, channel_excludelist) else: exclude_now = False elif isinstance(excludelist, list): - exclude_now = _check_package_match(package_spec, excludelist) + exclude_now = check_package_match(package_spec, excludelist) - # as long as it does not need to be removed here, we can include it + # package not explicitly excluded? -> listen to include action if not exclude_now: return incl_act else: From 223e29a11280959ce00fc8210fab5c0d2486b8ca Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Tue, 29 Aug 2023 22:22:03 +0200 Subject: [PATCH 17/36] fix rest models --- quetz/db_models.py | 16 ++++++++-------- quetz/rest_models.py | 12 +++++++----- quetz/tasks/mirror.py | 23 ++++++++++++++++++++++- quetz/tests/test_mirror.py | 18 ++++++++++++++---- quetz/utils.py | 9 +++++---- 5 files changed, 56 insertions(+), 22 deletions(-) diff --git a/quetz/db_models.py b/quetz/db_models.py index 7004c115..9c4944b1 100644 --- a/quetz/db_models.py +++ b/quetz/db_models.py @@ -200,7 +200,7 @@ class Channel(Base): ) description = Column(String) private = Column(Boolean, default=False) - _mirror_channel_url = Column("mirror_channel_url", String) + mirror_channel_url = Column("mirror_channel_url", String) mirror_mode = Column(String) channel_metadata = Column(String, server_default='{}', nullable=False) timestamp_mirror_sync = Column(Integer, default=0) @@ -210,21 +210,21 @@ class Channel(Base): @property def mirror_channel_urls(self): - if self._mirror_channel_url is None: + if self.mirror_channel_url is None: return [] - elif ";" in self._mirror_channel_url: - return self._mirror_channel_url.split(";") + elif ";" in self.mirror_channel_url: + return self.mirror_channel_url.split(";") else: - return [self._mirror_channel_url] + return [self.mirror_channel_url] @mirror_channel_urls.setter def mirror_channel_urls(self, value): if isinstance(value, str): - self._mirror_channel_url = value + self.mirror_channel_url = value elif isinstance(value, list): - self._mirror_channel_url = ";".join(value) + self.mirror_channel_url = ";".join(value) else: - self._mirror_channel_url = None # type: ignore + self.mirror_channel_url = None # type: ignore packages = relationship( 'Package', back_populates='channel', cascade="all,delete", uselist=True diff --git a/quetz/rest_models.py b/quetz/rest_models.py index df73f7be..c23d04d8 100644 --- a/quetz/rest_models.py +++ b/quetz/rest_models.py @@ -66,9 +66,8 @@ class ChannelBase(BaseModel): private: bool = Field(True, title="channel should be private") size_limit: Optional[int] = Field(None, title="size limit of the channel") ttl: int = Field(36000, title="ttl of the channel") - mirror_channel_url: Optional[Union[URLType, List[URLType], List[dict]]] = Field( + mirror_channel_url: Optional[Union[URLType, List[URLType]]] = Field( None, - nullable=True, ) mirror_mode: Optional[MirrorMode] = Field(None, nullable=True) @@ -124,16 +123,19 @@ class ChannelMetadata(BaseModel): """ examples: - - includelist: "numpy" - includelist: ["numpy", "pandas"] - includelist: {"channel1: ["numpy", "pandas"]}, {"channel2": ["scipy"]} """ - includelist: List[str] | str | Dict[str, List[str]] = Field( + includelist: Optional[ + Union[List[str], Dict[Union[URLType, str], List[str]]] + ] = Field( None, title="list of packages to include while creating a channel", ) - excludelist: List[str] | str | List[Dict[str, List[str] | str]] = Field( + excludelist: Optional[ + Union[List[str], Dict[Union[URLType, str], List[str]]] + ] = Field( None, title="list of packages to exclude while creating a channel", ) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 1bbaa892..d9ab8032 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -332,6 +332,22 @@ def initial_sync_mirror( skip_errors: bool = True, use_repodata: bool = False, ): + """ + Synchronize a mirror channel with a remote repository. + + Args: + channel_name: name of the channel to synchronize + remote_repository: RemoteRepository object + arch: architecture to synchronize + dao: Dao object + pkgstore + auth + includelist: list of package names to include + excludelist: list of package names to exclude + skip_errors: if True, continue processing packages even if an error occurs + use_repodata: if True, use repodata.json to process packages + + """ force = True # needed for updating packages logger.info( f"Running channel mirroring {channel_name}/{arch} from {remote_repository.host}" @@ -429,9 +445,14 @@ def handle_batch(update_batch): return False # go through all packages from remote channel + channel_metadata = channel.load_channel_metadata() for repo_package_name, metadata in packages.items(): action = check_package_membership( - channel, repo_package_name, metadata, remote_host=remote_repository.host + channel, + channel_metadata, + repo_package_name, + metadata, + remote_host=remote_repository.host, ) if action == MembershipAction.INCLUDE: # try to find out whether it's a new package version diff --git a/quetz/tests/test_mirror.py b/quetz/tests/test_mirror.py index 08421749..068e83dd 100644 --- a/quetz/tests/test_mirror.py +++ b/quetz/tests/test_mirror.py @@ -194,7 +194,17 @@ def mirror_package(mirror_channel, db): db.commit() -def test_set_mirror_url(db, client, owner): +@pytest.mark.parametrize( + "mirror_url_req, mirror_url_resp", + [ + ("http://host", "http://host"), + ( + ["http://my_remote_host", "http://my_remote_host2"], + "http://my_remote_host;http://my_remote_host2", + ), + ], +) +def test_set_mirror_url(db, client, owner, mirror_url_req, mirror_url_resp): response = client.get("/api/dummylogin/bartosz") assert response.status_code == 200 @@ -203,7 +213,7 @@ def test_set_mirror_url(db, client, owner): json={ "name": "test-create-channel", "private": False, - "mirror_channel_url": "http://my_remote_host", + "mirror_channel_url": mirror_url_req, "mirror_mode": "proxy", }, ) @@ -212,7 +222,7 @@ def test_set_mirror_url(db, client, owner): response = client.get("/api/channels/test-create-channel") assert response.status_code == 200 - assert response.json()["mirror_channel_url"] == "http://my_remote_host" + assert response.json()["mirror_channel_url"] == mirror_url_resp @pytest.mark.parametrize("mirror_mode", ["proxy", "mirror"]) @@ -1043,7 +1053,7 @@ def test_packagelist_mirror_channel( assert [p['name'] for p in response.json()] == expected_packages -def test_includelist_and_excludelist_mirror_channel(owner, client): +def test_cannot_use_includelist_and_excludelist_mirror_channel(owner, client): response = client.get("/api/dummylogin/bartosz") assert response.status_code == 200 diff --git a/quetz/utils.py b/quetz/utils.py index 5db233a5..058f95b3 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -86,6 +86,7 @@ def get_matching_hosts( def check_package_membership( channel: Channel, + channel_metadata: dict, package_name: str, package_metadata: dict, remote_host: str, @@ -114,10 +115,10 @@ def check_package_membership( ignored or removed from the channel """ package_spec = parse_package_filename(package_name) - metadata = channel.load_channel_metadata() - incl_act = MembershipAction.IGNORE + + incl_act = MembershipAction.INCLUDE exclude_now = False - if (includelist := metadata['includelist']) is not None: + if (includelist := channel_metadata['includelist']) is not None: # Example: { "main": ["numpy", "pandas"], "r": ["r-base"]} if isinstance(includelist, dict): matches = get_matching_hosts(includelist, package_spec) @@ -136,7 +137,7 @@ def check_package_membership( incl_act = MembershipAction.REMOVE # for exclude list, we only check the current host - if (excludelist := metadata['excludelist']) is not None: + if (excludelist := channel_metadata['excludelist']) is not None: exclude_now = False if isinstance(excludelist, dict): if channel.name in excludelist: From 3ca604c6e76aad58c7eb55588d4f90158e488447 Mon Sep 17 00:00:00 2001 From: Yasin Date: Fri, 8 Sep 2023 10:20:53 +0200 Subject: [PATCH 18/36] Update docs/source/using/mirroring.rst Co-authored-by: Andreas Albert <103571926+AndreasAlbertQC@users.noreply.github.com> --- docs/source/using/mirroring.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/using/mirroring.rst b/docs/source/using/mirroring.rst index 7ea7dc96..fc69ddd5 100644 --- a/docs/source/using/mirroring.rst +++ b/docs/source/using/mirroring.rst @@ -133,7 +133,7 @@ If you don't want to mirror all packages for a channel or can't mirror packages :proxylist: Parse package metadata, but redirect downloads to upstream server for packages in list. After creating a mirror channel you can only modify some attributes, e.g. ``private``, ``size_limit``, ``metadata`` and ``ttl``. -It's possible to change metadata, and therefore modify the ``includelist``, ``excludelist`` or ``proxylist`` attributes, after creating a channel using the PATCH ``/api​/channels​/{channel_name}`` endpoint: +The ``metadata`` attribute` holds the ``includelist``, ``excludelist`` or ``proxylist`` attributes, which can therefore be changed after creating a channel using the PATCH ``/api​/channels​/{channel_name}`` endpoint: .. code:: json From 854d4716554232876a1a4a1608008c145339e245 Mon Sep 17 00:00:00 2001 From: Yasin Date: Fri, 8 Sep 2023 10:21:15 +0200 Subject: [PATCH 19/36] Update quetz/dao.py Co-authored-by: Andreas Albert <103571926+AndreasAlbertQC@users.noreply.github.com> --- quetz/dao.py | 1 + 1 file changed, 1 insertion(+) diff --git a/quetz/dao.py b/quetz/dao.py index 19064875..eae6fe2a 100644 --- a/quetz/dao.py +++ b/quetz/dao.py @@ -335,6 +335,7 @@ def create_channel( return channel def remove_package(self, channel_name: str, package_name: str): + """Deletes a package and all package versions and members associated to it.""" self.db.query(Package).filter(Package.channel_name == channel_name).filter( Package.name == package_name ).delete() From cf5cc7af400c65c0329d21ed369e99ffc5abded0 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 10:27:03 +0200 Subject: [PATCH 20/36] fix --- quetz/dao.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/quetz/dao.py b/quetz/dao.py index eae6fe2a..8924fd5d 100644 --- a/quetz/dao.py +++ b/quetz/dao.py @@ -335,7 +335,7 @@ def create_channel( return channel def remove_package(self, channel_name: str, package_name: str): - """Deletes a package and all package versions and members associated to it.""" + """Deletes a package and all package versions and members associated to it.""" self.db.query(Package).filter(Package.channel_name == channel_name).filter( Package.name == package_name ).delete() @@ -596,7 +596,6 @@ def get_channel(self, channel_name: str) -> Optional[Channel]: return self.db.query(Channel).filter(Channel.name == channel_name).one_or_none() def get_package(self, channel_name: str, package_name: str) -> Optional[Package]: - print(f"get_package: {channel_name}{package_name}") return ( self.db.query(Package) .join(Channel) From f6480b758e4417284abd630020fa364f9ac8d16f Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 11:13:33 +0200 Subject: [PATCH 21/36] validate that proxy mode only allows a single mirror url --- quetz/main.py | 7 +------ quetz/rest_models.py | 4 ++++ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/quetz/main.py b/quetz/main.py index 6a027e19..a34fc22e 100644 --- a/quetz/main.py +++ b/quetz/main.py @@ -1810,14 +1810,9 @@ def serve_path( redirect_url = channel.mirror_channel_urls[0] return RedirectResponse(f"{redirect_url}/{path}") - # note: you can only proxy, when you have exactly one remote channel in your config + # note: proxy mode only works with one mirror url (checked on channel creation) if channel.mirror_channel_urls and channel.mirror_mode == "proxy": proxy_url = channel.mirror_channel_urls[0] - if len(channel.mirror_channel_urls) > 1: - logger.warning( - "More than one mirror channel url configured " - f"for channel {channel.name}. Proxying only to the first one." - ) repository = RemoteRepository(proxy_url, session) if not pkgstore.file_exists(channel.name, path): download_remote_file(repository, pkgstore, channel.name, path) diff --git a/quetz/rest_models.py b/quetz/rest_models.py index c23d04d8..117f3552 100644 --- a/quetz/rest_models.py +++ b/quetz/rest_models.py @@ -184,6 +184,10 @@ def check_mirror_params(self) -> "Channel": "'mirror_mode' provided but 'mirror_channel_url' is undefined" ) + if mirror_mode == MirrorMode.proxy: + if isinstance(mirror_url, list) and len(mirror_url) > 1: + raise ValueError("'proxy' mode requires a single 'mirror_channel_url'") + return self From e0f3bf15dfcf74012e7d4254388699d998f6c322 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 11:19:59 +0200 Subject: [PATCH 22/36] add more commments --- quetz/rest_models.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/quetz/rest_models.py b/quetz/rest_models.py index 117f3552..f9c57f47 100644 --- a/quetz/rest_models.py +++ b/quetz/rest_models.py @@ -124,7 +124,12 @@ class ChannelMetadata(BaseModel): """ examples: - includelist: ["numpy", "pandas"] + this allows to mirror only numpy and pandas + irrespective of the channel they come from - includelist: {"channel1: ["numpy", "pandas"]}, {"channel2": ["scipy"]} + this allows to mirror numpy and pandas from channel1 and scipy from channel2 + - proxylist: ["numpy", "pandas"] + this will redirect all download for these packages to the original channel """ includelist: Optional[ From ce554b6dabcc485c7c461202fea44357403d12ee Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 11:39:45 +0200 Subject: [PATCH 23/36] add docs --- quetz/tasks/mirror.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index d9ab8032..b19f9e42 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -293,6 +293,24 @@ def handle_repodata_package( def get_remote_repodata( channel_name: str, arch: str, remote_repository: RemoteRepository ) -> Union[dict, None]: + """ + Fetches the repodata.json file from a remote repository + for a given channel and architecture. + + The function tries to fetch two types of repodata files: + "repodata_from_packages.json" and "repodata.json". + If both files are not found or are not properly formatted, + the function returns None. + + Args: + channel_name (str) + arch (str) + remote_repository (RemoteRepository): remote repo to fetch from + + Returns: + dict or None: A dictionary containing the repodata + if the file is found and properly formatted, None otherwise. + """ repodata = {} for repodata_fn in ["repodata_from_packages.json", "repodata.json"]: try: From 299d89b36befe568aaf714a870f7821248492c18 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 14:14:56 +0200 Subject: [PATCH 24/36] add comment --- quetz/utils.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/quetz/utils.py b/quetz/utils.py index 058f95b3..dc7827bb 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -95,9 +95,16 @@ def check_package_membership( Check if a package should be in a channel according to the rules defined in the channel metadata. - Since for a given package and includelist, a package can be not in the includelist - of one remote channel but in the includelist of another remote channel, the result - of this check can also be to ignore the current package. + The function returns a representation of the treatment the package + should receive (include / exclude / ignore). + + A package should be: + * included if is in the includelist (for this channel) + * excluded if is in the excludelist (for this channel) + this means that existing versions of the package will be removed + * ignored if it does not match the includelist for this channel + this does not remove the package since it might + match the includelist of another channel Args: channel (Channel): mirror Channel object returned from the database From 1afd522ba6a24f4cdc623ff5e600fefe929319f8 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 14:16:22 +0200 Subject: [PATCH 25/36] rename method --- quetz/tasks/mirror.py | 4 ++-- quetz/tests/test_mirror.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index b19f9e42..e1dcfef5 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -338,7 +338,7 @@ def get_remote_repodata( return {} -def initial_sync_mirror( +def sync_mirror( channel_name: str, remote_repository: RemoteRepository, arch: str, @@ -733,7 +733,7 @@ def synchronize_packages( subdirs = KNOWN_SUBDIRS for arch in subdirs: - initial_sync_mirror( + sync_mirror( new_channel.name, remote_repo, arch, diff --git a/quetz/tests/test_mirror.py b/quetz/tests/test_mirror.py index 068e83dd..1296ccac 100644 --- a/quetz/tests/test_mirror.py +++ b/quetz/tests/test_mirror.py @@ -22,7 +22,7 @@ create_packages_from_channeldata, create_versions_from_repodata, handle_repodata_package, - initial_sync_mirror, + sync_mirror, ) from quetz.testing.mockups import MockWorker @@ -482,7 +482,7 @@ def close(self): dummy_repo = RemoteRepository("", DummySession()) - initial_sync_mirror( + sync_mirror( mirror_channel.name, dummy_repo, arch, @@ -548,7 +548,7 @@ def close(self): dummy_repo = RemoteRepository("", DummySession()) - initial_sync_mirror( + sync_mirror( mirror_channel.name, dummy_repo, arch, From 0c837798b6a88c57fbcb82ecacd15a4724dc1008 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 14:20:42 +0200 Subject: [PATCH 26/36] add type hints --- quetz/tasks/mirror.py | 59 ++++++++----------------------------------- 1 file changed, 10 insertions(+), 49 deletions(-) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index e1dcfef5..f4807e94 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -6,7 +6,7 @@ from concurrent.futures import ThreadPoolExecutor from http.client import IncompleteRead from tempfile import SpooledTemporaryFile -from typing import List, Union +from typing import List, Tuple, Union import requests from fastapi import HTTPException, status @@ -520,7 +520,12 @@ def handle_batch(update_batch): indexing.update_indexes(dao, pkgstore, channel_name, subdirs=[arch]) -def remove_packages(remove_batch, channel: Channel, dao, pkgstore) -> bool: +def remove_packages( + remove_batch: List[Tuple[str, str]], + channel: Channel, + dao: Dao, + pkgstore: PackageStore, +) -> bool: """ Remove packages from the channel and the package store. Args: @@ -536,57 +541,13 @@ def remove_packages(remove_batch, channel: Channel, dao, pkgstore) -> bool: package_specs_remove = set([p[1].split("-")[0] for p in remove_batch]) for package_specs in package_specs_remove: dao.remove_package(channel.name, package_name=package_specs) - # TODO: is this needed every time? - dao.cleanup_channel_db(channel.name, package_name=package_specs) if pkgstore.file_exists(channel.name, package_specs): pkgstore.delete_file(channel.name, destination=package_specs) - removal_performed |= True - - return removal_performed - + removal_performed = True -def remove_local_packages( - channel_name: str, - dao: Dao, - pkgstore: PackageStore, - auth: authorization.Rules, - includelist: List[str | dict] = None, - excludelist: List[str | dict] = None, -): - """ - For each package in the channel, check if it is in the includelist and - not in the excludelist. If not, remove it from the channel and the package store. + dao.cleanup_channel_db(channel.name, package_name=package_specs) - We assume that the includelist and excludelist are well-formed, - e.g. they don't overlap. - """ - from utils import get_matching_hosts - - channel = dao.get_channel(channel_name) - if not channel: - logger.error(f"channel {channel_name} not found") - return - - if not includelist: - includelist = [] - if not excludelist: - excludelist = [] - - packages_to_remove = [] - - for package in channel.packages: - info = package.current_package_version - name, version, build_string = info.package_name, info.version, info.build_string - # check: does this package match _any_ of the includelist patterns? - if not get_matching_hosts(name, version, build_string, includelist): - packages_to_remove.append((None, name)) - - # check: does this package match _any_ of the excludelist patterns? - if get_matching_hosts(name, version, build_string, excludelist): - packages_to_remove.append((None, name)) - - if packages_to_remove: - remove_packages(packages_to_remove, channel, dao, pkgstore) + return removal_performed def create_packages_from_channeldata( From a3b39dbaf2c49d54cae95948313638a01f0e06f0 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 15:55:04 +0200 Subject: [PATCH 27/36] use dist for parsing --- quetz/tasks/mirror.py | 15 +++++++++------ quetz/utils.py | 5 +++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index f4807e94..8c84cf56 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -29,6 +29,7 @@ add_static_file, check_package_membership, ) +from utils import parse_package_filename # copy common subdirs from conda: # https://github.com/conda/conda/blob/a78a2387f26a188991d771967fc33aa1fb5bb810/conda/base/constants.py#L63 @@ -536,16 +537,18 @@ def remove_packages( pkgstore Returns True if any removals were performed. """ + logger.debug(f"Removing {len(remove_batch)} packages: {remove_batch}") removal_performed = False - package_specs_remove = set([p[1].split("-")[0] for p in remove_batch]) - for package_specs in package_specs_remove: - dao.remove_package(channel.name, package_name=package_specs) - if pkgstore.file_exists(channel.name, package_specs): - pkgstore.delete_file(channel.name, destination=package_specs) + + for package_spec in set(p[1] for p in remove_batch): + package_name, version, build_string = parse_package_filename(package_spec) + dao.remove_package(channel.name, package_name=package_name) + if pkgstore.file_exists(channel.name, package_spec): + pkgstore.delete_file(channel.name, destination=package_spec) removal_performed = True - dao.cleanup_channel_db(channel.name, package_name=package_specs) + dao.cleanup_channel_db(channel.name) return removal_performed diff --git a/quetz/utils.py b/quetz/utils.py index dc7827bb..be1c1687 100644 --- a/quetz/utils.py +++ b/quetz/utils.py @@ -21,6 +21,7 @@ from typing import Any, Callable from urllib.parse import unquote +from conda.models.dist import Dist from conda.models.match_spec import MatchSpec from sqlalchemy import String, and_, cast, collate, not_, or_ @@ -37,9 +38,9 @@ def parse_package_filename(package_name: str) -> tuple[str, str, str]: Returns: tuple[str, str, str]: (name, version, build-string) """ + dist_obj = Dist.from_string(package_name) - spec = package_name.split("-") - return spec[0], spec[1] if len(spec) > 1 else "", spec[2] if len(spec) > 2 else "" + return dist_obj.name, dist_obj.version, dist_obj.build_string def check_package_match( From fc5c4bf5e839c776ec63209fad93640549034370 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 15:57:29 +0200 Subject: [PATCH 28/36] sooner package creation based on cond --- quetz/tasks/mirror.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 8c84cf56..626c5a21 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -684,12 +684,12 @@ def synchronize_packages( for mirror_channel_url in new_channel.mirror_channel_urls: remote_repo = RemoteRepository(mirror_channel_url, session) - auth.assert_user() + user_id = auth.assert_user() channel_data = _load_remote_channel_data(remote_repo) subdirs = None - if use_repodata: - # create_packages_from_channeldata(channel_name, user_id, channel_data, dao) + if use_repodata and includelist is None and excludelist is None: + create_packages_from_channeldata(channel_name, user_id, channel_data, dao) subdirs = channel_data.get("subdirs", []) # if no channel data use known architectures From ff909380d7601fb1d5522928436941d88348d17c Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 16:09:26 +0200 Subject: [PATCH 29/36] simplify package removal from database --- quetz/dao.py | 23 ++++++++++------------- quetz/db_models.py | 1 + quetz/tasks/mirror.py | 2 +- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/quetz/dao.py b/quetz/dao.py index 8924fd5d..3615d932 100644 --- a/quetz/dao.py +++ b/quetz/dao.py @@ -334,20 +334,17 @@ def create_channel( return channel - def remove_package(self, channel_name: str, package_name: str): - """Deletes a package and all package versions and members associated to it.""" - self.db.query(Package).filter(Package.channel_name == channel_name).filter( - Package.name == package_name - ).delete() - self.db.query(PackageVersion).filter( - PackageVersion.channel_name == channel_name - ).filter(PackageVersion.package_name == package_name).delete() - - # remove PackageMember entries as well - self.db.query(PackageMember).filter( - PackageMember.channel_name == channel_name - ).filter(PackageMember.package_name == package_name).delete() + def remove_package(self, package_name: str, channel_name: Optional[str] = None): + """ + Remove package from database but only from the given channel if specified. + Due to cascading behaviour, this will also remove all package versions and + package members. + """ + query = self.db.query(Package).filter(Package.name == package_name) + if channel_name: + query = query.filter(Package.channel_name == channel_name) + query.delete() self.db.commit() def cleanup_channel_db( diff --git a/quetz/db_models.py b/quetz/db_models.py index 9c4944b1..cae88025 100644 --- a/quetz/db_models.py +++ b/quetz/db_models.py @@ -173,6 +173,7 @@ class Package(Base): ), viewonly=True, lazy="select", + cascade="all,delete-orphan", ) @property diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 626c5a21..7fe0f30c 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -543,7 +543,7 @@ def remove_packages( for package_spec in set(p[1] for p in remove_batch): package_name, version, build_string = parse_package_filename(package_spec) - dao.remove_package(channel.name, package_name=package_name) + dao.remove_package(package_name=package_name, channel_name=channel.name) if pkgstore.file_exists(channel.name, package_spec): pkgstore.delete_file(channel.name, destination=package_spec) removal_performed = True From 5f951d0846807da6225a8edff28d238712b6c8eb Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 16:11:43 +0200 Subject: [PATCH 30/36] update comments --- quetz/pkgstores.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quetz/pkgstores.py b/quetz/pkgstores.py index 6802dc0a..cdf4819a 100644 --- a/quetz/pkgstores.py +++ b/quetz/pkgstores.py @@ -68,8 +68,8 @@ def support_redirect(self) -> bool: @abc.abstractmethod def create_channel(self, name): """ - create channel with given name, depending on backend, - e.g. create a directory or bucket + create backend resources for a channel with a given name, + e.g. create a directory or bucket. """ pass From 4108791962b310a227d587efd3713449532ee412 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 18:29:48 +0200 Subject: [PATCH 31/36] replaced Pydantic Field Validation with Annotated by constr --- quetz/rest_models.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/quetz/rest_models.py b/quetz/rest_models.py index f9c57f47..b454e0a9 100644 --- a/quetz/rest_models.py +++ b/quetz/rest_models.py @@ -7,12 +7,19 @@ import uuid from datetime import date, datetime from enum import Enum -from typing import Annotated, Dict, Generic, List, Optional, TypeVar, Union +from typing import Dict, Generic, List, NewType, Optional, TypeVar, Union -from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator +from pydantic import ( + BaseModel, + ConfigDict, + Field, + constr, + field_validator, + model_validator, +) T = TypeVar('T') -URLType = Annotated[str, Field(pattern="^(http|https)://.+")] +URLType = NewType('URLType', constr(pattern="^(http|https)://.+|None|^(\\[.*\\])+$")) class BaseProfile(BaseModel): @@ -66,9 +73,7 @@ class ChannelBase(BaseModel): private: bool = Field(True, title="channel should be private") size_limit: Optional[int] = Field(None, title="size limit of the channel") ttl: int = Field(36000, title="ttl of the channel") - mirror_channel_url: Optional[Union[URLType, List[URLType]]] = Field( - None, - ) + mirror_channel_url: Optional[Union[URLType, List[URLType]]] = Field(None) mirror_mode: Optional[MirrorMode] = Field(None, nullable=True) @field_validator("size_limit") From 2ed0e86158abf136e4e1e96348683053739fee6e Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 19:28:20 +0200 Subject: [PATCH 32/36] use enum instead of concrete values for mirror mode --- quetz/main.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/quetz/main.py b/quetz/main.py index a34fc22e..2f7dbbd8 100644 --- a/quetz/main.py +++ b/quetz/main.py @@ -704,9 +704,15 @@ def post_channel( if not new_channel.mirror_channel_url: auth.assert_create_channel() - is_mirror = new_channel.mirror_channel_url and new_channel.mirror_mode == "mirror" + is_mirror = ( + new_channel.mirror_channel_url + and new_channel.mirror_mode == rest_models.MirrorMode.mirror + ) - is_proxy = new_channel.mirror_channel_url and new_channel.mirror_mode == "proxy" + is_proxy = ( + new_channel.mirror_channel_url + and new_channel.mirror_mode == rest_models.MirrorMode.proxy + ) if is_mirror: auth.assert_create_mirror_channel() @@ -1811,7 +1817,10 @@ def serve_path( return RedirectResponse(f"{redirect_url}/{path}") # note: proxy mode only works with one mirror url (checked on channel creation) - if channel.mirror_channel_urls and channel.mirror_mode == "proxy": + if ( + channel.mirror_channel_urls + and channel.mirror_mode == rest_models.MirrorMode.proxy + ): proxy_url = channel.mirror_channel_urls[0] repository = RemoteRepository(proxy_url, session) if not pkgstore.file_exists(channel.name, path): From 183f482d9d1cb5022e696070cd563184c9e70256 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 19:29:58 +0200 Subject: [PATCH 33/36] use enum instead of concrete values for mirror mode --- quetz/tasks/assertions.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/quetz/tasks/assertions.py b/quetz/tasks/assertions.py index 8789ea8f..6daa90db 100644 --- a/quetz/tasks/assertions.py +++ b/quetz/tasks/assertions.py @@ -1,5 +1,10 @@ +from quetz.rest_models import MirrorMode + + def can_channel_synchronize(channel): - return channel.mirror_channel_urls and (channel.mirror_mode in ["mirror", "proxy"]) + return channel.mirror_channel_urls and ( + channel.mirror_mode in [MirrorMode.proxy, MirrorMode.mirror] + ) def can_channel_synchronize_metrics(channel): From b51ef67bb44292550aa8464a5b9ebcf97d347aec Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 19:38:11 +0200 Subject: [PATCH 34/36] add docstring --- quetz/main.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/quetz/main.py b/quetz/main.py index 2f7dbbd8..9354656d 100644 --- a/quetz/main.py +++ b/quetz/main.py @@ -1488,6 +1488,25 @@ def _assert_filename_package_name_consistent(file_name: str, package_name: str): after=after_log(logger, logging.WARNING), ) def _extract_and_upload_package(file, channel_name, channel_proxylist): + """ + Extracts information from a conda package and uploads it to a package store. + + This is automatically reried on error (indicating an unsuccessful upload) + The wait time between retries is exponentially increasing. + + Parameters: + file (object): The conda package file object to be uploaded. + channel_name (str): The name of the channel where the package will be uploaded. + channel_proxylist (list): A list of package names that are proxied. + If the package is in this list, it will not be uploaded. + + Returns: + conda_info: contains extracted information from the conda package. + + Raises: + Exception: If there is an error in extracting conda info from the package + or in uploading the package to the store. + """ try: conda_info = CondaInfo(file.file, file.filename) except Exception as e: From 5ded7dde9eba859fb5c727edf1aa0bb299a1d31d Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 19:40:30 +0200 Subject: [PATCH 35/36] add docstring --- quetz/tasks/mirror.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 7fe0f30c..3aa49f17 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -209,6 +209,16 @@ def download_file(remote_repository, path_metadata): after=after_log(logger, logging.WARNING), ) def _upload_package(file, channel_name, subdir, pkgstore): + """ + Uploads package to package store incl. retries with exponential time increase + on failure. + + Parameters: + file (object): The conda package file object to be uploaded. + channel_name (str): The name of the channel where the package will be uploaded. + subdir: architecture/subdir to use for the package. + pkgstore: target package store to use + """ dest = os.path.join(subdir, file.filename) try: From c0acdbc5bcac99ca480a81146b55daab95d94f7b Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Fri, 8 Sep 2023 20:11:32 +0200 Subject: [PATCH 36/36] fix import --- quetz/tasks/mirror.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index 3aa49f17..f57c2d16 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -28,8 +28,8 @@ TicToc, add_static_file, check_package_membership, + parse_package_filename, ) -from utils import parse_package_filename # copy common subdirs from conda: # https://github.com/conda/conda/blob/a78a2387f26a188991d771967fc33aa1fb5bb810/conda/base/constants.py#L63