Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/rhsmlib/facts/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from rhsmlib.facts import collector
from rhsmlib.facts import custom
from rhsmlib.facts import disk
from rhsmlib.facts import host_collector
from rhsmlib.facts import hwprobe
from rhsmlib.facts import insights
Expand All @@ -29,6 +30,7 @@ def __init__(self):
host_collector.HostCollector,
hwprobe.HardwareCollector,
network.NetworkCollector,
disk.DiskCollector,
custom.CustomFactsCollector,
insights.InsightsCollector,
kpatch.KPatchCollector,
Expand Down
80 changes: 80 additions & 0 deletions src/rhsmlib/facts/disk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright (c) 2023 Red Hat, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2023?

#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
import logging
import os
import re
from typing import Callable, Dict, List, Union

from rhsmlib.facts import collector

log = logging.getLogger(__name__)


class DiskCollector(collector.FactsCollector):
def __init__(
self,
arch: str = None,
prefix: str = None,
testing: bool = None,
collected_hw_info: Dict[str, Union[str, int, bool, None]] = None,
):
super().__init__(arch=arch, prefix=prefix, testing=testing, collected_hw_info=None)

self.hardware_methods: List[Callable] = [
self.get_disk_size_info,
]

def _get_block_devices(self) -> List[str]:
"""Get list of block devices from /sys/block/"""
block_devices: List[str] = []
sys_block_path: str = f"{self.prefix}/sys/block"

try:
if os.path.exists(sys_block_path):
for device in os.listdir(sys_block_path):
# Skip loop devices, ram devices, and other virtual devices
# Focus on actual disk devices (sd*, vd*, nvme*, hd*, xvd*)
if re.match(r'^(sd[a-z]+|vd[a-z]+|nvme[0-9]+n[0-9]+|hd[a-z]+|xvd[a-z]+)$', device):
block_devices.append(device)
except OSError as e:
log.debug(f"Could not read /sys/block directory: {e}")

return sorted(block_devices)

def _get_device_size_bytes(self, device: str) -> int:
"""Get the size of a block device in bytes"""
size_file: str = f"{self.prefix}/sys/block/{device}/size"
try:
with open(size_file, 'r') as f:
# The size file contains the number of 512-byte sectors
sectors = int(f.read().strip())
return sectors * 512
Comment on lines +59 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

512 might be wrong. You should check the content of /sys/block/*/queue/logical_block_size instead.

except (OSError, ValueError) as e:
log.debug(f"Could not read size for device {device}: {e}")
return 0

def get_disk_size_info(self) -> Dict[str, Union[str, int]]:
"""Get disk size information for all block devices.

Resulting facts have 'disk.<device_name>.size_bytes' format.
"""
result: Dict[str, Union[str, int]] = {}

block_devices = self._get_block_devices()

for device in block_devices:
size_bytes = self._get_device_size_bytes(device)
if size_bytes > 0:
result[f"disk.{device}.size_bytes"] = size_bytes

return result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing newline.

173 changes: 173 additions & 0 deletions test/rhsmlib/facts/test_disk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Copyright (c) 2023 Red Hat, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2025

#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.

import unittest
import os
import tempfile
from unittest import mock
from unittest.mock import patch, MagicMock

from rhsmlib.facts import disk


class TestDiskCollector(unittest.TestCase):
def setUp(self):
self.collector = disk.DiskCollector()

def test_init(self):
"""Test DiskCollector initialization"""
self.assertIsInstance(self.collector.hardware_methods, list)
self.assertEqual(len(self.collector.hardware_methods), 1)
self.assertEqual(self.collector.hardware_methods[0], self.collector.get_disk_size_info)

@patch('os.listdir')
@patch('os.path.exists')
def test_get_block_devices_no_sys_block(self, mock_exists, mock_listdir):
"""Test _get_block_devices when /sys/block doesn't exist"""
mock_exists.return_value = False
result = self.collector._get_block_devices()
self.assertEqual(result, [])

@patch('os.listdir')
@patch('os.path.exists')
def test_get_block_devices_empty(self, mock_exists, mock_listdir):
"""Test _get_block_devices with empty directory"""
mock_exists.return_value = True
mock_listdir.return_value = []
result = self.collector._get_block_devices()
self.assertEqual(result, [])

@patch('os.listdir')
@patch('os.path.exists')
def test_get_block_devices_mixed(self, mock_exists, mock_listdir):
"""Test _get_block_devices with mixed device types"""
mock_exists.return_value = True
mock_listdir.return_value = [
'sda', # Should be included
'sdb1', # Should be excluded (partition)
'vda', # Should be included
'nvme0n1', # Should be included
'loop0', # Should be excluded
'ram0', # Should be excluded
'dm-0', # Should be excluded
'hda', # Should be included (legacy IDE)
'xvda', # Should be included (Xen virtual)
]
result = self.collector._get_block_devices()
expected = ['hda', 'nvme0n1', 'sda', 'vda', 'xvda'] # sorted
self.assertEqual(result, expected)

def test_get_device_size_bytes_missing_file(self):
"""Test _get_device_size_bytes with missing size file"""
result = self.collector._get_device_size_bytes('nonexistent')
self.assertEqual(result, 0)

def test_get_device_size_bytes_invalid_content(self):
"""Test _get_device_size_bytes with invalid file content"""
with tempfile.TemporaryDirectory() as temp_dir:
# Set up a temporary sys/block structure
self.collector.prefix = temp_dir
device_dir = os.path.join(temp_dir, 'sys', 'block', 'testdev')
os.makedirs(device_dir)

size_file = os.path.join(device_dir, 'size')
with open(size_file, 'w') as f:
f.write('invalid')

result = self.collector._get_device_size_bytes('testdev')
self.assertEqual(result, 0)

def test_get_device_size_bytes_valid(self):
"""Test _get_device_size_bytes with valid size file"""
with tempfile.TemporaryDirectory() as temp_dir:
# Set up a temporary sys/block structure
self.collector.prefix = temp_dir
device_dir = os.path.join(temp_dir, 'sys', 'block', 'testdev')
os.makedirs(device_dir)

size_file = os.path.join(device_dir, 'size')
sectors = 2048000 # 1 GB in 512-byte sectors
with open(size_file, 'w') as f:
f.write(str(sectors))

result = self.collector._get_device_size_bytes('testdev')
expected = sectors * 512 # Convert sectors to bytes
self.assertEqual(result, expected)

@patch.object(disk.DiskCollector, '_get_device_size_bytes')
@patch.object(disk.DiskCollector, '_get_block_devices')
def test_get_disk_size_info(self, mock_get_devices, mock_get_size):
"""Test get_disk_size_info method"""
mock_get_devices.return_value = ['sda', 'nvme0n1']
mock_get_size.side_effect = [1000000000000, 500000000000] # 1TB and 500GB

result = self.collector.get_disk_size_info()

expected = {
'disk.sda.size_bytes': 1000000000000,
'disk.nvme0n1.size_bytes': 500000000000
}
self.assertEqual(result, expected)

@patch.object(disk.DiskCollector, '_get_device_size_bytes')
@patch.object(disk.DiskCollector, '_get_block_devices')
def test_get_disk_size_info_zero_size(self, mock_get_devices, mock_get_size):
"""Test get_disk_size_info filters out zero-size devices"""
mock_get_devices.return_value = ['sda', 'sdb']
mock_get_size.side_effect = [1000000000000, 0] # One valid, one zero

result = self.collector.get_disk_size_info()

expected = {
'disk.sda.size_bytes': 1000000000000
}
self.assertEqual(result, expected)

def test_get_all(self):
"""Test get_all method returns proper dictionary"""
# Mock the hardware_methods to use our mock function
mock_method = MagicMock(return_value={'disk.sda.size_bytes': 1000000000000})
self.collector.hardware_methods = [mock_method]

result = self.collector.get_all()

self.assertIsInstance(result, dict)
self.assertEqual(result, {'disk.sda.size_bytes': 1000000000000})
mock_method.assert_called_once()

def test_collect(self):
"""Test collect method returns FactsCollection"""
with patch.object(self.collector, 'get_all') as mock_get_all:
mock_get_all.return_value = {'disk.sda.size_bytes': 1000000000000}

result = self.collector.collect()

# Check that it returns a FactsCollection object
from rhsmlib.facts.collection import FactsCollection
self.assertIsInstance(result, FactsCollection)

def test_collect_empty(self):
"""Test collect method returns FactsCollection for empty input"""
with patch.object(self.collector, 'get_all') as mock_get_all:
mock_get_all.return_value = {}

result = self.collector.collect()

from rhsmlib.facts.collection import FactsCollection
self.assertIsInstance(result, FactsCollection)
# Optionally, check that the FactsCollection is empty
self.assertEqual(len(result), 0)


if __name__ == '__main__':
unittest.main()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing newline.

Loading