File: FasterStringIO.py

package info (click to toggle)
zope-pts 1.2-rc1-2
  • links: PTS
  • area: main
  • in suites: sarge
  • size: 356 kB
  • ctags: 311
  • sloc: python: 1,756; makefile: 89; sh: 62
file content (150 lines) | stat: -rw-r--r-- 4,328 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""File-like objects that read from or write to a string buffer.

This implements (nearly) all stdio methods.

f = StringIO()      # ready for writing
f = StringIO(buf)   # ready for reading
f.close()           # explicitly release resources held
flag = f.isatty()   # always false
buf = f.read()      # returns the whole file (like getvalue())
buf = f.readline()  # returns one line from the list buffer
list = f.readlines()# the whole list buffer
f.write(buf)        # write at current position
f.writelines(list)  # extends the list buffer with another list
f.getvalue()        # return whole file's contents as a string

Notes:

FasterStringIO is a very basic but fast implementation with lot's of limitations.
I ripped of all functionality that isn't needed by TAL and replaced the rest
with a very basic implementation.

Limitations:

 * read() does return all data
 * no seek/truncate/tell methods are available
 * write is unicode aware
 * readlines does't guarantee lines
 * no len value available

You need the global request patch from PlacelessTranslationService!

Based on the StringIO module from python 2.3
Adapted for the PlacelessTranslationService / SpeedPack by Christian Heimes

Thanks to Andreas Jung for his idea to use list.append().
"""
try:
    True
except NameError:
    True=1
    False=0

from types import UnicodeType, StringType
from TAL.TALInterpreter import _write_ValueError

class FasterStringIO:
    """class FasterStringIO([buffer])

    unicode aware and restricted version of StringIO for Zope's TAL
    """
    def __init__(self, buf = ''):
        ## disabled
        ## Force self.buf to be a string or unicode
        ##if type(buf) in (UnicodeType, StringType):
        ##    buf = str(buf)
        self.buf = []
        self.buf.append(buf)
        #self.len = len(buf)
        self.linepos = 0
        self.closed = 0

    def __iter__(self):
        return self

    def next(self):
        if self.closed:
            raise StopIteration
        r = self.readline()
        if not r:
            raise StopIteration
        return r

    def close(self):
        if not self.closed:
            self.closed = 1
            self.write = _write_ValueError
            del self.buf

    def isatty(self):
        if self.closed:
            raise ValueError("I/O operation on closed file")
        return False

    def seek(self, pos, mode = 0):
        raise RuntimeError("FasterStringIO doesn't support seeking")

    def tell(self):
        if self.closed:
            raise RuntimeError("FasterStringIO doesn't support tell")

    def read(self):
        if self.closed:
            raise ValueError("I/O operation on closed file")
        return ''.join(self.buf)

    def readline(self, length=None):
        if self.closed:
            raise ValueError("I/O operation on closed file")
        if self.linepos <= len(self.buf):
            self.linepos+=1
            return self.buf[self.linepos]

    def readlines(self):
        return self.buf

    def truncate(self, size=None):
        raise RuntimeError("FasterStringIO doesn't support truncating")

    def write(self, s):
        if self.closed:
            raise ValueError("I/O operation on closed file")
        if not s: return

        if isinstance(s, UnicodeType):
            # XXX: import the get_request method
            # this will fail the first time we need it if the patch wasn't applied
            # before
            try:
                from Globals import get_request
            except ImportError:
                from PatchStringIO import applyRequestPatch
                applyRequestPatch()
                request = None
            else:
                request = get_request()

            try:
                response = request.RESPONSE
                s = response._encode_unicode(s)
            except AttributeError:
                # not an HTTPResponse
                pass

        #for l in s.split('\n'):
        #    self.len += len(l) +1
        #    self.buf.append(l)
        #self.len += len(s) +1
        self.buf.append(s)

    def writelines(self, list):
        self.buf.extend(list)

    def flush(self):
        if self.closed:
            raise RuntimeError("I/O operation on closed file")

    def getvalue(self):
        return self.read()

__all__ = ["StringIO"]