1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
import time
try:
import simplejson as json
except ImportError:
import json
from ..exceptions import TransportError, ConnectionError, ImproperlyConfigured
from ..compat import urlencode
from .pooling import PoolingConnection
class MemcachedConnection(PoolingConnection):
"""
Client using the `pylibmc` python library to communicate with elasticsearch
using the memcached protocol. Requires plugin in the cluster.
See https://github.com/elasticsearch/elasticsearch-transport-memcached for more details.
"""
transport_schema = 'memcached'
method_map = {
'PUT': 'set',
'POST': 'set',
'DELETE': 'delete',
'HEAD': 'get',
'GET': 'get',
}
def __init__(self, host='localhost', port=11211, **kwargs):
try:
import pylibmc
except ImportError:
raise ImproperlyConfigured("You need to install pylibmc to use the MemcachedConnection class.")
super(MemcachedConnection, self).__init__(host=host, port=port, **kwargs)
self._make_connection = lambda: pylibmc.Client(['%s:%s' % (host, port)], behaviors={"tcp_nodelay": True})
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
mc = self._get_connection()
url = self.url_prefix + url
if params:
url = '%s?%s' % (url, urlencode(params or {}))
full_url = self.host + url
mc_method = self.method_map.get(method, 'get')
start = time.time()
try:
status = 200
if mc_method == 'set':
# no response from set commands
response = ''
if not json.dumps(mc.set(url, body)):
status = 500
else:
response = mc.get(url)
duration = time.time() - start
if response:
response = response.decode('utf-8')
except Exception as e:
self.log_request_fail(method, full_url, body, time.time() - start, exception=e)
raise ConnectionError('N/A', str(e), e)
finally:
self._release_connection(mc)
# try not to load the json every time
if response and response[0] == '{' and ('"status"' in response or '"error"' in response):
data = json.loads(response)
if 'status' in data and isinstance(data['status'], int):
status = data['status']
elif 'error' in data:
raise TransportError('N/A', data['error'])
if not (200 <= status < 300) and status not in ignore:
self.log_request_fail(method, url, body, duration, status)
self._raise_error(status, response)
self.log_request_success(method, full_url, url, body, status,
response, duration)
return status, {}, response
|