1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
|
import multiprocessing
import threading
import weakref
from typing import Any, MutableMapping, Optional
try:
from dask.utils import SerializableLock
except ImportError:
# no need to worry about serializing the lock
SerializableLock = threading.Lock
try:
from dask.distributed import Lock as DistributedLock
except ImportError:
DistributedLock = None
# Locks used by multiple backends.
# Neither HDF5 nor the netCDF-C library are thread-safe.
HDF5_LOCK = SerializableLock()
NETCDFC_LOCK = SerializableLock()
_FILE_LOCKS: MutableMapping[Any, threading.Lock] = weakref.WeakValueDictionary()
def _get_threaded_lock(key):
try:
lock = _FILE_LOCKS[key]
except KeyError:
lock = _FILE_LOCKS[key] = threading.Lock()
return lock
def _get_multiprocessing_lock(key):
# TODO: make use of the key -- maybe use locket.py?
# https://github.com/mwilliamson/locket.py
del key # unused
return multiprocessing.Lock()
_LOCK_MAKERS = {
None: _get_threaded_lock,
"threaded": _get_threaded_lock,
"multiprocessing": _get_multiprocessing_lock,
"distributed": DistributedLock,
}
def _get_lock_maker(scheduler=None):
"""Returns an appropriate function for creating resource locks.
Parameters
----------
scheduler : str or None
Dask scheduler being used.
See Also
--------
dask.utils.get_scheduler_lock
"""
return _LOCK_MAKERS[scheduler]
def _get_scheduler(get=None, collection=None) -> Optional[str]:
"""Determine the dask scheduler that is being used.
None is returned if no dask scheduler is active.
See also
--------
dask.base.get_scheduler
"""
try:
# Fix for bug caused by dask installation that doesn't involve the toolz library
# Issue: 4164
import dask
from dask.base import get_scheduler # noqa: F401
actual_get = get_scheduler(get, collection)
except ImportError:
return None
try:
from dask.distributed import Client
if isinstance(actual_get.__self__, Client):
return "distributed"
except (ImportError, AttributeError):
pass
try:
# As of dask=2.6, dask.multiprocessing requires cloudpickle to be installed
# Dependency removed in https://github.com/dask/dask/pull/5511
if actual_get is dask.multiprocessing.get:
return "multiprocessing"
except AttributeError:
pass
return "threaded"
def get_write_lock(key):
"""Get a scheduler appropriate lock for writing to the given resource.
Parameters
----------
key : str
Name of the resource for which to acquire a lock. Typically a filename.
Returns
-------
Lock object that can be used like a threading.Lock object.
"""
scheduler = _get_scheduler()
lock_maker = _get_lock_maker(scheduler)
return lock_maker(key)
def acquire(lock, blocking=True):
"""Acquire a lock, possibly in a non-blocking fashion.
Includes backwards compatibility hacks for old versions of Python, dask
and dask-distributed.
"""
if blocking:
# no arguments needed
return lock.acquire()
elif DistributedLock is not None and isinstance(lock, DistributedLock):
# distributed.Lock doesn't support the blocking argument yet:
# https://github.com/dask/distributed/pull/2412
return lock.acquire(timeout=0)
else:
# "blocking" keyword argument not supported for:
# - threading.Lock on Python 2.
# - dask.SerializableLock with dask v1.0.0 or earlier.
# - multiprocessing.Lock calls the argument "block" instead.
return lock.acquire(blocking)
class CombinedLock:
"""A combination of multiple locks.
Like a locked door, a CombinedLock is locked if any of its constituent
locks are locked.
"""
def __init__(self, locks):
self.locks = tuple(set(locks)) # remove duplicates
def acquire(self, blocking=True):
return all(acquire(lock, blocking=blocking) for lock in self.locks)
def release(self):
for lock in self.locks:
lock.release()
def __enter__(self):
for lock in self.locks:
lock.__enter__()
def __exit__(self, *args):
for lock in self.locks:
lock.__exit__(*args)
def locked(self):
return any(lock.locked for lock in self.locks)
def __repr__(self):
return "CombinedLock(%r)" % list(self.locks)
class DummyLock:
"""DummyLock provides the lock API without any actual locking."""
def acquire(self, blocking=True):
pass
def release(self):
pass
def __enter__(self):
pass
def __exit__(self, *args):
pass
def locked(self):
return False
def combine_locks(locks):
"""Combine a sequence of locks into a single lock."""
all_locks = []
for lock in locks:
if isinstance(lock, CombinedLock):
all_locks.extend(lock.locks)
elif lock is not None:
all_locks.append(lock)
num_locks = len(all_locks)
if num_locks > 1:
return CombinedLock(all_locks)
elif num_locks == 1:
return all_locks[0]
else:
return DummyLock()
def ensure_lock(lock):
"""Ensure that the given object is a lock."""
if lock is None or lock is False:
return DummyLock()
return lock
|