1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
|
# Copyright 2009 Shikhar Bhushan
# Copyright 2011 Leonidas Poulopoulos
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"Methods for creating, parsing, and dealing with XML and ElementTree objects."
import io
import sys
import six
import types
from six import StringIO
from io import BytesIO
from lxml import etree
# In case issues come up with XML generation/parsing
# make sure you have the ElementTree v1.2.7+ lib as
# well as lxml v3.0+
from ncclient import NCClientError
parser = etree.XMLParser(recover=False)
huge_parser = etree.XMLParser(recover=False, huge_tree=True)
def _get_parser(huge_tree=False):
return huge_parser if huge_tree else parser
class XMLError(NCClientError):
pass
### Namespace-related
#: Base NETCONF namespace
BASE_NS_1_0 = "urn:ietf:params:xml:ns:netconf:base:1.0"
#: YANG (RFC 6020/RFC 7950) namespace
YANG_NS_1_0 = "urn:ietf:params:xml:ns:yang:1"
#: NXOS_1_0
NXOS_1_0 = "http://www.cisco.com/nxos:1.0"
#: NXOS_IF
NXOS_IF = "http://www.cisco.com/nxos:1.0:if_manager"
#: Namespace for Tail-f core data model
TAILF_AAA_1_1 = "http://tail-f.com/ns/aaa/1.1"
#: Namespace for Tail-f execd data model
TAILF_EXECD_1_1 = "http://tail-f.com/ns/execd/1.1"
#: Namespace for Cisco data model
CISCO_CPI_1_0 = "http://www.cisco.com/cpi_10/schema"
#: Namespace for Flowmon data model
FLOWMON_1_0 = "http://www.liberouter.org/ns/netopeer/flowmon/1.0"
#: Namespace for Juniper 9.6R4. Tested with Junos 9.6R4+
JUNIPER_1_1 = "http://xml.juniper.net/xnm/1.1/xnm"
#: Namespace for Huawei data model
HUAWEI_NS = "http://www.huawei.com/netconf/vrp"
#: Namespace for Huawei private
HW_PRIVATE_NS = "http://www.huawei.com/netconf/capability/base/1.0"
#: Namespace for H3C data model
H3C_DATA_1_0 = "http://www.h3c.com/netconf/data:1.0"
#: Namespace for H3C config model
H3C_CONFIG_1_0 = "http://www.h3c.com/netconf/config:1.0"
#: Namespace for H3C action model
H3C_ACTION_1_0 = "http://www.h3c.com/netconf/action:1.0"
#: Namespace for netconf monitoring
NETCONF_MONITORING_NS = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring"
#: Namespace for netconf notifications
NETCONF_NOTIFICATION_NS = "urn:ietf:params:xml:ns:netconf:notification:1.0"
#: Namespace for netconf with-defaults (RFC 6243)
NETCONF_WITH_DEFAULTS_NS = "urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults"
#: Namespace for Alcatel-Lucent SR OS Base r13 YANG models
ALU_CONFIG = "urn:alcatel-lucent.com:sros:ns:yang:conf-r13"
#: Namespace for Nokia SR OS global operations
SROS_GLOBAL_OPS_NS = "urn:nokia.com:sros:ns:yang:sr:oper-global"
try:
register_namespace = etree.register_namespace
except AttributeError:
def register_namespace(prefix, uri):
from xml.etree import ElementTree
# cElementTree uses ElementTree's _namespace_map, so that's ok
ElementTree._namespace_map[uri] = prefix
for (ns, pre) in six.iteritems({
BASE_NS_1_0: 'nc',
NETCONF_MONITORING_NS: 'ncm',
NXOS_1_0: 'nxos',
NXOS_IF: 'if',
TAILF_AAA_1_1: 'aaa',
TAILF_EXECD_1_1: 'execd',
CISCO_CPI_1_0: 'cpi',
FLOWMON_1_0: 'fm',
JUNIPER_1_1: 'junos',
}):
register_namespace(pre, ns)
qualify = lambda tag, ns=BASE_NS_1_0: tag if ns is None else "{%s}%s" % (ns, tag)
"""Qualify a *tag* name with a *namespace*, in :mod:`~xml.etree.ElementTree` fashion i.e. *{namespace}tagname*."""
def to_xml(ele, encoding="UTF-8", pretty_print=False):
"Convert and return the XML for an *ele* (:class:`~xml.etree.ElementTree.Element`) with specified *encoding*."
xml = etree.tostring(ele, encoding=encoding, pretty_print=pretty_print)
if sys.version < '3':
return xml if xml.startswith('<?xml') else '<?xml version="1.0" encoding="%s"?>%s' % (encoding, xml)
else:
return xml.decode('UTF-8') if xml.startswith(b'<?xml') \
else '<?xml version="1.0" encoding="%s"?>%s' % (encoding, xml.decode('UTF-8'))
def to_ele(x, huge_tree=False):
"""Convert and return the :class:`~xml.etree.ElementTree.Element` for the XML document *x*. If *x* is already an :class:`~xml.etree.ElementTree.Element` simply returns that.
*huge_tree*: parse XML with very deep trees and very long text content
"""
if sys.version < '3':
return x if etree.iselement(x) else etree.fromstring(x, parser=_get_parser(huge_tree))
else:
return x if etree.iselement(x) else etree.fromstring(x.encode('UTF-8'), parser=_get_parser(huge_tree))
def parse_root(raw):
"Efficiently parses the root element of a *raw* XML document, returning a tuple of its qualified name and attribute dictionary."
if sys.version < '3':
fp = StringIO(raw)
else:
fp = BytesIO(raw.encode('UTF-8'))
for event, element in etree.iterparse(fp, events=('start',)):
return (element.tag, element.attrib)
def validated_element(x, tags=None, attrs=None):
"""Checks if the root element of an XML document or Element meets the supplied criteria.
*tags* if specified is either a single allowable tag name or sequence of allowable alternatives
*attrs* if specified is a sequence of required attributes, each of which may be a sequence of several allowable alternatives
Raises :exc:`XMLError` if the requirements are not met.
"""
ele = to_ele(x)
if tags:
if isinstance(tags, (str, bytes)):
tags = [tags]
if ele.tag not in tags:
raise XMLError("Element [%s] does not meet requirement" % ele.tag)
if attrs:
for req in attrs:
if isinstance(req, (str, bytes)): req = [req]
for alt in req:
if alt in ele.attrib:
break
else:
raise XMLError("Element [%s] does not have required attributes" % ele.tag)
return ele
XPATH_NAMESPACES = {
're':'http://exslt.org/regular-expressions'
}
class NCElement(object):
def __init__(self, result, transform_reply, huge_tree=False):
self.__result = result
self.__transform_reply = transform_reply
self.__huge_tree = huge_tree
if isinstance(transform_reply, types.FunctionType):
self.__doc = self.__transform_reply(result._root)
else:
self.__doc = self.remove_namespaces(self.__result)
def xpath(self, expression, namespaces={}):
"""Perform XPath navigation on an object
Args:
expression: A string representing a compliant XPath
expression.
namespaces: A dict of caller supplied prefix/xmlns to
append to the static dict of XPath namespaces.
Returns:
A list of 'lxml.etree._Element' should a match on the
expression be successful. Otherwise, an empty list will
be returned to the caller.
"""
self.__expression = expression
self.__namespaces = XPATH_NAMESPACES
self.__namespaces.update(namespaces)
return self.__doc.xpath(self.__expression, namespaces=self.__namespaces)
def find(self, expression):
"""return result for a call to lxml ElementPath find()"""
self.__expression = expression
return self.__doc.find(self.__expression)
def findtext(self, expression):
"""return result for a call to lxml ElementPath findtext()"""
self.__expression = expression
return self.__doc.findtext(self.__expression)
def findall(self, expression):
"""return result for a call to lxml ElementPath findall()"""
self.__expression = expression
return self.__doc.findall(self.__expression)
def __str__(self):
"""syntactic sugar for str() - alias to tostring"""
if sys.version < '3':
return self.tostring
else:
return self.tostring.decode('UTF-8')
@property
def tostring(self):
"""return a pretty-printed string output for rpc reply"""
parser = etree.XMLParser(remove_blank_text=True, huge_tree=self.__huge_tree)
outputtree = etree.XML(etree.tostring(self.__doc), parser)
return etree.tostring(outputtree, pretty_print=True)
@property
def data_xml(self):
"""return an unmodified output for rpc reply"""
return to_xml(self.__doc)
def remove_namespaces(self, rpc_reply):
"""remove xmlns attributes from rpc reply"""
self.__xslt=self.__transform_reply
self.__parser = etree.XMLParser(remove_blank_text=True, huge_tree=self.__huge_tree)
self.__xslt_doc = etree.parse(io.BytesIO(self.__xslt), self.__parser)
self.__transform = etree.XSLT(self.__xslt_doc)
self.__root = etree.fromstring(str(self.__transform(etree.parse(StringIO(str(rpc_reply)),
parser=self.__parser))),
parser=self.__parser)
return self.__root
def parent_ns(node):
if node.prefix:
return node.nsmap[node.prefix]
return None
def yang_action(name, attrs):
"""Instantiate a YANG action element
Args:
name: A string representing the first descendant name of the
XML element for the YANG action.
attrs: A dict of attributes to apply to the XML element
(e.g. namespaces).
Returns:
A tuple of 'lxml.etree._Element' values. The first value
represents the top-level YANG action element and the second
represents the caller supplied initial node.
"""
node = new_ele('action', attrs={'xmlns': YANG_NS_1_0})
return (node, sub_ele(node, name, attrs))
def replace_namespace(root, old_ns, new_ns):
"""
Substitute old_ns with new_ns for all the xml elements including and below root
:param root: top element (root for this change)
:param old_ns: old namespace
:param new_ns: new namespace
:return:
"""
for elem in root.getiterator():
# Comments don't have a namespace
if elem.tag is not etree.Comment:
# handle tag
qtag = etree.QName(elem)
if qtag.namespace == old_ns:
elem.tag = etree.QName(new_ns, qtag.localname)
# handle attributes
attribs_dict = elem.attrib
for attr in attribs_dict.keys():
qattr = etree.QName(attr)
if qattr.namespace == old_ns:
attribs_dict[etree.QName(new_ns, qattr.localname)] = attribs_dict.pop(attr)
new_ele_nsmap = lambda tag, nsmap, attrs={}, **extra: etree.Element(qualify(tag), attrs, nsmap, **extra)
new_ele = lambda tag, attrs={}, **extra: etree.Element(qualify(tag), attrs, **extra)
new_ele_ns = lambda tag, ns, attrs={}, **extra: etree.Element(qualify(tag,ns), attrs, **extra)
sub_ele = lambda parent, tag, attrs={}, **extra: etree.SubElement(parent, qualify(tag, parent_ns(parent)), attrs, **extra)
sub_ele_ns = lambda parent, tag, ns, attrs={}, **extra: etree.SubElement(parent, qualify(tag, ns), attrs, **extra)
|