1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
from ctypes import (
c_bool,
c_uint8,
c_void_p,
c_size_t,
byref,
py_object,
cast,
POINTER,
CFUNCTYPE,
)
from .lib import _lib
class Input:
def __init__(self, obj, io=None):
self._obj = obj
self._io = io
def __del__(self):
_lib.rnp_input_destroy(self._obj)
def obj(self):
return self._obj
@staticmethod
def from_path(path):
obj = c_void_p()
_lib.rnp_input_from_path(byref(obj), path.encode("utf-8"))
return Input(obj)
@staticmethod
def from_bytes(data):
obj = c_void_p()
buf = (c_uint8 * len(data)).from_buffer_copy(data)
_lib.rnp_input_from_memory(byref(obj), buf, len(data), True)
return Input(obj)
@staticmethod
def from_io(io):
obj = c_void_p()
io = py_object(io)
_lib.rnp_input_from_callback(byref(obj), Input.READER, Input.CLOSER, byref(io))
return Input(obj, io)
@staticmethod
@CFUNCTYPE(c_bool, c_void_p, c_void_p, c_size_t, POINTER(c_size_t))
def READER(app_ctx, buf, buf_len, read):
try:
io = cast(app_ctx, POINTER(py_object)).contents.value
data = io.read(buf_len)
if isinstance(data, str):
data = data.encode("utf-8")
buf = (c_uint8 * buf_len).from_address(buf)
buf[: len(data)] = data[: len(data)]
read.contents.value = len(data)
return True
except Exception as e:
print(e)
return False
@staticmethod
@CFUNCTYPE(None, c_void_p)
def CLOSER(app_ctx):
try:
io = cast(app_ctx, POINTER(py_object)).contents.value
io.close()
except Exception as e:
print(e)
|