1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
import logging
import asyncio
import struct
import qpack
from siridb.connector import SiriDBProtocol
from siridb.connector.lib.connection import SiriDBAsyncConnection
class Package:
__slots__ = ('pid', 'length', 'tipe', 'checkbit', 'data')
struct_datapackage = struct.Struct('<IHBB')
def __init__(self, barray):
self.length, self.pid, self.tipe, self.checkbit = \
self.__class__.struct_datapackage.unpack_from(barray, offset=0)
self.length += self.__class__.struct_datapackage.size
self.data = None
def extract_data_from(self, barray):
try:
self.data = qpack.unpackb(
barray[self.__class__.struct_datapackage.size:self.length],
decode='utf-8')
finally:
del barray[:self.length]
class SiriDBServerProtocol(asyncio.Protocol):
def __init__(self, on_package_received):
self._buffered_data = bytearray()
self._data_package = None
self._on_package_received = on_package_received
def data_received(self, data):
'''
override asyncio.Protocol
'''
self._buffered_data.extend(data)
while self._buffered_data:
size = len(self._buffered_data)
if self._data_package is None:
if size < Package.struct_datapackage.size:
return None
self._data_package = Package(self._buffered_data)
if size < self._data_package.length:
return None
try:
self._data_package.extract_data_from(self._buffered_data)
except KeyError as e:
logging.error('Unsupported package received: {}'.format(e))
except Exception as e:
logging.exception(e)
# empty the byte-array to recover from this error
self._buffered_data.clear()
else:
self._on_package_received(self._data_package.data)
self._data_package = None
|