File: mutex.py

package info (click to toggle)
python-pyqtgraph 0.13.1-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 6,520 kB
  • sloc: python: 52,773; makefile: 115; ansic: 40; sh: 2
file content (113 lines) | stat: -rw-r--r-- 3,442 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import traceback

from ..Qt import QtCore


class Mutex(QtCore.QMutex):
    """
    Subclass of QMutex that provides useful debugging information during
    deadlocks--tracebacks are printed for both the code location that is 
    attempting to lock the mutex as well as the location that has already
    acquired the lock.
    
    Also provides __enter__ and __exit__ methods for use in "with" statements.
    """    
    def __init__(self, *args, **kargs):
        if kargs.get('recursive', False):
            args = (QtCore.QMutex.Recursive,)
        QtCore.QMutex.__init__(self, *args)
        self.l = QtCore.QMutex()  ## for serializing access to self.tb
        self.tb = []
        self.debug = kargs.pop('debug', False) ## True to enable debugging functions

    def tryLock(self, timeout=None, id=None):
        if timeout is None:
            locked = QtCore.QMutex.tryLock(self)
        else:
            locked = QtCore.QMutex.tryLock(self, timeout)

        if self.debug and locked:
            self.l.lock()
            try:
                if id is None:
                    self.tb.append(''.join(traceback.format_stack()[:-1]))
                else:
                    self.tb.append("  " + str(id))
                #print 'trylock', self, len(self.tb)
            finally:
                self.l.unlock()
        return locked
        
    def lock(self, id=None):
        c = 0
        waitTime = 5000  # in ms
        while True:
            if self.tryLock(waitTime, id):
                break
            c += 1
            if self.debug:
                self.l.lock()
                try:
                    print("Waiting for mutex lock (%0.1f sec). Traceback follows:" 
                          % (c*waitTime/1000.))
                    traceback.print_stack()
                    if len(self.tb) > 0:
                        print("Mutex is currently locked from:\n")
                        print(self.tb[-1])
                    else:
                        print("Mutex is currently locked from [???]")
                finally:
                    self.l.unlock()
        #print 'lock', self, len(self.tb)

    def unlock(self):
        QtCore.QMutex.unlock(self)
        if self.debug:
            self.l.lock()
            try:
                #print 'unlock', self, len(self.tb)
                if len(self.tb) > 0:
                    self.tb.pop()
                else:
                    raise Exception("Attempt to unlock mutex before it has been locked")
            finally:
                self.l.unlock()

    def acquire(self, blocking=True):
        """Mimics threading.Lock.acquire() to allow this class as a drop-in replacement.
        """
        return self.tryLock()
        
    def release(self): 
        """Mimics threading.Lock.release() to allow this class as a drop-in replacement.
        """
        self.unlock()

    def depth(self):
        self.l.lock()
        n = len(self.tb)
        self.l.unlock()
        return n

    def traceback(self):
        self.l.lock()
        try:
            ret = self.tb[:]
        finally:
            self.l.unlock()
        return ret

    def __exit__(self, *args):
        self.unlock()

    def __enter__(self):
        self.lock()
        return self


class RecursiveMutex(Mutex):
    """Mimics threading.RLock class.
    """
    def __init__(self, **kwds):
        kwds['recursive'] = True
        Mutex.__init__(self, **kwds)