From 311bb82f8e0eeffaa19640d65254ed63b9d9e7b9 Mon Sep 17 00:00:00 2001 From: erikhuck Date: Wed, 22 Jan 2025 12:15:51 -0500 Subject: [PATCH] Measures combined RAM and threads by summing the measurements of the main and descendant processes that were measured beforehand rather than re-measuring --- src/gpu_tracker/tracker.py | 42 ++++++++++++++++++++++---------------- tests/test_tracker.py | 22 ++++++++------------ 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/gpu_tracker/tracker.py b/src/gpu_tracker/tracker.py index a1d8f73..6142257 100644 --- a/src/gpu_tracker/tracker.py +++ b/src/gpu_tracker/tracker.py @@ -107,6 +107,9 @@ def run(self): """ start_time = time.time() self._extraneous_process_ids.add(self.pid) + get_memory_maps = lambda process: process.memory_maps(grouped=False) + get_rss = lambda process: process.memory_info().rss + get_n_threads = lambda process: process.num_threads() get_cpu_percent = lambda process: process.cpu_percent() try: main_process = psutil.Process(self._main_process_id) @@ -124,15 +127,19 @@ def run(self): try: descendent_processes = [ process for process in main_process.children(recursive=True) if process.pid not in self._extraneous_process_ids] - combined_processes = [main_process] + descendent_processes # The first call to cpu_percent returns a meaningless value of 0.0 and should be ignored. # And it's recommended to wait a specified amount of time after the first call to cpu_percent. # See https://psutil.readthedocs.io/en/latest/#psutil.Process.cpu_percent - self._map_processes(processes=combined_processes, map_func=get_cpu_percent) + self._map_processes(processes=[main_process] + descendent_processes, map_func=get_cpu_percent) # Get the maximum RAM usage. - self._update_ram(rss_values=self._resource_usage.max_ram.main, processes=[main_process]) - self._update_ram(rss_values=self._resource_usage.max_ram.descendents, processes=descendent_processes) - self._update_ram(rss_values=self._resource_usage.max_ram.combined, processes=combined_processes) + ram_map_func = get_memory_maps if self._is_linux else get_rss + main_ram = self._map_processes([main_process], map_func=ram_map_func) + descendents_ram = self._map_processes(descendent_processes, map_func=ram_map_func) + combined_ram = main_ram + descendents_ram + kwarg = 'memory_maps_list' if self._is_linux else 'rss_list' + self._update_ram(rss_values=self._resource_usage.max_ram.main, **{kwarg: main_ram}) + self._update_ram(rss_values=self._resource_usage.max_ram.descendents, **{kwarg: descendents_ram}) + self._update_ram(rss_values=self._resource_usage.max_ram.combined, **{kwarg: combined_ram}) self._resource_usage.max_ram.system = max( self._resource_usage.max_ram.system, psutil.virtual_memory().used * self._ram_coefficient) # Get the maximum GPU RAM usage if available. @@ -156,9 +163,11 @@ def run(self): n_hardware_units=self._resource_usage.gpu_utilization.n_expected_gpus) # Get the mean and maximum CPU usages. - self._update_n_threads(processes=[main_process], attr='main') - self._update_n_threads(processes=descendent_processes, attr='descendents') - self._update_n_threads(processes=combined_processes, attr='combined') + main_n_threads = self._map_processes([main_process], map_func=get_n_threads) + descendent_n_threads = self._map_processes(descendent_processes, map_func=get_n_threads) + self._update_n_threads(n_threads_list=main_n_threads, attr='main') + self._update_n_threads(n_threads_list=descendent_n_threads, attr='descendents') + self._update_n_threads(n_threads_list=main_n_threads + descendent_n_threads, attr='combined') # noinspection PyTypeChecker system_core_percentages: list[float] = psutil.cpu_percent(percpu=True) cpu_utilization = self._resource_usage.cpu_utilization @@ -166,16 +175,16 @@ def run(self): current_percentages=system_core_percentages, processing_unit_percentages=cpu_utilization.system, percent_key='cpu_system', n_hardware_units=cpu_utilization.system_core_count) time.sleep(_TrackingProcess._CPU_PERCENT_INTERVAL) - main_percentage = main_process.cpu_percent() + main_percentage = self._map_processes([main_process], map_func=get_cpu_percent) descendent_percentages = self._map_processes(processes=descendent_processes, map_func=get_cpu_percent) self._update_processing_unit_utilization( - current_percentages=[main_percentage], processing_unit_percentages=cpu_utilization.main, percent_key='cpu_main', + current_percentages=main_percentage, processing_unit_percentages=cpu_utilization.main, percent_key='cpu_main', n_hardware_units=cpu_utilization.n_expected_cores) self._update_processing_unit_utilization( current_percentages=descendent_percentages, processing_unit_percentages=cpu_utilization.descendents, percent_key='cpu_descendents', n_hardware_units=cpu_utilization.n_expected_cores) self._update_processing_unit_utilization( - current_percentages=[main_percentage] + descendent_percentages, processing_unit_percentages=cpu_utilization.combined, + current_percentages=main_percentage + descendent_percentages, processing_unit_percentages=cpu_utilization.combined, percent_key='cpu_combined', n_hardware_units=cpu_utilization.n_expected_cores) # Update compute time. self._resource_usage.compute_time.time = (time.time() - start_time) * self._time_coefficient @@ -197,10 +206,8 @@ def _map_processes(self, processes: list[psutil.Process], map_func: typ.Callable self._log_warning('Attempted to obtain usage information of a process that no longer exists.') # pragma: nocover return mapped_list - def _update_ram(self, rss_values: RSSValues, processes: list[psutil.Process]): - if self._is_linux: - memory_maps_list: list[list] = self._map_processes( - processes, map_func=lambda process: process.memory_maps(grouped=False)) + def _update_ram(self, rss_values: RSSValues, memory_maps_list: list[list] | None = None, rss_list: list[int] | None = None): + if memory_maps_list is not None: private_rss = 0 path_to_shared_rss = dict[str, float]() for memory_maps in memory_maps_list: @@ -218,7 +225,7 @@ def _update_ram(self, rss_values: RSSValues, processes: list[psutil.Process]): rss_values.shared_rss = max(rss_values.shared_rss, shared_rss) total_rss = private_rss + shared_rss else: - total_rss = sum(self._map_processes(processes, map_func=lambda process: process.memory_info().rss)) + total_rss = sum(rss_list) total_rss *= self._ram_coefficient rss_values.total_rss = max(rss_values.total_rss, total_rss) @@ -254,8 +261,7 @@ def _update_processing_unit_utilization( max_percent: float = getattr(processing_unit_percentages, f'max_{percent_type}_percent') setattr(processing_unit_percentages, f'max_{percent_type}_percent', max(max_percent, percent)) - def _update_n_threads(self, processes: list[psutil.Process], attr: str): - n_threads_list = self._map_processes(processes, map_func=lambda process: process.num_threads()) + def _update_n_threads(self, n_threads_list: list[int], attr: str): n_threads = sum(n_threads_list) attr = f'{attr}_n_threads' max_n_threads = getattr(self._resource_usage.cpu_utilization, attr) diff --git a/tests/test_tracker.py b/tests/test_tracker.py index 10cbc7b..6d923ed 100644 --- a/tests/test_tracker.py +++ b/tests/test_tracker.py @@ -19,10 +19,6 @@ def get_use_context_manager(request) -> bool: yield request.param -def multiply_list(_list: list, multiple=2) -> list: - return [item for item in _list for _ in range(multiple)] - - test_tracker_data = [ ('bytes', 'megabytes', 'seconds', None, 3), ('kilobytes', 'gigabytes', 'minutes', {'gpu-id1'}, 2), @@ -83,10 +79,8 @@ def get_process_mock( private_dirty=private_dirty, private_clean=private_clean, shared_dirty=shared_dirty, shared_clean=shared_clean, path=path) memory_map_mocks.append(memory_map_mock) - memory_maps_side_effect.extend([memory_map_mocks, memory_map_mocks]) - rams = multiply_list(rams) - cpu_percentages = multiply_list(cpu_percentages) - num_threads = multiply_list(num_threads) + memory_maps_side_effect.append(memory_map_mocks) + cpu_percentages = [percent for percent in cpu_percentages for percent in [0.0, percent]] return mocker.MagicMock( pid=pid, memory_info=mocker.MagicMock(side_effect=[mocker.MagicMock(rss=ram) for ram in rams]), @@ -172,13 +166,13 @@ def start_mock(self): utils.assert_args_list(current_process_mock.children, [()] * 2) utils.assert_args_list(mock=main_process_mock.children, expected_args_list=[{'recursive': True}] * 3, use_kwargs=True) if operating_system == 'Linux': - utils.assert_args_list(mock=main_process_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True) - utils.assert_args_list(mock=child1_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True) - utils.assert_args_list(mock=child2_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True) + utils.assert_args_list(mock=main_process_mock.memory_maps, expected_args_list=[{'grouped': False}] * 3, use_kwargs=True) + utils.assert_args_list(mock=child1_mock.memory_maps, expected_args_list=[{'grouped': False}] * 3, use_kwargs=True) + utils.assert_args_list(mock=child2_mock.memory_maps, expected_args_list=[{'grouped': False}] * 3, use_kwargs=True) else: - utils.assert_args_list(mock=main_process_mock.memory_info, expected_args_list=[()] * 6) - utils.assert_args_list(mock=child1_mock.memory_info, expected_args_list=[()] * 6) - utils.assert_args_list(mock=child2_mock.memory_info, expected_args_list=[()] * 6) + utils.assert_args_list(mock=main_process_mock.memory_info, expected_args_list=[()] * 3) + utils.assert_args_list(mock=child1_mock.memory_info, expected_args_list=[()] * 3) + utils.assert_args_list(mock=child2_mock.memory_info, expected_args_list=[()] * 3) assert len(check_output_mock.call_args_list) == 8 os_mock.getpid.assert_called_once_with() utils.assert_args_list(mock=time_mock.time, expected_args_list=[()] * 5)