Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measures combined RAM and threads by summing the measurements of the main and descendant processes that were measured beforehand rather than re-measuring #49

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions src/gpu_tracker/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def run(self):
"""
start_time = time.time()
self._extraneous_process_ids.add(self.pid)
get_memory_maps = lambda process: process.memory_maps(grouped=False)
get_rss = lambda process: process.memory_info().rss
get_n_threads = lambda process: process.num_threads()
get_cpu_percent = lambda process: process.cpu_percent()
try:
main_process = psutil.Process(self._main_process_id)
Expand All @@ -124,15 +127,19 @@ def run(self):
try:
descendent_processes = [
process for process in main_process.children(recursive=True) if process.pid not in self._extraneous_process_ids]
combined_processes = [main_process] + descendent_processes
# The first call to cpu_percent returns a meaningless value of 0.0 and should be ignored.
# And it's recommended to wait a specified amount of time after the first call to cpu_percent.
# See https://psutil.readthedocs.io/en/latest/#psutil.Process.cpu_percent
self._map_processes(processes=combined_processes, map_func=get_cpu_percent)
self._map_processes(processes=[main_process] + descendent_processes, map_func=get_cpu_percent)
# Get the maximum RAM usage.
self._update_ram(rss_values=self._resource_usage.max_ram.main, processes=[main_process])
self._update_ram(rss_values=self._resource_usage.max_ram.descendents, processes=descendent_processes)
self._update_ram(rss_values=self._resource_usage.max_ram.combined, processes=combined_processes)
ram_map_func = get_memory_maps if self._is_linux else get_rss
main_ram = self._map_processes([main_process], map_func=ram_map_func)
descendents_ram = self._map_processes(descendent_processes, map_func=ram_map_func)
combined_ram = main_ram + descendents_ram
kwarg = 'memory_maps_list' if self._is_linux else 'rss_list'
self._update_ram(rss_values=self._resource_usage.max_ram.main, **{kwarg: main_ram})
self._update_ram(rss_values=self._resource_usage.max_ram.descendents, **{kwarg: descendents_ram})
self._update_ram(rss_values=self._resource_usage.max_ram.combined, **{kwarg: combined_ram})
self._resource_usage.max_ram.system = max(
self._resource_usage.max_ram.system, psutil.virtual_memory().used * self._ram_coefficient)
# Get the maximum GPU RAM usage if available.
Expand All @@ -156,26 +163,28 @@ def run(self):
n_hardware_units=self._resource_usage.gpu_utilization.n_expected_gpus)

# Get the mean and maximum CPU usages.
self._update_n_threads(processes=[main_process], attr='main')
self._update_n_threads(processes=descendent_processes, attr='descendents')
self._update_n_threads(processes=combined_processes, attr='combined')
main_n_threads = self._map_processes([main_process], map_func=get_n_threads)
descendent_n_threads = self._map_processes(descendent_processes, map_func=get_n_threads)
self._update_n_threads(n_threads_list=main_n_threads, attr='main')
self._update_n_threads(n_threads_list=descendent_n_threads, attr='descendents')
self._update_n_threads(n_threads_list=main_n_threads + descendent_n_threads, attr='combined')
# noinspection PyTypeChecker
system_core_percentages: list[float] = psutil.cpu_percent(percpu=True)
cpu_utilization = self._resource_usage.cpu_utilization
self._update_processing_unit_utilization(
current_percentages=system_core_percentages, processing_unit_percentages=cpu_utilization.system,
percent_key='cpu_system', n_hardware_units=cpu_utilization.system_core_count)
time.sleep(_TrackingProcess._CPU_PERCENT_INTERVAL)
main_percentage = main_process.cpu_percent()
main_percentage = self._map_processes([main_process], map_func=get_cpu_percent)
descendent_percentages = self._map_processes(processes=descendent_processes, map_func=get_cpu_percent)
self._update_processing_unit_utilization(
current_percentages=[main_percentage], processing_unit_percentages=cpu_utilization.main, percent_key='cpu_main',
current_percentages=main_percentage, processing_unit_percentages=cpu_utilization.main, percent_key='cpu_main',
n_hardware_units=cpu_utilization.n_expected_cores)
self._update_processing_unit_utilization(
current_percentages=descendent_percentages, processing_unit_percentages=cpu_utilization.descendents,
percent_key='cpu_descendents', n_hardware_units=cpu_utilization.n_expected_cores)
self._update_processing_unit_utilization(
current_percentages=[main_percentage] + descendent_percentages, processing_unit_percentages=cpu_utilization.combined,
current_percentages=main_percentage + descendent_percentages, processing_unit_percentages=cpu_utilization.combined,
percent_key='cpu_combined', n_hardware_units=cpu_utilization.n_expected_cores)
# Update compute time.
self._resource_usage.compute_time.time = (time.time() - start_time) * self._time_coefficient
Expand All @@ -197,10 +206,8 @@ def _map_processes(self, processes: list[psutil.Process], map_func: typ.Callable
self._log_warning('Attempted to obtain usage information of a process that no longer exists.') # pragma: nocover
return mapped_list

def _update_ram(self, rss_values: RSSValues, processes: list[psutil.Process]):
if self._is_linux:
memory_maps_list: list[list] = self._map_processes(
processes, map_func=lambda process: process.memory_maps(grouped=False))
def _update_ram(self, rss_values: RSSValues, memory_maps_list: list[list] | None = None, rss_list: list[int] | None = None):
if memory_maps_list is not None:
private_rss = 0
path_to_shared_rss = dict[str, float]()
for memory_maps in memory_maps_list:
Expand All @@ -218,7 +225,7 @@ def _update_ram(self, rss_values: RSSValues, processes: list[psutil.Process]):
rss_values.shared_rss = max(rss_values.shared_rss, shared_rss)
total_rss = private_rss + shared_rss
else:
total_rss = sum(self._map_processes(processes, map_func=lambda process: process.memory_info().rss))
total_rss = sum(rss_list)
total_rss *= self._ram_coefficient
rss_values.total_rss = max(rss_values.total_rss, total_rss)

Expand Down Expand Up @@ -254,8 +261,7 @@ def _update_processing_unit_utilization(
max_percent: float = getattr(processing_unit_percentages, f'max_{percent_type}_percent')
setattr(processing_unit_percentages, f'max_{percent_type}_percent', max(max_percent, percent))

def _update_n_threads(self, processes: list[psutil.Process], attr: str):
n_threads_list = self._map_processes(processes, map_func=lambda process: process.num_threads())
def _update_n_threads(self, n_threads_list: list[int], attr: str):
n_threads = sum(n_threads_list)
attr = f'{attr}_n_threads'
max_n_threads = getattr(self._resource_usage.cpu_utilization, attr)
Expand Down
22 changes: 8 additions & 14 deletions tests/test_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ def get_use_context_manager(request) -> bool:
yield request.param


def multiply_list(_list: list, multiple=2) -> list:
return [item for item in _list for _ in range(multiple)]


test_tracker_data = [
('bytes', 'megabytes', 'seconds', None, 3),
('kilobytes', 'gigabytes', 'minutes', {'gpu-id1'}, 2),
Expand Down Expand Up @@ -83,10 +79,8 @@ def get_process_mock(
private_dirty=private_dirty, private_clean=private_clean, shared_dirty=shared_dirty, shared_clean=shared_clean,
path=path)
memory_map_mocks.append(memory_map_mock)
memory_maps_side_effect.extend([memory_map_mocks, memory_map_mocks])
rams = multiply_list(rams)
cpu_percentages = multiply_list(cpu_percentages)
num_threads = multiply_list(num_threads)
memory_maps_side_effect.append(memory_map_mocks)
cpu_percentages = [percent for percent in cpu_percentages for percent in [0.0, percent]]
return mocker.MagicMock(
pid=pid,
memory_info=mocker.MagicMock(side_effect=[mocker.MagicMock(rss=ram) for ram in rams]),
Expand Down Expand Up @@ -172,13 +166,13 @@ def start_mock(self):
utils.assert_args_list(current_process_mock.children, [()] * 2)
utils.assert_args_list(mock=main_process_mock.children, expected_args_list=[{'recursive': True}] * 3, use_kwargs=True)
if operating_system == 'Linux':
utils.assert_args_list(mock=main_process_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True)
utils.assert_args_list(mock=child1_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True)
utils.assert_args_list(mock=child2_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True)
utils.assert_args_list(mock=main_process_mock.memory_maps, expected_args_list=[{'grouped': False}] * 3, use_kwargs=True)
utils.assert_args_list(mock=child1_mock.memory_maps, expected_args_list=[{'grouped': False}] * 3, use_kwargs=True)
utils.assert_args_list(mock=child2_mock.memory_maps, expected_args_list=[{'grouped': False}] * 3, use_kwargs=True)
else:
utils.assert_args_list(mock=main_process_mock.memory_info, expected_args_list=[()] * 6)
utils.assert_args_list(mock=child1_mock.memory_info, expected_args_list=[()] * 6)
utils.assert_args_list(mock=child2_mock.memory_info, expected_args_list=[()] * 6)
utils.assert_args_list(mock=main_process_mock.memory_info, expected_args_list=[()] * 3)
utils.assert_args_list(mock=child1_mock.memory_info, expected_args_list=[()] * 3)
utils.assert_args_list(mock=child2_mock.memory_info, expected_args_list=[()] * 3)
assert len(check_output_mock.call_args_list) == 8
os_mock.getpid.assert_called_once_with()
utils.assert_args_list(mock=time_mock.time, expected_args_list=[()] * 5)
Expand Down
Loading