File: lxmlhtml.py

package info (click to toggle)
python-scrapy 0.24.2-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 3,240 kB
  • ctags: 4,259
  • sloc: python: 21,170; xml: 199; makefile: 67; sh: 44
file content (110 lines) | stat: -rw-r--r-- 4,151 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
Link extractor based on lxml.html
"""

import re
from urlparse import urlparse, urljoin

import lxml.etree as etree

from scrapy.selector import Selector
from scrapy.link import Link
from scrapy.utils.misc import arg_to_iter
from scrapy.utils.python import unique as unique_list, str_to_unicode
from scrapy.linkextractor import FilteringLinkExtractor
from scrapy.utils.response import get_base_url


# from lxml/src/lxml/html/__init__.py
XHTML_NAMESPACE = "http://www.w3.org/1999/xhtml"

_collect_string_content = etree.XPath("string()")

def _nons(tag):
    if isinstance(tag, basestring):
        if tag[0] == '{' and tag[1:len(XHTML_NAMESPACE)+1] == XHTML_NAMESPACE:
            return tag.split('}')[-1]
    return tag


class LxmlParserLinkExtractor(object):
    def __init__(self, tag="a", attr="href", process=None, unique=False):
        self.scan_tag = tag if callable(tag) else lambda t: t == tag
        self.scan_attr = attr if callable(attr) else lambda a: a == attr
        self.process_attr = process if callable(process) else lambda v: v
        self.unique = unique

    def _iter_links(self, document):
        for el in document.iter(etree.Element):
            tag = _nons(el.tag)
            if not self.scan_tag(el.tag):
                continue
            attribs = el.attrib
            for attrib in attribs:
                yield (el, attrib, attribs[attrib])

    def _extract_links(self, selector, response_url, response_encoding, base_url):
        links = []
        # hacky way to get the underlying lxml parsed document
        for el, attr, attr_val in self._iter_links(selector._root):
            if self.scan_tag(el.tag) and self.scan_attr(attr):
                # pseudo _root.make_links_absolute(base_url)
                attr_val = urljoin(base_url, attr_val)
                url = self.process_attr(attr_val)
                if url is None:
                    continue
                if isinstance(url, unicode):
                    url = url.encode(response_encoding)
                # to fix relative links after process_value
                url = urljoin(response_url, url)
                link = Link(url, _collect_string_content(el) or u'',
                    nofollow=True if el.get('rel') == 'nofollow' else False)
                links.append(link)

        return unique_list(links, key=lambda link: link.url) \
                if self.unique else links

    def extract_links(self, response):
        html = Selector(response)
        base_url = get_base_url(response)
        return self._extract_links(html, response.url, response.encoding, base_url)

    def _process_links(self, links):
        """ Normalize and filter extracted links

        The subclass should override it if neccessary
        """
        links = unique_list(links, key=lambda link: link.url) if self.unique else links
        return links


class LxmlLinkExtractor(FilteringLinkExtractor):

    def __init__(self, allow=(), deny=(), allow_domains=(), deny_domains=(), restrict_xpaths=(),
                 tags=('a', 'area'), attrs=('href',), canonicalize=True, unique=True, process_value=None,
                 deny_extensions=None):
        tags, attrs = set(arg_to_iter(tags)), set(arg_to_iter(attrs))
        tag_func = lambda x: x in tags
        attr_func = lambda x: x in attrs
        lx = LxmlParserLinkExtractor(tag=tag_func, attr=attr_func,
            unique=unique, process=process_value)

        super(LxmlLinkExtractor, self).__init__(lx, allow, deny,
            allow_domains, deny_domains, restrict_xpaths, canonicalize,
            deny_extensions)

    def extract_links(self, response):
        html = Selector(response)
        base_url = get_base_url(response)
        if self.restrict_xpaths:
            docs = [subdoc
                    for x in self.restrict_xpaths
                    for subdoc in html.xpath(x)]
        else:
            docs = [html]
        all_links = []
        for doc in docs:
            links = self._extract_links(doc, response.url, response.encoding, base_url)
            all_links.extend(self._process_links(links))
        return unique_list(all_links)