File: test_app_stream.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (104 lines) | stat: -rw-r--r-- 3,455 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
class AppTestStreams:
    spaceconfig = dict(usemodules=['_multibytecodec'])

    def setup_class(cls):
        cls.w_HzStreamReader = cls.space.appexec([], """():
            import _codecs_cn
            from _multibytecodec import MultibyteStreamReader

            class HzStreamReader(MultibyteStreamReader):
                codec = _codecs_cn.getcodec('hz')

            return HzStreamReader
        """)
        cls.w_HzStreamWriter = cls.space.appexec([], """():
            import _codecs_cn
            from _multibytecodec import MultibyteStreamWriter

            class HzStreamWriter(MultibyteStreamWriter):
                codec = _codecs_cn.getcodec('hz')

            return HzStreamWriter
        """)
        cls.w_ShiftJisx0213StreamWriter = cls.space.appexec([], """():
            import _codecs_jp
            from _multibytecodec import MultibyteStreamWriter

            class ShiftJisx0213StreamWriter(MultibyteStreamWriter):
                codec = _codecs_jp.getcodec('shift_jisx0213')

            return ShiftJisx0213StreamWriter
        """)

    def test_reader(self):
        class FakeFile:
            def __init__(self, data):
                self.data = data
                self.pos = 0
            def read(self, size):
                res = self.data[self.pos : self.pos + size]
                self.pos += size
                return res
        #
        r = self.HzStreamReader(FakeFile(b"!~{abcd~}xyz~{efgh"))
        for expected in u'!\u5f95\u6c85xyz\u5f50\u73b7':
            c = r.read(1)
            assert c == expected
        c = r.read(1)
        assert c == ''

    def test_reader_replace(self):
        class FakeFile:
            def __init__(self, data):
                self.data = data
            def read(self):
                return self.data
        #
        r = self.HzStreamReader(FakeFile(b"!~{a"), "replace")
        c = r.read()
        assert c == u'!\ufffd'
        #
        r = self.HzStreamReader(FakeFile(b"!~{a"))
        r.errors = "replace"
        assert r.errors == "replace"
        c = r.read()
        assert c == u'!\ufffd'

    def test_writer(self):
        class FakeFile:
            def __init__(self):
                self.output = []
            def write(self, data):
                self.output.append(data)
        #
        w = self.HzStreamWriter(FakeFile())
        for input in u'!\u5f95\u6c85xyz\u5f50\u73b7':
            w.write(input)
        w.reset()
        assert w.stream.output == [b'!', b'~{ab', b'cd', b'~}x', b'y', b'z',
                                   b'~{ef', b'gh', b'~}']

    def test_no_flush(self):
        class FakeFile:
            def __init__(self):
                self.output = []
            def write(self, data):
                self.output.append(data)
        #
        w = self.ShiftJisx0213StreamWriter(FakeFile())
        w.write(u'\u30ce')
        w.write(u'\u304b')
        w.write(u'\u309a')
        assert w.stream.output == [b'\x83m', b'', b'\x82\xf5']

    def test_writer_seek_no_empty_write(self):
        # issue #2293: codecs.py will sometimes issue a reset()
        # on a StreamWriter attached to a file that is not opened
        # for writing at all.  We must not emit a "write('')"!
        class FakeFile:
            def write(self, data):
                raise IOError("can't write!")
        #
        w = self.ShiftJisx0213StreamWriter(FakeFile())
        w.reset()
        # assert did not crash