File: sgml.py

package info (click to toggle)
python-scrapy 1.5.1-1%2Bdeb10u1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 4,404 kB
  • sloc: python: 25,793; xml: 199; makefile: 95; sh: 33
file content (151 lines) | stat: -rw-r--r-- 5,926 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
SGMLParser-based Link extractors
"""
import six
from six.moves.urllib.parse import urljoin
import warnings
from sgmllib import SGMLParser

from w3lib.url import safe_url_string, canonicalize_url
from w3lib.html import strip_html5_whitespace

from scrapy.link import Link
from scrapy.linkextractors import FilteringLinkExtractor
from scrapy.utils.misc import arg_to_iter, rel_has_nofollow
from scrapy.utils.python import unique as unique_list, to_unicode
from scrapy.utils.response import get_base_url
from scrapy.exceptions import ScrapyDeprecationWarning


class BaseSgmlLinkExtractor(SGMLParser):

    def __init__(self, tag="a", attr="href", unique=False, process_value=None,
                 strip=True, canonicalized=False):
        warnings.warn(
            "BaseSgmlLinkExtractor is deprecated and will be removed in future releases. "
            "Please use scrapy.linkextractors.LinkExtractor",
            ScrapyDeprecationWarning, stacklevel=2,
        )
        SGMLParser.__init__(self)
        self.scan_tag = tag if callable(tag) else lambda t: t == tag
        self.scan_attr = attr if callable(attr) else lambda a: a == attr
        self.process_value = (lambda v: v) if process_value is None else process_value
        self.current_link = None
        self.unique = unique
        self.strip = strip
        if canonicalized:
            self.link_key = lambda link: link.url
        else:
            self.link_key = lambda link: canonicalize_url(link.url,
                                                          keep_fragments=True)

    def _extract_links(self, response_text, response_url, response_encoding, base_url=None):
        """ Do the real extraction work """
        self.reset()
        self.feed(response_text)
        self.close()

        ret = []
        if base_url is None:
            base_url = urljoin(response_url, self.base_url) if self.base_url else response_url
        for link in self.links:
            if isinstance(link.url, six.text_type):
                link.url = link.url.encode(response_encoding)
            try:
                link.url = urljoin(base_url, link.url)
            except ValueError:
                continue
            link.url = safe_url_string(link.url, response_encoding)
            link.text = to_unicode(link.text, response_encoding, errors='replace').strip()
            ret.append(link)

        return ret

    def _process_links(self, links):
        """ Normalize and filter extracted links

        The subclass should override it if necessary
        """
        return unique_list(links, key=self.link_key) if self.unique else links

    def extract_links(self, response):
        # wrapper needed to allow to work directly with text
        links = self._extract_links(response.body, response.url, response.encoding)
        links = self._process_links(links)
        return links

    def reset(self):
        SGMLParser.reset(self)
        self.links = []
        self.base_url = None
        self.current_link = None

    def unknown_starttag(self, tag, attrs):
        if tag == 'base':
            self.base_url = dict(attrs).get('href')
        if self.scan_tag(tag):
            for attr, value in attrs:
                if self.scan_attr(attr):
                    if self.strip and value is not None:
                        value = strip_html5_whitespace(value)
                    url = self.process_value(value)
                    if url is not None:
                        link = Link(url=url, nofollow=rel_has_nofollow(dict(attrs).get('rel')))
                        self.links.append(link)
                        self.current_link = link

    def unknown_endtag(self, tag):
        if self.scan_tag(tag):
            self.current_link = None

    def handle_data(self, data):
        if self.current_link:
            self.current_link.text = self.current_link.text + data

    def matches(self, url):
        """This extractor matches with any url, since
        it doesn't contain any patterns"""
        return True


class SgmlLinkExtractor(FilteringLinkExtractor):

    def __init__(self, allow=(), deny=(), allow_domains=(), deny_domains=(), restrict_xpaths=(),
                 tags=('a', 'area'), attrs=('href',), canonicalize=False, unique=True,
                 process_value=None, deny_extensions=None, restrict_css=(),
                 strip=True):
        warnings.warn(
            "SgmlLinkExtractor is deprecated and will be removed in future releases. "
            "Please use scrapy.linkextractors.LinkExtractor",
            ScrapyDeprecationWarning, stacklevel=2,
        )

        tags, attrs = set(arg_to_iter(tags)), set(arg_to_iter(attrs))
        tag_func = lambda x: x in tags
        attr_func = lambda x: x in attrs

        with warnings.catch_warnings():
            warnings.simplefilter('ignore', ScrapyDeprecationWarning)
            lx = BaseSgmlLinkExtractor(tag=tag_func, attr=attr_func,
                unique=unique, process_value=process_value, strip=strip,
                canonicalized=canonicalize)

        super(SgmlLinkExtractor, self).__init__(lx, allow=allow, deny=deny,
            allow_domains=allow_domains, deny_domains=deny_domains,
            restrict_xpaths=restrict_xpaths, restrict_css=restrict_css,
            canonicalize=canonicalize, deny_extensions=deny_extensions)

    def extract_links(self, response):
        base_url = None
        if self.restrict_xpaths:
            base_url = get_base_url(response)
            body = u''.join(f
                            for x in self.restrict_xpaths
                            for f in response.xpath(x).extract()
                            ).encode(response.encoding, errors='xmlcharrefreplace')
        else:
            body = response.body

        links = self._extract_links(body, response.url, response.encoding, base_url)
        links = self._process_links(links)
        return links