1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
# Copyright (c) 2018-2019 Robin Jarry
# Copyright (c) 2021 RACOM s.r.o.
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
import enum
from typing import Iterable, Optional
import warnings
from _libyang import ffi, lib
# -------------------------------------------------------------------------------------
@dataclass(frozen=True)
class LibyangErrorItem:
msg: Optional[str]
data_path: Optional[str]
schema_path: Optional[str]
line: Optional[int]
# -------------------------------------------------------------------------------------
class LibyangError(Exception):
def __init__(
self, message: str, *args, errors: Optional[Iterable[LibyangErrorItem]] = None
):
super().__init__(message, *args)
self.message = message
self.errors = tuple(errors or ())
def __str__(self):
return self.message
# -------------------------------------------------------------------------------------
def deprecated(old: str, new: str, removed_in: str) -> None:
msg = "%s has been replaced by %s, it will be removed in version %s"
msg %= (old, new, removed_in)
warnings.warn(msg, DeprecationWarning, stacklevel=3)
# -------------------------------------------------------------------------------------
def str2c(s: Optional[str], encode: bool = True):
if s is None:
return ffi.NULL
if encode and hasattr(s, "encode"):
s = s.encode("utf-8")
return ffi.new("char []", s)
# -------------------------------------------------------------------------------------
def c2str(c, decode: bool = True):
if c == ffi.NULL: # C type: "char *"
return None
s = ffi.string(c)
if decode and hasattr(s, "decode"):
s = s.decode("utf-8")
return s
# -------------------------------------------------------------------------------------
def p_str2c(s: Optional[str], encode: bool = True):
s_p = str2c(s, encode)
return ffi.new("char **", s_p)
# -------------------------------------------------------------------------------------
def ly_array_count(cdata):
if cdata == ffi.NULL:
return 0
return ffi.cast("uint64_t *", cdata)[-1]
# -------------------------------------------------------------------------------------
def ly_array_iter(cdata):
for i in range(ly_array_count(cdata)):
yield cdata[i]
# -------------------------------------------------------------------------------------
def ly_list_iter(cdata):
item = cdata
while item != ffi.NULL:
yield item
item = item.next
# -------------------------------------------------------------------------------------
class IOType(enum.Enum):
FD = enum.auto()
FILE = enum.auto()
FILEPATH = enum.auto()
MEMORY = enum.auto()
# -------------------------------------------------------------------------------------
class DataType(enum.Enum):
DATA_YANG = lib.LYD_TYPE_DATA_YANG
RPC_YANG = lib.LYD_TYPE_RPC_YANG
NOTIF_YANG = lib.LYD_TYPE_NOTIF_YANG
REPLY_YANG = lib.LYD_TYPE_REPLY_YANG
RPC_NETCONF = lib.LYD_TYPE_RPC_NETCONF
NOTIF_NETCONF = lib.LYD_TYPE_NOTIF_NETCONF
REPLY_NETCONF = lib.LYD_TYPE_REPLY_NETCONF
RPC_RESTCONF = lib.LYD_TYPE_RPC_RESTCONF
NOTIF_RESTCONF = lib.LYD_TYPE_NOTIF_RESTCONF
REPLY_RESTCONF = lib.LYD_TYPE_REPLY_RESTCONF
# -------------------------------------------------------------------------------------
def init_output(out_type, out_target, out_data):
output = None
if out_type == IOType.FD:
ret = lib.ly_out_new_fd(out_target.fileno(), out_data)
elif out_type == IOType.FILE:
ret = lib.ly_out_new_file(out_target, out_data)
elif out_type == IOType.FILEPATH:
out_target = str2c(out_target)
ret = lib.ly_out_new_filepath(out_target, out_data)
elif out_type == IOType.MEMORY:
output = ffi.new("char **")
ret = lib.ly_out_new_memory(output, 0, out_data)
else:
raise ValueError("invalid output")
return ret, output
# -------------------------------------------------------------------------------------
def data_load(in_type, in_data, data, data_keepalive, encode=True):
if in_type == IOType.FD:
ret = lib.ly_in_new_fd(in_data.fileno(), data)
elif in_type == IOType.FILE:
ret = lib.ly_in_new_file(in_data, data)
elif in_type == IOType.FILEPATH:
ret = lib.ly_in_new_filepath(str2c(in_data), len(in_data), data)
elif in_type == IOType.MEMORY:
c_str = str2c(in_data, encode=encode)
data_keepalive.append(c_str)
ret = lib.ly_in_new_memory(c_str, data)
else:
raise ValueError("invalid input")
return ret
|