File: test_examples.py

package info (click to toggle)
python-carrot 0.10.5-1
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 492 kB
  • ctags: 508
  • sloc: python: 2,477; makefile: 75
file content (156 lines) | stat: -rw-r--r-- 4,934 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
import sys
import unittest
sys.path.insert(0, os.pardir)
sys.path.append(os.getcwd())

from tests.utils import establish_test_connection
from carrot.connection import BrokerConnection
from carrot.backends.pyamqplib import Message

README_QUEUE = "feed"
README_EXCHANGE = "feed"
README_ROUTING_KEY = "feed"


class TimeoutError(Exception):
    """The operation timed out."""


def receive_a_message(consumer):
    while True:
        message = consumer.fetch()
        if message:
            return message


def emulate_wait(consumer):
    message = receive_a_message(consumer)
    consumer._receive_callback(message)


class CallbacksTestable(object):
    last_feed = None
    last_status = None
    last_body = None
    last_delivery_tag = None

    def import_feed(self, message_data, message):
        feed_url = message_data.get("import_feed")
        self.last_feed = feed_url
        if not feed_url:
            self.last_status = "REJECT"
            message.reject()
        else:
            self.last_status = "ACK"
            message.ack()

    def dump_message(self, message_data, message):
        self.last_body = message.body
        self.last_delivery_tag = message.delivery_tag


def create_README_consumer(amqpconn):
    from carrot.messaging import Consumer
    consumer = Consumer(connection=amqpconn,
                        queue=README_QUEUE, exchange=README_EXCHANGE,
                        routing_key=README_ROUTING_KEY)
    tcallbacks = CallbacksTestable()
    consumer.register_callback(tcallbacks.import_feed)
    consumer.register_callback(tcallbacks.dump_message)
    return consumer, tcallbacks


def create_README_publisher(amqpconn):
    from carrot.messaging import Publisher
    publisher = Publisher(connection=amqpconn, exchange=README_EXCHANGE,
                          routing_key=README_ROUTING_KEY)
    return publisher


class TestExamples(unittest.TestCase):

    def setUp(self):
        self.conn = establish_test_connection()
        self.consumer, self.tcallbacks = create_README_consumer(self.conn)
        self.consumer.discard_all()

    def test_connection(self):
        self.assertTrue(self.conn)
        self.assertTrue(self.conn.connection.channel())

    def test_README_consumer(self):
        consumer = self.consumer
        tcallbacks = self.tcallbacks
        self.assertTrue(consumer.connection)
        self.assertTrue(isinstance(consumer.connection, BrokerConnection))
        self.assertEquals(consumer.queue, README_QUEUE)
        self.assertEquals(consumer.exchange, README_EXCHANGE)
        self.assertEquals(consumer.routing_key, README_ROUTING_KEY)
        self.assertTrue(len(consumer.callbacks), 2)

    def test_README_publisher(self):
        publisher = create_README_publisher(self.conn)
        self.assertTrue(publisher.connection)
        self.assertTrue(isinstance(publisher.connection, BrokerConnection))
        self.assertEquals(publisher.exchange, README_EXCHANGE)
        self.assertEquals(publisher.routing_key, README_ROUTING_KEY)

    def test_README_together(self):
        consumer = self.consumer
        tcallbacks = self.tcallbacks

        publisher = create_README_publisher(self.conn)
        feed_url = "http://cnn.com/rss/edition.rss"
        body = {"import_feed": feed_url}
        publisher.send(body)
        publisher.close()
        emulate_wait(consumer)

        self.assertEquals(tcallbacks.last_feed, feed_url)
        self.assertTrue(tcallbacks.last_delivery_tag)
        self.assertEquals(tcallbacks.last_status, "ACK")

        publisher = create_README_publisher(self.conn)
        body = {"foo": "FOO"}
        publisher.send(body)
        publisher.close()
        emulate_wait(consumer)

        self.assertFalse(tcallbacks.last_feed)
        self.assertTrue(tcallbacks.last_delivery_tag)
        self.assertEquals(tcallbacks.last_status, "REJECT")

    def test_subclassing(self):
        from carrot.messaging import Consumer, Publisher
        feed_url = "http://cnn.com/rss/edition.rss"
        testself = self

        class TConsumer(Consumer):
            queue = README_QUEUE
            exchange = README_EXCHANGE
            routing_key = README_ROUTING_KEY

            def receive(self, message_data, message):
                testself.assertTrue(isinstance(message, Message))
                testself.assertTrue("import_feed" in message_data)
                testself.assertEquals(message_data.get("import_feed"),
                        feed_url)

        class TPublisher(Publisher):
            exchange = README_EXCHANGE
            routing_key = README_ROUTING_KEY

        consumer = TConsumer(connection=self.conn)
        publisher = TPublisher(connection=self.conn)

        consumer.discard_all()
        publisher.send({"import_feed": feed_url})
        publisher.close()
        emulate_wait(consumer)

        consumer.close()


if __name__ == '__main__':
    unittest.main()