File: input.py

package info (click to toggle)
py-rnp 0.1.0%2Bgit20221014.01b7129-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 424 kB
  • sloc: python: 3,062; sh: 8; makefile: 4
file content (71 lines) | stat: -rw-r--r-- 1,782 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from ctypes import (
    c_bool,
    c_uint8,
    c_void_p,
    c_size_t,
    byref,
    py_object,
    cast,
    POINTER,
    CFUNCTYPE,
)

from .lib import _lib


class Input:
    def __init__(self, obj, io=None):
        self._obj = obj
        self._io = io

    def __del__(self):
        _lib.rnp_input_destroy(self._obj)

    def obj(self):
        return self._obj

    @staticmethod
    def from_path(path):
        obj = c_void_p()
        _lib.rnp_input_from_path(byref(obj), path.encode("utf-8"))
        return Input(obj)

    @staticmethod
    def from_bytes(data):
        obj = c_void_p()
        buf = (c_uint8 * len(data)).from_buffer_copy(data)
        _lib.rnp_input_from_memory(byref(obj), buf, len(data), True)
        return Input(obj)

    @staticmethod
    def from_io(io):
        obj = c_void_p()
        io = py_object(io)
        _lib.rnp_input_from_callback(byref(obj), Input.READER, Input.CLOSER, byref(io))
        return Input(obj, io)

    @staticmethod
    @CFUNCTYPE(c_bool, c_void_p, c_void_p, c_size_t, POINTER(c_size_t))
    def READER(app_ctx, buf, buf_len, read):
        try:
            io = cast(app_ctx, POINTER(py_object)).contents.value
            data = io.read(buf_len)
            if isinstance(data, str):
                data = data.encode("utf-8")

            buf = (c_uint8 * buf_len).from_address(buf)
            buf[: len(data)] = data[: len(data)]
            read.contents.value = len(data)
            return True
        except Exception as e:
            print(e)
            return False

    @staticmethod
    @CFUNCTYPE(None, c_void_p)
    def CLOSER(app_ctx):
        try:
            io = cast(app_ctx, POINTER(py_object)).contents.value
            io.close()
        except Exception as e:
            print(e)