1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
"""
SGMLParser-based Link extractors
"""
from six.moves.urllib.parse import urljoin
import warnings
from sgmllib import SGMLParser
from w3lib.url import safe_url_string
from scrapy.selector import Selector
from scrapy.link import Link
from scrapy.linkextractors import FilteringLinkExtractor
from scrapy.utils.misc import arg_to_iter
from scrapy.utils.python import unique as unique_list, str_to_unicode
from scrapy.utils.response import get_base_url
from scrapy.exceptions import ScrapyDeprecationWarning
class BaseSgmlLinkExtractor(SGMLParser):
def __init__(self, tag="a", attr="href", unique=False, process_value=None):
warnings.warn(
"BaseSgmlLinkExtractor is deprecated and will be removed in future releases. "
"Please use scrapy.linkextractors.LinkExtractor",
ScrapyDeprecationWarning, stacklevel=2,
)
SGMLParser.__init__(self)
self.scan_tag = tag if callable(tag) else lambda t: t == tag
self.scan_attr = attr if callable(attr) else lambda a: a == attr
self.process_value = (lambda v: v) if process_value is None else process_value
self.current_link = None
self.unique = unique
def _extract_links(self, response_text, response_url, response_encoding, base_url=None):
""" Do the real extraction work """
self.reset()
self.feed(response_text)
self.close()
ret = []
if base_url is None:
base_url = urljoin(response_url, self.base_url) if self.base_url else response_url
for link in self.links:
if isinstance(link.url, unicode):
link.url = link.url.encode(response_encoding)
link.url = urljoin(base_url, link.url)
link.url = safe_url_string(link.url, response_encoding)
link.text = str_to_unicode(link.text, response_encoding, errors='replace').strip()
ret.append(link)
return ret
def _process_links(self, links):
""" Normalize and filter extracted links
The subclass should override it if necessary
"""
links = unique_list(links, key=lambda link: link.url) if self.unique else links
return links
def extract_links(self, response):
# wrapper needed to allow to work directly with text
links = self._extract_links(response.body, response.url, response.encoding)
links = self._process_links(links)
return links
def reset(self):
SGMLParser.reset(self)
self.links = []
self.base_url = None
self.current_link = None
def unknown_starttag(self, tag, attrs):
if tag == 'base':
self.base_url = dict(attrs).get('href')
if self.scan_tag(tag):
for attr, value in attrs:
if self.scan_attr(attr):
url = self.process_value(value)
if url is not None:
link = Link(url=url, nofollow=True if dict(attrs).get('rel') == 'nofollow' else False)
self.links.append(link)
self.current_link = link
def unknown_endtag(self, tag):
if self.scan_tag(tag):
self.current_link = None
def handle_data(self, data):
if self.current_link:
self.current_link.text = self.current_link.text + data
def matches(self, url):
"""This extractor matches with any url, since
it doesn't contain any patterns"""
return True
class SgmlLinkExtractor(FilteringLinkExtractor):
def __init__(self, allow=(), deny=(), allow_domains=(), deny_domains=(), restrict_xpaths=(),
tags=('a', 'area'), attrs=('href',), canonicalize=True, unique=True,
process_value=None, deny_extensions=None, restrict_css=()):
warnings.warn(
"SgmlLinkExtractor is deprecated and will be removed in future releases. "
"Please use scrapy.linkextractors.LinkExtractor",
ScrapyDeprecationWarning, stacklevel=2,
)
tags, attrs = set(arg_to_iter(tags)), set(arg_to_iter(attrs))
tag_func = lambda x: x in tags
attr_func = lambda x: x in attrs
with warnings.catch_warnings(record=True):
lx = BaseSgmlLinkExtractor(tag=tag_func, attr=attr_func,
unique=unique, process_value=process_value)
super(SgmlLinkExtractor, self).__init__(lx, allow=allow, deny=deny,
allow_domains=allow_domains, deny_domains=deny_domains,
restrict_xpaths=restrict_xpaths, restrict_css=restrict_css,
canonicalize=canonicalize, deny_extensions=deny_extensions)
# FIXME: was added to fix a RegexLinkExtractor testcase
self.base_url = None
def extract_links(self, response):
base_url = None
if self.restrict_xpaths:
sel = Selector(response)
base_url = get_base_url(response)
body = u''.join(f
for x in self.restrict_xpaths
for f in sel.xpath(x).extract()
).encode(response.encoding, errors='xmlcharrefreplace')
else:
body = response.body
links = self._extract_links(body, response.url, response.encoding, base_url)
links = self._process_links(links)
return links
|