1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
import os
import sys
from pypy.interpreter.error import OperationError, oefmt
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.interpreter.typedef import (
TypeDef, interp_attrproperty, generic_new_descr)
from pypy.module._io.interp_fileio import W_FileIO
from pypy.module._io.interp_textio import W_TextIOWrapper
from pypy.module.posix import interp_posix
from rpython.rlib import jit
_WIN32 = sys.platform == 'win32'
class Cache:
def __init__(self, space):
self.w_unsupportedoperation = space.new_exception_class(
"io.UnsupportedOperation",
space.newtuple2(space.w_ValueError, space.w_IOError))
@unwrap_spec(mode='text', buffering=int,
encoding="text_or_none", errors="text_or_none",
newline="text_or_none", closefd=int)
def open(space, w_file, mode="r", buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, w_opener=None):
return _open(space, w_file, mode, buffering, encoding, errors, newline,
closefd, w_opener)
@jit.look_inside_iff(lambda space, w_file, mode, buffering, encoding, errors,
newlines, closefd, w_opener: jit.isconstant(mode))
def _open(space, w_file, mode, buffering, encoding, errors, newline, closefd,
w_opener):
from pypy.module._io.interp_bufferedio import (W_BufferedRandom,
W_BufferedWriter, W_BufferedReader)
if not (space.isinstance_w(w_file, space.w_unicode) or
space.isinstance_w(w_file, space.w_bytes) or
space.isinstance_w(w_file, space.w_int)):
w_file = interp_posix.fspath(space, w_file)
reading = writing = creating = appending = updating = text = binary = False
for i in range(1, len(mode)):
flag = mode[i]
if mode.find(flag, 0, i) != -1:
raise oefmt(space.w_ValueError, "invalid mode: %s", mode)
for flag in mode:
if flag == "r":
reading = True
elif flag == "w":
writing = True
elif flag == "x":
creating = True
elif flag == "a":
appending = True
elif flag == "+":
updating = True
elif flag == "t":
text = True
elif flag == "b":
binary = True
else:
raise oefmt(space.w_ValueError, "invalid mode: %s", mode)
if text and binary:
raise oefmt(space.w_ValueError,
"can't have text and binary mode at once")
if creating + reading + writing + appending > 1:
raise oefmt(space.w_ValueError,
"must have exactly one of create/read/write/append mode")
if binary and encoding is not None:
raise oefmt(space.w_ValueError,
"binary mode doesn't take an encoding argument")
if binary and newline is not None:
raise oefmt(space.w_ValueError,
"binary mode doesn't take a newline argument")
if binary and buffering == 1:
space.warn(
space.newtext(
"line buffering (buffering=1) isn't supported in "
"binary mode, the default buffer size will be used"
), space.w_RuntimeWarning
)
rawmode = ""
if reading:
rawmode = "r"
if updating:
rawmode = "r+"
elif writing:
rawmode = "w"
if updating:
rawmode = "w+"
elif creating:
rawmode = "x"
if updating:
rawmode = "x+"
elif appending:
rawmode = "a"
if updating:
rawmode = "a+"
else:
# error, will be raised from interp_fileio
if updating:
rawmode = "+"
w_result = None
try:
rawclass = W_FileIO
if _WIN32:
from pypy.module._io.interp_win32consoleio import W_WinConsoleIO, _pyio_get_console_type
typ = _pyio_get_console_type(space, w_file)
if typ != '\0':
rawclass = W_WinConsoleIO
encoding = "utf-8"
w_raw = space.call_function(
space.gettypefor(rawclass), w_file, space.newtext(rawmode),
space.newbool(bool(closefd)), w_opener)
else:
w_raw = W_FileIO(space)
w_raw.descr_init(space, w_file, rawmode, bool(closefd), w_opener)
w_result = w_raw
isatty = space.is_true(space.call_method(w_raw, "isatty"))
line_buffering = buffering == 1 or (buffering < 0 and isatty)
if line_buffering:
buffering = -1
if buffering < 0:
buffering = space.c_int_w(space.getattr(
w_raw, space.newtext("_blksize")))
if buffering < 0:
raise oefmt(space.w_ValueError, "invalid buffering size")
if buffering == 0:
if not binary:
raise oefmt(space.w_ValueError,
"can't have unbuffered text I/O")
return w_result
if updating:
w_buffer = W_BufferedRandom(space)
w_buffer.descr_init(space, w_raw, buffering)
elif writing or creating or appending:
w_buffer = W_BufferedWriter(space)
w_buffer.descr_init(space, w_raw, buffering)
elif reading:
w_buffer = W_BufferedReader(space)
w_buffer.descr_init(space, w_raw, buffering)
else:
raise oefmt(space.w_ValueError, "unknown mode: '%s'", mode)
w_result = w_buffer
if binary:
return w_result
w_wrapper = W_TextIOWrapper(space)
w_wrapper.descr_init(space, w_buffer, encoding,
space.newtext_or_none(errors),
space.newtext_or_none(newline),
line_buffering)
w_result = w_wrapper
space.setattr(w_wrapper, space.newtext("mode"), space.newtext(mode))
return w_result
except OperationError as e:
if w_result:
try:
space.call_method(w_result, "close")
except OperationError as e2:
e.chain_exceptions(space, e2)
raise
def open_code(space, w_file):
return open(space, w_file, "rb")
|