File: __init__.py

package info (click to toggle)
python-scrapy 0.8-3
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 2,904 kB
  • ctags: 2,981
  • sloc: python: 15,349; xml: 199; makefile: 68; sql: 64; sh: 34
file content (153 lines) | stat: -rw-r--r-- 5,871 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
XPath selectors 

See documentation in docs/topics/selectors.rst
"""

import libxml2

from scrapy.http import TextResponse
from scrapy.utils.python import flatten, unicode_to_str
from scrapy.utils.misc import extract_regex
from scrapy.utils.trackref import object_ref
from scrapy.utils.decorator import deprecated
from .factories import xmlDoc_from_html, xmlDoc_from_xml
from .document import Libxml2Document

__all__ = ['HtmlXPathSelector', 'XmlXPathSelector', 'XPathSelector', \
    'XPathSelectorList']

class XPathSelector(object_ref):

    __slots__ = ['doc', 'xmlNode', 'expr', '__weakref__']

    def __init__(self, response=None, text=None, node=None, parent=None, expr=None):
        if parent:
            self.doc = parent.doc
            self.xmlNode = node
        elif response:
            self.doc = Libxml2Document(response, factory=self._get_libxml2_doc)
            self.xmlNode = self.doc.xmlDoc
        elif text:
            response = TextResponse(url='about:blank', body=unicode_to_str(text), \
                encoding='utf-8')
            self.doc = Libxml2Document(response, factory=self._get_libxml2_doc)
            self.xmlNode = self.doc.xmlDoc
        self.expr = expr

    def select(self, xpath):
        """Perform the given XPath query on the current XPathSelector and
        return a XPathSelectorList of the result"""
        if hasattr(self.xmlNode, 'xpathEval'):
            self.doc.xpathContext.setContextNode(self.xmlNode)
            try:
                xpath_result = self.doc.xpathContext.xpathEval(xpath)
            except libxml2.xpathError:
                raise ValueError("Invalid XPath: %s" % xpath)
            if hasattr(xpath_result, '__iter__'):
                return XPathSelectorList([self.__class__(node=node, parent=self, \
                    expr=xpath) for node in xpath_result])
            else:
                return XPathSelectorList([self.__class__(node=xpath_result, \
                    parent=self, expr=xpath)])
        else:
            return XPathSelectorList([])

    def re(self, regex):
        """Return a list of unicode strings by applying the regex over all
        current XPath selections, and flattening the results"""
        return extract_regex(regex, self.extract(), 'utf-8')

    def extract(self):
        """Return a unicode string of the content referenced by the XPathSelector"""
        if isinstance(self.xmlNode, basestring):
            text = unicode(self.xmlNode, 'utf-8', errors='ignore')
        elif hasattr(self.xmlNode, 'serialize'):
            if isinstance(self.xmlNode, libxml2.xmlDoc):
                data = self.xmlNode.getRootElement().serialize('utf-8')
                text = unicode(data, 'utf-8', errors='ignore') if data else u''
            elif isinstance(self.xmlNode, libxml2.xmlAttr): 
                # serialization doesn't work sometimes for xmlAttr types
                text = unicode(self.xmlNode.content, 'utf-8', errors='ignore')
            else:
                data = self.xmlNode.serialize('utf-8')
                text = unicode(data, 'utf-8', errors='ignore') if data else u''
        else:
            try:
                text = unicode(self.xmlNode, 'utf-8', errors='ignore')
            except TypeError:  # catched when self.xmlNode is a float - see tests
                text = unicode(self.xmlNode)
        return text

    def extract_unquoted(self):
        """Get unescaped contents from the text node (no entities, no CDATA)"""
        if self.select('self::text()'):
            return unicode(self.xmlNode.getContent(), 'utf-8', errors='ignore')
        else:
            return u''

    def register_namespace(self, prefix, uri):
        """Register namespace so that it can be used in XPath queries"""
        self.doc.xpathContext.xpathRegisterNs(prefix, uri)

    def _get_libxml2_doc(self, response):
        """Return libxml2 document (xmlDoc) from response"""
        return xmlDoc_from_html(response)

    def __nonzero__(self):
        return bool(self.extract())

    def __str__(self):
        return "<%s (%s) xpath=%s>" % (type(self).__name__, getattr(self.xmlNode, \
            'name', type(self.xmlNode).__name__), self.expr)

    __repr__ = __str__

    @deprecated(use_instead='XPathSelector.select')
    def __call__(self, xpath):
        return self.select(xpath)

    @deprecated(use_instead='XPathSelector.select')
    def x(self, xpath):
        return self.select(xpath)


class XPathSelectorList(list):
    """List of XPathSelector objects"""

    def __getslice__(self, i, j):
        return XPathSelectorList(list.__getslice__(self, i, j))

    def select(self, xpath):
        """Perform the given XPath query on each XPathSelector of the list and
        return a new (flattened) XPathSelectorList of the results"""
        return XPathSelectorList(flatten([x.select(xpath) for x in self]))

    def re(self, regex):
        """Perform the re() method on each XPathSelector of the list, and
        return the result as a flattened list of unicode strings"""
        return flatten([x.re(regex) for x in self])

    def extract(self):
        """Return a list of unicode strings with the content referenced by each
        XPathSelector of the list"""
        return [x.extract() if isinstance(x, XPathSelector) else x for x in self]

    def extract_unquoted(self):
        return [x.extract_unquoted() if isinstance(x, XPathSelector) else x for x in self]

    @deprecated(use_instead='XPathSelectorList.select')
    def x(self, xpath):
        return self.select(xpath)


class XmlXPathSelector(XPathSelector):
    """XPathSelector for XML content"""
    __slots__ = ()
    _get_libxml2_doc = staticmethod(xmlDoc_from_xml)


class HtmlXPathSelector(XPathSelector):
    """XPathSelector for HTML content"""
    __slots__ = ()
    _get_libxml2_doc = staticmethod(xmlDoc_from_html)