diff --git a/src/rhsm/profile.py b/src/rhsm/profile.py index 374df83b52..5f3cdfa37c 100644 --- a/src/rhsm/profile.py +++ b/src/rhsm/profile.py @@ -22,6 +22,8 @@ from iniparse import SafeConfigParser, ConfigParser from cloud_what import provider +from hawkey import split_nevra + try: import dnf except ImportError: @@ -37,6 +39,16 @@ except ImportError: yum = None +try: + import gi + + gi.require_version("OSTree", "1.0") + from gi.repository import OSTree + + ostree_available = True +except (ImportError, ValueError): + ostree_available = False + use_zypper: bool = importlib.util.find_spec("zypp_plugin") is not None if use_zypper: @@ -304,7 +316,14 @@ class Package: """ def __init__( - self, name: str, version: str, release: str, arch: str, epoch: int = 0, vendor: str = None + self, + name: str, + version: str, + release: str, + arch: str, + epoch: int = 0, + vendor: str = None, + persistence: str = None, ) -> None: self.name: str = name self.version: str = version @@ -312,17 +331,20 @@ def __init__( self.arch: str = arch self.epoch: int = epoch self.vendor: str = vendor + self.persistence: str = persistence def to_dict(self) -> dict: """Returns a dict representation of this package info.""" - return { + result = { "name": self._normalize_string(self.name), "version": self._normalize_string(self.version), "release": self._normalize_string(self.release), "arch": self._normalize_string(self.arch), "epoch": self._normalize_string(self.epoch), "vendor": self._normalize_string(self.vendor), # bz1519512 handle vendors that aren't utf-8 + "persistence": self._normalize_string(self.persistence), } + return result def __eq__(self, other: "Package") -> bool: """ @@ -338,6 +360,7 @@ def __eq__(self, other: "Package") -> bool: and self.arch == other.arch and self.epoch == other.epoch and self._normalize_string(self.vendor) == self._normalize_string(other.vendor) + and self.persistence == other.persistence ): return True @@ -354,6 +377,132 @@ def _normalize_string(value: Union[str, bytes]) -> str: return value +def parse_rpm_string(rpm_string: str) -> dict | None: + """ + Parses a standard RPM package string into its NVR components using hawkey. + + Args: + rpm_string (str): The full package string to parse. + Example: "NetworkManager-cloud-setup-1:1.54.0-2.fc43.x86_64" + + Returns: + dict | None: A dictionary with the following keys if the match is successful: + - 'name': The package name (including any internal hyphens). + - 'version': The version string (e.g., '1.54.0'). + - 'epoch': The epoch string (e.g., '1'). + - 'release': The release string (e.g., '2.fc43'). + - 'arch': The architecture (e.g., 'x86_64', 'noarch'). + Returns None if the string does not follow the expected RPM format. + """ + if not rpm_string or not isinstance(rpm_string, str): + return None + + try: + nevra = split_nevra(rpm_string.strip()) + + if not nevra.name or not nevra.version: + return None + + return { + "name": str(nevra.name), + "version": str(nevra.version), + "epoch": int(nevra.epoch), + "release": str(nevra.release), + "arch": str(nevra.arch), + } + + except Exception as e: + logging.debug(f"Failed to parse rpm nevra string '{rpm_string}': {e}") + return None + + +def _is_ostree_system() -> bool: + """ + Check if the current system is running on ostree (bootc/silverblue/coreos). + """ + if not ostree_available: + return False + try: + sysroot = OSTree.Sysroot.new_default() + sysroot.load(None) + return sysroot.get_booted_deployment() is not None + except Exception as e: + log.debug(f"Failed to detect ostree system: {e}") + return False + + +def _get_immutable_packages() -> set: + """ + Get the set of packages from the immutable ostree deployment. + For bootc systems, uses rpm-ostree to get the true base commit packages. + Returns a set of tuples (name, version, epoch, release). + + The Python OSTree API does not provide information abot packages, this is why + this function calls the rpm-ostree tool and parses its output to get the need information. + """ + immutable_packages = set() + + try: + import subprocess + + # Get rpm-ostree status to find the base commits + result = subprocess.run( + ["rpm-ostree", "status", "--json"], capture_output=True, text=True, check=True + ) + status = json.loads(result.stdout) + + deployments = status.get("deployments", []) + if not deployments: + log.debug("No deployments found in rpm-ostree status") + return immutable_packages + + # Use deployments[0] since it's the most recent + base_checksum = deployments[0].get("checksum") + if not base_checksum: + log.debug("No base checksum found") + return immutable_packages + + log.debug(f"Using checksum: {base_checksum[:10]} to get for immutable packages") + + # Use rpm-ostree db list to get packages from the base_checksum + result = subprocess.run( + ["rpm-ostree", "db", "list", base_checksum], capture_output=True, text=True, check=True + ) + + # Skip first line since there is returned the consulted base_checksum + for line in result.stdout.strip().split("\n")[1:]: + line = line.strip() + + try: + package_dict = parse_rpm_string(line) + # parse_rpm_string returned None or an empty dict; skip this malformed line + if not package_dict: + continue + immutable_packages.add( + ( + package_dict["name"], + package_dict["version"], + package_dict["epoch"], + package_dict["release"], + ) + ) + + except (ValueError, IndexError) as e: + log.debug(f"Failed to parse package line '{line}': {e}") + continue + + log.debug(f"Found {len(immutable_packages)} packages in base ostree commit {base_checksum[:10]}") + + except subprocess.CalledProcessError as e: + log.debug(f"rpm-ostree command failed: {e}") + except ImportError: + log.debug("subprocess module not available") + except Exception as e: + log.debug(f"Failed to get immutable packages via rpm-ostree: {e}") + + return immutable_packages + + class RPMProfile: def __init__(self, from_file: _io.TextIOWrapper = None) -> None: """ @@ -375,6 +524,7 @@ def __init__(self, from_file: _io.TextIOWrapper = None) -> None: arch=pkg_dict["arch"], epoch=pkg_dict["epoch"], vendor=pkg_dict["vendor"], + persistence=pkg_dict.get("persistence") or "persistent", ) ) else: @@ -393,20 +543,35 @@ def _accumulate_profile(rpm_header_list: List[dict]) -> List[Package]: """ pkg_list = [] + + # Check if we're on an ostree system and get immutable packages if so + is_ostree = _is_ostree_system() + immutable_packages = set() + if is_ostree: + immutable_packages = _get_immutable_packages() + log.debug(f"Running on ostree system with {len(immutable_packages)} persistent packages") + for h in rpm_header_list: if h["name"] == "gpg-pubkey": # dbMatch includes imported gpg keys as well # skip these for now as there isn't compelling # reason for server to know this info continue + + epoch = h["epoch"] or 0 + package_info = (h["name"], h["version"], epoch, h["release"]) + pkg_list.append( Package( name=h["name"], version=h["version"], release=h["release"], arch=h["arch"], - epoch=h["epoch"] or 0, + epoch=epoch, vendor=h["vendor"] or None, + persistence=( + "persistent" if (not is_ostree or package_info in immutable_packages) else "transient" + ), ) ) return pkg_list diff --git a/test/rhsm/functional/test_profile.py b/test/rhsm/functional/test_profile.py index 7c005fd69b..aa5a0aeabd 100644 --- a/test/rhsm/functional/test_profile.py +++ b/test/rhsm/functional/test_profile.py @@ -38,6 +38,7 @@ def test_get_rpm_profile(self): self.assertTrue("epoch" in pkg) self.assertTrue("arch" in pkg) self.assertTrue("vendor" in pkg) + self.assertTrue("persistence" in pkg) def test_package_objects(self): profile = get_profile("rpm") @@ -49,8 +50,8 @@ def test_get_profile_bad_type(self): def test_load_profile_from_file(self): dummy_pkgs = [ - Package(name="package1", version="1.0.0", release="1", arch="x86_64"), - Package(name="package2", version="2.0.0", release="2", arch="x86_64"), + Package(name="package1", version="1.0.0", release="1", arch="x86_64", persistence="persistent"), + Package(name="package2", version="2.0.0", release="2", arch="x86_64", persistence="persistent"), ] profile = self._mock_pkg_profile(dummy_pkgs) self.assertEqual(2, len(profile.packages)) @@ -72,16 +73,16 @@ def _mock_pkg_profile(self, packages): def test_equality_different_object_type(self): dummy_pkgs = [ - Package(name="package1", version="1.0.0", release="1", arch="x86_64"), - Package(name="package2", version="2.0.0", release="2", arch="x86_64"), + Package(name="package1", version="1.0.0", release="1", arch="x86_64", persistence="persistent"), + Package(name="package2", version="2.0.0", release="2", arch="x86_64", persistence="persistent"), ] profile = self._mock_pkg_profile(dummy_pkgs) self.assertFalse(profile == "hello") def test_equality_no_change(self): dummy_pkgs = [ - Package(name="package1", version="1.0.0", release="1", arch="x86_64"), - Package(name="package2", version="2.0.0", release="2", arch="x86_64"), + Package(name="package1", version="1.0.0", release="1", arch="x86_64", persistence="persistent"), + Package(name="package2", version="2.0.0", release="2", arch="x86_64", persistence="persistent"), ] profile = self._mock_pkg_profile(dummy_pkgs) @@ -90,19 +91,21 @@ def test_equality_no_change(self): def test_equality_packages_added(self): dummy_pkgs = [ - Package(name="package1", version="1.0.0", release="1", arch="x86_64"), - Package(name="package2", version="2.0.0", release="2", arch="x86_64"), + Package(name="package1", version="1.0.0", release="1", arch="x86_64", persistence="persistent"), + Package(name="package2", version="2.0.0", release="2", arch="x86_64", persistence="persistent"), ] profile = self._mock_pkg_profile(dummy_pkgs) - dummy_pkgs.append(Package(name="package3", version="3.0.0", release="2", arch="x86_64")) + dummy_pkgs.append( + Package(name="package3", version="3.0.0", release="2", arch="x86_64", persistence="persistent") + ) other = self._mock_pkg_profile(dummy_pkgs) self.assertFalse(profile == other) def test_equality_packages_removed(self): dummy_pkgs = [ - Package(name="package1", version="1.0.0", release="1", arch="x86_64"), - Package(name="package2", version="2.0.0", release="2", arch="x86_64"), + Package(name="package1", version="1.0.0", release="1", arch="x86_64", persistence="persistent"), + Package(name="package2", version="2.0.0", release="2", arch="x86_64", persistence="persistent"), ] profile = self._mock_pkg_profile(dummy_pkgs) @@ -112,8 +115,8 @@ def test_equality_packages_removed(self): def test_equality_packages_updated(self): dummy_pkgs = [ - Package(name="package1", version="1.0.0", release="1", arch="x86_64"), - Package(name="package2", version="2.0.0", release="2", arch="x86_64"), + Package(name="package1", version="1.0.0", release="1", arch="x86_64", persistence="persistent"), + Package(name="package2", version="2.0.0", release="2", arch="x86_64", persistence="persistent"), ] profile = self._mock_pkg_profile(dummy_pkgs) @@ -124,13 +127,15 @@ def test_equality_packages_updated(self): def test_equality_packages_replaced(self): dummy_pkgs = [ - Package(name="package1", version="1.0.0", release="1", arch="x86_64"), - Package(name="package2", version="2.0.0", release="2", arch="x86_64"), + Package(name="package1", version="1.0.0", release="1", arch="x86_64", persistence="persistent"), + Package(name="package2", version="2.0.0", release="2", arch="x86_64", persistence="persistent"), ] profile = self._mock_pkg_profile(dummy_pkgs) # Remove package2, add package3: dummy_pkgs.pop() - dummy_pkgs.append(Package(name="package3", version="3.0.0", release="2", arch="x86_64")) + dummy_pkgs.append( + Package(name="package3", version="3.0.0", release="2", arch="x86_64", persistence="persistent") + ) other = self._mock_pkg_profile(dummy_pkgs) self.assertFalse(profile == other) diff --git a/test/rhsm/unit/test_profile.py b/test/rhsm/unit/test_profile.py index e883570b55..b3c80a71fd 100644 --- a/test/rhsm/unit/test_profile.py +++ b/test/rhsm/unit/test_profile.py @@ -17,7 +17,13 @@ from cloud_what.providers import aws, azure, gcp -from rhsm.profile import ModulesProfile, EnabledReposProfile +from rhsm.profile import ( + ModulesProfile, + EnabledReposProfile, + parse_rpm_string, + _is_ostree_system, + _get_immutable_packages, +) class TestModulesProfile(unittest.TestCase): @@ -225,3 +231,177 @@ def test_enabled_repos(self): self.assertEqual( repo_list[0]["baseurl"], ["http://cdn.foo.com/content/dist/snakes/1.0/x86_64/os"] ) + + +class TestParseRpmString(unittest.TestCase): + """ + Test case for parse_rpm_string function + """ + + def test_parse_valid_rpm_string_with_epoch(self): + """ + Test parsing a valid RPM string with epoch + """ + rpm_string = "NetworkManager-cloud-setup-1:1.54.0-2.fc43.x86_64" + result = parse_rpm_string(rpm_string) + self.assertIsNotNone(result) + self.assertEqual(result["name"], "NetworkManager-cloud-setup") + self.assertEqual(result["version"], "1.54.0") + self.assertEqual(result["epoch"], 1) + self.assertEqual(result["release"], "2.fc43") + self.assertEqual(result["arch"], "x86_64") + + def test_parse_valid_rpm_string_without_epoch(self): + """ + Test parsing a valid RPM string without epoch + """ + rpm_string = "bash-completion-2.16-2.fc43.noarch" + result = parse_rpm_string(rpm_string) + self.assertIsNotNone(result) + self.assertEqual(result["name"], "bash-completion") + self.assertEqual(result["version"], "2.16") + self.assertEqual(result["epoch"], 0) + self.assertEqual(result["release"], "2.fc43") + self.assertEqual(result["arch"], "noarch") + + def test_parse_rpm_string_with_hyphens_in_name(self): + """ + Test parsing RPM string where package name contains multiple hyphens + """ + rpm_string = "amd-ucode-firmware-20241210-164.fc42.noarch" + result = parse_rpm_string(rpm_string) + self.assertIsNotNone(result) + self.assertEqual(result["name"], "amd-ucode-firmware") + self.assertEqual(result["version"], "20241210") + self.assertEqual(result["epoch"], 0) + self.assertEqual(result["release"], "164.fc42") + self.assertEqual(result["arch"], "noarch") + + def test_parse_rpm_string_with_leading_whitespace(self): + """ + Test parsing RPM string with leading whitespace + """ + rpm_string = " bash-5.2.32-1.fc42.x86_64" + result = parse_rpm_string(rpm_string) + self.assertIsNotNone(result) + self.assertEqual(result["name"], "bash") + self.assertEqual(result["version"], "5.2.32") + self.assertEqual(result["epoch"], 0) + self.assertEqual(result["release"], "1.fc42") + self.assertEqual(result["arch"], "x86_64") + + def test_parse_invalid_rpm_string(self): + """ + Test parsing an invalid RPM string returns None + """ + rpm_string = "invalid-package-string" + result = parse_rpm_string(rpm_string) + self.assertIsNone(result) + + def test_parse_empty_string(self): + """ + Test parsing an empty string returns None + """ + rpm_string = "" + result = parse_rpm_string(rpm_string) + self.assertIsNone(result) + + +class TestOstreeSystemDetection(unittest.TestCase): + """ + Test case for ostree system detection functions + """ + + @patch("rhsm.profile.ostree_available", False) + def test_is_ostree_system_without_ostree_library(self): + """ + Test detection when ostree library is not available + """ + result = _is_ostree_system() + self.assertFalse(result) + + @patch("rhsm.profile.ostree_available", True) + @patch("rhsm.profile.OSTree", create=True) + def test_is_ostree_system_with_booted_deployment(self, mock_ostree): + """ + Test detection when there is a booted ostree deployment + """ + mock_sysroot = mock.Mock() + mock_sysroot.load = mock.Mock() + mock_sysroot.get_booted_deployment = mock.Mock(return_value=mock.Mock()) + + mock_ostree.Sysroot.new_default.return_value = mock_sysroot + + result = _is_ostree_system() + self.assertTrue(result) + mock_sysroot.load.assert_called_once_with(None) + mock_sysroot.get_booted_deployment.assert_called_once() + + @patch("rhsm.profile.ostree_available", True) + @patch("rhsm.profile.OSTree", create=True) + def test_is_ostree_system_without_booted_deployment(self, mock_ostree): + """ + Test detection when there is no booted ostree deployment + """ + mock_sysroot = mock.Mock() + mock_sysroot.load = mock.Mock() + mock_sysroot.get_booted_deployment = mock.Mock(return_value=None) + mock_ostree.Sysroot.new_default.return_value = mock_sysroot + result = _is_ostree_system() + self.assertFalse(result) + + @patch("rhsm.profile.ostree_available", True) + @patch("rhsm.profile.OSTree", create=True) + def test_is_ostree_system_with_ostree_api_exception(self, mock_ostree): + """ + Test detection when OSTree API raises an exception + """ + mock_ostree.Sysroot.new_default.side_effect = Exception("OSTree API error") + + result = _is_ostree_system() + self.assertFalse(result) + + @patch("subprocess.run") + @patch("rhsm.profile.json.loads") + def test_get_immutable_packages(self, mock_json_loads, mock_subprocess_run): + """ + Test getting immutable packages from ostree system + """ + # Mock rpm-ostree status output + mock_status = {"deployments": [{"checksum": "abc123def456"}]} + + # Mock rpm-ostree db list output + mock_db_list_output = """ostree commit: abc123def456 +bash-5.2.32-1.fc42.x86_64 +systemd-257.3-1.fc42.x86_64 +NetworkManager-1:1.54.0-2.fc43.x86_64""" + + mock_subprocess_run.side_effect = [ + mock.Mock(stdout='{"deployments": [{"checksum": "abc123def456"}]}', returncode=0), + mock.Mock(stdout=mock_db_list_output, returncode=0), + ] + + mock_json_loads.return_value = mock_status + + result = _get_immutable_packages() + + self.assertIsInstance(result, set) + # Check that result contains tuples with (name, version, epoch, release) + self.assertIn(("bash", "5.2.32", 0, "1.fc42"), result) + self.assertIn(("systemd", "257.3", 0, "1.fc42"), result) + self.assertIn(("NetworkManager", "1.54.0", 1, "2.fc43"), result) + self.assertEqual(len(result), 3) + + @patch("subprocess.run") + def test_get_immutable_packages_command_failure(self, mock_run): + """ + Test handling of rpm-ostree command failure + """ + from subprocess import CalledProcessError + + mock_run.side_effect = CalledProcessError(1, "rpm-ostree") + + result = _get_immutable_packages() + + self.assertIsInstance(result, set) + self.assertEqual(len(result), 0)