File: test_threads.py

package info (click to toggle)
python-memray 1.17.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 24,396 kB
  • sloc: python: 28,451; ansic: 16,507; sh: 10,586; cpp: 8,494; javascript: 1,474; makefile: 822; awk: 12
file content (146 lines) | stat: -rw-r--r-- 3,868 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import threading
from pathlib import Path

from memray import AllocatorType
from memray import FileReader
from memray import Tracker
from memray._test import MemoryAllocator
from memray._test import set_thread_name
from tests.utils import filter_relevant_allocations
from tests.utils import skip_if_macos

HERE = Path(__file__).parent
TEST_MULTITHREADED_EXTENSION = HERE / "multithreaded_extension"


def allocating_function(allocator, flag_event, wait_event):
    allocator.valloc(1234)
    allocator.free()
    flag_event.set()
    wait_event.wait()
    allocator.valloc(1234)
    allocator.free()


def test_thread_allocations_after_tracker_is_deactivated(tmpdir):
    # GIVEN
    output = Path(tmpdir) / "test.bin"
    wait_event = threading.Event()
    flag_event = threading.Event()
    allocator = MemoryAllocator()

    # WHEN
    with Tracker(output):
        t = threading.Thread(
            target=allocating_function, args=(allocator, flag_event, wait_event)
        )
        t.start()
        flag_event.wait()

    # Keep allocating in the same thread while the tracker is not active
    wait_event.set()
    t.join()

    # THEN
    relevant_records = list(
        filter_relevant_allocations(FileReader(output).get_allocation_records())
    )
    assert len(relevant_records) == 2

    vallocs = [
        record
        for record in relevant_records
        if record.allocator == AllocatorType.VALLOC
    ]
    assert len(vallocs) == 1
    (valloc,) = vallocs
    assert valloc.size == 1234

    frees = [
        record for record in relevant_records if record.allocator == AllocatorType.FREE
    ]
    assert len(frees) == 1


@skip_if_macos
def test_thread_name(tmpdir):
    # GIVEN
    output = Path(tmpdir) / "test.bin"
    allocator = MemoryAllocator()

    def allocating_function():
        set_thread_name("my thread name")
        allocator.valloc(1234)
        allocator.free()

    # WHEN
    with Tracker(output):
        t = threading.Thread(target=allocating_function)
        t.start()
        t.join()

    # THEN
    relevant_records = list(
        filter_relevant_allocations(FileReader(output).get_allocation_records())
    )
    assert len(relevant_records) == 2

    vallocs = [
        record
        for record in relevant_records
        if record.allocator == AllocatorType.VALLOC
    ]
    assert len(vallocs) == 1
    (valloc,) = vallocs
    assert valloc.size == 1234
    assert "my thread name" == valloc.thread_name


def test_setting_python_thread_name(tmpdir):
    # GIVEN
    output = Path(tmpdir) / "test.bin"
    allocator = MemoryAllocator()
    name_set_inside_thread = threading.Event()
    name_set_outside_thread = threading.Event()
    prctl_rc = -1

    def allocating_function():
        allocator.valloc(1234)
        allocator.free()

        threading.current_thread().name = "set inside thread"
        allocator.valloc(1234)
        allocator.free()

        name_set_inside_thread.set()
        name_set_outside_thread.wait()
        allocator.valloc(1234)
        allocator.free()

        nonlocal prctl_rc
        prctl_rc = set_thread_name("set by prctl")
        allocator.valloc(1234)
        allocator.free()

    # WHEN
    with Tracker(output):
        t = threading.Thread(target=allocating_function, name="set before start")
        t.start()
        name_set_inside_thread.wait()
        t.name = "set outside running thread"
        name_set_outside_thread.set()
        t.join()

    # THEN
    expected_names = [
        "set before start",
        "set inside thread",
        "set outside running thread",
        "set by prctl" if prctl_rc == 0 else "set outside running thread",
    ]
    names = [
        rec.thread_name
        for rec in FileReader(output).get_allocation_records()
        if rec.allocator == AllocatorType.VALLOC
    ]
    assert names == expected_names