1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
import logging
import threading
log = logging.getLogger(__name__)
class LockError(Exception):
pass
class ReadWriteMutex(object):
"""A mutex which allows multiple readers, single writer.
:class:`.ReadWriteMutex` uses a Python ``threading.Condition``
to provide this functionality across threads within a process.
The Beaker package also contained a file-lock based version
of this concept, so that readers/writers could be synchronized
across processes with a common filesystem. A future Dogpile
release may include this additional class at some point.
"""
def __init__(self):
# counts how many asynchronous methods are executing
self.async_ = 0
# pointer to thread that is the current sync operation
self.current_sync_operation = None
# condition object to lock on
self.condition = threading.Condition(threading.Lock())
def acquire_read_lock(self, wait=True):
"""Acquire the 'read' lock."""
self.condition.acquire()
try:
# see if a synchronous operation is waiting to start
# or is already running, in which case we wait (or just
# give up and return)
if wait:
while self.current_sync_operation is not None:
self.condition.wait()
else:
if self.current_sync_operation is not None:
return False
self.async_ += 1
log.debug("%s acquired read lock", self)
finally:
self.condition.release()
if not wait:
return True
def release_read_lock(self):
"""Release the 'read' lock."""
self.condition.acquire()
try:
self.async_ -= 1
# check if we are the last asynchronous reader thread
# out the door.
if self.async_ == 0:
# yes. so if a sync operation is waiting, notify_all to wake
# it up
if self.current_sync_operation is not None:
self.condition.notify_all()
elif self.async_ < 0:
raise LockError(
"Synchronizer error - too many "
"release_read_locks called"
)
log.debug("%s released read lock", self)
finally:
self.condition.release()
def acquire_write_lock(self, wait=True):
"""Acquire the 'write' lock."""
self.condition.acquire()
try:
# here, we are not a synchronous reader, and after returning,
# assuming waiting or immediate availability, we will be.
if wait:
# if another sync is working, wait
while self.current_sync_operation is not None:
self.condition.wait()
else:
# if another sync is working,
# we dont want to wait, so forget it
if self.current_sync_operation is not None:
return False
# establish ourselves as the current sync
# this indicates to other read/write operations
# that they should wait until this is None again
self.current_sync_operation = threading.current_thread()
# now wait again for asyncs to finish
if self.async_ > 0:
if wait:
# wait
self.condition.wait()
else:
# we dont want to wait, so forget it
self.current_sync_operation = None
return False
log.debug("%s acquired write lock", self)
finally:
self.condition.release()
if not wait:
return True
def release_write_lock(self):
"""Release the 'write' lock."""
self.condition.acquire()
try:
if self.current_sync_operation is not threading.current_thread():
raise LockError(
"Synchronizer error - current thread doesn't "
"have the write lock"
)
# reset the current sync operation so
# another can get it
self.current_sync_operation = None
# tell everyone to get ready
self.condition.notify_all()
log.debug("%s released write lock", self)
finally:
# everyone go !!
self.condition.release()
|