1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
import os
import sys
import unittest
import pickle
import time
sys.path.insert(0, os.pardir)
sys.path.append(os.getcwd())
from tests.utils import establish_test_connection
from carrot.connection import BrokerConnection
from carrot.messaging import Consumer, Publisher, ConsumerSet
from carrot.backends.pyamqplib import Backend as AMQPLibBackend
from carrot.backends.pyamqplib import Message as AMQPLibMessage
from carrot import serialization
from tests.backend import BackendMessagingCase
TEST_QUEUE = "carrot.unittest"
TEST_EXCHANGE = "carrot.unittest"
TEST_ROUTING_KEY = "carrot.unittest"
TEST_QUEUE_TWO = "carrot.unittest.two"
TEST_EXCHANGE_TWO = "carrot.unittest.two"
TEST_ROUTING_KEY_TWO = "carrot.unittest.two"
TEST_CELERY_QUEUE = {
TEST_QUEUE: {
"exchange": TEST_EXCHANGE,
"exchange_type": "direct",
"routing_key": TEST_ROUTING_KEY,
},
TEST_QUEUE_TWO: {
"exchange": TEST_EXCHANGE_TWO,
"exchange_type": "direct",
"routing_key": TEST_ROUTING_KEY_TWO,
},
}
class TestAMQPlibMessaging(BackendMessagingCase):
def setUp(self):
self.conn = establish_test_connection()
self.queue = TEST_QUEUE
self.exchange = TEST_EXCHANGE
self.routing_key = TEST_ROUTING_KEY
BackendMessagingCase = None
if __name__ == '__main__':
unittest.main()
|