File: test_with_statement.py

package info (click to toggle)
python-carrot 0.10.5-1
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 492 kB
  • ctags: 508
  • sloc: python: 2,477; makefile: 75
file content (35 lines) | stat: -rw-r--r-- 1,134 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from __future__ import with_statement
import os
import sys
import unittest
sys.path.insert(0, os.pardir)
sys.path.append(os.getcwd())

from tests.utils import test_connection_args
from carrot.connection import BrokerConnection
from carrot.messaging import Consumer, Publisher


class TestTransactioned(unittest.TestCase):

    def test_with_statement(self):

        with BrokerConnection(**test_connection_args()) as conn:
            self.assertFalse(conn._closed)
            with Publisher(connection=conn, exchange="F", routing_key="G") \
                    as publisher:
                        self.assertFalse(publisher._closed)
        self.assertTrue(conn._closed)
        self.assertTrue(publisher._closed)

        with BrokerConnection(**test_connection_args()) as conn:
            self.assertFalse(conn._closed)
            with Consumer(connection=conn, queue="E", exchange="F",
                    routing_key="G") as consumer:
                        self.assertFalse(consumer._closed)
        self.assertTrue(conn._closed)
        self.assertTrue(consumer._closed)


if __name__ == '__main__':
    unittest.main()