1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
"""
XPath selectors
See documentation in docs/topics/selectors.rst
"""
import libxml2
from scrapy.http import TextResponse
from scrapy.utils.python import flatten, unicode_to_str
from scrapy.utils.misc import extract_regex
from scrapy.utils.trackref import object_ref
from scrapy.utils.decorator import deprecated
from .factories import xmlDoc_from_html, xmlDoc_from_xml
from .document import Libxml2Document
__all__ = ['HtmlXPathSelector', 'XmlXPathSelector', 'XPathSelector', \
'XPathSelectorList']
class XPathSelector(object_ref):
__slots__ = ['doc', 'xmlNode', 'expr', '__weakref__']
def __init__(self, response=None, text=None, node=None, parent=None, expr=None):
if parent:
self.doc = parent.doc
self.xmlNode = node
elif response:
self.doc = Libxml2Document(response, factory=self._get_libxml2_doc)
self.xmlNode = self.doc.xmlDoc
elif text:
response = TextResponse(url='about:blank', body=unicode_to_str(text), \
encoding='utf-8')
self.doc = Libxml2Document(response, factory=self._get_libxml2_doc)
self.xmlNode = self.doc.xmlDoc
self.expr = expr
def select(self, xpath):
"""Perform the given XPath query on the current XPathSelector and
return a XPathSelectorList of the result"""
if hasattr(self.xmlNode, 'xpathEval'):
self.doc.xpathContext.setContextNode(self.xmlNode)
try:
xpath_result = self.doc.xpathContext.xpathEval(xpath)
except libxml2.xpathError:
raise ValueError("Invalid XPath: %s" % xpath)
if hasattr(xpath_result, '__iter__'):
return XPathSelectorList([self.__class__(node=node, parent=self, \
expr=xpath) for node in xpath_result])
else:
return XPathSelectorList([self.__class__(node=xpath_result, \
parent=self, expr=xpath)])
else:
return XPathSelectorList([])
def re(self, regex):
"""Return a list of unicode strings by applying the regex over all
current XPath selections, and flattening the results"""
return extract_regex(regex, self.extract(), 'utf-8')
def extract(self):
"""Return a unicode string of the content referenced by the XPathSelector"""
if isinstance(self.xmlNode, basestring):
text = unicode(self.xmlNode, 'utf-8', errors='ignore')
elif hasattr(self.xmlNode, 'serialize'):
if isinstance(self.xmlNode, libxml2.xmlDoc):
data = self.xmlNode.getRootElement().serialize('utf-8')
text = unicode(data, 'utf-8', errors='ignore') if data else u''
elif isinstance(self.xmlNode, libxml2.xmlAttr):
# serialization doesn't work sometimes for xmlAttr types
text = unicode(self.xmlNode.content, 'utf-8', errors='ignore')
else:
data = self.xmlNode.serialize('utf-8')
text = unicode(data, 'utf-8', errors='ignore') if data else u''
else:
try:
text = unicode(self.xmlNode, 'utf-8', errors='ignore')
except TypeError: # catched when self.xmlNode is a float - see tests
text = unicode(self.xmlNode)
return text
def extract_unquoted(self):
"""Get unescaped contents from the text node (no entities, no CDATA)"""
if self.select('self::text()'):
return unicode(self.xmlNode.getContent(), 'utf-8', errors='ignore')
else:
return u''
def register_namespace(self, prefix, uri):
"""Register namespace so that it can be used in XPath queries"""
self.doc.xpathContext.xpathRegisterNs(prefix, uri)
def _get_libxml2_doc(self, response):
"""Return libxml2 document (xmlDoc) from response"""
return xmlDoc_from_html(response)
def __nonzero__(self):
return bool(self.extract())
def __str__(self):
return "<%s (%s) xpath=%s>" % (type(self).__name__, getattr(self.xmlNode, \
'name', type(self.xmlNode).__name__), self.expr)
__repr__ = __str__
@deprecated(use_instead='XPathSelector.select')
def __call__(self, xpath):
return self.select(xpath)
@deprecated(use_instead='XPathSelector.select')
def x(self, xpath):
return self.select(xpath)
class XPathSelectorList(list):
"""List of XPathSelector objects"""
def __getslice__(self, i, j):
return XPathSelectorList(list.__getslice__(self, i, j))
def select(self, xpath):
"""Perform the given XPath query on each XPathSelector of the list and
return a new (flattened) XPathSelectorList of the results"""
return XPathSelectorList(flatten([x.select(xpath) for x in self]))
def re(self, regex):
"""Perform the re() method on each XPathSelector of the list, and
return the result as a flattened list of unicode strings"""
return flatten([x.re(regex) for x in self])
def extract(self):
"""Return a list of unicode strings with the content referenced by each
XPathSelector of the list"""
return [x.extract() if isinstance(x, XPathSelector) else x for x in self]
def extract_unquoted(self):
return [x.extract_unquoted() if isinstance(x, XPathSelector) else x for x in self]
@deprecated(use_instead='XPathSelectorList.select')
def x(self, xpath):
return self.select(xpath)
class XmlXPathSelector(XPathSelector):
"""XPathSelector for XML content"""
__slots__ = ()
_get_libxml2_doc = staticmethod(xmlDoc_from_xml)
class HtmlXPathSelector(XPathSelector):
"""XPathSelector for HTML content"""
__slots__ = ()
_get_libxml2_doc = staticmethod(xmlDoc_from_html)
|