File: mitogen_protocol_test.py

package info (click to toggle)
python-mitogen 0.3.25~a2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,220 kB
  • sloc: python: 21,989; sh: 183; makefile: 74; perl: 19; ansic: 18
file content (31 lines) | stat: -rw-r--r-- 754 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
try:
    from unittest import mock
except ImportError:
    import mock

import mitogen.core

import testlib


class ReceiveOneTest(testlib.TestCase):
    klass = mitogen.core.MitogenProtocol

    def test_corruption(self):
        broker = mock.Mock()
        router = mock.Mock()
        stream = mock.Mock()

        protocol = self.klass(router, 1)
        protocol.stream = stream

        junk = mitogen.core.b('x') * mitogen.core.Message.HEADER_LEN

        capture = testlib.LogCapturer()
        capture.start()
        protocol.on_receive(broker, junk)
        capture.stop()

        self.assertEqual(1, stream.on_disconnect.call_count)
        expect = self.klass.corrupt_msg % (stream.name, junk)
        self.assertIn(expect, capture.raw())