File: test_connection_pool.py

package info (click to toggle)
python-elasticsearch 1.4.0-2~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 636 kB
  • sloc: python: 3,209; makefile: 155
file content (112 lines) | stat: -rw-r--r-- 4,405 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import time

from elasticsearch.connection_pool import ConnectionPool, RoundRobinSelector, DummyConnectionPool
from elasticsearch.exceptions import ImproperlyConfigured

from .test_cases import TestCase

class TestConnectionPool(TestCase):
    def test_dummy_cp_raises_exception_on_more_connections(self):
        self.assertRaises(ImproperlyConfigured, DummyConnectionPool, [])
        self.assertRaises(ImproperlyConfigured, DummyConnectionPool, [object(), object()])

    def test_raises_exception_when_no_connections_defined(self):
        self.assertRaises(ImproperlyConfigured, ConnectionPool, [])

    def test_default_round_robin(self):
        pool = ConnectionPool([(x, {}) for x in range(100)])

        connections = set()
        for _ in range(100):
            connections.add(pool.get_connection())
        self.assertEquals(connections, set(range(100)))

    def test_disable_shuffling(self):
        pool = ConnectionPool([(x, {}) for x in range(100)], randomize_hosts=False)

        connections = []
        for _ in range(100):
            connections.append(pool.get_connection())
        self.assertEquals(connections, list(range(100)))

    def test_selectors_have_access_to_connection_opts(self):
        class MySelector(RoundRobinSelector):
            def select(self, connections):
                return self.connection_opts[super(MySelector, self).select(connections)]["actual"]
        pool = ConnectionPool([(x, {"actual": x*x}) for x in range(100)], selector_class=MySelector, randomize_hosts=False)

        connections = []
        for _ in range(100):
            connections.append(pool.get_connection())
        self.assertEquals(connections, [x*x for x in range(100)])

    def test_dead_nodes_are_removed_from_active_connections(self):
        pool = ConnectionPool([(x, {}) for x in range(100)])

        now = time.time()
        pool.mark_dead(42, now=now)
        self.assertEquals(99, len(pool.connections))
        self.assertEquals(1, pool.dead.qsize())
        self.assertEquals((now + 60, 42), pool.dead.get())

    def test_connection_is_skipped_when_dead(self):
        pool = ConnectionPool([(x, {}) for x in range(2)])
        pool.mark_dead(0)

        self.assertEquals([1, 1, 1], [pool.get_connection(), pool.get_connection(), pool.get_connection(), ])

    def test_connection_is_forcibly_resurrected_when_no_live_ones_are_availible(self):
        pool = ConnectionPool([(x, {}) for x in range(2)])
        pool.dead_count[0] = 1
        pool.mark_dead(0) # failed twice, longer timeout
        pool.mark_dead(1) # failed the first time, first to be resurrected

        self.assertEquals([], pool.connections)
        self.assertEquals(1, pool.get_connection())
        self.assertEquals([1,], pool.connections)

    def test_connection_is_resurrected_after_its_timeout(self):
        pool = ConnectionPool([(x, {}) for x in range(100)])

        now = time.time()
        pool.mark_dead(42, now=now-61)
        pool.get_connection()
        self.assertEquals(42, pool.connections[-1])
        self.assertEquals(100, len(pool.connections))

    def test_force_resurrect_always_returns_a_connection(self):
        pool = ConnectionPool([(0, {})])

        pool.connections = []
        self.assertEquals(0, pool.get_connection())
        self.assertEquals([], pool.connections)
        self.assertTrue(pool.dead.empty())

    def test_already_failed_connection_has_longer_timeout(self):
        pool = ConnectionPool([(x, {}) for x in range(100)])
        now = time.time()
        pool.dead_count[42] = 2
        pool.mark_dead(42, now=now)

        self.assertEquals(3, pool.dead_count[42])
        self.assertEquals((now + 4*60, 42), pool.dead.get())

    def test_timeout_for_failed_connections_is_limitted(self):
        pool = ConnectionPool([(x, {}) for x in range(100)])
        now = time.time()
        pool.dead_count[42] = 245
        pool.mark_dead(42, now=now)

        self.assertEquals(246, pool.dead_count[42])
        self.assertEquals((now + 32*60, 42), pool.dead.get())

    def test_dead_count_is_wiped_clean_for_connection_if_marked_live(self):
        pool = ConnectionPool([(x, {}) for x in range(100)])
        now = time.time()
        pool.dead_count[42] = 2
        pool.mark_dead(42, now=now)

        self.assertEquals(3, pool.dead_count[42])
        pool.mark_live(42)
        self.assertNotIn(42, pool.dead_count)