File: broker_test.py

package info (click to toggle)
python-mitogen 0.3.25~a2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,220 kB
  • sloc: python: 21,989; sh: 183; makefile: 74; perl: 19; ansic: 18
file content (67 lines) | stat: -rw-r--r-- 1,747 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
try:
    from unittest import mock
except ImportError:
    import mock

import testlib

import mitogen.core


class ShutdownTest(testlib.TestCase):
    klass = mitogen.core.Broker

    def test_poller_closed(self):
        broker = self.klass()
        actual_close = broker.poller.close
        broker.poller.close = mock.Mock()
        broker.shutdown()
        broker.join()
        self.assertEqual(1, len(broker.poller.close.mock_calls))
        actual_close()


class DeferTest(testlib.TestCase):
    klass = mitogen.core.Broker

    def test_defer(self):
        latch = mitogen.core.Latch()
        broker = self.klass()
        try:
            broker.defer(lambda: latch.put(123))
            self.assertEqual(123, latch.get())
        finally:
            broker.shutdown()
            broker.join()

    def test_defer_after_shutdown(self):
        latch = mitogen.core.Latch()
        broker = self.klass()
        broker.shutdown()
        broker.join()

        e = self.assertRaises(mitogen.core.Error,
            lambda: broker.defer(lambda: latch.put(123)))
        self.assertEqual(e.args[0], mitogen.core.Waker.broker_shutdown_msg)


class DeferSyncTest(testlib.TestCase):
    klass = mitogen.core.Broker

    def test_okay(self):
        broker = self.klass()
        try:
            th = broker.defer_sync(lambda: mitogen.core.threading__current_thread())
            self.assertEqual(th, broker._thread)
        finally:
            broker.shutdown()
            broker.join()

    def test_exception(self):
        broker = self.klass()
        try:
            self.assertRaises(ValueError,
                broker.defer_sync, lambda: int('dave'))
        finally:
            broker.shutdown()
            broker.join()