diff --git a/cve_bin_tool/data_sources/alma_source.py b/cve_bin_tool/data_sources/alma_source.py new file mode 100644 index 0000000000..178c068245 --- /dev/null +++ b/cve_bin_tool/data_sources/alma_source.py @@ -0,0 +1,293 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +Alma Linux Errata Data Source + +This module fetches security errata from AlmaLinux to help identify +CVEs that have been patched through backports, reducing false positives. +""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path + +import aiohttp + +from cve_bin_tool.async_utils import FileIO, RateLimiter +from cve_bin_tool.data_sources import Data_Source +from cve_bin_tool.database_defaults import ( + DISK_LOCATION_BACKUP, + DISK_LOCATION_DEFAULT, +) +from cve_bin_tool.error_handler import ErrorMode +from cve_bin_tool.log import LOGGER +from cve_bin_tool.version import HTTP_HEADERS + +logging.basicConfig(level=logging.DEBUG) + + +class Alma_Source(Data_Source): + """ + Data source for fetching security errata from AlmaLinux. + + AlmaLinux backports security fixes without changing version numbers. + This data source helps identify which CVEs have been patched, + reducing false positives when scanning AlmaLinux systems. + """ + + SOURCE = "ALMA" + CACHEDIR = DISK_LOCATION_DEFAULT + BACKUPCACHEDIR = DISK_LOCATION_BACKUP + LOGGER = LOGGER.getChild("CVEDB") + + # Alma Linux errata URLs for different versions + ERRATA_URL_9 = "https://errata.almalinux.org/9/errata.json" + ERRATA_URL_8 = "https://errata.almalinux.org/8/errata.json" + + def __init__(self, error_mode=ErrorMode.TruncTrace): + """ + Initialize the Alma Linux data source. + + Args: + error_mode: How to handle errors (default: truncated traceback) + """ + self.cachedir = self.CACHEDIR + self.backup_cachedir = self.BACKUPCACHEDIR + self.error_mode = error_mode + self.source_name = self.SOURCE + + # Will hold the raw downloaded data + self.errata_data = [] + + # Will hold the processed CVE data + self.severity_data = [] + self.affected_data = [] + + # HTTP session for downloads + self.session = None + + async def get_cve_data(self): + """ + Main entry point. Fetches and processes Alma Linux errata. + + Returns: + Tuple of ((severity_data, affected_data), source_name) + """ + try: + await self.fetch_errata() + self.process_errata() + except Exception as e: + LOGGER.error(f"Error fetching Alma Linux errata: {e}") + if self.session is not None: + await self.session.close() + return ([], []), self.source_name + + return (self.severity_data, self.affected_data), self.source_name + + async def fetch_errata(self): + """ + Download errata JSON files from Alma Linux. + """ + LOGGER.info("Fetching Alma Linux errata...") + + # Create HTTP session if needed + if not self.session: + connector = aiohttp.TCPConnector(limit_per_host=19) + self.session = RateLimiter( + aiohttp.ClientSession( + connector=connector, headers=HTTP_HEADERS, trust_env=True + ) + ) + + # Download errata for AlmaLinux 9 + await self.download_errata(self.ERRATA_URL_9, "alma9_errata.json") + + # Download errata for AlmaLinux 8 + await self.download_errata(self.ERRATA_URL_8, "alma8_errata.json") + + # Close the session + await self.session.close() + self.session = None + + async def download_errata(self, url: str, filename: str): + """ + Download a single errata file and save to cache. + + Args: + url: The URL to download from + filename: Name to save the file as + """ + LOGGER.debug(f"Downloading {url}") + + try: + # Try aiohttp first + async with await self.session.get(url) as response: + response.raise_for_status() + data = await response.json() + + # Save to cache + filepath = Path(self.cachedir) / filename + async with FileIO(filepath, "w") as f: + await f.write(json.dumps(data, indent=2)) + + # Add to our errata data + self.errata_data.extend(data) + + LOGGER.info(f"Downloaded {len(data)} advisories from {filename}") + + except aiohttp.ClientConnectorError as e: + # Fallback to urllib if aiohttp has DNS issues (common on Windows) + LOGGER.debug(f"aiohttp failed, trying urllib fallback: {e}") + await self.download_with_urllib(url, filename) + + except Exception as e: + LOGGER.warning(f"aiohttp failed with {e}, trying urllib fallback") + await self.download_with_urllib(url, filename) + + async def download_with_urllib(self, url: str, filename: str): + """ + Fallback download method using urllib. + + Args: + url: The URL to download from + filename: Name to save the file as + """ + try: + import urllib.request + + LOGGER.debug(f"Downloading {url} with urllib fallback") + + with urllib.request.urlopen(url, timeout=120) as response: # nosec B310 + raw_data = response.read().decode("utf-8") + data = json.loads(raw_data) + + # Save to cache + filepath = Path(self.cachedir) / filename + with open(filepath, "w", encoding="utf-8") as f: + f.write(json.dumps(data, indent=2)) + + # Add to our errata data + self.errata_data.extend(data) + + LOGGER.info(f"Downloaded {len(data)} advisories from {filename} (urllib)") + + except Exception as fallback_error: + LOGGER.warning(f"Failed to download {url}: {fallback_error}") + + def process_errata(self): + """ + Process the downloaded errata and extract CVE information. + + This converts Alma Linux errata format to cve-bin-tool format. + """ + LOGGER.info(f"Processing {len(self.errata_data)} Alma Linux advisories...") + + self.severity_data = [] + self.affected_data = [] + + for advisory in self.errata_data: + # Get CVEs from the references + cve_ids = self.extract_cves(advisory) + + if not cve_ids: + continue + + # Get packages affected by this advisory + packages = self.extract_packages(advisory) + + # Get severity + severity = advisory.get("severity", "unknown") + if severity: + severity = severity.upper() + + # Create entries for each CVE + for cve_id in cve_ids: + # Add severity data + cve_entry = { + "ID": cve_id, + "severity": severity if severity else "unknown", + "description": advisory.get("description", "unknown"), + "score": "unknown", # Alma doesn't provide scores + "CVSS_version": "unknown", + "CVSS_vector": "unknown", + "last_modified": str( + advisory.get("updated_date", {}).get("$date", "") + ), + } + self.severity_data.append(cve_entry) + + # Add affected data for each package + for pkg in packages: + affected = { + "cve_id": cve_id, + "vendor": "almalinux", + "product": pkg["name"], + "version": pkg["version"], + "versionStartIncluding": "", + "versionStartExcluding": "", + "versionEndIncluding": "", + "versionEndExcluding": "", + } + self.affected_data.append(affected) + + LOGGER.info( + f"Processed {len(self.severity_data)} CVE entries, " + f"{len(self.affected_data)} affected packages" + ) + + def extract_cves(self, advisory: dict) -> list: + """ + Extract CVE IDs from an advisory's references. + + Args: + advisory: The advisory dictionary + + Returns: + List of CVE IDs (e.g., ["CVE-2022-1271", "CVE-2022-1272"]) + """ + cve_ids = [] + + references = advisory.get("references", []) + for ref in references: + if ref.get("type") == "cve": + cve_id = ref.get("id") + if cve_id: + cve_ids.append(cve_id) + + return cve_ids + + def extract_packages(self, advisory: dict) -> list: + """ + Extract package information from an advisory. + + Args: + advisory: The advisory dictionary + + Returns: + List of package dicts with 'name' and 'version' keys + """ + packages = [] + seen = set() # Avoid duplicates + + pkglist = advisory.get("pkglist", {}) + pkg_entries = pkglist.get("packages", []) + + for pkg in pkg_entries: + name = pkg.get("name") + version = pkg.get("version") + + if name and version: + key = f"{name}-{version}" + if key not in seen: + seen.add(key) + packages.append( + { + "name": name, + "version": version, + } + ) + + return packages diff --git a/test/alma/sample_errata.json b/test/alma/sample_errata.json new file mode 100644 index 0000000000..07cd5bfb43 --- /dev/null +++ b/test/alma/sample_errata.json @@ -0,0 +1,118 @@ +[ + { + "updateinfo_id": "ALSA-2022:4940", + "title": "Important: xz security update", + "type": "security", + "severity": "Important", + "description": "XZ Utils security fix for CVE-2022-1271", + "updated_date": { + "$date": 1678210340000 + }, + "references": [ + { + "type": "bugzilla", + "id": "2073310", + "href": "https://bugzilla.redhat.com/2073310" + }, + { + "type": "cve", + "id": "CVE-2022-1271", + "title": "CVE-2022-1271" + }, + { + "type": "self", + "id": "ALSA-2022:4940" + } + ], + "pkglist": { + "name": "almalinux-9", + "packages": [ + { + "name": "xz", + "version": "5.2.5", + "release": "8.el9_0", + "arch": "x86_64" + }, + { + "name": "xz-libs", + "version": "5.2.5", + "release": "8.el9_0", + "arch": "x86_64" + } + ] + } + }, + { + "updateinfo_id": "ALSA-2022:5244", + "title": "Moderate: expat security update", + "type": "security", + "severity": "Moderate", + "description": "Expat security fix", + "updated_date": { + "$date": 1658424899000 + }, + "references": [ + { + "type": "cve", + "id": "CVE-2022-25313", + "title": "CVE-2022-25313" + }, + { + "type": "cve", + "id": "CVE-2022-25314", + "title": "CVE-2022-25314" + } + ], + "pkglist": { + "name": "almalinux-9", + "packages": [ + { + "name": "expat", + "version": "2.2.10", + "release": "12.el9_0.2", + "arch": "x86_64" + }, + { + "name": "expat-devel", + "version": "2.2.10", + "release": "12.el9_0.2", + "arch": "x86_64" + } + ] + } + }, + { + "updateinfo_id": "ALSA-2023:6745", + "title": "Critical: curl security update", + "type": "security", + "severity": "Critical", + "description": "Curl security fix for heap buffer overflow", + "updated_date": { + "$date": 1699401600000 + }, + "references": [ + { + "type": "cve", + "id": "CVE-2023-38545", + "title": "CVE-2023-38545" + } + ], + "pkglist": { + "name": "almalinux-9", + "packages": [ + { + "name": "curl", + "version": "7.76.1", + "release": "26.el9_3.2", + "arch": "x86_64" + }, + { + "name": "libcurl", + "version": "7.76.1", + "release": "26.el9_3.2", + "arch": "x86_64" + } + ] + } + } +] \ No newline at end of file diff --git a/test/test_source_alma.py b/test/test_source_alma.py new file mode 100644 index 0000000000..e0b7e32f3c --- /dev/null +++ b/test/test_source_alma.py @@ -0,0 +1,180 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: GPL-3.0-or-later + +"""Tests for Alma Linux data source.""" + +import json +from pathlib import Path + +import pytest + +from cve_bin_tool.data_sources import alma_source + + +class TestSourceAlma: + """Test cases for Alma_Source class.""" + + @classmethod + def setup_class(cls): + """Set up test fixtures.""" + cls.alma = alma_source.Alma_Source() + cls.test_data_path = Path(__file__).parent.resolve() / "alma" + + # Load test data directly instead of downloading + with open(cls.test_data_path / "sample_errata.json") as f: + cls.alma.errata_data = json.load(f) + + def test_extract_cves(self): + """Test CVE extraction from advisory references.""" + # Test advisory with single CVE + advisory_single = { + "references": [ + {"type": "bugzilla", "id": "12345"}, + {"type": "cve", "id": "CVE-2022-1271"}, + {"type": "self", "id": "ALSA-2022:4940"}, + ] + } + cves = self.alma.extract_cves(advisory_single) + assert len(cves) == 1 + assert cves[0] == "CVE-2022-1271" + + # Test advisory with multiple CVEs + advisory_multiple = { + "references": [ + {"type": "cve", "id": "CVE-2022-25313"}, + {"type": "cve", "id": "CVE-2022-25314"}, + ] + } + cves = self.alma.extract_cves(advisory_multiple) + assert len(cves) == 2 + assert "CVE-2022-25313" in cves + assert "CVE-2022-25314" in cves + + # Test advisory with no CVEs + advisory_none = {"references": [{"type": "bugzilla", "id": "12345"}]} + cves = self.alma.extract_cves(advisory_none) + assert len(cves) == 0 + + def test_extract_packages(self): + """Test package extraction from advisory.""" + advisory = { + "pkglist": { + "name": "almalinux-9", + "packages": [ + { + "name": "xz", + "version": "5.2.5", + "release": "8.el9_0", + "arch": "x86_64", + }, + { + "name": "xz-libs", + "version": "5.2.5", + "release": "8.el9_0", + "arch": "x86_64", + }, + # Duplicate should be filtered out + { + "name": "xz", + "version": "5.2.5", + "release": "8.el9_0", + "arch": "i686", + }, + ], + } + } + packages = self.alma.extract_packages(advisory) + + # Should have 2 unique packages (xz and xz-libs, not duplicate xz) + assert len(packages) == 2 + + # Check package structure + for pkg in packages: + assert "name" in pkg + assert "version" in pkg + + def test_process_errata(self): + """Test full errata processing.""" + # Process the test data + self.alma.process_errata() + + # Check that data was processed + assert len(self.alma.severity_data) > 0 + assert len(self.alma.affected_data) > 0 + + # Test data has 3 advisories with 4 total CVEs + # Advisory 1: CVE-2022-1271 (1 CVE) + # Advisory 2: CVE-2022-25313, CVE-2022-25314 (2 CVEs) + # Advisory 3: CVE-2023-38545 (1 CVE) + assert len(self.alma.severity_data) == 4 + + def test_severity_mapping(self): + """Test that severities are properly uppercased.""" + self.alma.process_errata() + + for entry in self.alma.severity_data: + # Severity should be uppercase + assert entry["severity"] == entry["severity"].upper() + # Should be one of the expected values + assert entry["severity"] in [ + "CRITICAL", + "IMPORTANT", + "MODERATE", + "LOW", + "UNKNOWN", + ] + + def test_affected_data_structure(self): + """Test that affected data has correct structure.""" + self.alma.process_errata() + + for entry in self.alma.affected_data: + # Check required fields + assert "cve_id" in entry + assert "vendor" in entry + assert "product" in entry + assert "version" in entry + + # Vendor should be almalinux + assert entry["vendor"] == "almalinux" + + # CVE ID should start with CVE- + assert entry["cve_id"].startswith("CVE-") + + # Version should not be empty + assert entry["version"] is not None + assert entry["version"] != "" + + def test_cve_entry_structure(self): + """Test that CVE entries have correct structure.""" + self.alma.process_errata() + + for entry in self.alma.severity_data: + # Check required fields + assert "ID" in entry + assert "severity" in entry + assert "description" in entry + + # ID should be a CVE + assert entry["ID"].startswith("CVE-") + + @pytest.mark.asyncio + async def test_get_cve_data_with_local_data(self): + """Test get_cve_data returns correct format.""" + # Reset and reload test data + self.alma.errata_data = [] + with open(self.test_data_path / "sample_errata.json") as f: + self.alma.errata_data = json.load(f) + + # Process the data + self.alma.process_errata() + + # Check return format matches expected structure + result = (self.alma.severity_data, self.alma.affected_data) + + severity_data, affected_data = result + + assert isinstance(severity_data, list) + assert isinstance(affected_data, list) + assert len(severity_data) > 0 + assert len(affected_data) > 0