Skip to content

Commit 436032f

Browse files
authored
fix: Use PSS instead of RSS to estimate children process memory usage on Linux (#1210)
### Description To estimate process memory usage use `Proportional Set Size (PSS)` to estimate process memory usage of the process and all it's children to avoid overestimation of used memory due to same shared memory being counted multiple times when using `Resident Set Size (RSS)`. `PSS` is available only on Linux, so this improved estimation will work only there. Add test. ### Issues - Closes: #1206
1 parent 3103d54 commit 436032f

File tree

2 files changed

+131
-5
lines changed

2 files changed

+131
-5
lines changed

src/crawlee/_utils/system.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from contextlib import suppress
55
from datetime import datetime, timezone
66
from logging import getLogger
7-
from typing import Annotated
7+
from typing import Annotated, Any
88

99
import psutil
1010
from pydantic import BaseModel, ConfigDict, Field, PlainSerializer, PlainValidator
@@ -79,19 +79,36 @@ def get_memory_info() -> MemoryInfo:
7979
logger.debug('Calling get_memory_info()...')
8080
current_process = psutil.Process(os.getpid())
8181

82-
# Retrieve the Resident Set Size (RSS) of the current process. RSS is the portion of memory
83-
# occupied by a process that is held in RAM.
84-
current_size_bytes = int(current_process.memory_info().rss)
82+
# Retrieve estimated memory usage of the current process.
83+
current_size_bytes = int(_get_used_memory(current_process.memory_full_info()))
8584

85+
# Sum memory usage by all children processes, try to exclude shared memory from the sum if allowed by OS.
8686
for child in current_process.children(recursive=True):
8787
# Ignore any NoSuchProcess exception that might occur if a child process ends before we retrieve
8888
# its memory usage.
8989
with suppress(psutil.NoSuchProcess):
90-
current_size_bytes += int(child.memory_info().rss)
90+
current_size_bytes += _get_used_memory(child.memory_full_info())
9191

9292
total_size_bytes = psutil.virtual_memory().total
9393

9494
return MemoryInfo(
9595
total_size=ByteSize(total_size_bytes),
9696
current_size=ByteSize(current_size_bytes),
9797
)
98+
99+
100+
def _get_used_memory(memory_full_info: Any) -> int:
101+
"""Get the most suitable available used memory metric.
102+
103+
`Proportional Set Size (PSS)`, is the amount of own memory and memory shared with other processes, accounted in a
104+
way that the shared amount is divided evenly between the processes that share it. Available on Linux. Suitable for
105+
avoiding overestimation by counting the same shared memory used by children processes multiple times.
106+
107+
`Resident Set Size (RSS)` is the non-swapped physical memory a process has used; it includes shared memory. It
108+
should be available everywhere.
109+
"""
110+
try:
111+
# Linux
112+
return int(memory_full_info.pss)
113+
except AttributeError:
114+
return int(memory_full_info.rss)

tests/unit/_utils/test_system.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
from __future__ import annotations
22

3+
import os
4+
from multiprocessing import Barrier, Process, Value, synchronize
5+
from multiprocessing.shared_memory import SharedMemory
6+
from typing import Callable
7+
8+
import pytest
9+
310
from crawlee._utils.byte_size import ByteSize
411
from crawlee._utils.system import get_cpu_info, get_memory_info
512

@@ -14,3 +21,105 @@ def test_get_memory_info_returns_valid_values() -> None:
1421
def test_get_cpu_info_returns_valid_values() -> None:
1522
cpu_info = get_cpu_info()
1623
assert 0 <= cpu_info.used_ratio <= 1
24+
25+
26+
@pytest.mark.skipif(os.name == 'nt', reason='Improved estimation not available on Windows')
27+
def test_memory_estimation_does_not_overestimate_due_to_shared_memory() -> None:
28+
"""Test that memory usage estimation is not overestimating memory usage by counting shared memory multiple times.
29+
30+
In this test, the parent process is started and its memory usage is measured in situations where it is running
31+
child processes without additional memory, with shared additional memory and with own unshared additional memory.
32+
Child process without additional memory are used to estimate baseline memory usage of any child process.
33+
The following estimation is asserted by the test:
34+
additional_memory_size_estimate_per_shared_memory_child * number_of_sharing_children_processes is approximately
35+
equal to additional_memory_size_estimate_per_unshared_memory_child where the additional shared memory is exactly
36+
the same as the unshared memory.
37+
"""
38+
estimated_memory_expectation = Value('b', False) # noqa: FBT003 # Common usage pattern for multiprocessing.Value
39+
40+
def parent_process() -> None:
41+
extra_memory_size = 1024 * 1024 * 100 # 100 MB
42+
children_count = 4
43+
# Memory calculation is not exact, so allow for some tolerance.
44+
test_tolerance = 0.1
45+
46+
def no_extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None:
47+
ready.wait()
48+
measured.wait()
49+
50+
def extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None:
51+
memory = SharedMemory(size=extra_memory_size, create=True)
52+
memory.buf[:] = bytearray([255 for _ in range(extra_memory_size)])
53+
ready.wait()
54+
measured.wait()
55+
memory.close()
56+
memory.unlink()
57+
58+
def shared_extra_memory_child(
59+
ready: synchronize.Barrier, measured: synchronize.Barrier, memory: SharedMemory
60+
) -> None:
61+
print(memory.buf[-1])
62+
ready.wait()
63+
measured.wait()
64+
65+
def get_additional_memory_estimation_while_running_processes(
66+
*, target: Callable, count: int = 1, use_shared_memory: bool = False
67+
) -> float:
68+
processes = []
69+
ready = Barrier(parties=count + 1)
70+
measured = Barrier(parties=count + 1)
71+
shared_memory: None | SharedMemory = None
72+
memory_before = get_memory_info().current_size
73+
74+
if use_shared_memory:
75+
shared_memory = SharedMemory(size=extra_memory_size, create=True)
76+
shared_memory.buf[:] = bytearray([255 for _ in range(extra_memory_size)])
77+
extra_args = [shared_memory]
78+
else:
79+
extra_args = []
80+
81+
for _ in range(count):
82+
p = Process(target=target, args=[ready, measured, *extra_args])
83+
p.start()
84+
processes.append(p)
85+
86+
ready.wait()
87+
memory_during = get_memory_info().current_size
88+
measured.wait()
89+
90+
for p in processes:
91+
p.join()
92+
93+
if shared_memory:
94+
shared_memory.close()
95+
shared_memory.unlink()
96+
97+
return (memory_during - memory_before).to_mb() / count
98+
99+
additional_memory_simple_child = get_additional_memory_estimation_while_running_processes(
100+
target=no_extra_memory_child, count=children_count
101+
)
102+
additional_memory_extra_memory_child = (
103+
get_additional_memory_estimation_while_running_processes(target=extra_memory_child, count=children_count)
104+
- additional_memory_simple_child
105+
)
106+
additional_memory_shared_extra_memory_child = (
107+
get_additional_memory_estimation_while_running_processes(
108+
target=shared_extra_memory_child, count=children_count, use_shared_memory=True
109+
)
110+
- additional_memory_simple_child
111+
)
112+
113+
estimated_memory_expectation.value = (
114+
abs((additional_memory_shared_extra_memory_child * children_count) - additional_memory_extra_memory_child)
115+
/ additional_memory_extra_memory_child
116+
< test_tolerance
117+
)
118+
119+
process = Process(target=parent_process)
120+
process.start()
121+
process.join()
122+
123+
assert estimated_memory_expectation.value, (
124+
'Estimated memory usage for process with shared memory does not meet the expectation.'
125+
)

0 commit comments

Comments
 (0)