File: threadutil.py

package info (click to toggle)
offlineimap3 0.0~git20210225.1e7ef9e%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,328 kB
  • sloc: python: 7,974; sh: 548; makefile: 81
file content (214 lines) | stat: -rw-r--r-- 7,126 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# Copyright (C) 2002-2016 John Goerzen & contributors
# Thread support module
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program; if not, write to the Free Software
#    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA

from threading import Lock, Thread, BoundedSemaphore
from queue import Queue, Empty
import traceback
from offlineimap.ui import getglobalui

STOP_MONITOR = 'STOP_MONITOR'


# General utilities


def semaphorereset(semaphore, originalstate):
    """Block until `semaphore` gets back to its original state, ie all acquired
    resources have been released."""

    for i in range(originalstate):
        semaphore.acquire()
    # Now release these.
    for i in range(originalstate):
        semaphore.release()


class accountThreads:
    """Store the list of all threads in the software so it can be used to find out
    what's running and what's not."""

    def __init__(self):
        self.lock = Lock()
        self.list = []

    def add(self, thread):
        with self.lock:
            self.list.append(thread)

    def remove(self, thread):
        with self.lock:
            self.list.remove(thread)

    def pop(self):
        with self.lock:
            if len(self.list) < 1:
                return None
            return self.list.pop()

    def wait(self):
        while True:
            thread = self.pop()
            if thread is None:
                break
            thread.join()


######################################################################
# Exit-notify threads
######################################################################

exitedThreads = Queue()


def monitor():
    """An infinite "monitoring" loop watching for finished ExitNotifyThread's.

    This one is supposed to run in the main thread.
    """

    global exitedThreads
    ui = getglobalui()

    while True:
        # Loop forever and call 'callback' for each thread that exited
        try:
            # We need a timeout in the get() call, so that ctrl-c can throw a
            # SIGINT (http://bugs.python.org/issue1360). A timeout with empty
            # Queue will raise `Empty`.
            #
            # ExitNotifyThread add themselves to the exitedThreads queue once
            # they are done (normally or with exception).
            thread = exitedThreads.get(True, 60)
            # Request to abort when callback returns True.

            if thread.exit_exception is not None:
                if isinstance(thread.exit_exception, SystemExit):
                    # Bring a SystemExit into the main thread.
                    # Do not send it back to UI layer right now.
                    # Maybe later send it to ui.terminate?
                    raise SystemExit
                ui.threadException(thread)  # Expected to terminate the program.
                # Should never hit this line.
                raise AssertionError("thread has 'exit_exception' set to"
                                     " '%s' [%s] but this value is unexpected"
                                     " and the ui did not stop the program." %
                                     (repr(thread.exit_exception), type(thread.exit_exception)))

            # Only the monitor thread has this exit message set.
            elif thread.exit_message == STOP_MONITOR:
                break  # Exit the loop here.
            else:
                ui.threadExited(thread)
        except Empty:
            pass


class ExitNotifyThread(Thread):
    """This class is designed to alert a "monitor" to the fact that a
    thread has exited and to provide for the ability for it to find out
    why.  All instances are made daemon threads (setDaemon(True), so we
    bail out when the mainloop dies.

    The thread can set instance variables self.exit_message for a human
    readable reason of the thread exit.

    There is one instance of this class at runtime. The main thread waits for
    the monitor to end."""

    def __init__(self, *args, **kwargs):
        super(ExitNotifyThread, self).__init__(*args, **kwargs)
        # These are all child threads that are supposed to go away when
        # the main thread is killed.
        self.setDaemon(True)
        self.exit_message = None
        self._exit_exc = None
        self._exit_stacktrace = None

    def run(self):
        """Allow profiling of a run and store exceptions."""

        global exitedThreads
        try:
            Thread.run(self)
        except Exception as e:
            # Thread exited with Exception, store it
            tb = traceback.format_exc()
            self.set_exit_exception(e, tb)

        exitedThreads.put(self, True)

    def set_exit_exception(self, exc, st=None):
        """Sets Exception and stacktrace of a thread, so that other
        threads can query its exit status"""

        self._exit_exc = exc
        self._exit_stacktrace = st

    @property
    def exit_exception(self):
        """Returns the cause of the exit, one of:
        Exception() -- the thread aborted with this exception
        None -- normal termination."""

        return self._exit_exc

    @property
    def exit_stacktrace(self):
        """Returns a string representing the stack trace if set"""

        return self._exit_stacktrace


######################################################################
# Instance-limited threads
######################################################################

limitedNamespaces = {}


def initInstanceLimit(limitNamespace, instancemax):
    """Initialize the instance-limited thread implementation.

    Run up to intancemax threads for the given limitNamespace. This allows to
    honor maxsyncaccounts and maxconnections."""

    global limitedNamespaces

    if limitNamespace not in limitedNamespaces:
        limitedNamespaces[limitNamespace] = BoundedSemaphore(instancemax)


class InstanceLimitedThread(ExitNotifyThread):
    def __init__(self, limitNamespace, *args, **kwargs):
        self.limitNamespace = limitNamespace
        super(InstanceLimitedThread, self).__init__(*args, **kwargs)

    def start(self):
        global limitedNamespaces

        # Will block until the semaphore has free slots.
        limitedNamespaces[self.limitNamespace].acquire()
        ExitNotifyThread.start(self)

    def run(self):
        global limitedNamespaces

        try:
            ExitNotifyThread.run(self)
        finally:
            if limitedNamespaces and limitedNamespaces[self.limitNamespace]:
                limitedNamespaces[self.limitNamespace].release()