1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
import os
from pypy.interpreter.error import OperationError, oefmt
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.interpreter.typedef import (
TypeDef, interp_attrproperty, generic_new_descr)
from pypy.module._io.interp_fileio import W_FileIO
from pypy.module._io.interp_textio import W_TextIOWrapper
class Cache:
def __init__(self, space):
self.w_unsupportedoperation = space.new_exception_class(
"io.UnsupportedOperation",
space.newtuple([space.w_ValueError, space.w_IOError]))
@unwrap_spec(mode='text', buffering=int,
encoding="text_or_none", errors="text_or_none",
newline="text_or_none", closefd=int)
def open(space, w_file, mode="r", buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, w_opener=None):
from pypy.module._io.interp_bufferedio import (W_BufferedRandom,
W_BufferedWriter, W_BufferedReader)
if not (space.isinstance_w(w_file, space.w_unicode) or
space.isinstance_w(w_file, space.w_bytes) or
space.isinstance_w(w_file, space.w_int)):
raise oefmt(space.w_TypeError, "invalid file: %R", w_file)
reading = writing = creating = appending = updating = text = binary = universal = False
uniq_mode = {}
for flag in mode:
uniq_mode[flag] = None
if len(uniq_mode) != len(mode):
raise oefmt(space.w_ValueError, "invalid mode: %s", mode)
for flag in mode:
if flag == "r":
reading = True
elif flag == "w":
writing = True
elif flag == "x":
creating = True
elif flag == "a":
appending = True
elif flag == "+":
updating = True
elif flag == "t":
text = True
elif flag == "b":
binary = True
elif flag == "U":
universal = True
reading = True
else:
raise oefmt(space.w_ValueError, "invalid mode: %s", mode)
rawmode = ""
if reading:
rawmode += "r"
if writing:
rawmode += "w"
if creating:
rawmode += "x"
if appending:
rawmode += "a"
if updating:
rawmode += "+"
if universal:
if writing or appending:
raise oefmt(space.w_ValueError,
"can't use U and writing mode at once")
space.warn(space.newtext("'U' mode is deprecated ('r' has the same "
"effect in Python 3.x)"),
space.w_DeprecationWarning)
if text and binary:
raise oefmt(space.w_ValueError,
"can't have text and binary mode at once")
if reading + writing + creating + appending > 1:
raise oefmt(space.w_ValueError,
"must have exactly one of read/write/create/append mode")
if binary and encoding is not None:
raise oefmt(space.w_ValueError,
"binary mode doesn't take an encoding argument")
if binary and newline is not None:
raise oefmt(space.w_ValueError,
"binary mode doesn't take a newline argument")
w_result = None
try:
w_raw = space.call_function(
space.gettypefor(W_FileIO), w_file, space.newtext(rawmode),
space.newbool(bool(closefd)), w_opener)
w_result = w_raw
isatty = space.is_true(space.call_method(w_raw, "isatty"))
line_buffering = buffering == 1 or (buffering < 0 and isatty)
if line_buffering:
buffering = -1
if buffering < 0:
buffering = space.c_int_w(space.getattr(
w_raw, space.newtext("_blksize")))
if buffering < 0:
raise oefmt(space.w_ValueError, "invalid buffering size")
if buffering == 0:
if not binary:
raise oefmt(space.w_ValueError,
"can't have unbuffered text I/O")
return w_result
if updating:
buffer_cls = W_BufferedRandom
elif writing or creating or appending:
buffer_cls = W_BufferedWriter
elif reading:
buffer_cls = W_BufferedReader
else:
raise oefmt(space.w_ValueError, "unknown mode: '%s'", mode)
w_buffer = space.call_function(
space.gettypefor(buffer_cls), w_raw, space.newint(buffering)
)
w_result = w_buffer
if binary:
return w_result
w_wrapper = space.call_function(space.gettypefor(W_TextIOWrapper),
w_buffer,
space.newtext_or_none(encoding),
space.newtext_or_none(errors),
space.newtext_or_none(newline),
space.newbool(line_buffering)
)
w_result = w_wrapper
space.setattr(w_wrapper, space.newtext("mode"), space.newtext(mode))
return w_result
except OperationError as e:
if w_result:
try:
space.call_method(w_result, "close")
except OperationError as e2:
e.chain_exceptions(space, e2)
raise
|