File: cymemory.pyx

package info (click to toggle)
python-thriftpy 0.3.9%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster
  • size: 560 kB
  • sloc: python: 3,287; ansic: 30; makefile: 7
file content (87 lines) | stat: -rw-r--r-- 2,007 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from libc.string cimport memcpy
from libc.stdlib cimport malloc, free
from thriftpy.transport.cybase cimport (
    TCyBuffer,
    CyTransportBase,
    DEFAULT_BUFFER,
)


cdef class TCyMemoryBuffer(CyTransportBase):
    cdef TCyBuffer buf

    def __init__(self, value=b'', int buf_size=DEFAULT_BUFFER):
        self.trans = None
        self.buf = TCyBuffer(buf_size)

        if value:
            self.setvalue(value)

    cdef c_read(self, int sz, char* out):
        if self.buf.data_size < sz:
            sz = self.buf.data_size

        if sz <= 0:
            out[0] = '\0'
        else:
            memcpy(out, self.buf.buf + self.buf.cur, sz)
            self.buf.cur += sz
            self.buf.data_size -= sz

        return sz

    cdef c_write(self, const char* data, int sz):
        cdef int r = self.buf.write(sz, data)
        if r == -1:
            raise MemoryError("Write to memory error")

    cdef _getvalue(self):
        cdef char *out
        cdef int size = self.buf.data_size

        if size <= 0:
            return b''

        out = <char*>malloc(size)
        try:
            memcpy(out, self.buf.buf + self.buf.cur, size)
            return out[:size]
        finally:
            free(out)

    cdef _setvalue(self, int sz, const char *value):
        self.buf.clean()
        self.buf.write(sz, value)

    def read(self, sz):
        return self.get_string(sz)

    def write(self, data):
        if isinstance(data, unicode):
            data = (<unicode>data).encode('utf-8')

        cdef int sz = len(data)
        return self.c_write(data, sz)

    def is_open(self):
        return True

    def open(self):
        pass

    def close(self):
        pass

    def flush(self):
        pass

    def clean(self):
        self.buf.clean()

    def getvalue(self):
        return self._getvalue()

    def setvalue(self, value):
        if isinstance(value, unicode):
            value = (<unicode>value).encode('utf-8')
        self._setvalue(len(value), value)