1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
|
import os
import sys
import unittest
import uuid
sys.path.insert(0, os.pardir)
sys.path.append(os.getcwd())
from carrot.backends.queue import Message as PyQueueMessage
from carrot.backends.queue import Backend as PyQueueBackend
from carrot.connection import BrokerConnection
from carrot.messaging import Messaging, Consumer, Publisher
def create_backend():
return PyQueueBackend(connection=BrokerConnection())
class TestPyQueueMessage(unittest.TestCase):
def test_message(self):
b = create_backend()
self.assertTrue(b)
message_body = "George Constanza"
delivery_tag = str(uuid.uuid4())
m1 = PyQueueMessage(backend=b,
body=message_body,
delivery_tag=delivery_tag)
m2 = PyQueueMessage(backend=b,
body=message_body,
delivery_tag=delivery_tag)
m3 = PyQueueMessage(backend=b,
body=message_body,
delivery_tag=delivery_tag)
self.assertEquals(m1.body, message_body)
self.assertEquals(m1.delivery_tag, delivery_tag)
m1.ack()
m2.reject()
m3.requeue()
class TestPyQueueBackend(unittest.TestCase):
def test_backend(self):
b = create_backend()
message_body = "Vandelay Industries"
b.publish(b.prepare_message(message_body, "direct",
content_type='text/plain',
content_encoding="ascii"),
exchange="test",
routing_key="test")
m_in_q = b.get()
self.assertTrue(isinstance(m_in_q, PyQueueMessage))
self.assertEquals(m_in_q.body, message_body)
def test_consumer_interface(self):
to_send = ['No', 'soup', 'for', 'you!']
messages = []
def cb(message_data, message):
messages.append(message_data)
conn = BrokerConnection(backend_cls='memory')
consumer = Consumer(connection=conn, queue="test",
exchange="test", routing_key="test")
consumer.register_callback(cb)
publisher = Publisher(connection=conn, exchange="test",
routing_key="test")
for i in to_send:
publisher.send(i)
it = consumer.iterconsume()
for i in range(len(to_send)):
it.next()
self.assertEqual(messages, to_send)
class TMessaging(Messaging):
exchange = "test"
routing_key = "test"
queue = "test"
class TestMessaging(unittest.TestCase):
def test_messaging(self):
m = TMessaging(connection=BrokerConnection(backend_cls=PyQueueBackend))
self.assertTrue(m)
self.assertEquals(m.fetch(), None)
mdata = {"name": "Cosmo Kramer"}
m.send(mdata)
next_msg = m.fetch()
next_msg_data = next_msg.decode()
self.assertEquals(next_msg_data, mdata)
self.assertEquals(m.fetch(), None)
if __name__ == '__main__':
unittest.main()
|