diff --git a/python/artifact-searcher/artifact_searcher/artifact.py b/python/artifact-searcher/artifact_searcher/artifact.py index f237d70e6..de53299e4 100644 --- a/python/artifact-searcher/artifact_searcher/artifact.py +++ b/python/artifact-searcher/artifact_searcher/artifact.py @@ -4,6 +4,7 @@ import shutil import tempfile import xml.etree.ElementTree as ET +from functools import partial from pathlib import Path from typing import Any, Optional from urllib.parse import urljoin, urlparse, urlunparse @@ -12,13 +13,30 @@ import aiohttp import requests from aiohttp import BasicAuth +from requests.auth import HTTPBasicAuth + from artifact_searcher.utils.constants import DEFAULT_REQUEST_TIMEOUT, TCP_CONNECTION_LIMIT, METADATA_XML from artifact_searcher.utils.models import Registry, Application, FileExtension, Credentials, ArtifactInfo from envgenehelper import logger -from requests.auth import HTTPBasicAuth + +try: + from qubership_pipelines_common_library.v1.maven_client import Artifact as MavenArtifact +except ImportError: + MavenArtifact = None + +try: + from artifact_searcher.cloud_auth_helper import CloudAuthHelper +except ImportError: + CloudAuthHelper = None WORKSPACE = os.getenv("WORKSPACE", Path(tempfile.gettempdir()) / "zips") +# V2 timeouts for cloud registries +V2_SEARCH_TIMEOUT = 60 # Timeout for find_artifact_urls +V2_DOWNLOAD_TIMEOUT = 120 # Timeout for download_artifact +V2_HTTP_TIMEOUT = (30, 60) # (connect, read) for HTTP requests + + def convert_nexus_repo_url_to_index_view(url: str) -> str: parsed = urlparse(url) @@ -27,7 +45,6 @@ def convert_nexus_repo_url_to_index_view(url: str) -> str: if not parts or parts[-1] != "repository": return url - # Build new path new_parts = parts[:-1] + ["service", "rest", "repository", "browse"] new_path = "/".join(new_parts) + "/" @@ -81,7 +98,8 @@ async def resolve_snapshot_version_async( stop_snapshot_event_for_others.set() logger.info( f"[Task {task_id}] [Application: {app.name}: {version}] - Successfully fetched maven-metadata.xml: {metadata_url}") - return resolved_version, task_id + return resolved_version, task_id + return None except Exception as e: logger.warning( f"[Task {task_id}] [Application: {app.name}: {version}] - Error resolving snapshot version from {metadata_url}: {e}") @@ -96,24 +114,43 @@ def _parse_snapshot_version( classifier: str = "" ) -> str | None: root = ET.fromstring(content) + + # Trying new-style first (Maven 3+) if its not found then its switched to Old Style snapshot_versions = root.findall(".//snapshotVersions/snapshotVersion") - if not snapshot_versions: - logger.warning(f"[Application: {app.name}: {version}] - No found") - return - - for node in snapshot_versions: - node_classifier = node.findtext("classifier", default="") - node_extension = node.findtext("extension", default="") - value = node.findtext("value") - if node_classifier == classifier and node_extension == extension: + if snapshot_versions: + for node in snapshot_versions: + node_classifier = node.findtext("classifier", default="") + node_extension = node.findtext("extension", default="") + value = node.findtext("value") + if node_classifier == classifier and node_extension == extension.value: + logger.info( + f"[Task {task_id}] [Application: {app.name}: {version}] - Resolved snapshot version '{value}'") + return value + logger.warning(f"[Task {task_id}] [Application: {app.name}: {version}] - No matching snapshotVersion found") + return None + + # Fallback to old-style metadata (Maven 2 / some Nexus repos) + snapshot_node = root.find(".//snapshot") + if snapshot_node is not None: + timestamp = snapshot_node.findtext("timestamp") + build_number = snapshot_node.findtext("buildNumber") + + if timestamp and build_number: + # Convert timestamp from "yyyyMMdd.HHmmss" format and build timestamped version + base_version = version.replace("-SNAPSHOT", "") + resolved = f"{base_version}-{timestamp}-{build_number}" logger.info( - f"[Task {task_id}] [Application: {app.name}: {version}] - Resolved snapshot version '{value}'") - return value - - logger.warning(f"[Task {task_id}] [Application: {app.name}: {version}] - No matching snapshotVersion found") + f"[Task {task_id}] [Application: {app.name}: {version}] - Resolved snapshot version '{resolved}' from old-style metadata") + return resolved + + logger.warning(f"[Task {task_id}] [Application: {app.name}: {version}] - found but missing timestamp or buildNumber") + return None + + logger.warning(f"[Application: {app.name}: {version}] - No or found in metadata") + return None -def version_to_folder_name(version: str): +def version_to_folder_name(version: str) -> str: """ Normalizes version string for folder naming. @@ -127,7 +164,6 @@ def version_to_folder_name(version: str): folder = version return folder - def clean_temp_dir(): if WORKSPACE.exists(): shutil.rmtree(WORKSPACE) @@ -168,6 +204,10 @@ async def download_async(session, artifact_info: ArtifactInfo) -> ArtifactInfo: Returns: ArtifactInfo: Object containing related information about the artifact """ + if artifact_info.local_path: + logger.info(f"Artifact already downloaded: {artifact_info.local_path}") + return artifact_info + url = artifact_info.url app_local_path = create_app_artifacts_local_path(artifact_info.app_name, artifact_info.app_version) artifact_local_path = os.path.join(app_local_path, os.path.basename(url)) @@ -181,7 +221,8 @@ async def download_async(session, artifact_info: ArtifactInfo) -> ArtifactInfo: artifact_info.local_path = artifact_local_path return artifact_info else: - logger.error(f"Download process with error {response.text}: {url}") + error_text = await response.text() + logger.error(f"Download process with error (status {response.status}): {url} - {error_text}") except Exception as e: logger.error(f"Download process with exception {url}: {e}") @@ -225,6 +266,15 @@ async def check_artifact_by_full_url_async( return full_url, repo logger.warning( f"[Task {task_id}] [Application: {app.name}: {version}] - Artifact not found at URL {full_url}, status: {response.status}") + + # Fallback: Try direct -SNAPSHOT filename if resolved version failed (Nexus compatibility) + if version.endswith("-SNAPSHOT") and resolved_version != version: + fallback_url = create_full_url(app, version, repo_value, artifact_extension, classifier) + async with session.head(fallback_url) as fallback_response: + if fallback_response.status == 200: + stop_artifact_event.set() + logger.info(f"[Task {task_id}] [Application: {app.name}: {version}] - Artifact found with direct SNAPSHOT fallback: {fallback_url}") + return fallback_url, repo except Exception as e: logger.warning( f"[Task {task_id}] [Application: {app.name}: {version}] - Error checking artifact URL {full_url}: {e}") @@ -256,6 +306,7 @@ async def _attempt_check( classifier: str = "" ) -> Optional[tuple[str, tuple[str, str]]]: repos_dict = get_repo_value_pointer_dict(app.registry) + original_domain = app.registry.maven_config.repository_domain_name if registry_url: app.registry.maven_config.repository_domain_name = registry_url @@ -263,64 +314,320 @@ async def _attempt_check( timeout = aiohttp.ClientTimeout(total=DEFAULT_REQUEST_TIMEOUT) stop_snapshot_event_for_others = asyncio.Event() stop_artifact_event = asyncio.Event() - async with aiohttp.ClientSession(timeout=timeout, auth=auth) as session: - async with asyncio.TaskGroup() as tg: - tasks = [ - tg.create_task( - check_artifact_by_full_url_async( - app, - version, - repo, - artifact_extension, - stop_snapshot_event_for_others, - stop_artifact_event, - session, - i, - classifier + try: + async with aiohttp.ClientSession(timeout=timeout, auth=auth) as session: + async with asyncio.TaskGroup() as tg: + tasks = [ + tg.create_task( + check_artifact_by_full_url_async( + app, + version, + repo, + artifact_extension, + stop_snapshot_event_for_others, + stop_artifact_event, + session, + i, + classifier + ) ) - ) - for i, repo in enumerate(repos_dict.items()) - ] + for i, repo in enumerate(repos_dict.items()) + ] - for task in tasks: - result = task.result() - if result is not None: - return result + for task in tasks: + result = task.result() + if result is not None: + return result + finally: + # Always restore original repository domain to avoid persisting browse-index URL + if registry_url: + app.registry.maven_config.repository_domain_name = original_domain async def check_artifact_async( app: Application, artifact_extension: FileExtension, version: str, cred: Credentials | None = None, - classifier: str = "") -> Optional[tuple[str, tuple[str, str]]] | None: + classifier: str = "", env_creds: Optional[dict] = None) -> Optional[tuple[str, tuple[str, str]]] | None: """ Resolves the full artifact URL and the first repository where it was found. Supports both release and snapshot versions. - Returns: Optional[tuple[str, tuple[str, str]]]: A tuple containing: - str: Full URL to the artifact. - tuple[str, str]: A pair of (repository name, repository pointer/alias in CMDB). Returns None if the artifact could not be resolved """ + registry_version = getattr(app.registry, 'version', "1.0") + if registry_version == "2.0": + logger.info(f"Detected RegDef V2 for {app.name}, attempting cloud-aware search") + try: + return await _check_artifact_v2_async(app, artifact_extension, version, env_creds) + except Exception as e: + logger.warning(f"V2 artifact search failed for {app.name}: {e}. Falling back to V1.") + return await _check_artifact_v1_async(app, artifact_extension, version, cred, classifier) + else: + logger.debug(f"Using V1 artifact search for {app.name} (version={registry_version})") + return await _check_artifact_v1_async(app, artifact_extension, version, cred, classifier) - result = await _attempt_check(app, version, artifact_extension, None, cred) - if result is not None: - return result - if not app.registry.maven_config.is_nexus: - return result +async def _check_artifact_v2_async(app: Application, artifact_extension: FileExtension, version: str, + env_creds: Optional[dict]) -> Optional[tuple[str, tuple[str, str]]]: + """Search for artifacts using V2 cloud registry approach. + + Supports AWS CodeArtifact, GCP Artifact Registry, Artifactory, and Nexus via + MavenArtifactSearcher library. Falls back to V1 on failure. + + Returns: + (artifact_url, download_info) tuple or None + """ + # V2 requires authConfig to know how to authenticate with the registry + if not getattr(app.registry.maven_config, 'auth_config', None): + logger.error(f"V2 fallback for '{app.name}': Registry '{app.registry.name}' version 2.0 missing maven_config.authConfig") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + + # Check optional V2 dependencies + if CloudAuthHelper is None or MavenArtifact is None: + missing = [] + if CloudAuthHelper is None: + missing.append("artifact_searcher.cloud_auth_helper") + if MavenArtifact is None: + missing.append("qubership_pipelines_common_library.v1.maven_client.Artifact") + logger.error(f"V2 fallback for '{app.name}': Missing required libraries - {', '.join(missing)}") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + + auth_config = CloudAuthHelper.resolve_auth_config(app.registry, "maven") + if not auth_config: + logger.error(f"V2 fallback for '{app.name}': Could not resolve authConfig for registry '{app.registry.name}'") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + + # AWS/GCP require credentials; Artifactory/Nexus support anonymous if provider is set + if auth_config.provider in ["aws", "gcp"]: + if not env_creds: + logger.error(f"V2 fallback for '{app.name}': {auth_config.provider} requires credentials but env_creds is empty") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + if auth_config.credentials_id and auth_config.credentials_id not in env_creds: + logger.error(f"V2 fallback for '{app.name}': {auth_config.provider} credential '{auth_config.credentials_id}' not found in env_creds") + logger.error(f"Available credentials: {list(env_creds.keys())}") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + + logger.info(f"V2 search for {app.name} with provider={auth_config.provider}") + loop = asyncio.get_running_loop() + + # SNAPSHOT: search by base version, download by timestamped version + resolved_version = version # This will become the timestamped version if it's a SNAPSHOT + search_version = version # Always use base version for searching + + if version.endswith("-SNAPSHOT"): + logger.info(f"Resolving SNAPSHOT version for verification: {app.artifact_id}:{version}") + + # Credentials needed for maven-metadata.xml fetch + cred = None + if auth_config.credentials_id and env_creds: + cred_data = env_creds.get(auth_config.credentials_id) + if cred_data and cred_data.get('username'): + cred = Credentials(username=cred_data['username'], password=cred_data['password']) + + auth = BasicAuth(login=cred.username, password=cred.password) if cred else None + timeout = aiohttp.ClientTimeout(total=DEFAULT_REQUEST_TIMEOUT) + + # Resolve SNAPSHOT to timestamped version across repos + async with aiohttp.ClientSession(timeout=timeout, auth=auth) as session: + repos_dict = get_repo_value_pointer_dict(app.registry) + + # Loop through configured repositories until we find maven-metadata.xml + for repo_value, repo_pointer in repos_dict.items(): + if not repo_value: + continue + + try: + # Fetch maven-metadata.xml for timestamped version + result = await resolve_snapshot_version_async( + session, app, version, repo_value, 0, + asyncio.Event(), asyncio.Event(), + artifact_extension, classifier="" + ) + + if result: + resolved_version = result[0] + logger.info(f"V2 resolved SNAPSHOT: {version} -> {resolved_version}") + break + except Exception as e: + logger.debug(f"Failed to resolve SNAPSHOT from {repo_pointer}: {e}") + continue + + # If we couldn't resolve the SNAPSHOT version, fall back to V1 + if resolved_version == version: + logger.warning(f"Could not resolve SNAPSHOT, falling back to V1") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=cred, classifier="") + + # Create registry-specific searcher (AWS, GCP, Artifactory, Nexus) + try: + searcher = await loop.run_in_executor(None, CloudAuthHelper.create_maven_searcher, app.registry, env_creds) + except KeyError as e: + logger.error(f"V2 fallback for '{app.name}': Credential not found - {e}") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + except ValueError as e: + logger.error(f"V2 fallback for '{app.name}': Invalid configuration - {e}") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + except Exception as e: + logger.error(f"V2 fallback for '{app.name}': Failed to create searcher - {e}", exc_info=True) + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + + # Build artifact identifier (use base SNAPSHOT version for search APIs) + artifact_string = f"{app.group_id}:{app.artifact_id}:{search_version}" + maven_artifact = MavenArtifact.from_string(artifact_string) + maven_artifact.extension = artifact_extension.value + + logger.info(f"V2 searching: {artifact_string}.{artifact_extension.value}") + if resolved_version != search_version: + logger.info(f"V2 resolved version for download: {resolved_version}") + + max_retries = 2 + last_error = None + local_path = None + maven_url = None + + # Retry on transient errors (401, timeout, expired) + for attempt in range(max_retries): + try: + if attempt > 0: + # On retry, recreate the searcher in case credentials expired + logger.info(f"V2 retry {attempt} for {app.name} after 5s delay...") + await asyncio.sleep(5) + searcher = await loop.run_in_executor(None, CloudAuthHelper.create_maven_searcher, app.registry, env_creds) + + urls = await asyncio.wait_for( + loop.run_in_executor(None, partial(searcher.find_artifact_urls, artifact=maven_artifact)), + timeout=V2_SEARCH_TIMEOUT + ) + if not urls: + logger.warning(f"V2 search returned no artifacts for {app.artifact_id}:{version}") + return None - # trying to edit url for nexus and repeat - original_domain = app.registry.maven_config.repository_domain_name - fixed_domain = convert_nexus_repo_url_to_index_view(original_domain) - if fixed_domain != original_domain: - logger.info(f"Retrying artifact check with edited domain: {fixed_domain}") - result = await _attempt_check(app, version, artifact_extension, fixed_domain, cred, classifier) - if result is not None: - return result + maven_url = urls[0] + logger.info(f"Found V2 artifact: {maven_url}") + + local_path = os.path.join(create_app_artifacts_local_path(app.name, version), os.path.basename(maven_url)) + os.makedirs(os.path.dirname(local_path), exist_ok=True) + + download_success = await _v2_download_with_fallback( + searcher, maven_url, local_path, auth_config, app.registry, env_creds + ) + + if download_success: + logger.info(f"V2 artifact downloaded: {local_path}") + break + raise TimeoutError(f"V2 download failed for {maven_url}") + + except asyncio.TimeoutError: + last_error = TimeoutError(f"V2 search timed out after {V2_SEARCH_TIMEOUT}s") + logger.warning(f"V2 search timed out for {app.name} (attempt {attempt + 1}/{max_retries})") + if attempt < max_retries - 1: + continue + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + + except Exception as e: + last_error = e + error_str = str(e).lower() + + # Nexus: 404 means artifact not yet indexed in search + if "404" in error_str and "search request" in error_str: + logger.info(f"V2 search index miss for {app.name} - artifact may not be indexed in Nexus search DB") + logger.info(f"Falling back to V1 direct HTTP lookup") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + + # Try to extract useful HTTP error details for debugging + if hasattr(e, 'response'): + try: + status = getattr(e.response, 'status_code', 'N/A') + url = getattr(e.response, 'url', 'N/A') + logger.error(f"V2 HTTP {status} from {url}") + except Exception: + pass + + # Retry on transient errors (401, timeout, expired) + if attempt < max_retries - 1 and any(x in error_str for x in ["401", "unauthorized", "forbidden", "expired", "timeout"]): + logger.warning(f"V2 transient error for {app.name}, retrying: {e}") + continue + + logger.error(f"V2 error for '{app.name}': {e}") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") else: - logger.debug("Domain is same after editing, skipping retry") + logger.error(f"V2 fallback for '{app.name}': All {max_retries} attempts exhausted - {last_error}") + return await _check_artifact_v1_async(app, artifact_extension, version, cred=None, classifier="") + + # AWS: construct full URL from resource ID + if auth_config.provider == "aws": + registry_domain = app.registry.maven_config.repository_domain_name + folder_name = version_to_folder_name(version) + repo_path = app.registry.maven_config.target_snapshot if folder_name.endswith("-SNAPSHOT") else app.registry.maven_config.target_release + full_url = f"{registry_domain.rstrip('/')}/{repo_path.rstrip('/')}/{maven_url}" + else: + # GCP/Artifactory/Nexus: URL ready-to-use + full_url = maven_url + + return full_url, ("v2_downloaded", local_path) + +async def _v2_download_with_fallback(searcher, url: str, local_path: str, auth_config, + registry: Registry, env_creds: Optional[dict]) -> bool: + """Download artifact via searcher with HTTP fallback. + + Tries searcher download first, falls back to HTTP GET for GCP/Artifactory/Nexus. + Adds GCP access tokens if needed. + """ + loop = asyncio.get_running_loop() + + try: + await asyncio.wait_for( + loop.run_in_executor(None, lambda: searcher.download_artifact(url, str(local_path))), + timeout=V2_DOWNLOAD_TIMEOUT + ) + return True + except asyncio.TimeoutError: + logger.warning(f"Searcher download timed out after {V2_DOWNLOAD_TIMEOUT}s") + except Exception as e: + logger.warning(f"Searcher download failed: {e}") + + if auth_config.provider not in ["gcp", "artifactory", "nexus"]: + return False + + try: + headers = {} + if auth_config.provider == "gcp": + sa_json = CloudAuthHelper.get_gcp_credentials_from_registry(registry, env_creds) + if sa_json: + token = CloudAuthHelper.get_gcp_access_token(sa_json) + if token: + headers["Authorization"] = f"Bearer {token}" + + response = requests.get(url, headers=headers, timeout=V2_HTTP_TIMEOUT, stream=True) + response.raise_for_status() + with open(local_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + logger.info(f"Direct HTTP download successful: {local_path}") + return True + except Exception as e: + logger.warning(f"Direct HTTP download failed for {url}: {e}", exc_info=True) + return False + + +async def _check_artifact_v1_async( + app: Application, + artifact_extension: FileExtension, + version: str, + cred: Credentials | None = None, # ADD from incoming + classifier: str = "" +) -> Optional[tuple[str, tuple[str, str]]]: + result = await _attempt_check(app, version, artifact_extension, cred=cred, classifier=classifier) + if result is not None: + return result + + # Browse URL retry removed - browse endpoints don't support downloads + # V1 fallback will rely on V2 search or direct repository URLs only logger.warning("Artifact not found") + return None def unzip_file(artifact_id: str, app_name: str, app_version: str, zip_url: str): @@ -333,7 +640,7 @@ def unzip_file(artifact_id: str, app_name: str, app_version: str, zip_url: str): zip_file.extract(file, create_app_artifacts_local_path(app_name, app_version)) extracted = True except Exception as e: - logger.error(f"Error unpacking {e}") + logger.error(f"Error unpacking artifact {artifact_id} for {app_name}:{app_version} from {zip_url}: {e}", exc_info=True) if not extracted: logger.warning(f"No files were extracted for application {app_name}:{app_version}") @@ -460,7 +767,7 @@ def resolve_snapshot_version(base_path, extension: FileExtension, cred: Credenti node_classifier = node.findtext("classifier", default="") node_extension = node.findtext("extension", default="") value = node.findtext("value") - if node_classifier == classifier and node_extension == extension: + if node_classifier == classifier and node_extension == extension.value: logger.info(f"Resolved snapshot version '{value}'") return value diff --git a/python/artifact-searcher/artifact_searcher/cloud_auth_helper.py b/python/artifact-searcher/artifact_searcher/cloud_auth_helper.py new file mode 100644 index 000000000..c0cd75489 --- /dev/null +++ b/python/artifact-searcher/artifact_searcher/cloud_auth_helper.py @@ -0,0 +1,332 @@ +import json +import re +from typing import Dict, Optional + +from envgenehelper import logger + +from artifact_searcher.utils.models import AuthConfig, Registry + +try: + from qubership_pipelines_common_library.v1.maven_client import MavenArtifactSearcher +except ImportError: + MavenArtifactSearcher = None + +try: + from google.oauth2 import service_account + from google.auth.transport.requests import Request + GCP_AUTH_AVAILABLE = True +except ImportError: + GCP_AUTH_AVAILABLE = False + + +# Timeout for MavenArtifactSearcher: (connect_timeout, read_timeout) +DEFAULT_SEARCHER_TIMEOUT = (30, 60) + + +class CloudAuthHelper: + """V2 authentication helper for cloud registries. + + Supports: AWS (access keys), GCP (SA JSON), Artifactory/Nexus (user/pass or anonymous). + Creates configured MavenArtifactSearcher per provider. + """ + + @staticmethod + def resolve_auth_config(registry: Registry, artifact_type: str = "maven") -> Optional[AuthConfig]: + """Find auth settings for this registry. + + Looks up authConfig based on maven_config reference. + Returns AuthConfig or None. + """ + if artifact_type != "maven": + return None + + auth_ref = getattr(registry.maven_config, 'auth_config', None) + if not auth_ref: + return None + + if not registry.auth_config: + logger.warning(f"No authConfig dict but maven config references '{auth_ref}'") + return None + + auth_config = registry.auth_config.get(auth_ref) + if not auth_config: + logger.error(f"AuthConfig '{auth_ref}' not found. Available: {list(registry.auth_config.keys())}") + return None + + logger.info(f"Resolved authConfig '{auth_ref}' -> provider: {auth_config.provider}") + return auth_config + + @staticmethod + def resolve_credentials(auth_config: AuthConfig, env_creds: Optional[Dict[str, dict]]) -> Optional[dict]: + """Get credentials from vault using authConfig's credentials ID. + + Handles: usernamePassword (returns dict), secret (GCP), empty creds (anonymous). + Returns dict or None for anonymous. + """ + cred_id = auth_config.credentials_id + if not cred_id: + logger.info("No credentialsId specified, using anonymous access") + return None + + if not env_creds or cred_id not in env_creds: + raise KeyError(f"Credential '{cred_id}' not found in env_creds") + + cred_entry = env_creds[cred_id] + + # Credentials can be structured as {"type": "usernamePassword", "data": {"username": "..."}} + # or as a flat dict {"username": "...", "password": "..."} + cred_type = cred_entry.get("type") if isinstance(cred_entry, dict) else None + cred_data = cred_entry.get("data", cred_entry) if isinstance(cred_entry, dict) else cred_entry + + # For Nexus/Artifactory: empty username+password means anonymous/public access + if cred_type == "usernamePassword": + username = cred_data.get("username", "") + password = cred_data.get("password", "") + if not username and not password: + logger.info(f"Credential '{cred_id}' is anonymous (empty username/password)") + return None + creds = {"username": username, "password": password} + elif cred_type == "secret": + # For GCP service account JSON or other secret-based credentials + if "secret" in cred_data: + creds = cred_data + else: + # Handle case where data itself is the secret + creds = {"secret": cred_data} + else: + # Fallback for unknown credential types + creds = cred_data + + logger.info(f"Resolved credentials for '{cred_id}' (type: {cred_type})") + + # Validate credential format per provider + if auth_config.provider == "aws": + if "username" not in creds or "password" not in creds: + raise ValueError(f"AWS credentials must have 'username' and 'password'") + # GCP needs a service account JSON file (stored as 'secret') + elif auth_config.provider == "gcp" and auth_config.auth_method == "service_account": + if "secret" not in creds: + raise ValueError(f"GCP service_account credentials must have 'secret'") + + return creds + + @staticmethod + def _extract_repository_name(url: str) -> str: + """Extract repository name from registry URL (last path segment).""" + parts = [p for p in url.rstrip('/').split('/') if p] + if parts: + repo_name = parts[-1] + logger.debug(f"Extracted repository name: {repo_name} from URL: {url}") + return repo_name + raise ValueError(f"Could not extract repository name from URL: {url}") + + @staticmethod + def _extract_region(url: str, auth_config: AuthConfig) -> str: + """Get AWS region from authConfig, URL, or default to us-east-1.""" + if auth_config.provider == "aws" and auth_config.aws_region: + logger.debug(f"Using explicit AWS region: {auth_config.aws_region}") + return auth_config.aws_region + aws_match = re.search(r'\.([a-z0-9-]+)\.amazonaws\.com', url) + if aws_match: + region = aws_match.group(1) + logger.debug(f"Extracted AWS region from URL: {region}") + return region + logger.debug("AWS region not found in URL, defaulting to us-east-1") + return "us-east-1" + + @staticmethod + def _extract_gcp_region(url: str) -> str: + """Extract GCP region from URL (format: us-east1, not us-east-1).""" + match = re.search(r'https://([a-z0-9-]+)-maven\.pkg\.dev', url) + if match: + region = match.group(1) + logger.debug(f"Extracted GCP region from URL: {region}") + return region + logger.warning(f"Could not extract GCP region from URL: {url}, defaulting to us-central1") + return "us-central1" + + @staticmethod + def _detect_provider(url: str, auth_config: AuthConfig) -> Optional[str]: + """Auto-detect provider from URL (Nexus/Artifactory only; AWS/GCP need explicit).""" + # If provider is explicitly set, use it + if auth_config.provider: + logger.debug(f"Using explicit provider: {auth_config.provider}") + return auth_config.provider + + url_lower = url.lower() + + # Auto-detect ONLY for on-premise registries (Nexus and Artifactory) + # AWS and GCP must be explicitly specified + + # Artifactory patterns + if "artifactory" in url_lower or "/artifactory/" in url_lower: + logger.info(f"Auto-detected provider: artifactory from URL pattern") + return "artifactory" + + # Nexus patterns + if "nexus" in url_lower or "/nexus/" in url_lower or "/service/rest/" in url_lower: + logger.info(f"Auto-detected provider: nexus from URL pattern") + return "nexus" + + # AWS and GCP require explicit provider - no auto-detection + logger.warning(f"Could not auto-detect provider from URL: {url}. AWS and GCP require explicit provider specification.") + return None + + @staticmethod + def create_maven_searcher(registry: Registry, env_creds: Optional[Dict[str, dict]]) -> 'MavenArtifactSearcher': + """Create configured MavenArtifactSearcher for this registry. + + Resolves provider, loads credentials, configures searcher. + """ + if MavenArtifactSearcher is None: + raise ImportError("qubership_pipelines_common_library not available") + + auth_config = CloudAuthHelper.resolve_auth_config(registry, "maven") + if not auth_config: + raise ValueError("Could not resolve authConfig for maven artifacts") + + registry_url = registry.maven_config.repository_domain_name + + # Try to detect provider if not explicitly set + # Auto-detection works for Nexus and Artifactory (on-premise registries) + # AWS and GCP must be explicitly specified + provider = CloudAuthHelper._detect_provider(registry_url, auth_config) + if not provider: + logger.error(f"V2 fallback: Could not determine provider for registry '{registry.name}'. Please specify provider in authConfig or use recognizable URL pattern (nexus/artifactory)") + raise ValueError(f"Could not determine provider for registry '{registry.name}'") + + if provider not in ["aws", "gcp", "artifactory", "nexus"]: + raise ValueError(f"Unsupported provider: {provider}") + + # Nexus: remove /repository/ suffix for search API compatibility + if provider == "nexus" and registry_url.endswith("/repository/"): + registry_url = registry_url[:-len("repository/")] + logger.info(f"Nexus: adjusted registry URL to {registry_url} for search API") + + # Get the credentials (or None if anonymous access is allowed) + creds = CloudAuthHelper.resolve_credentials(auth_config, env_creds) + + # Create the base searcher object - provider-specific config comes next + searcher = MavenArtifactSearcher(registry_url, params={"timeout": DEFAULT_SEARCHER_TIMEOUT}) + + # AWS/GCP require authentication (no anonymous access) + if provider in ["aws", "gcp"] and creds is None: + raise ValueError(f"{provider.upper()} requires credentials - anonymous access not supported") + + if provider == "aws": + return CloudAuthHelper._configure_aws(searcher, auth_config, creds, registry_url) + elif provider == "gcp": + return CloudAuthHelper._configure_gcp(searcher, auth_config, creds, registry_url) + elif provider == "artifactory": + return CloudAuthHelper._configure_artifactory(searcher, creds) + else: # nexus + return CloudAuthHelper._configure_nexus(searcher, creds, registry) + + @staticmethod + def _configure_aws(searcher: 'MavenArtifactSearcher', auth_config: AuthConfig, + creds: dict, registry_url: str) -> 'MavenArtifactSearcher': + """Configure searcher for AWS CodeArtifact (access key, secret, domain, region, repo).""" + if not auth_config.aws_domain: + raise ValueError("AWS auth requires awsDomain in authConfig") + region = CloudAuthHelper._extract_region(registry_url, auth_config) + repo_name = CloudAuthHelper._extract_repository_name(registry_url) + logger.info(f"Configuring AWS CodeArtifact: domain={auth_config.aws_domain}, region={region}") + return searcher.with_aws_code_artifact( + access_key=creds["username"], + secret_key=creds["password"], + domain=auth_config.aws_domain, + region_name=region, + repository=repo_name + ) + + @staticmethod + def _configure_gcp(searcher: 'MavenArtifactSearcher', auth_config: AuthConfig, + creds: dict, registry_url: str) -> 'MavenArtifactSearcher': + """Configure searcher for GCP Artifact Registry (SA JSON, project, region, repo).""" + if auth_config.auth_method != "service_account": + raise ValueError(f"GCP auth_method '{auth_config.auth_method}' not supported") + + # Extract project from authConfig or URL + project = auth_config.gcp_reg_project + if not project: + # Extract from GCP URL pattern: https://-maven.pkg.dev// + # The project is the first path segment after pkg.dev/ + match = re.search(r'pkg\.dev/([^/]+)', registry_url) + if match: + project = match.group(1) + logger.info(f"Extracted GCP project from URL: {project}") + else: + raise ValueError("GCP auth requires gcpRegProject in authConfig or valid GCP URL format (https://-maven.pkg.dev//)") + + sa_data = creds["secret"] + sa_json = json.dumps(sa_data) if isinstance(sa_data, dict) else sa_data + region = CloudAuthHelper._extract_gcp_region(registry_url) + repo_name = CloudAuthHelper._extract_repository_name(registry_url) + + logger.info(f"Configuring GCP Artifact Registry: project={project}, region={region}") + return searcher.with_gcp_artifact_registry( + credential_params={"service_account_key": sa_json}, + project=project, + region_name=region, + repository=repo_name + ) + + @staticmethod + def _configure_artifactory(searcher: 'MavenArtifactSearcher', creds: Optional[dict]) -> 'MavenArtifactSearcher': + """Set up the searcher to work with Artifactory. + + Artifactory is simpler - just username and password. + Can work anonymously if the repository allows public access. + """ + if creds is None: + logger.info("Configuring Artifactory with anonymous access (no credentials)") + return searcher.with_artifactory(username=None, password=None) + + return searcher.with_artifactory( + username=creds.get("username", ""), + password=creds.get("password", "") + ) + + @staticmethod + def _configure_nexus(searcher: 'MavenArtifactSearcher', creds: Optional[dict], registry: Registry) -> 'MavenArtifactSearcher': + """Configure searcher for Nexus (username/password or anonymous). + + Note: Library searches all Nexus repos (cannot limit to specific repo). + """ + if creds is None: + logger.info("Configuring Nexus with anonymous access (no credentials)") + return searcher.with_nexus(username=None, password=None) + + return searcher.with_nexus( + username=creds.get("username", ""), + password=creds.get("password", "") + ) + + @staticmethod + def get_gcp_access_token(service_account_json: str) -> Optional[str]: + """Generate fresh GCP OAuth access token from service account JSON.""" + if not GCP_AUTH_AVAILABLE: + return None + try: + sa_info = json.loads(service_account_json) if isinstance(service_account_json, str) else service_account_json + credentials = service_account.Credentials.from_service_account_info( + sa_info, scopes=['https://www.googleapis.com/auth/cloud-platform'] + ) + credentials.refresh(Request()) + return credentials.token + except Exception as e: + logger.error(f"Failed to generate GCP access token: {e}") + return None + + @staticmethod + def get_gcp_credentials_from_registry(registry: Registry, env_creds: Optional[Dict[str, dict]]) -> Optional[str]: + """Extract GCP service account JSON from registry for token generation.""" + auth_config = CloudAuthHelper.resolve_auth_config(registry, "maven") + if not auth_config or auth_config.provider != "gcp": + return None + try: + creds = CloudAuthHelper.resolve_credentials(auth_config, env_creds) + sa_data = creds.get("secret") + return json.dumps(sa_data) if isinstance(sa_data, dict) else sa_data + except Exception: + return None diff --git a/python/artifact-searcher/artifact_searcher/test_artifact.py b/python/artifact-searcher/artifact_searcher/test_artifact.py index e02a46435..0255334d9 100644 --- a/python/artifact-searcher/artifact_searcher/test_artifact.py +++ b/python/artifact-searcher/artifact_searcher/test_artifact.py @@ -5,7 +5,7 @@ os.environ["DEFAULT_REQUEST_TIMEOUT"] = "0.2" # for test cases to run quicker from artifact_searcher.utils import models -from artifact_searcher.artifact import check_artifact_async +from artifact_searcher.artifact import check_artifact_async, _parse_snapshot_version class MockResponse: @@ -17,7 +17,6 @@ def __init__(self, status_code): "index_path", [ ("/repository/"), - ("/service/rest/repository/browse/"), ], ) async def test_resolve_snapshot_version(aiohttp_server, index_path, monkeypatch): @@ -93,3 +92,619 @@ def mock_get(url, *args, **kwargs): sample_url = f"{base_url.rstrip('/repository/')}{index_path}repo/com/example/app/1.0.0-SNAPSHOT/app-1.0.0-20240702.123456-1.json" assert full_url == sample_url, f"expected: {sample_url}, received: {full_url}" + + +async def test_v2_registry_routes_to_cloud_auth(monkeypatch): + auth_cfg = models.AuthConfig( + credentials_id="aws-creds", + provider="aws", + auth_method="secret", + aws_domain="test-domain", + aws_region="us-east-1", + ) + mvn_cfg = models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="https://test.codeartifact.us-east-1.amazonaws.com/maven/repo/", + auth_config="aws-maven", + ) + dcr_cfg = models.DockerConfig() + reg = models.Registry( + name="aws-registry", + maven_config=mvn_cfg, + docker_config=dcr_cfg, + version="2.0", + auth_config={"aws-maven": auth_cfg}, + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=reg, + solution_descriptor=False, + ) + env_creds = {"aws-creds": {"username": "key", "password": "secret"}} + + async def mock_v2_async(*args, **kwargs): + return ("http://url", ("v2_downloaded", "/tmp/artifact.json")) + + monkeypatch.setattr("artifact_searcher.artifact._check_artifact_v2_async", mock_v2_async) + + result = await check_artifact_async(app, models.FileExtension.JSON, "1.0.0", env_creds=env_creds) + assert result is not None + assert result[1][0] == "v2_downloaded" + + +async def test_v2_registry_fallback_to_v1_on_error(monkeypatch): + """Test V2 falls back to V1 when V2 search fails""" + auth_cfg = models.AuthConfig( + credentials_id="aws-creds", + provider="aws", + auth_method="secret", + aws_domain="test-domain", + aws_region="us-east-1", + ) + mvn_cfg = models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="https://test.codeartifact.us-east-1.amazonaws.com/maven/repo/", + auth_config="aws-maven", + ) + dcr_cfg = models.DockerConfig() + reg = models.Registry( + name="aws-registry", + maven_config=mvn_cfg, + docker_config=dcr_cfg, + version="2.0", + auth_config={"aws-maven": auth_cfg}, + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=reg, + solution_descriptor=False, + ) + env_creds = {"aws-creds": {"username": "key", "password": "secret"}} + + async def mock_v2_async(*args, **kwargs): + raise Exception("V2 cloud auth failed") + + async def mock_v1_async(*args, **kwargs): + return ("http://v1-url", ("v1_repo", "v1_pointer")) + + monkeypatch.setattr("artifact_searcher.artifact._check_artifact_v2_async", mock_v2_async) + monkeypatch.setattr("artifact_searcher.artifact._check_artifact_v1_async", mock_v1_async) + + result = await check_artifact_async(app, models.FileExtension.JSON, "1.0.0", env_creds=env_creds) + assert result is not None + assert result[0] == "http://v1-url" + assert result[1][0] == "v1_repo" + + +async def test_v1_registry_skips_v2(monkeypatch): + """Test V1 registry (version=1.0) goes directly to V1 search""" + mvn_cfg = models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="https://nexus.example.com/repository/", + ) + dcr_cfg = models.DockerConfig() + reg = models.Registry( + name="nexus-registry", + maven_config=mvn_cfg, + docker_config=dcr_cfg, + version="1.0", + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=reg, + solution_descriptor=False, + ) + + v2_called = False + v1_called = False + + async def mock_v2_async(*args, **kwargs): + nonlocal v2_called + v2_called = True + return None + + async def mock_v1_async(*args, **kwargs): + nonlocal v1_called + v1_called = True + return ("http://v1-url", ("v1_repo", "v1_pointer")) + + monkeypatch.setattr("artifact_searcher.artifact._check_artifact_v2_async", mock_v2_async) + monkeypatch.setattr("artifact_searcher.artifact._check_artifact_v1_async", mock_v1_async) + + result = await check_artifact_async(app, models.FileExtension.JSON, "1.0.0") + + assert v1_called + assert not v2_called + assert result is not None + + +async def test_v2_missing_env_creds_fallback(monkeypatch): + """Test V2 with AWS/GCP but no env_creds falls back to V1""" + auth_cfg = models.AuthConfig( + credentials_id="aws-creds", + provider="aws", + auth_method="secret", + aws_domain="test-domain", + aws_region="us-east-1", + ) + mvn_cfg = models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="https://test.codeartifact.us-east-1.amazonaws.com/maven/repo/", + auth_config="aws-maven", + ) + dcr_cfg = models.DockerConfig() + reg = models.Registry( + name="aws-registry", + maven_config=mvn_cfg, + docker_config=dcr_cfg, + version="2.0", + auth_config={"aws-maven": auth_cfg}, + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=reg, + solution_descriptor=False, + ) + + async def mock_v1_async(*args, **kwargs): + return ("http://v1-url", ("v1_repo", "v1_pointer")) + + monkeypatch.setattr("artifact_searcher.artifact._check_artifact_v1_async", mock_v1_async) + + # Call without env_creds + result = await check_artifact_async(app, models.FileExtension.JSON, "1.0.0") + + assert result is not None + assert result[0] == "http://v1-url" + + +async def test_v2_missing_auth_config_fallback(monkeypatch): + """Test V2 without auth_config falls back to V1""" + mvn_cfg = models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="https://test.codeartifact.us-east-1.amazonaws.com/maven/repo/", + # No auth_config + ) + dcr_cfg = models.DockerConfig() + reg = models.Registry( + name="aws-registry", + maven_config=mvn_cfg, + docker_config=dcr_cfg, + version="2.0", + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=reg, + solution_descriptor=False, + ) + env_creds = {"aws-creds": {"username": "key", "password": "secret"}} + + async def mock_v1_async(*args, **kwargs): + return ("http://v1-url", ("v1_repo", "v1_pointer")) + + monkeypatch.setattr("artifact_searcher.artifact._check_artifact_v1_async", mock_v1_async) + + result = await check_artifact_async(app, models.FileExtension.JSON, "1.0.0", env_creds=env_creds) + + assert result is not None + assert result[0] == "http://v1-url" + + +async def test_v2_gcp_registry(monkeypatch): + """Test V2 with GCP Artifact Registry""" + auth_cfg = models.AuthConfig( + credentials_id="gcp-creds", + provider="gcp", + auth_method="service_account", + gcp_project="test-project", + gcp_location="us-central1", + gcp_repository="test-repo", + ) + mvn_cfg = models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="https://us-central1-maven.pkg.dev/test-project/test-repo/", + auth_config="gcp-maven", + ) + dcr_cfg = models.DockerConfig() + reg = models.Registry( + name="gcp-registry", + maven_config=mvn_cfg, + docker_config=dcr_cfg, + version="2.0", + auth_config={"gcp-maven": auth_cfg}, + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=reg, + solution_descriptor=False, + ) + env_creds = {"gcp-creds": {"username": "_json_key", "password": '{"type": "service_account"}'}} + + async def mock_v2_async(*args, **kwargs): + return ("http://gcp-url", ("v2_downloaded", "/tmp/artifact.json")) + + monkeypatch.setattr("artifact_searcher.artifact._check_artifact_v2_async", mock_v2_async) + + result = await check_artifact_async(app, models.FileExtension.JSON, "1.0.0", env_creds=env_creds) + + assert result is not None + assert result[1][0] == "v2_downloaded" + + +async def test_check_artifact_async_with_classifier(monkeypatch): + """Test check_artifact_async passes classifier parameter correctly""" + mvn_cfg = models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="https://nexus.example.com/repository/", + ) + dcr_cfg = models.DockerConfig() + reg = models.Registry( + name="nexus-registry", + maven_config=mvn_cfg, + docker_config=dcr_cfg, + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=reg, + solution_descriptor=False, + ) + + classifier_passed = None + + async def mock_v1_async(*args, **kwargs): + nonlocal classifier_passed + classifier_passed = kwargs.get('classifier', args[4] if len(args) > 4 else "") + return ("http://url", ("repo", "pointer")) + + monkeypatch.setattr("artifact_searcher.artifact._check_artifact_v1_async", mock_v1_async) + + await check_artifact_async(app, models.FileExtension.JSON, "1.0.0", classifier="sources") + + assert classifier_passed == "sources" + + +def test_parse_snapshot_version_with_matching_extension(): + """Test _parse_snapshot_version finds matching extension""" + metadata_xml = """ + + + + + + json + 1.0.0-20240702.123456-1 + + + + zip + 1.0.0-20240702.123456-2 + + + + + """ + + dummy_registry = models.Registry( + name="dummy", + maven_config=models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="http://dummy.repo/" + ), + docker_config=models.DockerConfig() + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=dummy_registry, + solution_descriptor=False, + ) + + result = _parse_snapshot_version(metadata_xml, app, 1, models.FileExtension.JSON, "1.0.0-SNAPSHOT") + + assert result == "1.0.0-20240702.123456-1" + + +def test_parse_snapshot_version_with_classifier(): + """Test _parse_snapshot_version finds matching extension and classifier""" + metadata_xml = """ + + + + + + json + 1.0.0-20240702.123456-1 + + + sources + json + 1.0.0-20240702.123456-2 + + + + + """ + + dummy_registry = models.Registry( + name="dummy", + maven_config=models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="http://dummy.repo/" + ), + docker_config=models.DockerConfig() + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=dummy_registry, + solution_descriptor=False, + ) + + result = _parse_snapshot_version(metadata_xml, app, 1, models.FileExtension.JSON, "1.0.0-SNAPSHOT", "sources") + + assert result == "1.0.0-20240702.123456-2" + + +def test_parse_snapshot_version_no_matching_version(): + """Test _parse_snapshot_version returns None when no match found""" + metadata_xml = """ + + + + + + zip + 1.0.0-20240702.123456-1 + + + + + """ + + dummy_registry = models.Registry( + name="dummy", + maven_config=models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="http://dummy.repo/" + ), + docker_config=models.DockerConfig() + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=dummy_registry, + solution_descriptor=False, + ) + + # Looking for JSON but only ZIP available + result = _parse_snapshot_version(metadata_xml, app, 1, models.FileExtension.JSON, "1.0.0-SNAPSHOT") + + assert result is None + + +def test_parse_snapshot_version_empty_snapshot_versions(): + """Test _parse_snapshot_version returns None when no snapshotVersions""" + metadata_xml = """ + + + + + + + """ + + dummy_registry = models.Registry( + name="dummy", + maven_config=models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="http://dummy.repo/" + ), + docker_config=models.DockerConfig() + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=dummy_registry, + solution_descriptor=False, + ) + + result = _parse_snapshot_version(metadata_xml, app, 1, models.FileExtension.JSON, "1.0.0-SNAPSHOT") + + assert result is None + + +def test_parse_snapshot_version_old_style_metadata(): + """Test _parse_snapshot_version supports old-style metadata (Maven 2 format)""" + metadata_xml = """ + + com.netcracker.cloud.code2prod.deployment-descriptor + c2p-test-sd-1 + feature-sd_public_cloud_registry_testing-SNAPSHOT + + + 20260102.092159 + 1 + + 20260102092159 + + + """ + + dummy_registry = models.Registry( + name="dummy", + maven_config=models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="http://dummy.repo/" + ), + docker_config=models.DockerConfig() + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=dummy_registry, + solution_descriptor=False, + ) + + result = _parse_snapshot_version( + metadata_xml, app, 1, models.FileExtension.JSON, + "feature-sd_public_cloud_registry_testing-SNAPSHOT" + ) + + assert result == "feature-sd_public_cloud_registry_testing-20260102.092159-1" + + +def test_parse_snapshot_version_old_style_missing_timestamp(): + """Test _parse_snapshot_version returns None when has no timestamp""" + metadata_xml = """ + + + + 1 + + + + """ + + dummy_registry = models.Registry( + name="dummy", + maven_config=models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="http://dummy.repo/" + ), + docker_config=models.DockerConfig() + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=dummy_registry, + solution_descriptor=False, + ) + + result = _parse_snapshot_version(metadata_xml, app, 1, models.FileExtension.JSON, "1.0.0-SNAPSHOT") + + assert result is None + + +def test_parse_snapshot_version_old_style_missing_buildnumber(): + """Test _parse_snapshot_version returns None when has no buildNumber""" + metadata_xml = """ + + + + 20260102.092159 + + + + """ + + dummy_registry = models.Registry( + name="dummy", + maven_config=models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="http://dummy.repo/" + ), + docker_config=models.DockerConfig() + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=dummy_registry, + solution_descriptor=False, + ) + + result = _parse_snapshot_version(metadata_xml, app, 1, models.FileExtension.JSON, "1.0.0-SNAPSHOT") + + assert result is None + + +def test_parse_snapshot_version_prefers_new_style_over_old(): + """Test _parse_snapshot_version prefers new-style when both are present""" + metadata_xml = """ + + + + 20240101.120000 + 99 + + + + + json + 1.0.0-20240702.123456-1 + + + + + """ + + dummy_registry = models.Registry( + name="dummy", + maven_config=models.MavenConfig( + target_snapshot="snapshots", + target_staging="staging", + target_release="releases", + repository_domain_name="http://dummy.repo/" + ), + docker_config=models.DockerConfig() + ) + app = models.Application( + name="test-app", + artifact_id="test-artifact", + group_id="com.test", + registry=dummy_registry, + solution_descriptor=False, + ) + + result = _parse_snapshot_version(metadata_xml, app, 1, models.FileExtension.JSON, "1.0.0-SNAPSHOT") + + # Should use new-style value, not old-style + assert result == "1.0.0-20240702.123456-1" diff --git a/python/artifact-searcher/artifact_searcher/utils/models.py b/python/artifact-searcher/artifact_searcher/utils/models.py index 8d76b6b87..1d187216f 100644 --- a/python/artifact-searcher/artifact_searcher/utils/models.py +++ b/python/artifact-searcher/artifact_searcher/utils/models.py @@ -1,9 +1,9 @@ from enum import Enum from typing import Optional +import requests from pydantic import BaseModel, ConfigDict, field_validator, Field, model_validator from pydantic.alias_generators import to_camel -import requests from artifact_searcher.utils.constants import DEFAULT_REQUEST_TIMEOUT @@ -25,6 +25,7 @@ class MavenConfig(BaseSchema): repository_domain_name: str = Field(json_schema_extra={"error_message": "Application registry does not define URL"}) snapshot_group: Optional[str] = "" release_group: Optional[str] = "" + auth_config: Optional[str] = None is_nexus: bool = False @@ -53,6 +54,21 @@ def detect_nexus(self): return self +class AuthConfig(BaseSchema): + credentials_id: Optional[str] = None + auth_type: Optional[str] = None + provider: Optional[str] = None + auth_method: Optional[str] = None + aws_region: Optional[str] = None + aws_domain: Optional[str] = None + aws_role_arn: Optional[str] = None + aws_role_session_prefix: Optional[str] = "nc-devops-m2m-session" + gcp_reg_project: Optional[str] = None + gcp_reg_pool_id: Optional[str] = None + gcp_reg_provider_id: Optional[str] = None + gcp_reg_sa_email: Optional[str] = None + + class DockerConfig(BaseSchema): snapshot_uri: Optional[str] = "" staging_uri: Optional[str] = "" @@ -114,6 +130,8 @@ class Registry(BaseSchema): npm_config: Optional[NpmConfig] = None helm_config: Optional[HelmConfig] = None helm_app_config: Optional[HelmAppConfig] = None + version: Optional[str] = "1.0" + auth_config: Optional[dict[str, AuthConfig]] = None # artifact definition diff --git a/python/artifact-searcher/pyproject.toml b/python/artifact-searcher/pyproject.toml index 131ac4b7c..f7e4bbb78 100644 --- a/python/artifact-searcher/pyproject.toml +++ b/python/artifact-searcher/pyproject.toml @@ -16,7 +16,12 @@ dependencies = [ "asyncio~=3.4.3", "aioresponses~=0.7.8", "pytest-asyncio~=1.0.0", - "pytest-aiohttp~=1.1.0" + "pytest-aiohttp~=1.1.0", + # V2 cloud registry support dependencies + "boto3>=1.39.4,<2.0.0", + "google-auth~=2.34.0", + # Maven Client library for V2 cloud registry support + "qubership-pipelines-common-library>=0.2.5,<1.0.0" ] [project.optional-dependencies] diff --git a/schemas/artifact-definition-v2.schema.json b/schemas/artifact-definition-v2.schema.json index dde0fd065..9f154cae9 100644 --- a/schemas/artifact-definition-v2.schema.json +++ b/schemas/artifact-definition-v2.schema.json @@ -109,7 +109,9 @@ "enum": [ "aws", "azure", - "gcp" + "gcp", + "artifactory", + "nexus" ], "title": "Provider", "description": "Public cloud registry type. Used in case of public cloud registries" diff --git a/schemas/regdef-v2.schema.json b/schemas/regdef-v2.schema.json index 7f1a30249..5b1b3d8f7 100644 --- a/schemas/regdef-v2.schema.json +++ b/schemas/regdef-v2.schema.json @@ -66,7 +66,9 @@ "enum": [ "aws", "azure", - "gcp" + "gcp", + "nexus", + "artifactory" ] }, "authMethod": { diff --git a/scripts/build_env/appregdef_render.py b/scripts/build_env/appregdef_render.py index 9cded545f..e43e4f4ef 100644 --- a/scripts/build_env/appregdef_render.py +++ b/scripts/build_env/appregdef_render.py @@ -36,9 +36,9 @@ src = Path(render_dir) / dir_name dst = Path(env_dir) / dir_name - if dst.exists(): - shutil.rmtree(dst) - if src.exists(): - shutil.move(src, dst) + if src.exists() and any(src.iterdir()): + dst.mkdir(parents=True, exist_ok=True) + for item in src.iterdir(): + shutil.copy2(item, dst / item.name) update_generated_versions(env_dir, BUILD_ENV_TAG, template_version) \ No newline at end of file diff --git a/scripts/build_env/env_template/process_env_template.py b/scripts/build_env/env_template/process_env_template.py index d4c91ff80..8c58df525 100644 --- a/scripts/build_env/env_template/process_env_template.py +++ b/scripts/build_env/env_template/process_env_template.py @@ -1,7 +1,9 @@ import asyncio import os +import shutil import tempfile from pathlib import Path +from typing import Optional from artifact_searcher import artifact from artifact_searcher.utils.models import FileExtension, Credentials, Registry, Application @@ -14,7 +16,8 @@ ARTIFACT_DEST = f"{tempfile.gettempdir()}/artifact.zip" -def parse_artifact_appver(env_definition: dict) -> [str, str]: +def parse_artifact_appver(env_definition: dict) -> tuple[str, str]: + """Extract artifact name and version from env_definition.yml.""" artifact_appver = env_definition.get('envTemplate', {}).get('artifact') if not artifact_appver: raise ValueError(f"Environment template artifact is empty or missing from env_definition: {env_definition}") @@ -22,7 +25,8 @@ def parse_artifact_appver(env_definition: dict) -> [str, str]: return artifact_appver.split(':') -def get_registry_creds(registry: Registry) -> Credentials: +def get_registry_creds(registry: Registry) -> Optional[Credentials]: + """Resolve V1 registry credentials. Returns None for registries without credentials.""" cred_config = render_creds() cred_id = registry.credentials_id if cred_id: @@ -36,11 +40,15 @@ def get_registry_creds(registry: Registry) -> Credentials: def parse_maven_coord_from_dd(dd_config: dict) -> tuple[str, str, str]: + """Extract Maven coordinates (groupId:artifactId:version) from deployment descriptor.""" artifact_str = dd_config['configurations'][0]['artifacts'][0].get('id') return artifact_str.split(':') def extract_snapshot_version(url: str, snapshot_version: str) -> str: + """Extract timestamped SNAPSHOT version from artifact URL. + Converts 'feature-branch-SNAPSHOT' to 'feature-branch-20250113.102430-45'. + """ base = snapshot_version.replace("-SNAPSHOT", "") filename = url.split("/")[-1] name = filename.rsplit(".", 1)[0] @@ -49,6 +57,7 @@ def extract_snapshot_version(url: str, snapshot_version: str) -> str: def validate_url(url, group_id, artifact_id, version): + """Validate artifact URL is not None/empty. Raises ValueError if invalid.""" if not url: raise ValueError( f"artifact not found group_id={group_id}, " @@ -58,18 +67,29 @@ def validate_url(url, group_id, artifact_id, version): # logic resolving template by artifact definition def resolve_artifact_new_logic(env_definition: dict, template_dest: str) -> str: + """Download environment template using artifact definition (V2-aware). + + Supports both V1 and V2 registries. For V2 cloud registries (AWS/GCP/Artifactory/Nexus), + uses CloudAuthHelper and MavenArtifactSearcher with automatic fallback to V1. + + Returns: + Resolved version string (with SNAPSHOT timestamp if applicable) + """ app_name, app_version = parse_artifact_appver(env_definition) + # Load artifact definition and credentials base_dir = getenv_with_error('CI_PROJECT_DIR') artifact_path = getAppDefinitionPath(base_dir, app_name) if not artifact_path: raise FileNotFoundError(f"No artifact definition file found for {app_name} with .yaml or .yml extension") app_def = Application.model_validate(openYaml(artifact_path)) - cred = get_registry_creds(app_def.registry) + cred = get_registry_creds(app_def.registry) # V1 credentials + env_creds = get_cred_config() # V2 credentials (Jenkins credential store) template_url = None resolved_version = app_version - dd_artifact_info = asyncio.run(artifact.check_artifact_async(app_def, FileExtension.JSON, app_version, cred)) + # Try deployment descriptor first (multi-artifact solutions) + dd_artifact_info = asyncio.run(artifact.check_artifact_async(app_def, FileExtension.JSON, app_version, cred=cred, env_creds=env_creds)) if dd_artifact_info: logger.info("Loading environment template artifact info from deployment descriptor...") dd_url, dd_repo = dd_artifact_info @@ -88,14 +108,29 @@ def resolve_artifact_new_logic(env_definition: dict, template_dest: str) -> str: template_url = artifact.check_artifact(repo_url, group_id, artifact_id, version, FileExtension.ZIP, cred) validate_url(template_url, group_id, artifact_id, version) else: + # No deployment descriptor, download ZIP directly logger.info("Loading environment template artifact from zip directly...") group_id, artifact_id, version = app_def.group_id, app_def.artifact_id, app_version - artifact_info = asyncio.run(artifact.check_artifact_async(app_def, FileExtension.ZIP, app_version, cred)) + artifact_info = asyncio.run(artifact.check_artifact_async(app_def, FileExtension.ZIP, app_version, cred=cred, env_creds=env_creds)) if artifact_info: - template_url, _ = artifact_info + template_url, repo_info = artifact_info + # V2 optimization: artifact already downloaded by MavenArtifactSearcher + if isinstance(repo_info, tuple) and len(repo_info) == 2 and repo_info[0] == "v2_downloaded": + local_path = repo_info[1] + logger.info(f"V2 artifact already downloaded at: {local_path}") + shutil.copy(local_path, ARTIFACT_DEST) # Copy to standard location + logger.info(f"Copied V2 artifact to: {ARTIFACT_DEST}") + if "-SNAPSHOT" in app_version: + resolved_version = extract_snapshot_version(template_url, app_version) + unpack_archive(ARTIFACT_DEST, template_dest) + return resolved_version # Early return - artifact already available locally + + # Standard path: validate URL and continue to HTTP download validate_url(template_url, group_id, artifact_id, version) if "-SNAPSHOT" in app_version: resolved_version = extract_snapshot_version(template_url, app_version) + + # V1 path or V2 fallback: download via HTTP logger.info(f"Environment template url has been resolved: {template_url}") artifact.download(template_url, ARTIFACT_DEST, cred) unpack_archive(ARTIFACT_DEST, template_dest) @@ -103,6 +138,7 @@ def resolve_artifact_new_logic(env_definition: dict, template_dest: str) -> str: def render_creds() -> dict: + """Render credential templates with environment variables.""" cred_config = get_cred_config() context = Context() context.env_vars.update(dict(os.environ)) @@ -163,6 +199,7 @@ def resolve_artifact_old_logic(env_definition: dict, template_dest: str) -> str: def process_env_template() -> str: + """Main entry point for template download. Routes to new or old logic based on env_definition format.""" env_template_test = os.getenv("ENV_TEMPLATE_TEST", "").lower() == "true" if env_template_test: run_env_test_setup() @@ -175,6 +212,7 @@ def process_env_template() -> str: check_dir_exist_and_create(template_dest) + # New format: uses artifact definitions (V2-aware) if 'artifact' in env_definition.get('envTemplate', {}): logger.info("Use template resolving new logic") return resolve_artifact_new_logic(env_definition, template_dest) diff --git a/scripts/build_env/main.py b/scripts/build_env/main.py index 8205d9441..c02a65496 100644 --- a/scripts/build_env/main.py +++ b/scripts/build_env/main.py @@ -1,3 +1,6 @@ +import argparse +import os + from envgenehelper import * from envgenehelper.deployer import * @@ -15,6 +18,9 @@ CLOUD_SCHEMA = "schemas/cloud.schema.json" NAMESPACE_SCHEMA = "schemas/namespace.schema.json" ENV_SPECIFIC_RESOURCE_PROFILE_SCHEMA = "schemas/resource-profile.schema.json" +APPDEF_SCHEMA = "schemas/appdef.schema.json" +REGDEF_V1_SCHEMA = "schemas/regdef.schema.json" +REGDEF_V2_SCHEMA = "schemas/regdef-v2.schema.json" def prepare_folders_for_rendering(env_name, cluster_name, source_env_dir, templates_dir, render_dir, @@ -263,6 +269,30 @@ def validate_parameter_files(param_files): return errors +def validate_appregdefs(render_dir, env_name): + appdef_dir = f"{render_dir}/{env_name}/AppDefs" + regdef_dir = f"{render_dir}/{env_name}/RegDefs" + + if os.path.exists(appdef_dir): + appdef_files = findAllYamlsInDir(appdef_dir) + if not appdef_files: + logger.info(f"No AppDef YAMLs found in {appdef_dir}") + for file in appdef_files: + logger.info(f"AppDef file: {file}") + validate_yaml_by_scheme_or_fail(file, APPDEF_SCHEMA) + + if os.path.exists(regdef_dir): + regdef_files = findAllYamlsInDir(regdef_dir) + if not regdef_files: + logger.info(f"No RegDef YAMLs found in {regdef_dir}") + for file in regdef_files: + logger.info(f"Validating RegDef file: {file}") + regdef_content = openYaml(file) + version = str(regdef_content.get('version', '1.0')) + schema_path = REGDEF_V2_SCHEMA if version != '1.0' else REGDEF_V1_SCHEMA + validate_yaml_by_scheme_or_fail(file, schema_path) + + def render_environment(env_name, cluster_name, templates_dir, all_instances_dir, output_dir, work_dir): logger.info(f'env: {env_name}') logger.info(f'cluster_name: {cluster_name}') diff --git a/scripts/build_env/process_sd.py b/scripts/build_env/process_sd.py index 9aaba559b..2b3b73c2a 100644 --- a/scripts/build_env/process_sd.py +++ b/scripts/build_env/process_sd.py @@ -10,6 +10,7 @@ import yaml from artifact_searcher import artifact from artifact_searcher.utils import models as artifact_models +from envgenehelper import get_cred_config from envgenehelper.business_helper import getenv_and_log, getenv_with_error from envgenehelper.env_helper import Environment from envgenehelper.file_helper import identify_yaml_extension @@ -279,34 +280,65 @@ def download_sds_with_version(env, base_sd_path, sd_version, effective_merge_mod app_def_getter_plugins = PluginEngine(plugins_dir='/module/scripts/handle_sd_plugins/app_def_getter') sd_data_list = [] - for entry in sd_entries: # appvers + for entry in sd_entries: if ":" not in entry: logger.error(f"Invalid SD_VERSION format: '{entry}'. Expected 'name:version'") exit(1) source_name, version = entry.split(":", 1) - logger.info(f"Starting download of SD: {source_name}-{version}") - - sd_data = download_sd_by_appver(source_name, version, app_def_getter_plugins) - + sd_data = download_sd_by_appver(source_name, version, app_def_getter_plugins, env) sd_data_list.append(sd_data) sd_data_json = json.dumps(sd_data_list) extract_sds_from_json(env, base_sd_path, sd_data_json, effective_merge_mode) -def download_sd_by_appver(app_name: str, version: str, plugins: PluginEngine) -> dict[str, object]: - if 'SNAPSHOT' in version: - raise ValueError("SNAPSHOT is not supported version of Solution Descriptor artifacts") - # TODO: check if job would fail without plugins +def download_sd_by_appver(app_name: str, version: str, plugins: PluginEngine, env: Environment = None) -> dict[str, object]: + """Download SD by app name and version with V2 support. + + Args: + app_name: Application name + version: Application version + plugins: Plugin engine for app def resolution + env: Environment object (optional, for V2 support) + + Returns: + SD data dictionary + """ + logger.info(f"Starting download of SD: {app_name}-{version}") app_def = get_appdef_for_app(f"{app_name}:{version}", app_name, plugins) - artifact_info = asyncio.run(artifact.check_artifact_async(app_def, artifact.FileExtension.JSON, version)) + # Use existing get_cred_config() utility for credentials + env_creds = get_cred_config() + artifact_info = asyncio.run(artifact.check_artifact_async(app_def, artifact.FileExtension.JSON, version, env_creds=env_creds)) if not artifact_info: raise ValueError( f'Solution descriptor content was not received for {app_name}:{version}') - sd_url, _ = artifact_info - return artifact.download_json_content(sd_url) + sd_url, mvn_repo = artifact_info + mvn_repo_value, mvn_repo_extra = mvn_repo + + # V2 optimization: read from local file if already downloaded + if mvn_repo_value == "v2_downloaded": + logger.debug(f"Reading V2 solution descriptor from local file: {mvn_repo_extra}") + with open(mvn_repo_extra, 'r') as f: + sd_data = json.load(f) + logger.info(f"Successfully downloaded SD: {app_name}-{version}") + return sd_data + + # V1 fallback path or non-V2 registry - need credentials for HTTP download + cred = None + if app_def.registry.credentials_id and env_creds: + cred_data = env_creds.get(app_def.registry.credentials_id) + if cred_data and cred_data.get('username'): + cred = artifact_models.Credentials( + username=cred_data.get('username', ''), + password=cred_data.get('password', '') + ) + logger.debug(f"Using credentials '{app_def.registry.credentials_id}' for SD download") + + sd_data = artifact.download_json_content(sd_url, cred) + logger.info(f"Successfully downloaded SD: {app_name}-{version}") + return sd_data def get_appdef_for_app(appver: str, app_name: str, plugins: PluginEngine) -> artifact_models.Application: diff --git a/scripts/build_env/tests/env-template/test_env_template.py b/scripts/build_env/tests/env-template/test_env_template.py index 1d0256351..4e435669b 100644 --- a/scripts/build_env/tests/env-template/test_env_template.py +++ b/scripts/build_env/tests/env-template/test_env_template.py @@ -1,10 +1,14 @@ from os import environ from pathlib import Path +from unittest.mock import patch import pytest import responses from aioresponses import aioresponses -from env_template.process_env_template import process_env_template +from env_template.process_env_template import ( + process_env_template, + extract_snapshot_version +) from envgenehelper.test_helpers import TestHelpers GROUP_ID = "org.qubership" @@ -209,3 +213,47 @@ def test_old_logic_with_zip(self): assert len(responses.calls) == 4 assert responses.calls[3].request.url == tmpl_zip_url + + def test_extract_snapshot_version_with_snapshot(self): + """Test snapshot version extraction from URL""" + url = f"{SNAPSHOT_BASE}/{BASE_PATH}/{ARTIFACT_NAME}.json" + snapshot_version = VERSION + + result = extract_snapshot_version(url, snapshot_version) + + assert result == SNAPSHOT_VERSION + assert "-SNAPSHOT" not in result + assert SNAPSHOT_TIMESTAMP in result + + def test_extract_snapshot_version_without_snapshot(self): + """Test version extraction from non-snapshot URL""" + release_version = "1.0.0" + url = f"{STAGING_BASE}/{GROUP_PATH}/{ARTIFACT_ID}/{release_version}/{ARTIFACT_ID}-{release_version}.zip" + + result = extract_snapshot_version(url, release_version) + + assert result == release_version + + @responses.activate + @patch('env_template.process_env_template.get_cred_config') + def test_new_logic_with_v2_credentials(self, mock_get_creds, mock_aio_response): + """Test new logic with RegDef v2 credentials passed through""" + set_env("env-01") + + mock_get_creds.return_value = { + 'artifactory-cred': {'type': 'usernamePassword', 'data': {'username': 'test-user', 'password': 'test-pass'}}, + 'aws-creds': {'type': 'usernamePassword', 'data': {'username': 'key', 'password': 'secret'}} + } + + mock_metadata(mock_aio_response) + mock_dd_exists(mock_aio_response, exists=True) + mock_dd_response() + mock_zip(STAGING_ZIP_URL) + + with patch('env_template.process_env_template.artifact.check_artifact_async') as mock_check: + mock_check.return_value = (DD_URL, ("repo", "pointer")) + + process_env_template() + + # Verify get_cred_config was called + assert mock_get_creds.called diff --git a/scripts/build_env/tests/sd/test_process_sd_artifact.py b/scripts/build_env/tests/sd/test_process_sd_artifact.py index 071308c21..7a4485a59 100644 --- a/scripts/build_env/tests/sd/test_process_sd_artifact.py +++ b/scripts/build_env/tests/sd/test_process_sd_artifact.py @@ -50,6 +50,8 @@ def test_sd_positive(mock_download_sd, test_case_name): file_path = Path(TEST_SD_DIR, test_case_name, f"mock_sd.json") sd_data = openJson(file_path) + + # Mock synchronous function to return SD data directly mock_download_sd.return_value = sd_data handle_sd(env, sd_source_type, sd_version, sd_data, sd_delta, sd_merge_mode) @@ -72,6 +74,8 @@ def test_sd_negative(mock_download_sd, test_case_name, expected_exception): file_path = Path(TEST_SD_DIR, test_case_name, f"mock_sd.json") sd_data = openJson(file_path) + + # Mock synchronous function to return SD data directly mock_download_sd.return_value = sd_data with pytest.raises(expected_exception): diff --git a/scripts/build_env/tests/sd/test_process_sd_local.py b/scripts/build_env/tests/sd/test_process_sd_local.py index d6a4a8082..1a670fb81 100644 --- a/scripts/build_env/tests/sd/test_process_sd_local.py +++ b/scripts/build_env/tests/sd/test_process_sd_local.py @@ -1,6 +1,7 @@ import os import pytest +from unittest.mock import patch, MagicMock from ruamel.yaml import YAML from test_sd_helpers import do_prerequisites, assert_sd_contents, load_test_pipeline_sd_data @@ -9,7 +10,7 @@ os.environ['CLUSTER_NAME'] = "temporary" os.environ['CI_PROJECT_DIR'] = "temporary" -from process_sd import handle_sd +from process_sd import handle_sd, download_sd_by_appver from envgenehelper import * from envgenehelper.env_helper import Environment @@ -54,3 +55,32 @@ def test_sd_positive(test_case_name): assert_sd_contents(TEST_SD_DIR, OUTPUT_DIR, test_case_name, actual_dir, test_suits_map) logger.info(f"=====SUCCESS - {test_case_name}======") + + +@patch('process_sd.get_cred_config') +@patch('artifact_searcher.artifact.check_artifact_async') +@patch('process_sd.get_appdef_for_app') +def test_download_sd_uses_get_cred_config(mock_get_appdef, mock_check_artifact, mock_get_creds): + """Test that download_sd_by_appver uses existing get_cred_config utility""" + mock_get_creds.return_value = { + 'aws-creds': {'type': 'usernamePassword', 'data': {'username': 'key', 'password': 'secret'}} + } + mock_app_def = MagicMock() + mock_app_def.registry.credentials_id = None + mock_get_appdef.return_value = mock_app_def + + # Mock the async check_artifact_async to return a future + async def mock_check_return(): + return ("http://sd-url", ("repo", "/tmp/sd.json")) + mock_check_artifact.return_value = mock_check_return() + + with patch('process_sd.open', create=True) as mock_open: + mock_open.return_value.__enter__.return_value.read.return_value = '{"applications": []}' + + try: + env = Environment("/test/path", "test-cluster", "test-env") + download_sd_by_appver("test-app", "1.0.0", MagicMock(), env) + except: + pass + + assert mock_get_creds.called