File: consul.py

package info (click to toggle)
python-tooz 7.0.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 932 kB
  • sloc: python: 5,586; sh: 158; makefile: 27
file content (507 lines) | stat: -rw-r--r-- 20,467 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
#
# Copyright © 2015 Yahoo! Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import contextlib
import functools

import consul
from oslo_utils import encodeutils
import requests

import tooz
from tooz import _retry
from tooz import coordination
from tooz import locking
from tooz import utils


@contextlib.contextmanager
def _failure_translator():

    """Translates common consul exceptions into tooz exceptions."""
    try:
        yield
    except (consul.Timeout, requests.exceptions.RequestException) as e:
        utils.raise_with_cause(coordination.ToozConnectionError,
                               str(e), cause=e)
    except (consul.ConsulException, ValueError) as e:
        # ValueError = Typically json decoding failed for some reason.
        utils.raise_with_cause(tooz.ToozError, str(e), cause=e)


def _translate_failures(func):

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        with _failure_translator():
            return func(*args, **kwargs)

    return wrapper


class ConsulLock(locking.Lock):
    def __init__(self, name, node, address, session_id, client, token=None):
        super().__init__(name)
        self._name = name
        self._node = node
        self._address = address
        self._acl_token = token
        self._session_id = session_id
        self._client = client
        self.acquired = False

    def acquire(self, blocking=True, shared=False, timeout=None):
        if shared:
            raise tooz.NotImplemented
        if timeout:
            raise tooz.NotImplemented

        @_retry.retry(stop_max_delay=blocking)
        @_translate_failures
        def _acquire():
            # Check if we are the owner and if we are simulate
            # blocking (because consul will not block a second
            # acquisition attempt by the same owner).
            _index, value = self._client.kv.get(key=self._name,
                                                token=self._acl_token)
            if value and value.get('Session') == self._session_id:
                if blocking is False:
                    return False
                else:
                    raise _retry.TryAgain
            else:
                # The value can be anything.
                gotten = self._client.kv.put(key=self._name,
                                             value="I got it!",
                                             acquire=self._session_id,
                                             token=self._acl_token)
                if gotten:
                    self.acquired = True
                    return True
                if blocking is False:
                    return False
                else:
                    raise _retry.TryAgain

        return _acquire()

    def release(self):
        if not self.acquired:
            return False
        # Get the lock to verify the session ID's are same
        _index, contents = self._client.kv.get(key=self._name,
                                               token=self._acl_token)
        if not contents:
            return False
        owner = contents.get('Session')
        if owner == self._session_id:
            removed = self._client.kv.put(key=self._name,
                                          value=self._session_id,
                                          release=self._session_id,
                                          token=self._acl_token)
            if removed:
                self.acquired = False
                return True
        return False


class ConsulDriver(coordination.CoordinationDriverCachedRunWatchers,
                   coordination.CoordinationDriverWithExecutor):
    """This driver uses `python-consul`_ client against `consul`_ servers.

    The ConsulDriver implements the coordination driver API(s) so that Consul
    can be used as an option for Distributed Locking and Group Membership. The
    data is stored in Consul's key-value store.

    The Consul driver connection URI should look like::

      consul://HOST[:PORT][?OPTION1=VALUE1[&OPTION2=VALUE2[&...]]]

    If not specified, PORT defaults to 8500.
    Available options are:

    ==================  =======
    Name                Default
    ==================  =======
    ttl                 15
    namespace           tooz
    acl_token           None
    ==================  =======

    For details on the available options, refer to
    http://python-consul2.readthedocs.org/en/latest/.

    The following Key/Value paths are utilized in Consul to implement the
    coordination APIs:
    +-------------------------------------------+--------------+--------------+
    | Key                                       | Value        | Description  |
    +===========================================+==============+==============+
    | <namespace>/groups/<group_id>             | None         | Group of     |
    |                                           |              | members.     |
    +-------------------------------------------+--------------+--------------+
    | <namespace>/groups/<group_id>/<member_id> | Member       | Member in a  |
    |                                           | capabilities | group.       |
    |                                           | encoded as   |              |
    |                                           | msgpack      |              |
    +-------------------------------------------+--------------+--------------+
    | <namespace>/group_locks/<group_id>        | Consul       | Lock for     |
    |                                           | session ID   | group        |
    |                                           |              | membership   |
    +-------------------------------------------+--------------+--------------+
    | <namespace>/locks/<name>                  | Consul       | Each key is  |
    |                                           | session ID   | a distributed|
    |                                           |              | lock.        |
    +-------------------------------------------+--------------+--------------+

    NOTE: A group in tooz is NOT the same as a Consul service, so tooz groups
    are implemented using Consul Key/Values and not with native services.
    Groups in tooz do not have host:port details or health checks that report
    to consul to verify that the service is still alive and listening on that
    host:port. If you need to use native Consul services, configure the Consul
    agent directly (not via tooz).

    .. _python-consul: http://python-consul2.readthedocs.org/
    .. _consul: https://consul.io/
    """

    #: Default namespace when none is provided
    TOOZ_NAMESPACE = "tooz"

    #: Default TTL
    DEFAULT_TTL = 15

    #: Default consul port if not provided.
    DEFAULT_PORT = 8500

    #: Consul ACL Token if not provided
    ACL_TOKEN = None

    CHARACTERISTICS = (
        # client liveness is based on more than just timeouts
        coordination.Characteristics.NON_TIMEOUT_BASED,
        # multiple service instances (group members) could register
        # with different ports per thread/proc
        # but the agent is always on locahost: so not DISTRIBUTED_ACROSS_HOSTS
        coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS,
        coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES,
        # https://www.consul.io/docs/internals/consensus#consistency-modes
        # The consul consistency mode determines the history characteristics.
        # Writes *always* go through a single leader process into raft log.
        # Reads *may* use all servers depending on the request's mode:
        #   - 'consistent' = LINEARIZABLE
        #   - 'default' = SEQUENTIAL
        #   - 'stale' = CAUSAL
        coordination.Characteristics.SEQUENTIAL,  # 'default' consistency mode
        # https://www.consul.io/docs/internals/consensus
        # raft log of servers is snapshotted + compacted
        coordination.Characteristics.SERIALIZABLE,
    )
    """
    Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
    enum member(s) that can be used to interogate how this driver works.
    """

    def __init__(self, member_id, parsed_url, options):
        super().__init__(member_id, parsed_url, options)
        options = utils.collapse(options)
        self._host = parsed_url.hostname
        self._port = parsed_url.port or self.DEFAULT_PORT
        self._session_id = None
        self._session_name = encodeutils.safe_decode(member_id)
        self._ttl = int(options.get('ttl', self.DEFAULT_TTL))
        namespace = options.get('namespace', self.TOOZ_NAMESPACE)
        self._namespace = encodeutils.safe_decode(namespace)
        self._acl_token = options.get('acl_token', self.ACL_TOKEN)
        # the empty group name adds a trailing / needed for lookups
        self._groups_prefix = self._paths_join(self._namespace, "groups", "")
        self._client = None

    def _start(self):
        """Create a client, register a node and create a session."""
        # Create a consul client
        if self._client is None:
            self._client = consul.Consul(host=self._host, port=self._port,
                                         token=self._acl_token)

        local_agent = self._client.agent.self()
        self._node = local_agent['Member']['Name']
        self._address = local_agent['Member']['Addr']
        # implicitly uses the agent's datacenter (set in consul agent config)

        # Register a Node
        self._client.catalog.register(node=self._node,
                                      address=self._address,
                                      token=self._acl_token)

        # Create a session
        self._session_id = self._client.session.create(
                                        name=self._session_name,
                                        node=self._node,
                                        ttl=self._ttl,
                                        token=self._acl_token)

    def _stop(self):
        if self._client is not None:
            if self._session_id is not None:
                self._client.session.destroy(self._session_id,
                                             token=self._acl_token)
                self._session_id = None
            self._client = None

    def heartbeat(self):
        # THIS IS A REQUIREMENT FOR CONSUL TO WORK PROPERLY.
        # Consul maintains a "session" token that is used to as the basis
        # for all operations in the service. The session must be refreshed
        # periodically or else it assumes that the agent maintaining the
        # session has died or is unreachable. When a session expires all locks
        # are released and any services that were registered with that session
        # are marked as no longer active.
        self._client.session.renew(self._session_id, token=self._acl_token)
        # renew the session every half-TTL or 1 second, whatever is larger
        sleep_sec = max(self._ttl / 2, 1)
        return sleep_sec

    def _get_lock(self, real_name):
        return ConsulLock(real_name, self._node, self._address,
                          session_id=self._session_id,
                          client=self._client, token=self._acl_token)

    def get_lock(self, name):
        real_name = self._path_lock(name)
        return self._get_lock(real_name)

    def _get_group_lock(self, group_id):
        real_name = self._path_group_lock(group_id)
        return self._get_lock(real_name)

    @staticmethod
    def _paths_join(*args):
        pieces = []
        for arg in args:
            pieces.append(encodeutils.safe_decode(arg))
        return "/".join(pieces)

    def _path_lock(self, name):
        return self._paths_join(self._namespace, "locks", name)

    def _path_group_lock(self, name):
        return self._paths_join(self._namespace, "group_locks", name)

    def _path_group(self, group_id):
        # add an extra '/' at the end so that searches with this path
        # will go down and find their children
        return self._paths_join(self._namespace, "groups", group_id) + "/"

    def _path_member(self, group_id, member_id):
        return self._paths_join(
            self._namespace, "groups", group_id, member_id
        )

    def _group_path_to_id(self, base_path, group_path):
        """Translates a path into a group name.

        The group name is the last part of the path. So, we simply split on
        the path separator '/' and return the last element.

        Example:
           group_id = self._path_to_group_id("tooz/groups/helloworld")
           print(group_id) # "helloworld"
        """
        if group_path.startswith(base_path):
            group_id = group_path[len(base_path):]
        else:
            group_id = group_path
        # if a group has members (sub-keys) it will contain a trailing /
        # we need to strip this to get just the name
        # if a group has no members there is no trailing / (for some reason)
        group_id = group_id.strip("/")
        return utils.to_binary(group_id)

    def _get_group_members(self, group_path):
        index, data = self._client.kv.get(group_path, recurse=True)
        group = None
        members = []
        for kv in (data or []):
            if kv["Key"] == group_path:
                group = kv
            else:
                members.append(kv)
        return (group, members)

    def get_groups(self):

        @_translate_failures
        def _get_groups():
            groups = []
            index, data = self._client.kv.get(self._groups_prefix, keys=True,
                                              separator="/")
            for key in (data or []):
                if key != self._groups_prefix:
                    group_id = self._group_path_to_id(self._groups_prefix, key)
                    groups.append(group_id)
            return groups

        return ConsulFutureResult(self._executor.submit(_get_groups))

    def create_group(self, group_id):

        @_translate_failures
        def _create_group():
            group_path = self._path_group(group_id)
            # create with Check-And-Set index 0 will only succeed if the key
            # doesn't exit
            result = self._client.kv.put(group_path, "", cas=0)
            if not result:
                raise coordination.GroupAlreadyExist(group_id)
            return result

        return ConsulFutureResult(self._executor.submit(_create_group))

    def _destroy_group(self, group_id):
        """Should only be used in tests..."""
        with self._get_group_lock(group_id) as lock:
            group_path = self._path_group(group_id)
            self._client.kv.delete(group_path, recurse=True)
            self._client.kv.delete(lock._name)

    def delete_group(self, group_id):

        @_translate_failures
        def _delete_group():
            # create a lock for the group so that other operations on this
            # group do not conflict while the group is being deleted
            with self._get_group_lock(group_id) as lock:
                group_path = self._path_group(group_id)
                group, members = self._get_group_members(group_path)
                if not group:
                    raise coordination.GroupNotCreated(group_id)

                if members:
                    raise coordination.GroupNotEmpty(group_id)

                # delete the group recursively
                result = self._client.kv.delete(group_path, recurse=True)

                # delete the lock for the group
                self._client.kv.delete(lock._name)
                return result

        return ConsulFutureResult(self._executor.submit(_delete_group))

    def join_group(self, group_id, capabilities=b""):

        @_translate_failures
        def _join_group():
            # lock the group so that it doesn't get deleted while we join
            with self._get_group_lock(group_id):
                group_path = self._path_group(group_id)
                member_path = self._path_member(group_id, self._member_id)
                group, members = self._get_group_members(group_path)
                if not group:
                    raise coordination.GroupNotCreated(group_id)

                for m in members:
                    if m["Key"] == member_path:
                        raise coordination.MemberAlreadyExist(group_id,
                                                              self._member_id)

                # create with Check-And-Set index 0 will only succeed if the
                # key doesn't exit
                self._client.kv.put(member_path, utils.dumps(capabilities),
                                    cas=0)
                self._joined_groups.add(group_id)

        return ConsulFutureResult(self._executor.submit(_join_group))

    def leave_group(self, group_id):

        @_translate_failures
        def _leave_group():
            # NOTE: We do NOT have to lock the group here because deletes in
            #       Consul are atomic and succeed even if the key doesn't exist
            #       This means that there is no race condition between checking
            #       if the member exists and then deleting it.
            group_path = self._path_group(group_id)
            member_path = self._path_member(group_id, self._member_id)
            group, members = self._get_group_members(group_path)
            member = None
            for m in members:
                if m["Key"] == member_path:
                    member = m
                    break
            else:
                raise coordination.MemberNotJoined(group_id, self._member_id)

            # delete the member key with Check-And-Set semantics based on index
            # we read above
            self._client.kv.delete(member_path, cas=member["ModifyIndex"])
            self._joined_groups.discard(group_id)

        return ConsulFutureResult(self._executor.submit(_leave_group))

    def get_members(self, group_id):

        @_translate_failures
        def _get_members():
            group_path = self._path_group(group_id)
            group, members = self._get_group_members(group_path)
            if not group:
                raise coordination.GroupNotCreated(group_id)

            result = set()
            for m in members:
                member_id = self._group_path_to_id(group_path, m["Key"])
                result.add(member_id)
            return result

        return ConsulFutureResult(self._executor.submit(_get_members))

    def get_member_capabilities(self, group_id, member_id):

        @_translate_failures
        def _get_member_capabilities():
            member_path = self._path_member(group_id, member_id)
            index, data = self._client.kv.get(member_path)
            if not data:
                raise coordination.MemberNotJoined(group_id, member_id)
            return utils.loads(data["Value"])

        return ConsulFutureResult(
            self._executor.submit(_get_member_capabilities))

    def update_capabilities(self, group_id, capabilities):

        @_translate_failures
        def _update_capabilities():
            member_path = self._path_member(group_id, self._member_id)
            index, data = self._client.kv.get(member_path)
            if not data:
                raise coordination.MemberNotJoined(group_id, self._member_id)
            # no need to Check-And-Set here, latest write wins
            self._client.kv.put(member_path, utils.dumps(capabilities))

        return ConsulFutureResult(self._executor.submit(_update_capabilities))

    @staticmethod
    def watch_elected_as_leader(group_id, callback):
        raise tooz.NotImplemented

    @staticmethod
    def unwatch_elected_as_leader(group_id, callback):
        raise tooz.NotImplemented


ConsulFutureResult = functools.partial(coordination.CoordinatorResult,
                                       failure_translator=_failure_translator)