File: threadlocals.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (195 lines) | stat: -rw-r--r-- 7,775 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import weakref
from rpython.rlib import rthread, rshrinklist
from rpython.rlib.objectmodel import we_are_translated
from rpython.rlib.rarithmetic import r_ulonglong
from pypy.module.thread.error import wrap_thread_error
from pypy.interpreter.executioncontext import ExecutionContext


ExecutionContext._signals_enabled = 0     # default value
ExecutionContext._sentinel_lock = None


class OSThreadLocals:
    """Thread-local storage for OS-level threads.
    For memory management, this version depends on explicit notification when
    a thread finishes.  This works as long as the thread was started by
    os_thread.bootstrap()."""

    def __init__(self, space):
        "NOT_RPYTHON"
        #
        # This object tracks code that enters and leaves threads.
        # There are two APIs.  For Python-level threads, we know when
        # the thread starts and ends, and we call enter_thread() and
        # leave_thread().  In a few other cases, like callbacks, we
        # might be running in some never-seen-before thread: in this
        # case, the callback logic needs to call try_enter_thread() at
        # the start, and if this returns True it needs to call
        # leave_thread() at the end.
        #
        # We implement an optimization for the second case (which only
        # works if we translate with a framework GC and with
        # rweakref).  If try_enter_thread() is called in a
        # never-seen-before thread, it still returns False and
        # remembers the ExecutionContext with 'self._weaklist'.  The
        # next time we call try_enter_thread() again in the same
        # thread, the ExecutionContext is reused.  The optimization is
        # not completely invisible to the user: 'thread._local()'
        # values will remain.  We can argue that it is the correct
        # behavior to do that, and the behavior we get if the
        # optimization is disabled is buggy (but hard to do better
        # then).
        #
        # 'self._valuedict' is a dict mapping the thread idents to
        # ExecutionContexts; it does not list the ExecutionContexts
        # which are in 'self._weaklist'.  (The latter is more precisely
        # a list of AutoFreeECWrapper objects, defined below, which
        # each references the ExecutionContext.)
        #
        self.space = space
        self._valuedict = {}
        self._cleanup_()
        self.raw_thread_local = rthread.ThreadLocalReference(ExecutionContext,
                                                            loop_invariant=True)

    def can_optimize_with_weaklist(self):
        config = self.space.config
        return (config.translation.rweakref and
                rthread.ThreadLocalReference.automatic_keepalive(config))

    def _cleanup_(self):
        self._valuedict.clear()
        self._weaklist = None
        self._mainthreadident = 0

    def enter_thread(self, space):
        "Notification that the current thread is about to start running."
        self._set_ec(space.createexecutioncontext())

    def try_enter_thread(self, space):
        # common case: the thread-local has already got a value
        if self.raw_thread_local.get() is not None:
            return False

        # Else, make and attach a new ExecutionContext
        ec = space.createexecutioncontext()
        if not self.can_optimize_with_weaklist():
            self._set_ec(ec)
            return True

        # If can_optimize_with_weaklist(), then 'rthread' keeps the
        # thread-local values alive until the end of the thread.  Use
        # AutoFreeECWrapper as an object with a __del__; when this
        # __del__ is called, it means the thread was really finished.
        # In this case we don't want leave_thread() to be called
        # explicitly, so we return False.
        if self._weaklist is None:
            self._weaklist = ListECWrappers()
        self._weaklist.append(weakref.ref(AutoFreeECWrapper(ec)))
        self._set_ec(ec, register_in_valuedict=False)
        return False

    def _set_ec(self, ec, register_in_valuedict=True):
        ident = rthread.get_ident()
        if self._mainthreadident == 0 or self._mainthreadident == ident:
            ec._signals_enabled = 1    # the main thread is enabled
            self._mainthreadident = ident
        if register_in_valuedict:
            self._valuedict[ident] = ec
        self.raw_thread_local.set(ec)

    def leave_thread(self, space):
        "Notification that the current thread is about to stop."
        from pypy.module.thread.os_local import thread_is_stopping
        ec = self.get_ec()
        if ec is not None:
            try:
                thread_is_stopping(ec)
            finally:
                self.raw_thread_local.set(None)
                ident = rthread.get_ident()
                try:
                    del self._valuedict[ident]
                except KeyError:
                    pass

    def get_ec(self):
        ec = self.raw_thread_local.get()
        if not we_are_translated():
            assert ec is self._valuedict.get(rthread.get_ident(), None)
        return ec

    def signals_enabled(self):
        ec = self.get_ec()
        return ec is not None and ec._signals_enabled

    def enable_signals(self, space):
        ec = self.get_ec()
        assert ec is not None
        ec._signals_enabled += 1

    def disable_signals(self, space):
        ec = self.get_ec()
        assert ec is not None
        new = ec._signals_enabled - 1
        if new < 0:
            raise wrap_thread_error(space,
                "cannot disable signals in thread not enabled for signals")
        ec._signals_enabled = new

    def getallvalues(self):
        if self._weaklist is None:
            return self._valuedict
        # This logic walks the 'self._weaklist' list and adds the
        # ExecutionContexts to 'result'.  We are careful in case there
        # are two AutoFreeECWrappers in the list which have the same
        # 'ident'; in this case we must keep the most recent one (the
        # older one should be deleted soon).  Moreover, entries in
        # self._valuedict have priority because they are never
        # outdated.
        result = {}
        for h in self._weaklist.items():
            wrapper = h()
            if wrapper is not None and not wrapper.deleted:
                result[wrapper.ident] = wrapper.ec
                # ^^ this possibly overwrites an older ec
        result.update(self._valuedict)
        return result

    def reinit_threads(self, space):
        "Called in the child process after a fork()"
        ident = rthread.get_ident()
        ec = self.get_ec()
        assert ec is not None
        old_sig = ec._signals_enabled
        if ident != self._mainthreadident:
            old_sig += 1
        self._cleanup_()      # clears self._valuedict
        self._mainthreadident = ident
        self._set_ec(ec)
        ec._signals_enabled = old_sig


class AutoFreeECWrapper(object):
    deleted = False

    def __init__(self, ec):
        # this makes a loop between 'self' and 'ec'.  It should not prevent
        # the __del__ method here from being called.
        self.ec = ec
        ec._threadlocals_auto_free = self
        self.ident = rthread.get_ident()

    def __del__(self):
        from pypy.module.thread.os_local import thread_is_stopping
        # this is always called in another thread: the thread
        # referenced by 'self.ec' has finished at that point, and
        # we're just after the GC which finds no more references to
        # 'ec' (and thus to 'self').
        self.deleted = True
        thread_is_stopping(self.ec)

class ListECWrappers(rshrinklist.AbstractShrinkList):
    def must_keep(self, wref):
        return wref() is not None