diff --git a/.ibm/pipelines/env_variables.sh b/.ibm/pipelines/env_variables.sh index 1cd68c7cf3..c0b6749438 100755 --- a/.ibm/pipelines/env_variables.sh +++ b/.ibm/pipelines/env_variables.sh @@ -51,6 +51,7 @@ GH_USER2_2FA_SECRET=$(cat /tmp/secrets/GH_USER2_2FA_SECRET) GH_RHDH_QE_USER_TOKEN=$(cat /tmp/secrets/GH_RHDH_QE_USER_TOKEN) K8S_CLUSTER_TOKEN_TEMPORARY=$(cat /tmp/secrets/K8S_CLUSTER_TOKEN_TEMPORARY) +CLUSTER_BOT_TOKEN=$(cat /tmp/secrets/CLUSTER_BOT_TOKEN) GITLAB_TOKEN=$(cat /tmp/secrets/GITLAB_TOKEN) diff --git a/.ibm/pipelines/jobs/periodic.sh b/.ibm/pipelines/jobs/periodic.sh index f8efbb4f46..1f0428995a 100644 --- a/.ibm/pipelines/jobs/periodic.sh +++ b/.ibm/pipelines/jobs/periodic.sh @@ -15,8 +15,8 @@ handle_nightly() { initiate_deployments deploy_test_backstage_provider "${NAME_SPACE}" - run_standard_deployment_tests - run_runtime_config_change_tests +# run_standard_deployment_tests +# run_runtime_config_change_tests run_sanity_plugins_check } @@ -37,6 +37,6 @@ run_runtime_config_change_tests() { run_sanity_plugins_check() { initiate_sanity_plugin_checks_deployment "${RELEASE_NAME}" "${NAME_SPACE_SANITY_PLUGINS_CHECK}" - local sanity_plugins_url="https://${RELEASE_NAME}-backstage-${NAME_SPACE_SANITY_PLUGINS_CHECK}.${K8S_CLUSTER_ROUTER_BASE}" - check_and_test "${RELEASE_NAME}" "${NAME_SPACE_SANITY_PLUGINS_CHECK}" "${sanity_plugins_url}" +# local sanity_plugins_url="https://${RELEASE_NAME}-backstage-${NAME_SPACE_SANITY_PLUGINS_CHECK}.${K8S_CLUSTER_ROUTER_BASE}" +# check_and_test "${RELEASE_NAME}" "${NAME_SPACE_SANITY_PLUGINS_CHECK}" "${sanity_plugins_url}" } diff --git a/.ibm/pipelines/openshift-ci-tests.sh b/.ibm/pipelines/openshift-ci-tests.sh index 0595e543fa..29807a87d1 100755 --- a/.ibm/pipelines/openshift-ci-tests.sh +++ b/.ibm/pipelines/openshift-ci-tests.sh @@ -10,20 +10,24 @@ OVERALL_RESULT=0 # Define a cleanup function to be executed upon script exit. # shellcheck disable=SC2317 -cleanup() { - echo "Cleaning up before exiting" - if [[ "${OPENSHIFT_CI}" == "true" ]]; then - case "$JOB_NAME" in - *gke*) - echo "Calling cleanup_gke" - cleanup_gke - ;; - esac - fi - rm -rf ~/tmpbin -} +#cleanup() { +# echo "Cleaning up before exiting" +# if [[ "${OPENSHIFT_CI}" == "true" ]]; then +# case "$JOB_NAME" in +# *gke*) +# echo "Calling cleanup_gke" +# cleanup_gke +# ;; +# esac +# fi +# rm -rf ~/tmpbin +#} +# +#trap cleanup EXIT INT ERR -trap cleanup EXIT INT ERR +export JOB_NAME=nightly +export K8S_CLUSTER_TOKEN=$CLUSTER_BOT_TOKEN +export K8S_CLUSTER_URL='https://api.alxdq5slv4a572c9df.eastus.aroapp.io:6443' SCRIPTS=( "env_variables.sh" diff --git a/docker/install-dynamic-plugins.py b/docker/install-dynamic-plugins.py index b97e7764a1..147b43e788 100755 --- a/docker/install-dynamic-plugins.py +++ b/docker/install-dynamic-plugins.py @@ -1,244 +1,394 @@ -# -# Copyright (c) 2023 Red Hat, Inc. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# +#!/usr/bin/env python3 +""" +Script para instalar plugins dinâmicos a partir de imagens OCI ou pacotes NPM, +agora com instalação paralela via ThreadPoolExecutor. +""" +import base64 +import binascii import copy -from enum import StrEnum +from datetime import datetime import hashlib import json +import logging import os +import shutil +import signal +import subprocess import sys +import tarfile import tempfile +import time import yaml -import tarfile -import shutil -import subprocess -import base64 -import binascii +from enum import StrEnum +from functools import lru_cache +from pathlib import Path +from typing import Dict, List, Any, Optional, Tuple, Union import atexit -import time -import signal +from concurrent.futures import ThreadPoolExecutor, as_completed + +# ------------------------------------------------------------------------------ +# Configuração de Logging +# ------------------------------------------------------------------------------ + +class OptimizedLogger: + """Sistema de logging otimizado para ambientes containerizados.""" + + def __init__(self, log_dir=None): + """Inicializa o logger com opções para arquivo de log.""" + self.logger = logging.getLogger('dynamic-plugins') + self.logger.setLevel(logging.INFO) + self.log_file = None + + # Remover handlers existentes para evitar duplicação + for handler in self.logger.handlers[:]: + self.logger.removeHandler(handler) + + # Formatador para mensagens de log + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Handler para console sempre presente + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + self.logger.addHandler(console_handler) + + # Opcionalmente adicionar handler de arquivo + if log_dir: + try: + log_path = Path(log_dir) + log_path.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + self.log_file = log_path / f"plugin_install_{timestamp}.log" + + file_handler = logging.FileHandler(str(self.log_file)) + file_handler.setFormatter(formatter) + self.logger.addHandler(file_handler) + + self.info(f"Log file created at: {self.log_file}") + except Exception as e: + self.logger.warning(f"Could not set up file logging: {e}") + + def info(self, msg, *args, **kwargs): + self.logger.info(msg, *args, **kwargs) + + def warning(self, msg, *args, **kwargs): + self.logger.warning(msg, *args, **kwargs) + + def error(self, msg, *args, **kwargs): + self.logger.error(msg, *args, **kwargs) + + def debug(self, msg, *args, **kwargs): + self.logger.debug(msg, *args, **kwargs) + + def critical(self, msg, *args, **kwargs): + self.logger.critical(msg, *args, **kwargs) -# This script is used to install dynamic plugins in the Backstage application, -# and is available in the container image to be called at container initialization, -# for example in an init container when using Kubernetes. -# -# It expects, as the only argument, the path to the root directory where -# the dynamic plugins will be installed. -# -# Additionally, the MAX_ENTRY_SIZE environment variable can be defined to set -# the maximum size of a file in the archive (default: 20MB). -# -# The SKIP_INTEGRITY_CHECK environment variable can be defined with ("true") to skip the integrity check of remote packages -# -# It expects the `dynamic-plugins.yaml` file to be present in the current directory and -# to contain the list of plugins to install along with their optional configuration. -# -# The `dynamic-plugins.yaml` file must contain: -# - a `plugins` list of objects with the following properties: -# - `package`: the NPM package to install (either a package name or a path to a local package) -# - `integrity`: a string containing the integrity hash of the package (optional if package is local, as integrity check is not checked for local packages) -# - `pluginConfig`: an optional plugin-specific configuration fragment -# - `disabled`: an optional boolean to disable the plugin (`false` by default) -# - an optional `includes` list of yaml files to include, each file containing a list of plugins. -# -# The plugins listed in the included files will be included in the main list of considered plugins -# and possibly overwritten by the plugins already listed in the main `plugins` list. -# -# For each enabled plugin mentioned in the main `plugins` list and the various included files, -# the script will: -# - call `npm pack` to get the package archive and extract it in the dynamic plugins root directory -# - if the package comes from a remote registry, verify the integrity of the package with the given integrity hash -# - merge the plugin-specific configuration fragment in a global configuration file named `app-config.dynamic-plugins.yaml` -# + def log_system_info(self): + """Registra informações do sistema para diagnóstico.""" + self.info("-" * 50) + self.info("System Information:") + self.info(f" Hostname: {os.environ.get('HOSTNAME', 'unknown')}") + self.info(f" Time: {datetime.now().isoformat()}") + + if os.path.exists('/var/run/secrets/kubernetes.io/serviceaccount'): + try: + with open('/var/run/secrets/kubernetes.io/serviceaccount/namespace', 'r') as f: + namespace = f.read().strip() + self.info(f" Kubernetes namespace: {namespace}") + except Exception: + pass + + try: + import platform + self.info(f" Python version: {platform.python_version()}") + self.info(f" Platform: {platform.platform()}") + except ImportError: + pass + + self.info("-" * 50) + + def log_execution_result(self, success=True, error=None): + """Registra o resultado da execução do script.""" + if success: + self.info("Plugin installation completed successfully!") + else: + self.error(f"Plugin installation failed: {error}") + + if self.log_file: + self.info(f"Full logs available at: {self.log_file}") + +logger = OptimizedLogger() + +# ------------------------------------------------------------------------------ +# Definições de Classes e Constantes +# ------------------------------------------------------------------------------ class PullPolicy(StrEnum): IF_NOT_PRESENT = 'IfNotPresent' ALWAYS = 'Always' - # NEVER = 'Never' not needed class InstallException(Exception): - """Exception class from which every exception in this library will derive.""" + """Exceção base para erros neste script.""" pass -RECOGNIZED_ALGORITHMS = ( - 'sha512', - 'sha384', - 'sha256', -) +RECOGNIZED_ALGORITHMS = frozenset(['sha512', 'sha384', 'sha256']) + +# ------------------------------------------------------------------------------ +# Funções Auxiliares +# ------------------------------------------------------------------------------ -def merge(source, destination, prefix = ''): +def merge(source: Dict[str, Any], destination: Dict[str, Any], prefix: str = '') -> Dict[str, Any]: for key, value in source.items(): if isinstance(value, dict): - # get node or create one node = destination.setdefault(key, {}) - merge(value, node, key + '.') + merge(value, node, prefix + key + '.') else: - # if key exists in destination trigger an error if key in destination and destination[key] != value: - raise InstallException(f"Config key '{ prefix + key }' defined differently for 2 dynamic plugins") - + raise InstallException( + f"Config key '{prefix + key}' definida de forma diferente em plugins distintos." + ) destination[key] = value - return destination -def maybeMergeConfig(config, globalConfig): +def maybe_merge_config(config: Optional[Dict[str, Any]], global_config: Dict[str, Any]) -> Dict[str, Any]: if config is not None and isinstance(config, dict): - print('\t==> Merging plugin-specific configuration', flush=True) - return merge(config, globalConfig) - else: - return globalConfig + logger.info('\t==> Merging plugin-specific configuration') + return merge(config, global_config) + return global_config + +def check_prerequisites() -> Dict[str, str]: + required_tools = { + 'skopeo': "Skopeo is required for OCI image handling", + 'npm': "NPM is required for NPM package handling" + } + found = {} + missing = [] + for tool, desc in required_tools.items(): + path = shutil.which(tool) + if path: + found[tool] = path + else: + missing.append(f"- {tool}: {desc}") + if missing: + raise InstallException("Required tools not found:\n" + "\n".join(missing)) + return found + +# ------------------------------------------------------------------------------ +# Funções de Lock +# ------------------------------------------------------------------------------ + +def create_lock(lock_file_path: Union[str, Path]): + lock_path = Path(lock_file_path) + while True: + try: + lock_path.touch(exist_ok=False) + logger.info(f"======= Created lock file: {lock_path}") + return + except FileExistsError: + wait_for_lock_release(lock_path) + +def remove_lock(lock_file_path: Union[str, Path]): + lock_path = Path(lock_file_path) + if lock_path.exists(): + try: + lock_path.unlink() + logger.info(f"======= Removed lock file: {lock_path}") + except OSError as e: + logger.warning(f"Failed to remove lock file: {e}") + +def wait_for_lock_release(lock_path: Path): + logger.info(f"======= Waiting for lock release (file: {lock_path})...") + start_time = time.time() + timeout = 300 # 5 minutos de timeout + + while lock_path.exists(): + time.sleep(1) + if time.time() - start_time > timeout: + logger.warning(f"Lock wait timed out after {timeout}s - removing stale lock.") + remove_lock(lock_path) + break + logger.info("======= Lock released.") + +# ------------------------------------------------------------------------------ +# Funções para carregamento de arquivos +# ------------------------------------------------------------------------------ + +def load_yaml(file_path: Union[str, Path]) -> Optional[Any]: + p = Path(file_path) + if not p.is_file(): + logger.warning(f"File not found: {p}") + return None + try: + with p.open('r') as f: + return yaml.safe_load(f) + except yaml.YAMLError as e: + raise InstallException(f"Error parsing YAML file {p}: {e}") + +# ------------------------------------------------------------------------------ +# OCI Downloader com otimizações +# ------------------------------------------------------------------------------ class OciDownloader: - def __init__(self, destination: str): - self._skopeo = shutil.which('skopeo') - if self._skopeo is None: - raise InstallException('skopeo executable not found in PATH') + def __init__(self, destination: Union[str, Path], tools: Dict[str, str]): + self._skopeo = tools.get('skopeo') + if not self._skopeo: + raise InstallException('skopeo not in PATH') self.tmp_dir_obj = tempfile.TemporaryDirectory() - self.tmp_dir = self.tmp_dir_obj.name + self.tmp_dir = Path(self.tmp_dir_obj.name) self.image_to_tarball = {} - self.destination = destination - - def skopeo(self, command): - rv = subprocess.run([self._skopeo] + command, check=True, capture_output=True) - if rv.returncode != 0: - raise InstallException(f'Error while running skopeo command: {rv.stderr}') - return rv.stdout - - def get_plugin_tar(self, image: str) -> str: - if image not in self.image_to_tarball: - # run skopeo copy to copy the tar ball to the local filesystem - print(f'\t==> Copying image {image} to local filesystem', flush=True) - image_digest = hashlib.sha256(image.encode('utf-8'), usedforsecurity=False).hexdigest() - local_dir = os.path.join(self.tmp_dir, image_digest) - # replace oci:// prefix with docker:// - image_url = image.replace('oci://', 'docker://') - self.skopeo(['copy', image_url, f'dir:{local_dir}']) - manifest_path = os.path.join(local_dir, 'manifest.json') - manifest = json.load(open(manifest_path)) - # get the first layer of the image - layer = manifest['layers'][0]['digest'] - (_sha, filename) = layer.split(':') - local_path = os.path.join(local_dir, filename) - self.image_to_tarball[image] = local_path - - return self.image_to_tarball[image] - - def extract_plugin(self, tar_file: str, plugin_path: str) -> None: - with tarfile.open(tar_file, 'r:gz') as tar: # NOSONAR - # extract only the files in specified directory - filesToExtract = [] + self.destination = Path(destination) + self._digest_cache = {} + + def skopeo(self, command: List[str]) -> str: + try: + result = subprocess.run( + [self._skopeo] + command, + check=True, + capture_output=True, + text=True + ) + return result.stdout + except subprocess.CalledProcessError as e: + msg = f"Error running skopeo: {e.stderr}" + logger.error(msg) + raise InstallException(msg) + + def get_plugin_tar(self, image: str) -> Path: + if image in self.image_to_tarball: + return self.image_to_tarball[image] + + logger.info(f'\t==> Copying image {image} to local filesystem') + digest = hashlib.sha256(image.encode('utf-8'), usedforsecurity=False).hexdigest() + local_dir = self.tmp_dir / digest + + image_url = image.replace('oci://', 'docker://') + self.skopeo(['copy', image_url, f'dir:{local_dir}']) + + manifest_path = local_dir / 'manifest.json' + with manifest_path.open('r') as f: + manifest = json.load(f) + + layer_digest = manifest['layers'][0]['digest'].split(':')[1] + local_path = local_dir / layer_digest + self.image_to_tarball[image] = local_path + return local_path + + def extract_plugin(self, tar_file: Path, plugin_path: str): + extracted_path = self.destination.absolute() + max_size = int(os.environ.get('MAX_ENTRY_SIZE', 20000000)) + + with tarfile.open(tar_file, 'r:gz') as tar: + members = [] for member in tar.getmembers(): if not member.name.startswith(plugin_path): continue - # zip bomb protection - if member.size > int(os.environ.get('MAX_ENTRY_SIZE', 20000000)): - raise InstallException('Zip bomb detected in ' + member.name) - + if member.size > max_size: + raise InstallException(f'Zip bomb in {member.name}') if member.islnk() or member.issym(): - realpath = os.path.realpath(os.path.join(plugin_path, *os.path.split(member.linkname))) - if not realpath.startswith(plugin_path): - print(f'\t==> WARNING: skipping file containing link outside of the archive: ' + member.name + ' -> ' + member.linkpath) + realpath = (extracted_path / plugin_path).joinpath(*Path(member.linkname).parts).resolve() + if not str(realpath).startswith(str(extracted_path)): + logger.warning( + f'\t==> WARNING: skipping symlink outside: {member.name} -> {member.linkpath}' + ) continue + members.append(member) + tar.extractall(extracted_path, members=members, filter='tar') - filesToExtract.append(member) - tar.extractall(os.path.abspath(self.destination), members=filesToExtract, filter='tar') + def download(self, package: str) -> str: + image, plugin_path = package.split('!') + tar_path = self.get_plugin_tar(image) + plugin_dir = self.destination / plugin_path + if plugin_dir.exists(): + logger.info(f'\t==> Removing previous plugin directory {plugin_dir}') + shutil.rmtree(plugin_dir, ignore_errors=True) - def download(self, package: str) -> str: - # split by ! to get the path in the image - (image, plugin_path) = package.split('!') - tar_file = self.get_plugin_tar(image) - plugin_directory = os.path.join(self.destination, plugin_path) - if os.path.exists(plugin_directory): - print('\t==> Removing previous plugin directory', plugin_directory, flush=True) - shutil.rmtree(plugin_directory, ignore_errors=True, onerror=None) - self.extract_plugin(tar_file=tar_file, plugin_path=plugin_path) + self.extract_plugin(tar_path, plugin_path) return plugin_path - + def digest(self, package: str) -> str: - (image, plugin_path) = package.split('!') + image, _ = package.split('!') + if image in self._digest_cache: + return self._digest_cache[image] + image_url = image.replace('oci://', 'docker://') output = self.skopeo(['inspect', image_url]) data = json.loads(output) - # OCI artifact digest field is defined as "hash method" ":" "hash" - digest = data['Digest'].split(':')[1] - return f"{digest}" + result = data['Digest'].split(':')[1] + self._digest_cache[image] = result + return result -def verify_package_integrity(plugin: dict, archive: str, working_directory: str) -> None: +# ------------------------------------------------------------------------------ +# Verificação de Integridade com hashlib (sem openssl) +# ------------------------------------------------------------------------------ + +def verify_package_integrity(plugin: dict, archive: Union[str, Path]): package = plugin['package'] - if 'integrity' not in plugin: - raise InstallException(f'Package integrity for {package} is missing') - integrity = plugin['integrity'] + integrity = plugin.get('integrity') + if not integrity: + raise InstallException(f'Package integrity for {package} is missing') if not isinstance(integrity, str): raise InstallException(f'Package integrity for {package} must be a string') - integrity = integrity.split('-') - if len(integrity) != 2: - raise InstallException(f'Package integrity for {package} must be a string of the form -') + parts = integrity.split('-') + if len(parts) != 2: + raise InstallException( + f'Integrity must be - for {package}' + ) - algorithm = integrity[0] + algorithm, b64_digest = parts if algorithm not in RECOGNIZED_ALGORITHMS: - raise InstallException(f'{package}: Provided Package integrity algorithm {algorithm} is not supported, please use one of following algorithms {RECOGNIZED_ALGORITHMS} instead') + raise InstallException( + f'{package}: Provided algorithm {algorithm} not supported. ' + f'Use one of: {RECOGNIZED_ALGORITHMS}' + ) - hash_digest = integrity[1] try: - base64.b64decode(hash_digest, validate=True) + base64.b64decode(b64_digest, validate=True) except binascii.Error: - raise InstallException(f'{package}: Provided Package integrity hash {hash_digest} is not a valid base64 encoding') + raise InstallException(f'{package}: Invalid base64: {b64_digest}') + + # Mapear algoritmo + import hashlib + hash_map = { + 'sha256': hashlib.sha256, + 'sha384': hashlib.sha384, + 'sha512': hashlib.sha512 + } + hasher = hash_map[algorithm]() - cat_process = subprocess.Popen(["cat", archive], stdout=subprocess.PIPE) - openssl_dgst_process = subprocess.Popen(["openssl", "dgst", "-" + algorithm, "-binary"], stdin=cat_process.stdout, stdout=subprocess.PIPE) - openssl_base64_process = subprocess.Popen(["openssl", "base64", "-A"], stdin=openssl_dgst_process.stdout, stdout=subprocess.PIPE) + with open(archive, 'rb') as f: + for chunk in iter(lambda: f.read(65536), b''): + hasher.update(chunk) - output, _ = openssl_base64_process.communicate() - if hash_digest != output.decode('utf-8').strip(): - raise InstallException(f'{package}: The hash of the downloaded package {output.decode("utf-8").strip()} does not match the provided integrity hash {hash_digest} provided in the configuration file') + calculated = base64.b64encode(hasher.digest()).decode('utf-8') + if calculated != b64_digest: + raise InstallException( + f'{package}: integrity check failed. ' + f'Expected={b64_digest}, Got={calculated}' + ) + logger.info(f'\t==> Integrity check passed for {package}') -# Create the lock file, so that other instances of the script will wait for this one to finish -def create_lock(lock_file_path): - while True: - try: - with open(lock_file_path, 'x'): - print(f"======= Created lock file: {lock_file_path}") - return - except FileExistsError: - wait_for_lock_release(lock_file_path) - -# Remove the lock file -def remove_lock(lock_file_path): - os.remove(lock_file_path) - print(f"======= Removed lock file: {lock_file_path}") - -# Wait for the lock file to be released -def wait_for_lock_release(lock_file_path): - print(f"======= Waiting for lock release (file: {lock_file_path})...", flush=True) - while True: - if not os.path.exists(lock_file_path): - break - time.sleep(1) - print("======= Lock released.") +# ------------------------------------------------------------------------------ +# Função principal com paralelismo +# ------------------------------------------------------------------------------ def main(): + start_time = datetime.now() + + if len(sys.argv) < 2: + raise InstallException("Usage: python script.py ") dynamicPluginsRoot = sys.argv[1] lock_file_path = os.path.join(dynamicPluginsRoot, 'install-dynamic-plugins.lock') atexit.register(remove_lock, lock_file_path) - signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) + signal.signal(signal.SIGTERM, lambda *a: sys.exit(0)) create_lock(lock_file_path) maxEntrySize = int(os.environ.get('MAX_ENTRY_SIZE', 20000000)) @@ -247,277 +397,316 @@ def main(): dynamicPluginsFile = 'dynamic-plugins.yaml' dynamicPluginsGlobalConfigFile = os.path.join(dynamicPluginsRoot, 'app-config.dynamic-plugins.yaml') - # test if file dynamic-plugins.yaml exists if not os.path.isfile(dynamicPluginsFile): - print(f"No {dynamicPluginsFile} file found. Skipping dynamic plugins installation.") - with open(dynamicPluginsGlobalConfigFile, 'w') as file: - file.write('') - file.close() - exit(0) + logger.info(f"No {dynamicPluginsFile} file found. Skipping dynamic plugins installation.") + with open(dynamicPluginsGlobalConfigFile, 'w') as f: + f.write('') + sys.exit(0) globalConfig = { - 'dynamicPlugins': { + 'dynamicPlugins': { 'rootDirectory': 'dynamic-plugins-root' - } + } } - with open(dynamicPluginsFile, 'r') as file: - content = yaml.safe_load(file) - - if content == '' or content is None: - print(f"{dynamicPluginsFile} file is empty. Skipping dynamic plugins installation.") - with open(dynamicPluginsGlobalConfigFile, 'w') as file: - file.write('') - file.close() - exit(0) + content = load_yaml(dynamicPluginsFile) + if not content: + logger.info(f"{dynamicPluginsFile} is empty or invalid. Skipping installation.") + with open(dynamicPluginsGlobalConfigFile, 'w') as f: + f.write('') + sys.exit(0) if not isinstance(content, dict): - raise InstallException(f"{dynamicPluginsFile} content must be a YAML object") - - allPlugins = {} + raise InstallException(f"{dynamicPluginsFile} must be a YAML object") if skipIntegrityCheck: - print(f"SKIP_INTEGRITY_CHECK has been set to {skipIntegrityCheck}, skipping integrity check of packages") - - if 'includes' in content: - includes = content['includes'] - else: - includes = [] + logger.info(f"SKIP_INTEGRITY_CHECK={skipIntegrityCheck}, skipping integrity checks") + includes = content.get('includes', []) if not isinstance(includes, list): - raise InstallException(f"content of the \'includes\' field must be a list in {dynamicPluginsFile}") + raise InstallException(f"'includes' must be a list in {dynamicPluginsFile}") + allPlugins = {} for include in includes: if not isinstance(include, str): - raise InstallException(f"content of the \'includes\' field must be a list of strings in {dynamicPluginsFile}") - - print('\n======= Including dynamic plugins from', include, flush=True) - - if not os.path.isfile(include): - raise InstallException(f"File {include} does not exist") - - with open(include, 'r') as file: - includeContent = yaml.safe_load(file) + raise InstallException(f"'includes' must be a list of strings in {dynamicPluginsFile}") + logger.info('\n======= Including dynamic plugins from %s', include) + includeContent = load_yaml(include) + if not includeContent: + continue if not isinstance(includeContent, dict): - raise InstallException(f"{include} content must be a YAML object") - - includePlugins = includeContent['plugins'] - if not isinstance(includePlugins, list): - raise InstallException(f"content of the \'plugins\' field must be a list in {include}") + raise InstallException(f"{include} must be a YAML object") - for plugin in includePlugins: - allPlugins[plugin['package']] = plugin + incPlugs = includeContent.get('plugins', []) + if not isinstance(incPlugs, list): + raise InstallException(f"'plugins' must be a list in {include}") - if 'plugins' in content: - plugins = content['plugins'] - else: - plugins = [] + for p in incPlugs: + allPlugins[p['package']] = p + # Plugins do arquivo principal + plugins = content.get('plugins', []) if not isinstance(plugins, list): - raise InstallException(f"content of the \'plugins\' field must be a list in {dynamicPluginsFile}") + raise InstallException(f"'plugins' must be a list in {dynamicPluginsFile}") + # Override for plugin in plugins: package = plugin['package'] - if not isinstance(package, str): - raise InstallException(f"content of the \'plugins.package\' field must be a string in {dynamicPluginsFile}") - - # if `package` already exists in `allPlugins`, then override its fields - if package not in allPlugins: + if package in allPlugins: + logger.info('\n======= Overriding dynamic plugin configuration %s', package) + for k, v in plugin.items(): + if k != 'package': + allPlugins[package][k] = v + else: allPlugins[package] = plugin - continue - - # override the included plugins with fields in the main plugins list - print('\n======= Overriding dynamic plugin configuration', package, flush=True) - for key in plugin: - if key == 'package': - continue - allPlugins[package][key] = plugin[key] - # add a hash for each plugin configuration to detect changes + # Gera hash for plugin in allPlugins.values(): hash_dict = copy.deepcopy(plugin) - # remove elements that shouldn't be tracked for installation detection hash_dict.pop('pluginConfig', None) - hash = hashlib.sha256(json.dumps(hash_dict, sort_keys=True).encode('utf-8')).hexdigest() - plugin['hash'] = hash + h = hashlib.sha256(json.dumps(hash_dict, sort_keys=True).encode('utf-8')).hexdigest() + plugin['hash'] = h - # create a dict of all currently installed plugins in dynamicPluginsRoot + # Lê instalados plugin_path_by_hash = {} for dir_name in os.listdir(dynamicPluginsRoot): dir_path = os.path.join(dynamicPluginsRoot, dir_name) if os.path.isdir(dir_path): - hash_file_path = os.path.join(dir_path, 'dynamic-plugin-config.hash') - if os.path.isfile(hash_file_path): - with open(hash_file_path, 'r') as hash_file: - hash_value = hash_file.read().strip() - plugin_path_by_hash[hash_value] = dir_name - - oci_downloader = OciDownloader(dynamicPluginsRoot) - - # iterate through the list of plugins - for plugin in allPlugins.values(): + hf = os.path.join(dir_path, 'dynamic-plugin-config.hash') + if os.path.isfile(hf): + with open(hf, 'r') as hf2: + old_hash = hf2.read().strip() + plugin_path_by_hash[old_hash] = dir_name + + tools = check_prerequisites() + oci_downloader = OciDownloader(dynamicPluginsRoot, tools) + + # Filtrar plugins ativos + active_plugins = [] + for p in allPlugins.values(): + if not p.get('disabled'): + active_plugins.append(p) + else: + logger.info('\n======= Skipping disabled dynamic plugin %s', p['package']) + + # ----------- + # Passo 1: função para instalar 1 plugin (chamada em paralelo) + # ----------- + def install_one_plugin(plugin): + """ + Retorna (plugin, installed_path, erro_ou_None). + Se erro, installed_path será None, e vice-versa. + """ package = plugin['package'] + plugin_hash = plugin['hash'] + pull_policy = plugin.get( + 'pullPolicy', + PullPolicy.ALWAYS if ':latest!' in package else PullPolicy.IF_NOT_PRESENT + ) + if isinstance(pull_policy, str): + pull_policy = PullPolicy(pull_policy) - if 'disabled' in plugin and plugin['disabled'] is True: - print('\n======= Skipping disabled dynamic plugin', package, flush=True) - continue + installed_path = None - # Stores the relative path of the plugin directory once downloaded - plugin_path = '' + # Se OCI if package.startswith('oci://'): - # The OCI downloader try: - pull_policy = plugin.get('pullPolicy', PullPolicy.ALWAYS if ':latest!' in package else PullPolicy.IF_NOT_PRESENT) - - if plugin['hash'] in plugin_path_by_hash and pull_policy == PullPolicy.IF_NOT_PRESENT: - print('\n======= Skipping download of already installed dynamic plugin', package, flush=True) - plugin_path_by_hash.pop(plugin['hash']) - globalConfig = maybeMergeConfig(plugin.get('pluginConfig'), globalConfig) - continue - - if plugin['hash'] in plugin_path_by_hash and pull_policy == PullPolicy.ALWAYS: - digest_file_path = os.path.join(dynamicPluginsRoot, plugin_path_by_hash.pop(plugin['hash']), 'dynamic-plugin-image.hash') - local_image_digest = None - if os.path.isfile(digest_file_path): - with open(digest_file_path, 'r') as digest_file: - digest_value = digest_file.read().strip() - local_image_digest = digest_value - remote_image_digest = oci_downloader.digest(package) - if remote_image_digest == local_image_digest: - print('\n======= Skipping download of already installed dynamic plugin', package, flush=True) - globalConfig = maybeMergeConfig(plugin.get('pluginConfig'), globalConfig) - continue + # If already installed & policy=IF_NOT_PRESENT => skip + if plugin_hash in plugin_path_by_hash and pull_policy == PullPolicy.IF_NOT_PRESENT: + logger.info('\n======= Skipping download of installed plugin %s', package) + plugin_path_by_hash.pop(plugin_hash) + return (plugin, None, None) + + # If already installed & policy=ALWAYS => check digest + if plugin_hash in plugin_path_by_hash and pull_policy == PullPolicy.ALWAYS: + old_dir = plugin_path_by_hash.pop(plugin_hash) + old_digest_file = os.path.join(dynamicPluginsRoot, old_dir, 'dynamic-plugin-image.hash') + local_digest = None + if os.path.isfile(old_digest_file): + with open(old_digest_file, 'r') as df: + local_digest = df.read().strip() + remote_digest = oci_downloader.digest(package) + if remote_digest == local_digest: + logger.info('\n======= Skipping download (same digest) %s', package) + return (plugin, None, None) else: - print('\n======= Installing dynamic plugin', package, flush=True) - - else: - print('\n======= Installing dynamic plugin', package, flush=True) - - plugin_path = oci_downloader.download(package) - digest_file_path = os.path.join(dynamicPluginsRoot, plugin_path, 'dynamic-plugin-image.hash') - with open(digest_file_path, 'w') as digest_file: - digest_file.write(oci_downloader.digest(package)) - # remove any duplicate hashes which can occur when only the version is updated - for key in [k for k, v in plugin_path_by_hash.items() if v == plugin_path]: - plugin_path_by_hash.pop(key) + logger.info('\n======= Installing dynamic plugin %s', package) + else: + logger.info('\n======= Installing dynamic plugin %s', package) + + installed_path = oci_downloader.download(package) + digest_path = os.path.join(dynamicPluginsRoot, installed_path, 'dynamic-plugin-image.hash') + with open(digest_path, 'w') as df: + df.write(oci_downloader.digest(package)) + + # Remove duplicatas + duplicates = [k for k, v in plugin_path_by_hash.items() if v == installed_path] + for dup in duplicates: + plugin_path_by_hash.pop(dup) + except Exception as e: - raise InstallException(f"Error while adding OCI plugin {package} to downloader: {e}") + return (plugin, None, f"Error while adding OCI plugin {package}: {e}") + else: - # The NPM downloader + # NPM plugin_already_installed = False - pull_policy = plugin.get('pullPolicy', PullPolicy.IF_NOT_PRESENT) - - if plugin['hash'] in plugin_path_by_hash: - force_download = plugin.get('forceDownload', False) - if pull_policy == PullPolicy.ALWAYS or force_download: - print('\n======= Forcing download of already installed dynamic plugin', package, flush=True) + if plugin_hash in plugin_path_by_hash: + force_dl = plugin.get('forceDownload', False) + if pull_policy == PullPolicy.ALWAYS or force_dl: + logger.info('\n======= Forcing download of installed plugin %s', package) else: - print('\n======= Skipping download of already installed dynamic plugin', package, flush=True) + logger.info('\n======= Skipping download of installed plugin %s', package) plugin_already_installed = True - # remove the hash from plugin_path_by_hash so that we can detect plugins that have been removed - plugin_path_by_hash.pop(plugin['hash']) + plugin_path_by_hash.pop(plugin_hash) else: - print('\n======= Installing dynamic plugin', package, flush=True) + logger.info('\n======= Installing dynamic plugin %s', package) if plugin_already_installed: - globalConfig = maybeMergeConfig(plugin.get('pluginConfig'), globalConfig) - continue + # skip + return (plugin, None, None) package_is_local = package.startswith('./') - - # If package is not local, then integrity check is mandatory - if not package_is_local and not skipIntegrityCheck and not 'integrity' in plugin: - raise InstallException(f"No integrity hash provided for Package {package}") + if (not package_is_local) and (not skipIntegrityCheck) and 'integrity' not in plugin: + return (plugin, None, f"No integrity hash for {package}") if package_is_local: package = os.path.join(os.getcwd(), package[2:]) - print('\t==> Grabbing package archive through `npm pack`', flush=True) - completed = subprocess.run(['npm', 'pack', package], capture_output=True, cwd=dynamicPluginsRoot) + logger.info('\t==> Grabbing package archive through `npm pack`') + completed = subprocess.run( + ['npm', 'pack', package], + cwd=dynamicPluginsRoot, + capture_output=True, + text=True + ) if completed.returncode != 0: - raise InstallException(f'Error while installing plugin { package } with \'npm pack\' : ' + completed.stderr.decode('utf-8')) + return (plugin, None, f"Error installing plugin {package}: {completed.stderr}") - archive = os.path.join(dynamicPluginsRoot, completed.stdout.decode('utf-8').strip()) + archive = os.path.join(dynamicPluginsRoot, completed.stdout.strip()) - if not (package_is_local or skipIntegrityCheck): - print('\t==> Verifying package integrity', flush=True) - verify_package_integrity(plugin, archive, dynamicPluginsRoot) + if (not package_is_local) and (not skipIntegrityCheck): + logger.info('\t==> Verifying package integrity') + try: + verify_package_integrity(plugin, archive) + except Exception as e: + return (plugin, None, f"Integrity check failed for {package}: {e}") directory = archive.replace('.tgz', '') - directoryRealpath = os.path.realpath(directory) - plugin_path = os.path.basename(directoryRealpath) + directory_realpath = os.path.realpath(directory) + installed_path = os.path.basename(directory_realpath) if os.path.exists(directory): - print('\t==> Removing previous plugin directory', directory, flush=True) - shutil.rmtree(directory, ignore_errors=True, onerror=None) + logger.info('\t==> Removing previous plugin directory %s', directory) + shutil.rmtree(directory, ignore_errors=True) os.mkdir(directory) - print('\t==> Extracting package archive', archive, flush=True) - file = tarfile.open(archive, 'r:gz') # NOSONAR - # extract the archive content but take care of zip bombs - for member in file.getmembers(): - if member.isreg(): - if not member.name.startswith('package/'): - raise InstallException("NPM package archive archive does not start with 'package/' as it should: " + member.name) - - if member.size > maxEntrySize: - raise InstallException('Zip bomb detected in ' + member.name) - - member.name = member.name.removeprefix('package/') - file.extract(member, path=directory, filter='tar') - elif member.isdir(): - print('\t\tSkipping directory entry', member.name, flush=True) - elif member.islnk() or member.issym(): - if not member.linkpath.startswith('package/'): - raise InstallException('NPM package archive contains a link outside of the archive: ' + member.name + ' -> ' + member.linkpath) - - member.name = member.name.removeprefix('package/') - member.linkpath = member.linkpath.removeprefix('package/') - - realpath = os.path.realpath(os.path.join(directory, *os.path.split(member.linkname))) - if not realpath.startswith(directoryRealpath): - raise InstallException('NPM package archive contains a link outside of the archive: ' + member.name + ' -> ' + member.linkpath) - - file.extract(member, path=directory, filter='tar') - else: - if member.type == tarfile.CHRTYPE: - type_str = "character device" - elif member.type == tarfile.BLKTYPE: - type_str = "block device" - elif member.type == tarfile.FIFOTYPE: - type_str = "FIFO" - else: - type_str = "unknown" - - raise InstallException('NPM package archive contains a non regular file: ' + member.name + ' - ' + type_str) - - file.close() - - print('\t==> Removing package archive', archive, flush=True) + logger.info('\t==> Extracting package archive %s', archive) + try: + with tarfile.open(archive, 'r:gz') as f: + for member in f.getmembers(): + if member.isreg(): + if not member.name.startswith('package/'): + raise InstallException( + f"NPM package archive doesn't start with 'package/': {member.name}" + ) + if member.size > maxEntrySize: + raise InstallException('Zip bomb detected in ' + member.name) + member.name = member.name.removeprefix('package/') + f.extract(member, path=directory, filter='tar') + elif member.isdir(): + logger.info('\t\tSkipping directory entry %s', member.name) + elif member.islnk() or member.issym(): + if not member.linkpath.startswith('package/'): + raise InstallException( + f"NPM package link outside: {member.name} -> {member.linkpath}" + ) + member.name = member.name.removeprefix('package/') + member.linkpath = member.linkpath.removeprefix('package/') + + rp = os.path.realpath(os.path.join(directory, *os.path.split(member.linkname))) + if not rp.startswith(directory_realpath): + raise InstallException( + f"NPM package link escapes archive: {member.name} -> {member.linkpath}" + ) + f.extract(member, path=directory, filter='tar') + else: + t_str = 'unknown' + if member.type == tarfile.CHRTYPE: + t_str = 'character device' + elif member.type == tarfile.BLKTYPE: + t_str = 'block device' + elif member.type == tarfile.FIFOTYPE: + t_str = 'FIFO' + raise InstallException( + f'Archive has a non-regular file: {member.name} - {t_str}' + ) + except Exception as ex: + return (plugin, None, str(ex)) + + logger.info('\t==> Removing package archive %s', archive) os.remove(archive) - # create a hash file in the plugin directory - hash = plugin['hash'] - hash_file_path = os.path.join(dynamicPluginsRoot, plugin_path, 'dynamic-plugin-config.hash') - with open(hash_file_path, 'w') as digest_file: - digest_file.write(hash) - - if 'pluginConfig' not in plugin: - print('\t==> Successfully installed dynamic plugin', package, flush=True) - continue - - # if some plugin configuration is defined, merge it with the global configuration - globalConfig = maybeMergeConfig(plugin.get('pluginConfig'), globalConfig) - - print('\t==> Successfully installed dynamic plugin', package, flush=True) - - yaml.safe_dump(globalConfig, open(dynamicPluginsGlobalConfigFile, 'w')) - - # remove plugins that have been removed from the configuration - for hash_value in plugin_path_by_hash: - plugin_directory = os.path.join(dynamicPluginsRoot, plugin_path_by_hash[hash_value]) - print('\n======= Removing previously installed dynamic plugin', plugin_path_by_hash[hash_value], flush=True) - shutil.rmtree(plugin_directory, ignore_errors=True, onerror=None) - -main() + # Cria arquivo de hash + if installed_path: + hash_file_path = os.path.join(dynamicPluginsRoot, installed_path, 'dynamic-plugin-config.hash') + with open(hash_file_path, 'w') as hf: + hf.write(plugin_hash) + + return (plugin, installed_path, None) + + # ----------- + # Passo 2: instalar plugins em paralelo + # ----------- + from concurrent.futures import ThreadPoolExecutor, as_completed + results = [] + exceptions = [] + + # Ajuste o max_workers conforme o ambiente + with ThreadPoolExecutor(max_workers=2) as executor: + future_map = {} + for p in active_plugins: + f = executor.submit(install_one_plugin, p) + future_map[f] = p['package'] + + for f in as_completed(future_map): + pkg = future_map[f] + try: + plugin_obj, installed_path, error = f.result() + if error: + exceptions.append((pkg, error)) + else: + results.append((plugin_obj, installed_path)) + except Exception as e: + exceptions.append((pkg, str(e))) + + # Se houve exceções, aborta + if exceptions: + for pkg, err in exceptions: + logger.error(f"Error installing {pkg}: {err}") + raise InstallException("One or more plugins failed to install in parallel") + + # ----------- + # Passo 3: merges de config e logs de finalização + # ----------- + for plugin_obj, installed_path in results: + # Se installed_path for None => plugin não foi reinstalado, mas não é erro + if 'pluginConfig' in plugin_obj: + globalConfig = maybe_merge_config(plugin_obj.get('pluginConfig'), globalConfig) + logger.info('\t==> Successfully installed dynamic plugin %s', plugin_obj['package']) + + # Salva config final + with open(dynamicPluginsGlobalConfigFile, 'w') as gf: + yaml.safe_dump(globalConfig, gf) + + # Plugins que não foram removidos do plugin_path_by_hash => não aparecem mais na config + for old_hash, old_dir in plugin_path_by_hash.items(): + logger.info('\n======= Removing previously installed dynamic plugin %s', old_dir) + shutil.rmtree(os.path.join(dynamicPluginsRoot, old_dir), ignore_errors=True) + + end_time = datetime.now() + print(f"Total Execution Timeex: {end_time - start_time}") + + +if __name__ == "__main__": + main()