File: interp_io.py

package info (click to toggle)
pypy3 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 111,848 kB
  • sloc: python: 1,291,746; ansic: 74,281; asm: 5,187; cpp: 3,017; sh: 2,533; makefile: 544; xml: 243; lisp: 45; csh: 21; awk: 4
file content (146 lines) | stat: -rw-r--r-- 5,121 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os

from pypy.interpreter.error import OperationError, oefmt
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.interpreter.typedef import (
    TypeDef, interp_attrproperty, generic_new_descr)
from pypy.module._io.interp_fileio import W_FileIO
from pypy.module._io.interp_textio import W_TextIOWrapper


class Cache:
    def __init__(self, space):
        self.w_unsupportedoperation = space.new_exception_class(
            "io.UnsupportedOperation",
            space.newtuple([space.w_ValueError, space.w_IOError]))

@unwrap_spec(mode='text', buffering=int,
             encoding="text_or_none", errors="text_or_none",
             newline="text_or_none", closefd=int)
def open(space, w_file, mode="r", buffering=-1, encoding=None, errors=None,
         newline=None, closefd=True, w_opener=None):
    from pypy.module._io.interp_bufferedio import (W_BufferedRandom,
        W_BufferedWriter, W_BufferedReader)

    if not (space.isinstance_w(w_file, space.w_unicode) or
            space.isinstance_w(w_file, space.w_bytes) or
            space.isinstance_w(w_file, space.w_int)):
        raise oefmt(space.w_TypeError, "invalid file: %R", w_file)

    reading = writing = creating = appending = updating = text = binary = universal = False

    uniq_mode = {}
    for flag in mode:
        uniq_mode[flag] = None
    if len(uniq_mode) != len(mode):
        raise oefmt(space.w_ValueError, "invalid mode: %s", mode)
    for flag in mode:
        if flag == "r":
            reading = True
        elif flag == "w":
            writing = True
        elif flag == "x":
            creating = True
        elif flag == "a":
            appending = True
        elif flag == "+":
            updating = True
        elif flag == "t":
            text = True
        elif flag == "b":
            binary = True
        elif flag == "U":
            universal = True
            reading = True
        else:
            raise oefmt(space.w_ValueError, "invalid mode: %s", mode)

    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    if creating:
        rawmode += "x"
    if appending:
        rawmode += "a"
    if updating:
        rawmode += "+"

    if universal:
        if writing or appending:
            raise oefmt(space.w_ValueError,
                        "can't use U and writing mode at once")
        space.warn(space.newtext("'U' mode is deprecated ('r' has the same "
                              "effect in Python 3.x)"),
                   space.w_DeprecationWarning)
    if text and binary:
        raise oefmt(space.w_ValueError,
                    "can't have text and binary mode at once")
    if reading + writing + creating + appending > 1:
        raise oefmt(space.w_ValueError,
                    "must have exactly one of read/write/create/append mode")
    if binary and encoding is not None:
        raise oefmt(space.w_ValueError,
                    "binary mode doesn't take an encoding argument")
    if binary and newline is not None:
        raise oefmt(space.w_ValueError,
                    "binary mode doesn't take a newline argument")

    w_result = None
    try:
        w_raw = space.call_function(
            space.gettypefor(W_FileIO), w_file, space.newtext(rawmode),
            space.newbool(bool(closefd)), w_opener)
        w_result = w_raw

        isatty = space.is_true(space.call_method(w_raw, "isatty"))
        line_buffering = buffering == 1 or (buffering < 0 and isatty)
        if line_buffering:
            buffering = -1

        if buffering < 0:
            buffering = space.c_int_w(space.getattr(
                w_raw, space.newtext("_blksize")))

        if buffering < 0:
            raise oefmt(space.w_ValueError, "invalid buffering size")

        if buffering == 0:
            if not binary:
                raise oefmt(space.w_ValueError,
                            "can't have unbuffered text I/O")
            return w_result

        if updating:
            buffer_cls = W_BufferedRandom
        elif writing or creating or appending:
            buffer_cls = W_BufferedWriter
        elif reading:
            buffer_cls = W_BufferedReader
        else:
            raise oefmt(space.w_ValueError, "unknown mode: '%s'", mode)
        w_buffer = space.call_function(
            space.gettypefor(buffer_cls), w_raw, space.newint(buffering)
        )
        w_result = w_buffer
        if binary:
            return w_result

        w_wrapper = space.call_function(space.gettypefor(W_TextIOWrapper),
            w_buffer,
            space.newtext_or_none(encoding),
            space.newtext_or_none(errors),
            space.newtext_or_none(newline),
            space.newbool(line_buffering)
        )
        w_result = w_wrapper
        space.setattr(w_wrapper, space.newtext("mode"), space.newtext(mode))
        return w_result
    except OperationError as e:
        if w_result:
            try:
                space.call_method(w_result, "close")
            except OperationError as e2:
                e.chain_exceptions(space, e2)
        raise