1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
# NOT_RPYTHON
#
# The interface here may be a little bit on the lightweight side.
from _multibytecodec import MultibyteIncrementalDecoder
from _multibytecodec import MultibyteIncrementalEncoder
class MultibyteStreamReader(MultibyteIncrementalDecoder):
def __new__(cls, stream, errors=None):
self = MultibyteIncrementalDecoder.__new__(cls, errors)
self.stream = stream
return self
def __read(self, read, size):
if size is None or size < 0:
return MultibyteIncrementalDecoder.decode(self, read(), True)
while True:
data = read(size)
final = not data
output = MultibyteIncrementalDecoder.decode(self, data, final)
if output or final:
return output
size = 1 # read 1 more byte and retry
def read(self, size=None):
return self.__read(self.stream.read, size)
def readline(self, size=None):
return self.__read(self.stream.readline, size)
def readlines(self, sizehint=None):
return self.__read(self.stream.read, sizehint).splitlines(True)
class MultibyteStreamWriter(MultibyteIncrementalEncoder):
def __new__(cls, stream, errors=None):
self = MultibyteIncrementalEncoder.__new__(cls, errors)
self.stream = stream
return self
def write(self, data):
self.stream.write(MultibyteIncrementalEncoder.encode(
self, data))
def reset(self):
data = MultibyteIncrementalEncoder.encode(
self, '', final=True)
if len(data) > 0:
self.stream.write(data)
MultibyteIncrementalEncoder.reset(self)
def writelines(self, lines):
for data in lines:
self.write(data)
|