1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
import logging
import gevent
try:
from gevent.lock import BoundedSemaphore
except ImportError:
from gevent.coros import BoundedSemaphore # before gevent-1.0
from gevent import socket
from collections import deque
from contextlib import contextmanager
from functools import wraps
from test import TestRunGreenlet
__all__ = ["ConnectionPool", "retry"]
DEFAULT_EXC_CLASSES = (socket.error,)
class ConnectionPool(object):
"""
Generic TCP connection pool, with the following features:
* Configurable pool size
* Auto-reconnection when a broken socket is detected
* Optional periodic keepalive
"""
# Frequency at which the pool is populated at startup
SPAWN_FREQUENCY = 0.1
def __init__(self, size, exc_classes=DEFAULT_EXC_CLASSES, keepalive=None):
self.size = size
self.conn = deque()
self.lock = BoundedSemaphore(size)
self.keepalive = keepalive
# Exceptions list must be in tuple form to be caught properly
self.exc_classes = tuple(exc_classes)
for i in xrange(size):
self.lock.acquire()
for i in xrange(size):
greenlet = TestRunGreenlet(self._addOne)
greenlet.start_later(self.SPAWN_FREQUENCY * i)
if self.keepalive:
greenlet = TestRunGreenlet(self._keepalive_periodic)
greenlet.start_later()
def _new_connection(self):
"""
Estabilish a new connection (to be implemented in subclasses).
"""
raise NotImplementedError
def _keepalive(self, c):
"""
Implement actual application-level keepalive (to be
reimplemented in subclasses).
:raise: socket.error if the connection has been closed or is broken.
"""
raise NotImplementedError()
def _keepalive_periodic(self):
delay = float(self.keepalive) / self.size
while 1:
try:
with self.get() as c:
self._keepalive(c)
except self.exc_classes:
# Nothing to do, the pool will generate a new connection later
pass
gevent.sleep(delay)
def _addOne(self):
stime = 0.1
while 1:
c = self._new_connection()
if c:
break
gevent.sleep(stime)
if stime < 400:
stime *= 2
self.conn.append(c)
self.lock.release()
@contextmanager
def get(self):
"""
Get a connection from the pool, to make and receive traffic.
If the connection fails for any reason (socket.error), it is dropped
and a new one is scheduled. Please use @retry as a way to automatically
retry whatever operation you were performing.
"""
self.lock.acquire()
try:
c = self.conn.popleft()
yield c
except self.exc_classes:
# The current connection has failed, drop it and create a new one
greenlet = TestRunGreenlet(self._addOne)
greenlet.start_later(1)
raise
except: # noqa: E722
self.conn.append(c)
self.lock.release()
raise
else:
# NOTE: cannot use finally because MUST NOT reuse the connection
# if it failed (socket.error)
self.conn.append(c)
self.lock.release()
def retry(f, exc_classes=DEFAULT_EXC_CLASSES, logger=None,
retry_log_level=logging.INFO,
retry_log_message="Connection broken in '{f}' (error: '{e}'); "
"retrying with new connection.",
max_failures=None, interval=0,
max_failure_log_level=logging.ERROR,
max_failure_log_message="Max retries reached for '{f}'. Aborting."):
"""
Decorator to automatically reexecute a function if the connection is
broken for any reason.
"""
exc_classes = tuple(exc_classes)
@wraps(f)
def deco(*args, **kwargs):
failures = 0
while True:
try:
return f(*args, **kwargs)
except exc_classes as e:
if logger is not None:
logger.log(retry_log_level,
retry_log_message.format(f=f.func_name, e=e))
gevent.sleep(interval)
failures += 1
if max_failures is not None \
and failures > max_failures:
if logger is not None:
logger.log(max_failure_log_level,
max_failure_log_message.format(
f=f.func_name, e=e))
raise
return deco
|