1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
# -*- coding: utf8 -*-
"""
Tests for connection parameters.
"""
import socket
from mock import patch
try:
import unittest2 as unittest
except ImportError:
import unittest
import pika
from pika.adapters import asyncore_connection
from pika.adapters import base_connection
from pika.adapters import blocking_connection
from pika.adapters import select_connection
try:
from pika.adapters import tornado_connection
except ImportError:
tornado_connection = None
try:
from pika.adapters import twisted_connection
except ImportError:
twisted_connection = None
try:
from pika.adapters import libev_connection
except ImportError:
libev_connection = None
from pika import exceptions
def mock_timeout(*args, **kwargs):
raise socket.timeout
class ConnectionTests(unittest.TestCase):
def test_parameters(self):
params = pika.ConnectionParameters(socket_timeout=0.5,
retry_delay=0.1,
connection_attempts=3)
self.assertEqual(params.socket_timeout, 0.5)
self.assertEqual(params.retry_delay, 0.1)
self.assertEqual(params.connection_attempts, 3)
@patch.object(socket.socket, 'settimeout')
@patch.object(socket.socket, 'connect')
def test_connection_timeout(self, connect, settimeout):
connect.side_effect = mock_timeout
with self.assertRaises(exceptions.AMQPConnectionError):
params = pika.ConnectionParameters(socket_timeout=2.0)
base_connection.BaseConnection(params)
settimeout.assert_called_with(2.0)
@patch.object(socket.socket, 'settimeout')
@patch.object(socket.socket, 'connect')
def test_asyncore_connection_timeout(self, connect, settimeout):
connect.side_effect = mock_timeout
with self.assertRaises(exceptions.AMQPConnectionError):
params = pika.ConnectionParameters(socket_timeout=2.0)
asyncore_connection.AsyncoreConnection(params)
settimeout.assert_called_with(2.0)
@patch.object(socket.socket, 'settimeout')
@patch.object(socket.socket, 'connect')
def test_blocking_connection_timeout(self, connect, settimeout):
connect.side_effect = mock_timeout
with self.assertRaises(exceptions.AMQPConnectionError):
params = pika.ConnectionParameters(socket_timeout=2.0)
blocking_connection.BlockingConnection(params)
settimeout.assert_called_with(2.0)
@patch.object(socket.socket, 'settimeout')
@patch.object(socket.socket, 'connect')
def test_select_connection_timeout(self, connect, settimeout):
connect.side_effect = mock_timeout
with self.assertRaises(exceptions.AMQPConnectionError):
params = pika.ConnectionParameters(socket_timeout=2.0)
select_connection.SelectConnection(params)
settimeout.assert_called_with(2.0)
@unittest.skipUnless(tornado_connection is not None,
'tornado is not installed')
@patch.object(socket.socket, 'settimeout')
@patch.object(socket.socket, 'connect')
def test_tornado_connection_timeout(self, connect, settimeout):
connect.side_effect = mock_timeout
with self.assertRaises(exceptions.AMQPConnectionError):
params = pika.ConnectionParameters(socket_timeout=2.0)
tornado_connection.TornadoConnection(params)
settimeout.assert_called_with(2.0)
@unittest.skipUnless(twisted_connection is not None,
'twisted is not installed')
@patch.object(socket.socket, 'settimeout')
@patch.object(socket.socket, 'connect')
def test_twisted_connection_timeout(self, connect, settimeout):
connect.side_effect = mock_timeout
with self.assertRaises(exceptions.AMQPConnectionError):
params = pika.ConnectionParameters(socket_timeout=2.0)
twisted_connection.TwistedConnection(params)
settimeout.assert_called_with(2.0)
@unittest.skipUnless(libev_connection is not None, 'pyev is not installed')
@patch.object(socket.socket, 'settimeout')
@patch.object(socket.socket, 'connect')
def test_libev_connection_timeout(self, connect, settimeout):
connect.side_effect = mock_timeout
with self.assertRaises(exceptions.AMQPConnectionError):
params = pika.ConnectionParameters(socket_timeout=2.0)
libev_connection.LibevConnection(params)
settimeout.assert_called_with(2.0)
|