1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
|
from __future__ import absolute_import
from socket import timeout as SocketTimeout
import time
import logging
try:
from .esthrift import Rest
from .esthrift.ttypes import Method, RestRequest
from thrift.transport import TTransport, TSocket, TSSLSocket
from thrift.protocol import TBinaryProtocol
from thrift.Thrift import TException
THRIFT_AVAILABLE = True
except ImportError:
THRIFT_AVAILABLE = False
from ..exceptions import ConnectionError, ImproperlyConfigured, ConnectionTimeout
from .pooling import PoolingConnection
logger = logging.getLogger('elasticsearch')
class ThriftConnection(PoolingConnection):
"""
Connection using the `thrift` protocol to communicate with elasticsearch.
See https://github.com/elasticsearch/elasticsearch-transport-thrift for additional info.
"""
transport_schema = 'thrift'
def __init__(self, host='localhost', port=9500, framed_transport=False, use_ssl=False, **kwargs):
"""
:arg framed_transport: use `TTransport.TFramedTransport` instead of
`TTransport.TBufferedTransport`
"""
if not THRIFT_AVAILABLE:
raise ImproperlyConfigured("Thrift is not available.")
super(ThriftConnection, self).__init__(host=host, port=port, **kwargs)
self._framed_transport = framed_transport
self._tsocket_class = TSocket.TSocket
if use_ssl:
self._tsocket_class = TSSLSocket.TSSLSocket
self._tsocket_args = (host, port)
def _make_connection(self):
socket = self._tsocket_class(*self._tsocket_args)
socket.setTimeout(self.timeout * 1000.0)
if self._framed_transport:
transport = TTransport.TFramedTransport(socket)
else:
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
client = Rest.Client(protocol)
client.transport = transport
transport.open()
return client
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
request = RestRequest(method=Method._NAMES_TO_VALUES[method.upper()], uri=url,
parameters=params, body=body)
start = time.time()
tclient = None
try:
tclient = self._get_connection()
response = tclient.execute(request)
duration = time.time() - start
except SocketTimeout as e:
self.log_request_fail(method, url, body, time.time() - start, exception=e)
raise ConnectionTimeout('TIMEOUT', str(e), e)
except (TException, SocketTimeout) as e:
self.log_request_fail(method, url, body, time.time() - start, exception=e)
if tclient:
try:
# try closing transport socket
tclient.transport.close()
except Exception as e:
logger.warning(
'Exception %s occured when closing a failed thrift connection.',
e, exc_info=True
)
raise ConnectionError('N/A', str(e), e)
self._release_connection(tclient)
if not (200 <= response.status < 300) and response.status not in ignore:
self.log_request_fail(method, url, body, duration, response.status)
self._raise_error(response.status, response.body)
self.log_request_success(method, url, url, body, response.status,
response.body, duration)
headers = {}
if response.headers:
headers = dict((k.lower(), v) for k, v in response.headers.items())
return response.status, headers, response.body or ''
|