File: test_base.py

package info (click to toggle)
python-pika 0.9.13-2~bpo70%2B1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy-backports
  • size: 976 kB
  • sloc: python: 8,470; makefile: 131
file content (107 lines) | stat: -rw-r--r-- 3,502 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import sys
try:
    import unittest2 as unittest
except ImportError:
    import unittest
import logging

import pika
import yaml

CONFIG = yaml.load(open(os.path.join(os.path.abspath(os.path.dirname(__file__)),
                                     'broker.conf')))

LOGGING_LEVEL = logging.critical
TIMEOUT = 4


class TestCase(unittest.TestCase):

    def __call__(self, result=None):
        self._result = result
        test_method = getattr(self, self._testMethodName)
        skipped = (getattr(self.__class__, '__unittest_skip__', False) or
                   getattr(test_method, '__unittest_skip__', False))
        if not skipped:
            try:
                self._pre_setup()
            except (KeyboardInterrupt, SystemExit):
                raise
            except Exception:
                result.addError(self, sys.exc_info())
                return
        super(TestCase, self).__call__(result)

        if not skipped:
            try:
                self._post_teardown()
            except (KeyboardInterrupt, SystemExit):
                raise
            except Exception:
                result.addError(self, sys.exc_info())
                return

    def _get_parameters(self):
        self.credentials = pika.PlainCredentials(CONFIG['username'],
                                                 CONFIG['password'])
        return pika.ConnectionParameters(host=CONFIG['host'],
                                         port=CONFIG['port'],
                                         virtual_host=CONFIG['virtual_host'],
                                         credentials=self.credentials)

    def _pre_setup(self):
        self._timed_out = False
        logging.getLogger('pika').setLevel(LOGGING_LEVEL)
        self.parameters = self._get_parameters()
        self._timeout_id = None
        self.connection = None
        self.config = CONFIG

    def _post_teardown(self):
        del self.credentials
        del self.parameters
        del self.connection
        del self.config
        del self._timeout_id
        del self._timed_out

    def start(self):
        pass

    def stop(self):
        """close the connection and stop the ioloop"""
        self.connection.remove_timeout(self._timeout_id)
        self.connection.add_timeout(4, self._on_close_timeout)
        self.connection.add_on_close_callback(self._on_closed)
        self.connection.close()

    def _on_closed(self, connection, reply_code, reply_text):
        """called when the connection has finished closing"""
        self.connection.ioloop.stop()
        if self._timed_out:
            raise AssertionError('Timed out. Did you call `stop`?')

    def _on_close_timeout(self):
        """called when stuck waiting for connection to close"""
        # force the ioloop to stop
        self.connection.ioloop.stop()
        raise AssertionError('Timed out waiting for connection to close')

    def _on_timeout(self):
        self._timed_out = True
        self.stop()


class SelectConnectionTestCase(TestCase):

    def start(self, on_connected, on_error, on_closed):
        """Connect to rabbitmq and start the ioloop

        """
        self.connection = pika.SelectConnection(self.parameters,
                                                on_connected,
                                                on_error,
                                                on_closed,
                                                False)
        self.connection.ioloop.start()