1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
import time
from elasticsearch.connection_pool import ConnectionPool, RoundRobinSelector, DummyConnectionPool
from elasticsearch.exceptions import ImproperlyConfigured
from .test_cases import TestCase
class TestConnectionPool(TestCase):
def test_dummy_cp_raises_exception_on_more_connections(self):
self.assertRaises(ImproperlyConfigured, DummyConnectionPool, [])
self.assertRaises(ImproperlyConfigured, DummyConnectionPool, [object(), object()])
def test_raises_exception_when_no_connections_defined(self):
self.assertRaises(ImproperlyConfigured, ConnectionPool, [])
def test_default_round_robin(self):
pool = ConnectionPool([(x, {}) for x in range(100)])
connections = set()
for _ in range(100):
connections.add(pool.get_connection())
self.assertEquals(connections, set(range(100)))
def test_disable_shuffling(self):
pool = ConnectionPool([(x, {}) for x in range(100)], randomize_hosts=False)
connections = []
for _ in range(100):
connections.append(pool.get_connection())
self.assertEquals(connections, list(range(100)))
def test_selectors_have_access_to_connection_opts(self):
class MySelector(RoundRobinSelector):
def select(self, connections):
return self.connection_opts[super(MySelector, self).select(connections)]["actual"]
pool = ConnectionPool([(x, {"actual": x*x}) for x in range(100)], selector_class=MySelector, randomize_hosts=False)
connections = []
for _ in range(100):
connections.append(pool.get_connection())
self.assertEquals(connections, [x*x for x in range(100)])
def test_dead_nodes_are_removed_from_active_connections(self):
pool = ConnectionPool([(x, {}) for x in range(100)])
now = time.time()
pool.mark_dead(42, now=now)
self.assertEquals(99, len(pool.connections))
self.assertEquals(1, pool.dead.qsize())
self.assertEquals((now + 60, 42), pool.dead.get())
def test_connection_is_skipped_when_dead(self):
pool = ConnectionPool([(x, {}) for x in range(2)])
pool.mark_dead(0)
self.assertEquals([1, 1, 1], [pool.get_connection(), pool.get_connection(), pool.get_connection(), ])
def test_connection_is_forcibly_resurrected_when_no_live_ones_are_availible(self):
pool = ConnectionPool([(x, {}) for x in range(2)])
pool.dead_count[0] = 1
pool.mark_dead(0) # failed twice, longer timeout
pool.mark_dead(1) # failed the first time, first to be resurrected
self.assertEquals([], pool.connections)
self.assertEquals(1, pool.get_connection())
self.assertEquals([1,], pool.connections)
def test_connection_is_resurrected_after_its_timeout(self):
pool = ConnectionPool([(x, {}) for x in range(100)])
now = time.time()
pool.mark_dead(42, now=now-61)
pool.get_connection()
self.assertEquals(42, pool.connections[-1])
self.assertEquals(100, len(pool.connections))
def test_force_resurrect_always_returns_a_connection(self):
pool = ConnectionPool([(0, {})])
pool.connections = []
self.assertEquals(0, pool.get_connection())
self.assertEquals([], pool.connections)
self.assertTrue(pool.dead.empty())
def test_already_failed_connection_has_longer_timeout(self):
pool = ConnectionPool([(x, {}) for x in range(100)])
now = time.time()
pool.dead_count[42] = 2
pool.mark_dead(42, now=now)
self.assertEquals(3, pool.dead_count[42])
self.assertEquals((now + 4*60, 42), pool.dead.get())
def test_timeout_for_failed_connections_is_limitted(self):
pool = ConnectionPool([(x, {}) for x in range(100)])
now = time.time()
pool.dead_count[42] = 245
pool.mark_dead(42, now=now)
self.assertEquals(246, pool.dead_count[42])
self.assertEquals((now + 32*60, 42), pool.dead.get())
def test_dead_count_is_wiped_clean_for_connection_if_marked_live(self):
pool = ConnectionPool([(x, {}) for x in range(100)])
now = time.time()
pool.dead_count[42] = 2
pool.mark_dead(42, now=now)
self.assertEquals(3, pool.dead_count[42])
pool.mark_live(42)
self.assertNotIn(42, pool.dead_count)
|