Skip to content

Commit

Permalink
refactor: extract repodata retrieval || add: package matching on more…
Browse files Browse the repository at this point in the history
… complicated patterns
  • Loading branch information
YYYasin19 committed Jun 15, 2023
1 parent 0a37241 commit b708632
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 79 deletions.
48 changes: 35 additions & 13 deletions quetz/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ def _alembic_config(db_url: str) -> AlembicConfig:
script_location = "quetz:migrations"

migration_modules = [
f"{ep.module}:versions" for ep in entry_points().select(group='quetz.migrations')
f"{ep.module}:versions"
for ep in entry_points().select(group='quetz.migrations')
]
migration_modules.append("quetz:migrations/versions")

Expand Down Expand Up @@ -126,7 +127,9 @@ def _make_migrations(
found = True

if plugin_name != "quetz" and not found:
raise Exception(f"models entrypoint (quetz.models) for plugin {plugin_name} not registered")
raise Exception(
f"models entrypoint (quetz.models) for plugin {plugin_name} not registered"
)

logger.info('Making DB migrations on %r for %r', db_url, plugin_name)
if not alembic_config and db_url:
Expand All @@ -136,7 +139,9 @@ def _make_migrations(
if plugin_name == "quetz":
version_path = None # Path(quetz.__file__).parent / 'migrations' / 'versions'
else:
entry_point = tuple(entry_points().select(group='quetz.migrations', name=plugin_name))[0]
entry_point = tuple(
entry_points().select(group='quetz.migrations', name=plugin_name)
)[0]
module = entry_point.load()
version_path = str(Path(module.__file__).parent / "versions")

Expand Down Expand Up @@ -197,7 +202,9 @@ def _set_user_roles(db: Session, config: Config):
f"with identity from provider '{provider}'"
)
elif user.role is not None and user.role != default_role:
logger.warning(f"user has already role {user.role} not assigning a new role")
logger.warning(
f"user has already role {user.role} not assigning a new role"
)
else:
user.role = role

Expand Down Expand Up @@ -393,11 +400,14 @@ def create(
help="Delete the the deployment if it exists. "
"Must be specified with --copy-conf or --create-conf",
),
exists_ok: bool = typer.Option(False, help="Skip the creation if deployment already exists."),
exists_ok: bool = typer.Option(
False, help="Skip the creation if deployment already exists."
),
dev: bool = typer.Option(
False,
help=(
"Enable/disable dev mode " "(fills the database with test data and allows http access)"
"Enable/disable dev mode "
"(fills the database with test data and allows http access)"
),
),
):
Expand All @@ -415,7 +425,8 @@ def create(
if _is_deployment(deployment_folder):
if exists_ok:
logger.info(
f'Quetz deployment already exists at {deployment_folder}.\n' f'Skipping creation.'
f'Quetz deployment already exists at {deployment_folder}.\n'
f'Skipping creation.'
)
return
if delete and (copy_conf or create_conf):
Expand Down Expand Up @@ -507,7 +518,9 @@ def start(
),
reload: bool = typer.Option(
False,
help=("Enable/disable automatic reloading of the server when sources are modified"),
help=(
"Enable/disable automatic reloading of the server when sources are modified"
),
),
supervisor: bool = typer.Option(
True,
Expand Down Expand Up @@ -590,7 +603,8 @@ def run(
dev: bool = typer.Option(
False,
help=(
"Enable/disable dev mode " "(fills the database with test data and allows http access)"
"Enable/disable dev mode "
"(fills the database with test data and allows http access)"
),
),
port: int = typer.Option(8000, help="The port to bind"),
Expand All @@ -602,7 +616,9 @@ def run(
),
reload: bool = typer.Option(
False,
help=("Enable/disable automatic reloading of the server when sources are modified"),
help=(
"Enable/disable automatic reloading of the server when sources are modified"
),
),
) -> NoReturn:
"""Run a Quetz deployment.
Expand All @@ -617,7 +633,9 @@ def run(
@app.command()
def delete(
path: str = typer.Argument(None, help="The path of the deployment"),
force: bool = typer.Option(False, help="Enable/disable removal without confirmation prompt"),
force: bool = typer.Option(
False, help="Enable/disable removal without confirmation prompt"
),
) -> None:
"""Delete a Quetz deployment."""

Expand All @@ -633,7 +651,9 @@ def delete(


@app.command()
def plugin(cmd: str, path: str = typer.Argument(None, help="Path to the plugin folder")) -> None:
def plugin(
cmd: str, path: str = typer.Argument(None, help="Path to the plugin folder")
) -> None:
if cmd == 'install':
abs_path = Path(path).absolute()
if not (abs_path / "setup.py").exists():
Expand Down Expand Up @@ -672,7 +692,9 @@ def plugin(cmd: str, path: str = typer.Argument(None, help="Path to the plugin f
# Try to install pip if it's missing
if conda_exe_path is not None:
print("pip is missing, installing...")
subprocess.call([conda_exe_path, 'install', '--channel', 'conda-forge', 'pip'])
subprocess.call(
[conda_exe_path, 'install', '--channel', 'conda-forge', 'pip']
)
pip_exe_path = find_executable('pip')

if pip_exe_path is None:
Expand Down
4 changes: 3 additions & 1 deletion quetz/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,6 @@ def register(app):
frontend_dir = f"{sys.prefix}/share/quetz/frontend/"
else:
logger.info("Using basic fallback frontend")
frontend_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "basic_frontend")
frontend_dir = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "basic_frontend"
)
2 changes: 1 addition & 1 deletion quetz/rest_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ChannelActionEnum(str, Enum):


class ChannelMetadata(BaseModel):
includelist: Optional[List[str]] = Field(
includelist: Optional[Union[List[str], Dict[str, List]]] = Field(
None,
title="list of packages to include while creating a channel",
nullable=True,
Expand Down
82 changes: 49 additions & 33 deletions quetz/tasks/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from concurrent.futures import ThreadPoolExecutor
from http.client import IncompleteRead
from tempfile import SpooledTemporaryFile
from typing import List
from typing import List, Union

import requests
from fastapi import HTTPException, status
Expand All @@ -23,7 +23,7 @@
from quetz.errors import DBError
from quetz.pkgstores import PackageStore
from quetz.tasks import indexing
from quetz.utils import TicToc, add_static_file, check_package_membership_pattern
from quetz.utils import TicToc, add_static_file, check_package_membership

# copy common subdirs from conda:
# https://github.com/conda/conda/blob/a78a2387f26a188991d771967fc33aa1fb5bb810/conda/base/constants.py#L63
Expand Down Expand Up @@ -278,36 +278,22 @@ def _upload_package(file, channel_name, subdir):
file.file.close()


def initial_sync_mirror(
channel_name: str,
remote_repository: RemoteRepository,
arch: str,
dao: Dao,
pkgstore: PackageStore,
auth: authorization.Rules,
includelist: List[str] = None,
excludelist: List[str] = None,
skip_errors: bool = True,
use_repodata: bool = False,
):
force = True # needed for updating packages
logger.info(
f"Running channel mirroring {channel_name}/{arch} from {remote_repository.host}"
)

def get_remote_repodata(
channel_name: str, arch: str, remote_repository: RemoteRepository
) -> Union[dict, None]:
repodata = {}
for repodata_fn in ["repodata_from_packages.json", "repodata.json"]:
try:
repo_file = remote_repository.open(os.path.join(arch, repodata_fn))
repodata = json.load(repo_file.file)
break
return repodata
except RemoteServerError:
logger.error(
f"can not get {repodata_fn} for channel {arch}/{channel_name}."
)
if repodata_fn == "repodata.json":
logger.error(f"Giving up for {channel_name}/{arch}.")
return
return None
else:
logger.error("Trying next filename.")
continue
Expand All @@ -317,18 +303,41 @@ def initial_sync_mirror(
f"in channel {channel_name}"
)
if repodata_fn == "repodata.json":
return
return None

return {}

channel = dao.get_channel(channel_name)

def initial_sync_mirror(
channel_name: str,
remote_repository: RemoteRepository,
arch: str,
dao: Dao,
pkgstore: PackageStore,
auth: authorization.Rules,
includelist: List[str] = None,
excludelist: List[str] = None,
skip_errors: bool = True,
use_repodata: bool = False,
):
force = True # needed for updating packages
logger.info(
f"Running channel mirroring {channel_name}/{arch} from {remote_repository.host}"
)

repodata = get_remote_repodata(channel_name, arch, remote_repository)
if not repodata:
return # quit; error has already been logged.

packages = repodata.get("packages", {})

channel = dao.get_channel(channel_name)
if not channel:
logger.error(f"channel {channel_name} not found")
return

from quetz.main import handle_package_files

packages = repodata.get("packages", {})

version_methods = [
_check_checksum(dao, channel_name, arch, "sha256"),
_check_checksum(dao, channel_name, arch, "md5"),
Expand Down Expand Up @@ -408,10 +417,13 @@ def handle_batch(update_batch):
# TODO: also remove all packages that are not in the remote repository anymore
# practically re-write the complete sync mechanism?

# SYNC: Remote -> Local
# for each package in the remote repository:
# validate if it should be downloaded to this channel
# also: remove packages if they are not supposed to in this channel anymore
for repo_package_name, metadata in packages.items():
# if check_package_membership(repo_package_name, includelist, excludelist):
if check_package_membership_pattern(
repo_package_name, includelist, excludelist
if check_package_membership(
channel, repo_package_name, metadata, remote_host=remote_repository.host
):
path = os.path.join(arch, repo_package_name)

Expand Down Expand Up @@ -447,24 +459,28 @@ def handle_batch(update_batch):
# handle final batch
any_updated |= handle_batch(update_batch)

# SYNC: Local checks Remote
# Validate if all packages in this channel are still
# also present in the remote channel
# if not: add them to the remove batch as well
# TODO

if remove_batch:
logger.debug(f"Removing {len(remove_batch)} packages: {remove_batch}")
package_specs_remove = set([p[1].split("-")[0] for p in remove_batch])
# TODO: reuse route [DELETE /api/channels/{channel_name}/packages] logic
for package_specs in package_specs_remove:
# TODO: only remove if it already exists of course
dao.remove_package(channel_name, package_name=package_specs)
# TODO: is this needed every time?
dao.cleanup_channel_db(channel_name, package_name=package_specs)
# only remove if exists
if pkgstore.file_exists(channel.name, package_specs):
pkgstore.delete_file(channel.name, destination=package_specs)

any_updated |= True

if any_updated:
indexing.update_indexes(
dao, pkgstore, channel_name, subdirs=[arch]
) # build repodata
# build local repodata
indexing.update_indexes(dao, pkgstore, channel_name, subdirs=[arch])


def create_packages_from_channeldata(
Expand Down
Loading

0 comments on commit b708632

Please sign in to comment.