1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
from cpython cimport PyMem_Malloc, PyMem_Free, PyBytes_AsString, \
PyBytes_Check, PyBytes_FromStringAndSize, PyBytes_AS_STRING, \
PyBytes_GET_SIZE
from libc.string cimport memcpy, memset
from . import errors
from .varint import make_varint
cdef class BufferedWriter(object):
cdef char* buffer
cdef unsigned long long position, buffer_size
def __init__(self, unsigned long long bufsize):
self.buffer = <char *> PyMem_Malloc(bufsize)
if not self.buffer:
raise MemoryError()
self.position = 0
self.buffer_size = bufsize
super(BufferedWriter, self).__init__()
def __dealloc__(self):
PyMem_Free(self.buffer)
cpdef write_into_stream(self):
raise NotImplementedError
cpdef write(self, data):
cdef unsigned long long size, written = 0
cdef unsigned long long data_len = PyBytes_GET_SIZE(data)
cdef char* c_data = PyBytes_AS_STRING(data)
while written < data_len:
size = min(data_len - written, self.buffer_size - self.position)
memcpy(&self.buffer[self.position], &c_data[written], size)
if self.position == self.buffer_size:
self.write_into_stream()
self.position += size
written += size
def flush(self):
self.write_into_stream()
def write_strings(self, items, encoding=None):
cdef int do_encode = encoding is not None
for value in items:
if not PyBytes_Check(value):
if do_encode:
value = value.encode(encoding)
else:
raise ValueError('bytes object expected')
self.write(make_varint(PyBytes_GET_SIZE(value)))
self.write(value)
def write_fixed_strings_as_bytes(self, items, Py_ssize_t length):
cdef Py_ssize_t buf_pos = 0
cdef Py_ssize_t items_buf_size = length * len(items)
cdef char* c_value
cdef char* items_buf = <char *>PyMem_Malloc(items_buf_size)
if not items_buf:
raise MemoryError()
memset(items_buf, 0, items_buf_size)
for value in items:
value_len = len(value)
if length < value_len:
raise errors.TooLargeStringSize()
c_value = PyBytes_AsString(value)
memcpy(&items_buf[buf_pos], c_value, value_len)
buf_pos += length
self.write(PyBytes_FromStringAndSize(items_buf, items_buf_size))
PyMem_Free(items_buf)
def write_fixed_strings(self, items, Py_ssize_t length, encoding=None):
if encoding is None:
self.write_fixed_strings_as_bytes(items, length)
return
cdef Py_ssize_t buf_pos = 0
cdef Py_ssize_t items_buf_size = length * len(items)
cdef char* c_value
cdef char* items_buf = <char *>PyMem_Malloc(items_buf_size)
if not items_buf:
raise MemoryError()
memset(items_buf, 0, items_buf_size)
for value in items:
if not PyBytes_Check(value):
value = value.encode(encoding)
value_len = len(value)
if length < value_len:
raise errors.TooLargeStringSize()
c_value = PyBytes_AsString(value)
memcpy(&items_buf[buf_pos], c_value, value_len)
buf_pos += length
self.write(PyBytes_FromStringAndSize(items_buf, items_buf_size))
PyMem_Free(items_buf)
cdef class BufferedSocketWriter(BufferedWriter):
cdef object sock
def __init__(self, sock, bufsize):
self.sock = sock
super(BufferedSocketWriter, self).__init__(bufsize)
cpdef write_into_stream(self):
self.sock.sendall(
PyBytes_FromStringAndSize(self.buffer, self.position)
)
self.position = 0
cdef class CompressedBufferedWriter(BufferedWriter):
cdef object compressor
def __init__(self, compressor, bufsize):
self.compressor = compressor
super(CompressedBufferedWriter, self).__init__(bufsize)
cpdef write_into_stream(self):
self.compressor.write(
PyBytes_FromStringAndSize(self.buffer, self.position)
)
self.position = 0
def flush(self):
self.write_into_stream()
|