diff --git a/src/rhsmlib/facts/all.py b/src/rhsmlib/facts/all.py index 887cbac3c9..dc3b8b6f04 100644 --- a/src/rhsmlib/facts/all.py +++ b/src/rhsmlib/facts/all.py @@ -13,6 +13,7 @@ from rhsmlib.facts import collector from rhsmlib.facts import custom +from rhsmlib.facts import disk from rhsmlib.facts import host_collector from rhsmlib.facts import hwprobe from rhsmlib.facts import insights @@ -29,6 +30,7 @@ def __init__(self): host_collector.HostCollector, hwprobe.HardwareCollector, network.NetworkCollector, + disk.DiskCollector, custom.CustomFactsCollector, insights.InsightsCollector, kpatch.KPatchCollector, diff --git a/src/rhsmlib/facts/disk.py b/src/rhsmlib/facts/disk.py new file mode 100644 index 0000000000..6bf112bc0a --- /dev/null +++ b/src/rhsmlib/facts/disk.py @@ -0,0 +1,80 @@ +# Copyright (c) 2023 Red Hat, Inc. +# +# This software is licensed to you under the GNU General Public License, +# version 2 (GPLv2). There is NO WARRANTY for this software, express or +# implied, including the implied warranties of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 +# along with this software; if not, see +# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. +# +# Red Hat trademarks are not licensed under GPLv2. No permission is +# granted to use or replicate Red Hat trademarks that are incorporated +# in this software or its documentation. +import logging +import os +import re +from typing import Callable, Dict, List, Union + +from rhsmlib.facts import collector + +log = logging.getLogger(__name__) + + +class DiskCollector(collector.FactsCollector): + def __init__( + self, + arch: str = None, + prefix: str = None, + testing: bool = None, + collected_hw_info: Dict[str, Union[str, int, bool, None]] = None, + ): + super().__init__(arch=arch, prefix=prefix, testing=testing, collected_hw_info=None) + + self.hardware_methods: List[Callable] = [ + self.get_disk_size_info, + ] + + def _get_block_devices(self) -> List[str]: + """Get list of block devices from /sys/block/""" + block_devices: List[str] = [] + sys_block_path: str = f"{self.prefix}/sys/block" + + try: + if os.path.exists(sys_block_path): + for device in os.listdir(sys_block_path): + # Skip loop devices, ram devices, and other virtual devices + # Focus on actual disk devices (sd*, vd*, nvme*, hd*, xvd*) + if re.match(r'^(sd[a-z]+|vd[a-z]+|nvme[0-9]+n[0-9]+|hd[a-z]+|xvd[a-z]+)$', device): + block_devices.append(device) + except OSError as e: + log.debug(f"Could not read /sys/block directory: {e}") + + return sorted(block_devices) + + def _get_device_size_bytes(self, device: str) -> int: + """Get the size of a block device in bytes""" + size_file: str = f"{self.prefix}/sys/block/{device}/size" + try: + with open(size_file, 'r') as f: + # The size file contains the number of 512-byte sectors + sectors = int(f.read().strip()) + return sectors * 512 + except (OSError, ValueError) as e: + log.debug(f"Could not read size for device {device}: {e}") + return 0 + + def get_disk_size_info(self) -> Dict[str, Union[str, int]]: + """Get disk size information for all block devices. + + Resulting facts have 'disk..size_bytes' format. + """ + result: Dict[str, Union[str, int]] = {} + + block_devices = self._get_block_devices() + + for device in block_devices: + size_bytes = self._get_device_size_bytes(device) + if size_bytes > 0: + result[f"disk.{device}.size_bytes"] = size_bytes + + return result \ No newline at end of file diff --git a/test/rhsmlib/facts/test_disk.py b/test/rhsmlib/facts/test_disk.py new file mode 100644 index 0000000000..29b1b02e3d --- /dev/null +++ b/test/rhsmlib/facts/test_disk.py @@ -0,0 +1,173 @@ +# Copyright (c) 2023 Red Hat, Inc. +# +# This software is licensed to you under the GNU General Public License, +# version 2 (GPLv2). There is NO WARRANTY for this software, express or +# implied, including the implied warranties of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 +# along with this software; if not, see +# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. +# +# Red Hat trademarks are not licensed under GPLv2. No permission is +# granted to use or replicate Red Hat trademarks that are incorporated +# in this software or its documentation. + +import unittest +import os +import tempfile +from unittest import mock +from unittest.mock import patch, MagicMock + +from rhsmlib.facts import disk + + +class TestDiskCollector(unittest.TestCase): + def setUp(self): + self.collector = disk.DiskCollector() + + def test_init(self): + """Test DiskCollector initialization""" + self.assertIsInstance(self.collector.hardware_methods, list) + self.assertEqual(len(self.collector.hardware_methods), 1) + self.assertEqual(self.collector.hardware_methods[0], self.collector.get_disk_size_info) + + @patch('os.listdir') + @patch('os.path.exists') + def test_get_block_devices_no_sys_block(self, mock_exists, mock_listdir): + """Test _get_block_devices when /sys/block doesn't exist""" + mock_exists.return_value = False + result = self.collector._get_block_devices() + self.assertEqual(result, []) + + @patch('os.listdir') + @patch('os.path.exists') + def test_get_block_devices_empty(self, mock_exists, mock_listdir): + """Test _get_block_devices with empty directory""" + mock_exists.return_value = True + mock_listdir.return_value = [] + result = self.collector._get_block_devices() + self.assertEqual(result, []) + + @patch('os.listdir') + @patch('os.path.exists') + def test_get_block_devices_mixed(self, mock_exists, mock_listdir): + """Test _get_block_devices with mixed device types""" + mock_exists.return_value = True + mock_listdir.return_value = [ + 'sda', # Should be included + 'sdb1', # Should be excluded (partition) + 'vda', # Should be included + 'nvme0n1', # Should be included + 'loop0', # Should be excluded + 'ram0', # Should be excluded + 'dm-0', # Should be excluded + 'hda', # Should be included (legacy IDE) + 'xvda', # Should be included (Xen virtual) + ] + result = self.collector._get_block_devices() + expected = ['hda', 'nvme0n1', 'sda', 'vda', 'xvda'] # sorted + self.assertEqual(result, expected) + + def test_get_device_size_bytes_missing_file(self): + """Test _get_device_size_bytes with missing size file""" + result = self.collector._get_device_size_bytes('nonexistent') + self.assertEqual(result, 0) + + def test_get_device_size_bytes_invalid_content(self): + """Test _get_device_size_bytes with invalid file content""" + with tempfile.TemporaryDirectory() as temp_dir: + # Set up a temporary sys/block structure + self.collector.prefix = temp_dir + device_dir = os.path.join(temp_dir, 'sys', 'block', 'testdev') + os.makedirs(device_dir) + + size_file = os.path.join(device_dir, 'size') + with open(size_file, 'w') as f: + f.write('invalid') + + result = self.collector._get_device_size_bytes('testdev') + self.assertEqual(result, 0) + + def test_get_device_size_bytes_valid(self): + """Test _get_device_size_bytes with valid size file""" + with tempfile.TemporaryDirectory() as temp_dir: + # Set up a temporary sys/block structure + self.collector.prefix = temp_dir + device_dir = os.path.join(temp_dir, 'sys', 'block', 'testdev') + os.makedirs(device_dir) + + size_file = os.path.join(device_dir, 'size') + sectors = 2048000 # 1 GB in 512-byte sectors + with open(size_file, 'w') as f: + f.write(str(sectors)) + + result = self.collector._get_device_size_bytes('testdev') + expected = sectors * 512 # Convert sectors to bytes + self.assertEqual(result, expected) + + @patch.object(disk.DiskCollector, '_get_device_size_bytes') + @patch.object(disk.DiskCollector, '_get_block_devices') + def test_get_disk_size_info(self, mock_get_devices, mock_get_size): + """Test get_disk_size_info method""" + mock_get_devices.return_value = ['sda', 'nvme0n1'] + mock_get_size.side_effect = [1000000000000, 500000000000] # 1TB and 500GB + + result = self.collector.get_disk_size_info() + + expected = { + 'disk.sda.size_bytes': 1000000000000, + 'disk.nvme0n1.size_bytes': 500000000000 + } + self.assertEqual(result, expected) + + @patch.object(disk.DiskCollector, '_get_device_size_bytes') + @patch.object(disk.DiskCollector, '_get_block_devices') + def test_get_disk_size_info_zero_size(self, mock_get_devices, mock_get_size): + """Test get_disk_size_info filters out zero-size devices""" + mock_get_devices.return_value = ['sda', 'sdb'] + mock_get_size.side_effect = [1000000000000, 0] # One valid, one zero + + result = self.collector.get_disk_size_info() + + expected = { + 'disk.sda.size_bytes': 1000000000000 + } + self.assertEqual(result, expected) + + def test_get_all(self): + """Test get_all method returns proper dictionary""" + # Mock the hardware_methods to use our mock function + mock_method = MagicMock(return_value={'disk.sda.size_bytes': 1000000000000}) + self.collector.hardware_methods = [mock_method] + + result = self.collector.get_all() + + self.assertIsInstance(result, dict) + self.assertEqual(result, {'disk.sda.size_bytes': 1000000000000}) + mock_method.assert_called_once() + + def test_collect(self): + """Test collect method returns FactsCollection""" + with patch.object(self.collector, 'get_all') as mock_get_all: + mock_get_all.return_value = {'disk.sda.size_bytes': 1000000000000} + + result = self.collector.collect() + + # Check that it returns a FactsCollection object + from rhsmlib.facts.collection import FactsCollection + self.assertIsInstance(result, FactsCollection) + + def test_collect_empty(self): + """Test collect method returns FactsCollection for empty input""" + with patch.object(self.collector, 'get_all') as mock_get_all: + mock_get_all.return_value = {} + + result = self.collector.collect() + + from rhsmlib.facts.collection import FactsCollection + self.assertIsInstance(result, FactsCollection) + # Optionally, check that the FactsCollection is empty + self.assertEqual(len(result), 0) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file