File: test_pyamqplib.py

package info (click to toggle)
python-carrot 0.10.5-1
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 492 kB
  • ctags: 508
  • sloc: python: 2,477; makefile: 75
file content (49 lines) | stat: -rw-r--r-- 1,429 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import os
import sys
import unittest
import pickle
import time
sys.path.insert(0, os.pardir)
sys.path.append(os.getcwd())

from tests.utils import establish_test_connection
from carrot.connection import BrokerConnection
from carrot.messaging import Consumer, Publisher, ConsumerSet
from carrot.backends.pyamqplib import Backend as AMQPLibBackend
from carrot.backends.pyamqplib import Message as AMQPLibMessage
from carrot import serialization
from tests.backend import BackendMessagingCase

TEST_QUEUE = "carrot.unittest"
TEST_EXCHANGE = "carrot.unittest"
TEST_ROUTING_KEY = "carrot.unittest"

TEST_QUEUE_TWO = "carrot.unittest.two"
TEST_EXCHANGE_TWO = "carrot.unittest.two"
TEST_ROUTING_KEY_TWO = "carrot.unittest.two"

TEST_CELERY_QUEUE = {
            TEST_QUEUE: {
                "exchange": TEST_EXCHANGE,
                "exchange_type": "direct",
                "routing_key": TEST_ROUTING_KEY,
            },
            TEST_QUEUE_TWO: {
                "exchange": TEST_EXCHANGE_TWO,
                "exchange_type": "direct",
                "routing_key": TEST_ROUTING_KEY_TWO,
            },
        }


class TestAMQPlibMessaging(BackendMessagingCase):

    def setUp(self):
        self.conn = establish_test_connection()
        self.queue = TEST_QUEUE
        self.exchange = TEST_EXCHANGE
        self.routing_key = TEST_ROUTING_KEY
BackendMessagingCase = None

if __name__ == '__main__':
    unittest.main()