File: __init__.py

package info (click to toggle)
python-thriftpy 0.3.9%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster
  • size: 560 kB
  • sloc: python: 3,287; ansic: 30; makefile: 7
file content (57 lines) | stat: -rw-r--r-- 1,259 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# -*- coding: utf-8 -*-

from __future__ import absolute_import

from io import BytesIO

from thriftpy._compat import CYTHON
from .. import TTransportBase


class TMemoryBuffer(TTransportBase):
    """Wraps a BytesIO object as a TTransport."""

    def __init__(self, value=None):
        """value -- a value as the initial value in the BytesIO object.

        If value is set, the transport can be read first.
        """
        self._buffer = BytesIO(value) if value is not None else BytesIO()
        self._pos = 0

    def is_open(self):
        return not self._buffer.closed

    def open(self):
        pass

    def close(self):
        self._buffer.close()

    def read(self, sz):
        return self._read(sz)

    def _read(self, sz):
        orig_pos = self._buffer.tell()
        self._buffer.seek(self._pos)
        res = self._buffer.read(sz)
        self._buffer.seek(orig_pos)
        self._pos += len(res)
        return res

    def write(self, buf):
        self._buffer.write(buf)

    def flush(self):
        pass

    def getvalue(self):
        return self._buffer.getvalue()

    def setvalue(self, value):
        self._buffer = BytesIO(value)
        self._pos = 0


if CYTHON:
    from .cymemory import TCyMemoryBuffer  # noqa