File: memcached.py

package info (click to toggle)
python-elasticsearch 1.4.0-2~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 636 kB
  • sloc: python: 3,209; makefile: 155
file content (83 lines) | stat: -rw-r--r-- 2,878 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import time
try:
    import simplejson as json
except ImportError:
    import json

from ..exceptions import TransportError, ConnectionError, ImproperlyConfigured
from ..compat import urlencode
from .pooling import PoolingConnection

class MemcachedConnection(PoolingConnection):
    """
    Client using the `pylibmc` python library to communicate with elasticsearch
    using the memcached protocol. Requires plugin in the cluster.

    See https://github.com/elasticsearch/elasticsearch-transport-memcached for more details.
    """
    transport_schema = 'memcached'

    method_map = {
        'PUT': 'set',
        'POST': 'set',
        'DELETE': 'delete',
        'HEAD': 'get',
        'GET': 'get',
    }

    def __init__(self, host='localhost', port=11211, **kwargs):
        try:
            import pylibmc
        except ImportError:
            raise ImproperlyConfigured("You need to install pylibmc to use the MemcachedConnection class.")
        super(MemcachedConnection, self).__init__(host=host, port=port, **kwargs)
        self._make_connection = lambda: pylibmc.Client(['%s:%s' % (host, port)], behaviors={"tcp_nodelay": True})

    def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
        mc = self._get_connection()
        url = self.url_prefix + url
        if params:
            url = '%s?%s' % (url, urlencode(params or {}))
        full_url = self.host + url

        mc_method = self.method_map.get(method, 'get')

        start = time.time()
        try:
            status = 200
            if mc_method == 'set':
                # no response from set commands
                response = ''
                if not json.dumps(mc.set(url, body)):
                    status = 500
            else:
                response = mc.get(url)

            duration = time.time() - start
            if response:
                response = response.decode('utf-8')
        except Exception as e:
            self.log_request_fail(method, full_url, body, time.time() - start, exception=e)
            raise ConnectionError('N/A', str(e), e)
        finally:
            self._release_connection(mc)

        # try not to load the json every time
        if response and response[0] == '{' and ('"status"' in response or '"error"' in response):
            data = json.loads(response)
            if 'status' in data and isinstance(data['status'], int):
                status = data['status']
            elif 'error' in data:
                raise TransportError('N/A', data['error'])

        if not (200 <= status < 300) and status not in ignore:
            self.log_request_fail(method, url, body, duration, status)
            self._raise_error(status, response)

        self.log_request_success(method, full_url, url, body, status,
            response, duration)

        return status, {}, response