1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
import os
import sys
import unittest
sys.path.insert(0, os.pardir)
sys.path.append(os.getcwd())
from tests.utils import establish_test_connection
from carrot.connection import BrokerConnection
from carrot.backends.pyamqplib import Message
README_QUEUE = "feed"
README_EXCHANGE = "feed"
README_ROUTING_KEY = "feed"
class TimeoutError(Exception):
"""The operation timed out."""
def receive_a_message(consumer):
while True:
message = consumer.fetch()
if message:
return message
def emulate_wait(consumer):
message = receive_a_message(consumer)
consumer._receive_callback(message)
class CallbacksTestable(object):
last_feed = None
last_status = None
last_body = None
last_delivery_tag = None
def import_feed(self, message_data, message):
feed_url = message_data.get("import_feed")
self.last_feed = feed_url
if not feed_url:
self.last_status = "REJECT"
message.reject()
else:
self.last_status = "ACK"
message.ack()
def dump_message(self, message_data, message):
self.last_body = message.body
self.last_delivery_tag = message.delivery_tag
def create_README_consumer(amqpconn):
from carrot.messaging import Consumer
consumer = Consumer(connection=amqpconn,
queue=README_QUEUE, exchange=README_EXCHANGE,
routing_key=README_ROUTING_KEY)
tcallbacks = CallbacksTestable()
consumer.register_callback(tcallbacks.import_feed)
consumer.register_callback(tcallbacks.dump_message)
return consumer, tcallbacks
def create_README_publisher(amqpconn):
from carrot.messaging import Publisher
publisher = Publisher(connection=amqpconn, exchange=README_EXCHANGE,
routing_key=README_ROUTING_KEY)
return publisher
class TestExamples(unittest.TestCase):
def setUp(self):
self.conn = establish_test_connection()
self.consumer, self.tcallbacks = create_README_consumer(self.conn)
self.consumer.discard_all()
def test_connection(self):
self.assertTrue(self.conn)
self.assertTrue(self.conn.connection.channel())
def test_README_consumer(self):
consumer = self.consumer
tcallbacks = self.tcallbacks
self.assertTrue(consumer.connection)
self.assertTrue(isinstance(consumer.connection, BrokerConnection))
self.assertEquals(consumer.queue, README_QUEUE)
self.assertEquals(consumer.exchange, README_EXCHANGE)
self.assertEquals(consumer.routing_key, README_ROUTING_KEY)
self.assertTrue(len(consumer.callbacks), 2)
def test_README_publisher(self):
publisher = create_README_publisher(self.conn)
self.assertTrue(publisher.connection)
self.assertTrue(isinstance(publisher.connection, BrokerConnection))
self.assertEquals(publisher.exchange, README_EXCHANGE)
self.assertEquals(publisher.routing_key, README_ROUTING_KEY)
def test_README_together(self):
consumer = self.consumer
tcallbacks = self.tcallbacks
publisher = create_README_publisher(self.conn)
feed_url = "http://cnn.com/rss/edition.rss"
body = {"import_feed": feed_url}
publisher.send(body)
publisher.close()
emulate_wait(consumer)
self.assertEquals(tcallbacks.last_feed, feed_url)
self.assertTrue(tcallbacks.last_delivery_tag)
self.assertEquals(tcallbacks.last_status, "ACK")
publisher = create_README_publisher(self.conn)
body = {"foo": "FOO"}
publisher.send(body)
publisher.close()
emulate_wait(consumer)
self.assertFalse(tcallbacks.last_feed)
self.assertTrue(tcallbacks.last_delivery_tag)
self.assertEquals(tcallbacks.last_status, "REJECT")
def test_subclassing(self):
from carrot.messaging import Consumer, Publisher
feed_url = "http://cnn.com/rss/edition.rss"
testself = self
class TConsumer(Consumer):
queue = README_QUEUE
exchange = README_EXCHANGE
routing_key = README_ROUTING_KEY
def receive(self, message_data, message):
testself.assertTrue(isinstance(message, Message))
testself.assertTrue("import_feed" in message_data)
testself.assertEquals(message_data.get("import_feed"),
feed_url)
class TPublisher(Publisher):
exchange = README_EXCHANGE
routing_key = README_ROUTING_KEY
consumer = TConsumer(connection=self.conn)
publisher = TPublisher(connection=self.conn)
consumer.discard_all()
publisher.send({"import_feed": feed_url})
publisher.close()
emulate_wait(consumer)
consumer.close()
if __name__ == '__main__':
unittest.main()
|