File: pipe_test.py

package info (click to toggle)
python-mitogen 0.3.26-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,456 kB
  • sloc: python: 22,134; sh: 183; makefile: 74; perl: 19; ansic: 18
file content (35 lines) | stat: -rw-r--r-- 1,206 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import os

import mitogen.core

import testlib

class PipeTest(testlib.TestCase):
    def test_pipe_blocking_unspecified(self):
        "Test that unspecified blocking arg (None) behaves same as os.pipe()"
        os_rfd, os_wfd = os.pipe()
        mi_rfp, mi_wfp = mitogen.core.pipe()

        self.assertEqual(mitogen.core.get_blocking(os_rfd),
                         mitogen.core.get_blocking(mi_rfp.fileno()))
        self.assertEqual(mitogen.core.get_blocking(os_wfd),
                         mitogen.core.get_blocking(mi_wfp.fileno()))
        mi_rfp.close()
        mi_wfp.close()
        os.close(os_rfd)
        os.close(os_wfd)

    def test_pipe_blocking_true(self):
        mi_rfp, mi_wfp = mitogen.core.pipe(blocking=True)
        self.assertTrue(mitogen.core.get_blocking(mi_rfp.fileno()))
        self.assertTrue(mitogen.core.get_blocking(mi_wfp.fileno()))
        mi_rfp.close()
        mi_wfp.close()

    def test_pipe_blocking_false(self):
        mi_rfp, mi_wfp = mitogen.core.pipe(blocking=False)
        self.assertFalse(mitogen.core.get_blocking(mi_rfp.fileno()))
        self.assertFalse(mitogen.core.get_blocking(mi_wfp.fileno()))
        mi_rfp.close()
        mi_wfp.close()