File: bufferedwriter.pyx

package info (click to toggle)
python-clickhouse-driver 0.2.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,516 kB
  • sloc: python: 10,950; pascal: 42; makefile: 31; sh: 3
file content (147 lines) | stat: -rw-r--r-- 4,325 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from cpython cimport PyMem_Malloc, PyMem_Free, PyBytes_AsString, \
    PyBytes_Check, PyBytes_FromStringAndSize, PyBytes_AS_STRING, \
    PyBytes_GET_SIZE
from libc.string cimport memcpy, memset

from . import errors
from .varint import make_varint


cdef class BufferedWriter(object):
    cdef char* buffer
    cdef unsigned long long position, buffer_size

    def __init__(self, unsigned long long bufsize):
        self.buffer = <char *> PyMem_Malloc(bufsize)
        if not self.buffer:
            raise MemoryError()

        self.position = 0
        self.buffer_size = bufsize

        super(BufferedWriter, self).__init__()

    def __dealloc__(self):
        PyMem_Free(self.buffer)

    cpdef write_into_stream(self):
        raise NotImplementedError

    cpdef write(self, data):
        cdef unsigned long long size, written = 0
        cdef unsigned long long data_len = PyBytes_GET_SIZE(data)
        cdef char* c_data = PyBytes_AS_STRING(data)

        while written < data_len:
            size = min(data_len - written, self.buffer_size - self.position)
            memcpy(&self.buffer[self.position], &c_data[written], size)

            if self.position == self.buffer_size:
                self.write_into_stream()

            self.position += size
            written += size

    def flush(self):
        self.write_into_stream()

    def write_strings(self, items, encoding=None):
        cdef int do_encode = encoding is not None

        for value in items:
            if not PyBytes_Check(value):
                if do_encode:
                    value = value.encode(encoding)
                else:
                    raise ValueError('bytes object expected')

            self.write(make_varint(PyBytes_GET_SIZE(value)))
            self.write(value)

    def write_fixed_strings_as_bytes(self, items, Py_ssize_t length):
        cdef Py_ssize_t buf_pos = 0
        cdef Py_ssize_t items_buf_size = length * len(items)

        cdef char* c_value
        cdef char* items_buf = <char *>PyMem_Malloc(items_buf_size)
        if not items_buf:
            raise MemoryError()

        memset(items_buf, 0, items_buf_size)

        for value in items:
            value_len = len(value)
            if length < value_len:
                raise errors.TooLargeStringSize()

            c_value = PyBytes_AsString(value)

            memcpy(&items_buf[buf_pos], c_value, value_len)
            buf_pos += length

        self.write(PyBytes_FromStringAndSize(items_buf, items_buf_size))

        PyMem_Free(items_buf)

    def write_fixed_strings(self, items, Py_ssize_t length, encoding=None):
        if encoding is None:
            self.write_fixed_strings_as_bytes(items, length)
            return

        cdef Py_ssize_t buf_pos = 0
        cdef Py_ssize_t items_buf_size = length * len(items)

        cdef char* c_value
        cdef char* items_buf = <char *>PyMem_Malloc(items_buf_size)
        if not items_buf:
            raise MemoryError()

        memset(items_buf, 0, items_buf_size)

        for value in items:
            if not PyBytes_Check(value):
                value = value.encode(encoding)

            value_len = len(value)
            if length < value_len:
                raise errors.TooLargeStringSize()

            c_value = PyBytes_AsString(value)

            memcpy(&items_buf[buf_pos], c_value, value_len)
            buf_pos += length

        self.write(PyBytes_FromStringAndSize(items_buf, items_buf_size))

        PyMem_Free(items_buf)


cdef class BufferedSocketWriter(BufferedWriter):
    cdef object sock

    def __init__(self, sock, bufsize):
        self.sock = sock
        super(BufferedSocketWriter, self).__init__(bufsize)

    cpdef write_into_stream(self):
        self.sock.sendall(
            PyBytes_FromStringAndSize(self.buffer, self.position)
        )
        self.position = 0


cdef class CompressedBufferedWriter(BufferedWriter):
    cdef object compressor

    def __init__(self, compressor, bufsize):
        self.compressor = compressor
        super(CompressedBufferedWriter, self).__init__(bufsize)

    cpdef write_into_stream(self):
        self.compressor.write(
            PyBytes_FromStringAndSize(self.buffer, self.position)
        )
        self.position = 0

    def flush(self):
        self.write_into_stream()