1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
import multiprocessing
import sys
from multiprocessing import Pool
from pathlib import Path
import pytest
from memray import AllocatorType
from memray import FileReader
from memray import Tracker
from memray._test import MemoryAllocator
from memray._test import PymallocDomain
from memray._test import PymallocMemoryAllocator
from tests.utils import filter_relevant_allocations
from tests.utils import filter_relevant_pymalloc_allocations
@pytest.fixture(scope="module", autouse=True)
def set_multiprocessing_to_fork():
current_method = multiprocessing.get_start_method()
multiprocessing.set_start_method("fork", force=True)
yield
multiprocessing.set_start_method(current_method, force=True)
def multiproc_func(repetitions): # pragma: no cover
allocator = MemoryAllocator()
for _ in range(repetitions):
allocator.valloc(1234)
allocator.free()
def pymalloc_multiproc_func(): # pragma: no cover
allocator = PymallocMemoryAllocator(PymallocDomain.PYMALLOC_RAW)
allocator.calloc(1234)
allocator.free()
@pytest.mark.no_cover
def test_allocations_with_multiprocessing(tmpdir):
# GIVEN
output = Path(tmpdir) / "test.bin"
allocator = MemoryAllocator()
# WHEN
with Tracker(output):
with Pool(3) as p:
p.map(multiproc_func, [1, 10, 100, 1000, 2000, 3000, 4000, 5000])
allocator.valloc(1234)
allocator.free()
relevant_records = list(
filter_relevant_allocations(FileReader(output).get_allocation_records())
)
assert len(relevant_records) == 2
vallocs = [
record
for record in relevant_records
if record.allocator == AllocatorType.VALLOC
]
assert len(vallocs) == 1
(valloc,) = vallocs
assert valloc.size == 1234
frees = [
record for record in relevant_records if record.allocator == AllocatorType.FREE
]
assert len(frees) == 1
# No files created by child processes
child_files = Path(tmpdir).glob("test.bin.*")
assert list(child_files) == []
@pytest.mark.no_cover
def test_allocations_with_multiprocessing_following_fork(tmpdir):
# GIVEN
output = Path(tmpdir) / "test.bin"
allocator = MemoryAllocator()
# WHEN
with Tracker(output, follow_fork=True):
with Pool(3) as p:
p.map(multiproc_func, [1, 10, 100, 1000, 2000, 3000, 4000, 5000])
allocator.valloc(1234)
allocator.free()
# THEN
relevant_records = list(
filter_relevant_allocations(FileReader(output).get_allocation_records())
)
assert len(relevant_records) == 2
vallocs = [
record
for record in relevant_records
if record.allocator == AllocatorType.VALLOC
]
assert len(vallocs) == 1
(valloc,) = vallocs
assert valloc.size == 1234
frees = [
record for record in relevant_records if record.allocator == AllocatorType.FREE
]
assert len(frees) == 1
child_files = Path(tmpdir).glob("test.bin.*")
child_records = []
for child_file in child_files:
child_records.extend(
filter_relevant_allocations(FileReader(child_file).get_allocation_records())
)
child_vallocs = [
record for record in child_records if record.allocator == AllocatorType.VALLOC
]
child_frees = [
record for record in child_records if record.allocator == AllocatorType.FREE
]
num_expected = 5000 + 4000 + 3000 + 2000 + 1000 + 100 + 10 + 1
assert len(child_vallocs) == num_expected
assert len(child_frees) == num_expected
for valloc in child_vallocs:
assert valloc.size == 1234
@pytest.mark.no_cover
def test_pymalloc_allocations_after_fork(tmpdir):
# GIVEN
output = Path(tmpdir) / "test.bin"
# WHEN
with Tracker(output, follow_fork=True, trace_python_allocators=True):
with Pool(3) as p:
p.starmap(pymalloc_multiproc_func, [()] * 10)
# THEN
child_files = Path(tmpdir).glob("test.bin.*")
child_records = []
for child_file in child_files:
child_records.extend(
filter_relevant_pymalloc_allocations(
FileReader(child_file).get_allocation_records(), size=1234
)
)
print(child_records)
child_callocs = [
record
for record in child_records
if record.allocator == AllocatorType.PYMALLOC_CALLOC and record.size == 1234
]
num_expected = 10
assert len(child_callocs) == num_expected
@pytest.mark.no_cover
def test_stack_cleanup_after_fork(tmpdir):
"""Test that we don't crash miserably when we try to write pending Python
frames when the profile function is deactivated if the tracker has been
destroyed after a fork without `follow_fork=True`"""
# GIVEN
output = Path(tmpdir) / "test.bin"
def foo():
with Pool() as pool:
result = pool.map_async(sys.setprofile, [None])
return result.get(timeout=1)
# WHEN
with Tracker(output, follow_fork=False):
result = foo()
# THEN
assert result == [None]
|