File: util.py

package info (click to toggle)
libyang-python 4.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 640 kB
  • sloc: python: 8,012; ansic: 1,231; sh: 159; makefile: 15
file content (147 lines) | stat: -rw-r--r-- 4,685 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# Copyright (c) 2018-2019 Robin Jarry
# Copyright (c) 2021 RACOM s.r.o.
# SPDX-License-Identifier: MIT

from dataclasses import dataclass
import enum
from typing import Iterable, Optional
import warnings

from _libyang import ffi, lib


# -------------------------------------------------------------------------------------
@dataclass(frozen=True)
class LibyangErrorItem:
    msg: Optional[str]
    data_path: Optional[str]
    schema_path: Optional[str]
    line: Optional[int]


# -------------------------------------------------------------------------------------
class LibyangError(Exception):
    def __init__(
        self, message: str, *args, errors: Optional[Iterable[LibyangErrorItem]] = None
    ):
        super().__init__(message, *args)
        self.message = message
        self.errors = tuple(errors or ())

    def __str__(self):
        return self.message


# -------------------------------------------------------------------------------------
def deprecated(old: str, new: str, removed_in: str) -> None:
    msg = "%s has been replaced by %s, it will be removed in version %s"
    msg %= (old, new, removed_in)
    warnings.warn(msg, DeprecationWarning, stacklevel=3)


# -------------------------------------------------------------------------------------
def str2c(s: Optional[str], encode: bool = True):
    if s is None:
        return ffi.NULL
    if encode and hasattr(s, "encode"):
        s = s.encode("utf-8")
    return ffi.new("char []", s)


# -------------------------------------------------------------------------------------
def c2str(c, decode: bool = True):
    if c == ffi.NULL:  # C type: "char *"
        return None
    s = ffi.string(c)
    if decode and hasattr(s, "decode"):
        s = s.decode("utf-8")
    return s


# -------------------------------------------------------------------------------------
def p_str2c(s: Optional[str], encode: bool = True):
    s_p = str2c(s, encode)
    return ffi.new("char **", s_p)


# -------------------------------------------------------------------------------------
def ly_array_count(cdata):
    if cdata == ffi.NULL:
        return 0
    return ffi.cast("uint64_t *", cdata)[-1]


# -------------------------------------------------------------------------------------
def ly_array_iter(cdata):
    for i in range(ly_array_count(cdata)):
        yield cdata[i]


# -------------------------------------------------------------------------------------
def ly_list_iter(cdata):
    item = cdata
    while item != ffi.NULL:
        yield item
        item = item.next


# -------------------------------------------------------------------------------------
class IOType(enum.Enum):
    FD = enum.auto()
    FILE = enum.auto()
    FILEPATH = enum.auto()
    MEMORY = enum.auto()


# -------------------------------------------------------------------------------------
class DataType(enum.Enum):
    DATA_YANG = lib.LYD_TYPE_DATA_YANG
    RPC_YANG = lib.LYD_TYPE_RPC_YANG
    NOTIF_YANG = lib.LYD_TYPE_NOTIF_YANG
    REPLY_YANG = lib.LYD_TYPE_REPLY_YANG
    RPC_NETCONF = lib.LYD_TYPE_RPC_NETCONF
    NOTIF_NETCONF = lib.LYD_TYPE_NOTIF_NETCONF
    REPLY_NETCONF = lib.LYD_TYPE_REPLY_NETCONF
    RPC_RESTCONF = lib.LYD_TYPE_RPC_RESTCONF
    NOTIF_RESTCONF = lib.LYD_TYPE_NOTIF_RESTCONF
    REPLY_RESTCONF = lib.LYD_TYPE_REPLY_RESTCONF


# -------------------------------------------------------------------------------------
def init_output(out_type, out_target, out_data):
    output = None
    if out_type == IOType.FD:
        ret = lib.ly_out_new_fd(out_target.fileno(), out_data)

    elif out_type == IOType.FILE:
        ret = lib.ly_out_new_file(out_target, out_data)

    elif out_type == IOType.FILEPATH:
        out_target = str2c(out_target)
        ret = lib.ly_out_new_filepath(out_target, out_data)

    elif out_type == IOType.MEMORY:
        output = ffi.new("char **")
        ret = lib.ly_out_new_memory(output, 0, out_data)

    else:
        raise ValueError("invalid output")

    return ret, output


# -------------------------------------------------------------------------------------
def data_load(in_type, in_data, data, data_keepalive, encode=True):
    if in_type == IOType.FD:
        ret = lib.ly_in_new_fd(in_data.fileno(), data)
    elif in_type == IOType.FILE:
        ret = lib.ly_in_new_file(in_data, data)
    elif in_type == IOType.FILEPATH:
        ret = lib.ly_in_new_filepath(str2c(in_data), len(in_data), data)
    elif in_type == IOType.MEMORY:
        c_str = str2c(in_data, encode=encode)
        data_keepalive.append(c_str)
        ret = lib.ly_in_new_memory(c_str, data)
    else:
        raise ValueError("invalid input")
    return ret