File: mitogen_protocol_test.py

package info (click to toggle)
python-mitogen 0.3.0~rc1-4
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 3,240 kB
  • sloc: python: 19,899; sh: 91; perl: 19; ansic: 18; makefile: 13
file content (34 lines) | stat: -rw-r--r-- 768 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

import unittest2
import mock

import mitogen.core

import testlib


class ReceiveOneTest(testlib.TestCase):
    klass = mitogen.core.MitogenProtocol

    def test_corruption(self):
        broker = mock.Mock()
        router = mock.Mock()
        stream = mock.Mock()

        protocol = self.klass(router, 1)
        protocol.stream = stream

        junk = mitogen.core.b('x') * mitogen.core.Message.HEADER_LEN

        capture = testlib.LogCapturer()
        capture.start()
        protocol.on_receive(broker, junk)
        capture.stop()

        self.assertEquals(1, stream.on_disconnect.call_count)
        expect = self.klass.corrupt_msg % (stream.name, junk)
        self.assertTrue(expect in capture.raw())


if __name__ == '__main__':
    unittest2.main()