File: sgml.py

package info (click to toggle)
python-scrapy 1.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 4,468 kB
  • ctags: 4,468
  • sloc: python: 22,154; xml: 199; makefile: 76; sh: 3
file content (140 lines) | stat: -rw-r--r-- 5,387 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
SGMLParser-based Link extractors
"""
from six.moves.urllib.parse import urljoin
import warnings
from sgmllib import SGMLParser

from w3lib.url import safe_url_string
from scrapy.selector import Selector
from scrapy.link import Link
from scrapy.linkextractors import FilteringLinkExtractor
from scrapy.utils.misc import arg_to_iter
from scrapy.utils.python import unique as unique_list, str_to_unicode
from scrapy.utils.response import get_base_url
from scrapy.exceptions import ScrapyDeprecationWarning


class BaseSgmlLinkExtractor(SGMLParser):

    def __init__(self, tag="a", attr="href", unique=False, process_value=None):
        warnings.warn(
            "BaseSgmlLinkExtractor is deprecated and will be removed in future releases. "
            "Please use scrapy.linkextractors.LinkExtractor",
            ScrapyDeprecationWarning, stacklevel=2,
        )
        SGMLParser.__init__(self)
        self.scan_tag = tag if callable(tag) else lambda t: t == tag
        self.scan_attr = attr if callable(attr) else lambda a: a == attr
        self.process_value = (lambda v: v) if process_value is None else process_value
        self.current_link = None
        self.unique = unique

    def _extract_links(self, response_text, response_url, response_encoding, base_url=None):
        """ Do the real extraction work """
        self.reset()
        self.feed(response_text)
        self.close()

        ret = []
        if base_url is None:
            base_url = urljoin(response_url, self.base_url) if self.base_url else response_url
        for link in self.links:
            if isinstance(link.url, unicode):
                link.url = link.url.encode(response_encoding)
            link.url = urljoin(base_url, link.url)
            link.url = safe_url_string(link.url, response_encoding)
            link.text = str_to_unicode(link.text, response_encoding, errors='replace').strip()
            ret.append(link)

        return ret

    def _process_links(self, links):
        """ Normalize and filter extracted links

        The subclass should override it if necessary
        """
        links = unique_list(links, key=lambda link: link.url) if self.unique else links
        return links

    def extract_links(self, response):
        # wrapper needed to allow to work directly with text
        links = self._extract_links(response.body, response.url, response.encoding)
        links = self._process_links(links)
        return links

    def reset(self):
        SGMLParser.reset(self)
        self.links = []
        self.base_url = None
        self.current_link = None

    def unknown_starttag(self, tag, attrs):
        if tag == 'base':
            self.base_url = dict(attrs).get('href')
        if self.scan_tag(tag):
            for attr, value in attrs:
                if self.scan_attr(attr):
                    url = self.process_value(value)
                    if url is not None:
                        link = Link(url=url, nofollow=True if dict(attrs).get('rel') == 'nofollow' else False)
                        self.links.append(link)
                        self.current_link = link

    def unknown_endtag(self, tag):
        if self.scan_tag(tag):
            self.current_link = None

    def handle_data(self, data):
        if self.current_link:
            self.current_link.text = self.current_link.text + data

    def matches(self, url):
        """This extractor matches with any url, since
        it doesn't contain any patterns"""
        return True


class SgmlLinkExtractor(FilteringLinkExtractor):

    def __init__(self, allow=(), deny=(), allow_domains=(), deny_domains=(), restrict_xpaths=(),
                 tags=('a', 'area'), attrs=('href',), canonicalize=True, unique=True,
                 process_value=None, deny_extensions=None, restrict_css=()):

        warnings.warn(
            "SgmlLinkExtractor is deprecated and will be removed in future releases. "
            "Please use scrapy.linkextractors.LinkExtractor",
            ScrapyDeprecationWarning, stacklevel=2,
        )

        tags, attrs = set(arg_to_iter(tags)), set(arg_to_iter(attrs))
        tag_func = lambda x: x in tags
        attr_func = lambda x: x in attrs

        with warnings.catch_warnings(record=True):
            lx = BaseSgmlLinkExtractor(tag=tag_func, attr=attr_func,
                unique=unique, process_value=process_value)

        super(SgmlLinkExtractor, self).__init__(lx, allow=allow, deny=deny,
            allow_domains=allow_domains, deny_domains=deny_domains,
            restrict_xpaths=restrict_xpaths, restrict_css=restrict_css,
            canonicalize=canonicalize, deny_extensions=deny_extensions)

        # FIXME: was added to fix a RegexLinkExtractor testcase
        self.base_url = None

    def extract_links(self, response):
        base_url = None
        if self.restrict_xpaths:
            sel = Selector(response)
            base_url = get_base_url(response)
            body = u''.join(f
                            for x in self.restrict_xpaths
                            for f in sel.xpath(x).extract()
                            ).encode(response.encoding, errors='xmlcharrefreplace')
        else:
            body = response.body

        links = self._extract_links(body, response.url, response.encoding, base_url)
        links = self._process_links(links)
        return links