File: algorithm.py

package info (click to toggle)
python-aioredlock 0.7.3-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 184 kB
  • sloc: python: 608; makefile: 2
file content (271 lines) | stat: -rw-r--r-- 9,716 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import asyncio
import contextlib
import logging
import random
import uuid

import attr

from aioredlock.errors import LockError
from aioredlock.lock import Lock
from aioredlock.redis import Redis
from aioredlock.utility import clean_password


@attr.s
class Aioredlock:
    redis_connections = attr.ib(
        default=[{"host": "localhost", "port": 6379}], repr=clean_password
    )
    retry_count = attr.ib(default=3, converter=int)
    retry_delay_min = attr.ib(default=0.1, converter=float)
    retry_delay_max = attr.ib(default=0.3, converter=float)
    internal_lock_timeout = attr.ib(default=10.0, converter=float)

    def __attrs_post_init__(self):
        self.redis = Redis(self.redis_connections)
        self._watchdogs = {}
        self._locks = {}

    @retry_count.validator
    def _validate_retry_count(self, attribute, value):
        """
        Validate if retry_count is greater or equal 1
        """
        if value < 1:
            raise ValueError("Retry count must be greater or equal 1.")

    @internal_lock_timeout.validator
    def _validate_internal_lock_timeout(self, attribute, value):
        """
        Validate if internal_lock_timeout is greater than 0
        """
        if value <= 0:
            raise ValueError("Internal lock_timeout must be greater than 0 seconds.")

    @retry_delay_min.validator
    @retry_delay_max.validator
    def _validate_retry_delay(self, attribute, value):
        """
        Validate if retry_delay_min and retry_delay_max is greater than 0
        """
        if value <= 0:
            raise ValueError("Retry delay must be greater than 0 seconds.")

    @property
    def log(self):
        return logging.getLogger(__name__)

    async def _set_lock(self, resource, lock_identifier, lease_time):

        error = RuntimeError('Retry count less then one')

        # Proportional drift time to the length of the lock
        # See https://redis.io/topics/distlock#is-the-algorithm-asynchronous for more info
        drift = lease_time * 0.01 + 0.002

        try:
            # global try/except to catch CancelledError
            for n in range(self.retry_count):
                self.log.debug('Acquiring lock "%s" try %d/%d',
                               resource, n + 1, self.retry_count)
                if n != 0:
                    delay = random.uniform(self.retry_delay_min,
                                           self.retry_delay_max)
                    await asyncio.sleep(delay)
                try:
                    elapsed_time = await self.redis.set_lock(resource, lock_identifier, lease_time)
                except LockError as exc:
                    error = exc
                    continue

                if lease_time - elapsed_time - drift <= 0:
                    error = LockError('Lock timeout')
                    self.log.debug('Timeout in acquiring the lock "%s"',
                                   resource)
                    continue

                error = None
                break
            else:
                # break never reached
                raise error

        except (Exception, asyncio.CancelledError):
            # cleanup in case of fault or cancellation will run in background
            async def cleanup():
                self.log.debug('Cleaning up lock "%s"', resource)
                with contextlib.suppress(LockError):
                    await self.redis.unset_lock(resource, lock_identifier)

            asyncio.ensure_future(cleanup())

            raise

    async def _auto_extend(self, lock):
        """
        Tries to reset the lock's lifetime to lock_timeout every 0.6*lock_timeout automatically
        In case of fault the LockError exception will be raised
        :param lock: :class:`aioredlock.Lock`
        :raises: LockError in case of fault
        """

        await asyncio.sleep(0.6 * self.internal_lock_timeout)
        try:
            await self.extend(lock)
        except Exception:
            self.log.debug('Error in extending the lock "%s"',
                           lock.resource)

        self._watchdogs[lock.resource] = asyncio.ensure_future(self._auto_extend(lock))

    async def lock(self, resource, lock_timeout=None, lock_identifier=None):
        """
        Tries to acquire the lock.
        If the lock is correctly acquired, the valid property of
        the returned lock is True.
        In case of fault the LockError exception will be raised

        :param resource str: The string identifier of the resource to lock
        :param lock_timeout int: Lock's lifetime
        :param lock_identifier str: identifier for the instance of the lock
        :return: :class:`aioredlock.Lock`
        :raises: LockError in case of fault
        """
        lock_identifier = lock_identifier or str(uuid.uuid4())

        if lock_timeout is not None and lock_timeout <= 0:
            raise ValueError("Lock timeout must be greater than 0 seconds.")

        lease_time = lock_timeout or self.internal_lock_timeout

        await self._set_lock(resource, lock_identifier, lease_time)

        lock = Lock(self, resource, lock_identifier, lock_timeout, valid=True)
        if lock_timeout is None:
            self._watchdogs[lock.resource] = asyncio.ensure_future(self._auto_extend(lock))
        self._locks[resource] = lock

        return lock

    async def extend(self, lock, lock_timeout=None):
        """
        Tries to reset the lock's lifetime to lock_timeout
        In case of fault the LockError exception will be raised

        :param lock: :class:`aioredlock.Lock`
        :param lock_timeout: extend lock's life time to lock_timeout
        :raises: RuntimeError if lock is not valid
        :raises: LockError in case of fault
        """
        self.log.debug('Extending lock "%s"', lock.resource)

        if not lock.valid:
            raise RuntimeError('Lock is not valid')
        if lock_timeout is not None and lock_timeout <= 0:
            raise ValueError("Lock timeout must be greater than 0 seconds.")

        new_lease_time = lock_timeout or lock.lock_timeout or self.internal_lock_timeout

        try:
            await self._set_lock(lock.resource, lock.id, new_lease_time)
        except Exception:
            with contextlib.suppress(LockError):
                await self.unlock(lock)
            raise

    async def unlock(self, lock):
        """
        Release the lock and sets it's validity to False if
        lock successfully released.
        In case of fault the LockError exception will be raised
        :param lock: :class:`aioredlock.Lock`
        :raises: LockError in case of fault
        """
        self.log.debug('Releasing lock "%s"', lock.resource)

        lock.valid = False

        if lock.resource in self._watchdogs:
            self._watchdogs[lock.resource].cancel()

            done, _ = await asyncio.wait([self._watchdogs[lock.resource]])
            for fut in done:
                try:
                    await fut
                except asyncio.CancelledError:
                    pass
                except Exception:
                    self.log.exception('Can not unlock "%s"', lock.resource)

            self._watchdogs.pop(lock.resource)

        await self.redis.unset_lock(lock.resource, lock.id)
        # raises LockError if can not unlock
        self._locks.pop(lock.resource, None)

    async def is_locked(self, resource_or_lock):
        """
        Checks if the resource or the lock is locked by any redlock instance.

        :param resource_or_lock: resource name or aioredlock.Lock instance
        :returns: True if locked else False
        """

        if isinstance(resource_or_lock, Lock):
            resource = resource_or_lock.resource
        elif isinstance(resource_or_lock, str):
            resource = resource_or_lock
        else:
            raise TypeError(
                'Argument should be ether aioredlock.Lock instance or string, '
                '%s is given.', type(resource_or_lock)
            )

        return await self.redis.is_locked(resource)

    async def destroy(self):
        """
        cancel all _watchdogs, unlock all locks and Clear all the redis connections
        """
        self.log.debug('Destroying %s', repr(self))

        for resource, lock in self._locks.copy().items():
            if lock.valid:
                try:
                    await self.unlock(lock)
                except Exception:
                    self.log.exception('Can not unlock "%s"', resource)

        self._locks.clear()
        self._watchdogs.clear()

        await self.redis.clear_connections()

    async def get_active_locks(self):
        """
        Return all stored locks that are valid.

        .. note::
            This function is only really useful in learning if there are no
            active locks. It is possible that by the time the a lock is
            returned from this function that it is no longer active.
        """
        ret = []
        for lock in self._locks.values():
            if lock.valid is True and await lock.is_locked():
                ret.append(lock)
        return ret

    async def get_lock(self, resource, lock_identifier):
        """
        recreate a aioredlock.Lock from the goven params and the ttl from redis.
        so checks if the lock is valid somehow too...

        :param resource: The string identifier of the resource to lock
        :param lock_identifier: The identifier of the lock
        :return: a new `aioredlock.Lock`.
        """
        ttl = await self.redis.get_lock_ttl(resource, lock_identifier)
        lock = Lock(self, resource, lock_identifier, ttl, valid=True)
        return lock