File: test__fileobject.py

package info (click to toggle)
python-gevent 1.0.1-2
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 9,948 kB
  • ctags: 12,954
  • sloc: python: 39,061; ansic: 26,289; sh: 13,582; makefile: 833; awk: 18
file content (95 lines) | stat: -rw-r--r-- 2,466 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import os
import greentest
import gevent
from gevent.fileobject import FileObject, FileObjectThread


class Test(greentest.TestCase):

    def _test_del(self, **kwargs):
        r, w = os.pipe()
        s = FileObject(w, 'wb')
        s.write('x')
        s.flush()
        del s
        try:
            os.close(w)
        except OSError:
            pass  # expected, because SocketAdapter already closed it
        else:
            raise AssertionError('os.close(%r) must not succeed' % w)
        self.assertEqual(FileObject(r).read(), 'x')

    def test_del(self):
        self._test_del()

    def test_del_close(self):
        self._test_del(close=True)

    if FileObject is not FileObjectThread:

        def test_del_noclose(self):
            r, w = os.pipe()
            s = FileObject(w, 'wb', close=False)
            s.write('x')
            s.flush()
            del s
            os.close(w)
            self.assertEqual(FileObject(r).read(), 'x')

    def test_newlines(self):
        r, w = os.pipe()
        lines = ['line1\n', 'line2\r', 'line3\r\n', 'line4\r\nline5', '\nline6']
        g = gevent.spawn(writer, FileObject(w, 'wb'), lines)
        try:
            result = FileObject(r, 'rU').read()
            self.assertEqual('line1\nline2\nline3\nline4\nline5\nline6', result)
        finally:
            g.kill()


def writer(fobj, line):
    for character in line:
        fobj.write(character)
        fobj.flush()
    fobj.close()


try:
    from gevent.fileobject import SocketAdapter
except ImportError:
    pass
else:

    class TestSocketAdapter(greentest.TestCase):

        def _test_del(self, **kwargs):
            r, w = os.pipe()
            s = SocketAdapter(w)
            s.sendall('x')
            del s
            try:
                os.close(w)
            except OSError:
                pass  # expected, because SocketAdapter already closed it
            else:
                raise AssertionError('os.close(%r) must not succeed' % w)
            self.assertEqual(FileObject(r).read(), 'x')

        def test_del(self):
            self._test_del()

        def test_del_close(self):
            self._test_del(close=True)

        def test_del_noclose(self):
            r, w = os.pipe()
            s = SocketAdapter(w, close=False)
            s.sendall('x')
            del s
            os.close(w)
            self.assertEqual(FileObject(r).read(), 'x')


if __name__ == '__main__':
    greentest.main()