File: httpurl.py

package info (click to toggle)
linkchecker 10.6.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,132 kB
  • sloc: python: 13,154; makefile: 134; sh: 71; xml: 36; sql: 20; javascript: 19; php: 2
file content (397 lines) | stat: -rw-r--r-- 14,935 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# Copyright (C) 2000-2014 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Handle http links.
"""

import requests

# The validity of SSL certs is ignored to be able
# the check the URL and recurse into it.
# The warning about invalid SSL certs is given to the
# user instead.
import warnings

warnings.simplefilter(
    # pylint: disable=no-member
    'ignore', requests.packages.urllib3.exceptions.InsecureRequestWarning
)

from io import BytesIO
import re

from .. import (
    log,
    LOG_CHECK,
    mimeutil,
    url as urlutil,
    LinkCheckerError,
    httputil,
)
from . import internpaturl

# import warnings
from .const import WARN_HTTP_EMPTY_CONTENT, WARN_HTTP_RATE_LIMITED, WARN_HTTP_REDIRECTED
from requests.sessions import REDIRECT_STATI

HTTP_SCHEMAS = ('http://', 'https://')

# match for robots meta element content attribute
nofollow_re = re.compile(r"\bnofollow\b", re.IGNORECASE)


class HttpUrl(internpaturl.InternPatternUrl):
    """
    Url link with http scheme.
    """

    def reset(self):
        """
        Initialize HTTP specific variables.
        """
        super().reset()
        # initialize check data
        # server headers
        self.headers = {}
        self.auth = None
        self.ssl_cipher = None
        self.ssl_cert = None

    def allows_robots(self, url):
        """
        Fetch and parse the robots.txt of given url. Checks if LinkChecker
        can get the requested resource content.

        @param url: the url to be requested
        @type url: string
        @return: True if access is granted, otherwise False
        @rtype: bool
        """
        return not self.aggregate.config[
            'robotstxt'
        ] or self.aggregate.robots_txt.allows_url(
            self, timeout=self.aggregate.config["timeout"]
        )

    def content_allows_robots(self):
        """
        Return False if the content of this URL forbids robots to
        search for recursive links.
        """
        if not self.is_html():
            return True

        soup = self.get_soup()
        return not soup.find("meta", attrs={"name": "robots", "content": nofollow_re})

    def add_size_info(self):
        """Get size of URL content from HTTP header."""
        if (
            self.headers
            and "Content-Length" in self.headers
            and "Transfer-Encoding" not in self.headers
        ):
            # Note that content-encoding causes size differences since
            # the content data is always decoded.
            try:
                self.size = int(self.headers["Content-Length"])
            except (ValueError, OverflowError):
                pass
        else:
            self.size = -1

    def check_connection(self):
        """
        Check a URL with HTTP protocol.
        Here is an excerpt from RFC 1945 with common response codes:
        The first digit of the Status-Code defines the class of response. The
        last two digits do not have any categorization role. There are 5
        values for the first digit:

          - 1xx: Informational - Not used, but reserved for future use
          - 2xx: Success - The action was successfully received,
            understood, and accepted
          - 3xx: Redirection - Further action must be taken in order to
            complete the request
          - 4xx: Client Error - The request contains bad syntax or cannot
            be fulfilled
          - 5xx: Server Error - The server failed to fulfill an apparently
            valid request
        """
        self.session = self.aggregate.get_request_session()
        self.construct_auth()
        # check robots.txt
        if not self.allows_robots(self.url):
            self.add_info(_("Access denied by robots.txt, checked only syntax."))
            self.set_result(_("syntax OK"))
            self.do_check_content = False
            return
        # check the http connection
        request = self.build_request()
        self.send_request(request)
        self._add_response_info()
        self.follow_redirections(request)
        self.check_response()
        if self.allows_simple_recursion():
            self.parse_header_links()

    def build_request(self):
        """Build a prepared request object."""
        clientheaders = {}
        if self.parent_url and self.parent_url.lower().startswith(HTTP_SCHEMAS):
            clientheaders["Referer"] = self.parent_url
        kwargs = dict(method='GET', url=self.url, headers=clientheaders)
        if self.auth:
            kwargs['auth'] = self.auth
        log.debug(LOG_CHECK, "Prepare request with %s", kwargs)
        request = requests.Request(**kwargs)
        return self.session.prepare_request(request)

    def send_request(self, request):
        """Send request and store response in self.url_connection."""
        # throttle the number of requests to each host
        self.aggregate.wait_for_host(self.urlparts[1])
        kwargs = self.get_request_kwargs()
        kwargs["allow_redirects"] = False
        self._send_request(request, **kwargs)

    def _send_request(self, request, **kwargs):
        """Send GET request."""
        log.debug(LOG_CHECK, "Send request %s with %s", request, kwargs)
        log.debug(LOG_CHECK, "Request headers %s", request.headers)
        self.url_connection = self.session.send(request, **kwargs)
        self.headers = self.url_connection.headers
        log.debug(LOG_CHECK, "Response headers %s", self.headers)
        self.set_encoding(self.url_connection.encoding)
        log.debug(LOG_CHECK, "Response encoding %s", self.content_encoding)
        if "LinkChecker" in self.headers:
            self.aggregate.set_maxrated_for_host(self.urlparts[1])
        self._add_ssl_info()

    def _add_response_info(self):
        """Set info from established HTTP(S) connection."""
        self.set_content_type()
        self.add_size_info()

    def _get_ssl_sock(self):
        """Get raw SSL socket."""
        assert self.scheme == "https", self
        raw_connection = self.url_connection.raw._connection
        if not raw_connection:
            # this happens with newer requests versions:
            # https://github.com/linkchecker/linkchecker/issues/76
            return None
        if raw_connection.sock is None:
            # sometimes the socket is not yet connected
            # see https://github.com/psf/requests/issues/1966
            raw_connection.connect()
        return raw_connection.sock

    def _add_ssl_info(self):
        """Add SSL cipher info."""
        if self.scheme == 'https':
            sock = self._get_ssl_sock()
            if not sock:
                log.debug(LOG_CHECK, "cannot extract SSL certificate from connection")
                self.ssl_cert = None
            elif hasattr(sock, 'cipher'):
                self.ssl_cert = sock.getpeercert()
            else:
                # using pyopenssl
                cert = sock.connection.get_peer_certificate()
                self.ssl_cert = httputil.x509_to_dict(cert)
            log.debug(LOG_CHECK, "Got SSL certificate %s", self.ssl_cert)
        else:
            self.ssl_cert = None

    def construct_auth(self):
        """Construct HTTP Basic authentication credentials if there
        is user/password information available. Does not overwrite if
        credentials have already been constructed."""
        if self.auth:
            return
        _user, _password = self.get_user_password()
        if _user is not None and _password is not None:
            self.auth = (_user, _password)

    def set_content_type(self):
        """Set MIME type from HTTP response headers."""
        self.content_type = httputil.get_content_type(self.headers)
        log.debug(LOG_CHECK, "MIME type: %s", self.content_type)

    def set_encoding(self, encoding):
        """Set content encoding"""
        if encoding == "ISO-8859-1":
            # Although RFC 2616 (HTTP/1.1) says that text data in a non-ISO-8859-1
            # (or subset) character set must be labelled with a charset,
            # that is not always the case and then the default ISO-8859-1 is
            # set by Requests.
            # We fall back to it in UrlBase.get_content() if Beautiful Soup
            # doesn't return an encoding.
            self.content_encoding = None
        else:
            self.content_encoding = encoding

    def is_redirect(self):
        """Check if current response is a redirect."""
        return (
            'location' in self.headers
            and self.url_connection.status_code in REDIRECT_STATI
        )

    def get_request_kwargs(self):
        """Construct keyword parameters for Session.send() and
        Session.resolve_redirects()."""
        kwargs = dict(
            stream=True,
            timeout=self.aggregate.config["timeout"],
            verify=self.aggregate.config["sslverify"],
        )
        return kwargs

    def get_redirects(self, request):
        """Return iterator of redirects for given request."""
        kwargs = self.get_request_kwargs()
        return self.session.resolve_redirects(self.url_connection, request, **kwargs)

    def follow_redirections(self, request):
        """Follow all redirections of http response."""
        log.debug(LOG_CHECK, "follow all redirections")
        if self.is_redirect():
            # run connection plugins for old connection
            self.aggregate.plugin_manager.run_connection_plugins(self)
        response = None
        for response in self.get_redirects(request):
            newurl = response.url
            log.debug(LOG_CHECK, "Redirected to %r", newurl)
            self.aliases.append(newurl)
            # XXX on redirect errors this is not printed
            self.add_warning(
                _("Redirected to `%(url)s' status: %(code)d %(reason)s.")
                % {'url': newurl, 'code': self.url_connection.status_code,
                   'reason': self.url_connection.reason},
                tag=WARN_HTTP_REDIRECTED)
            # Reset extern and recalculate
            self.extern = None
            self.set_extern(newurl)
            self.urlparts = self.build_url_parts(newurl)
            self.url_connection = response
            self.headers = response.headers
            self.url = urlutil.urlunsplit(self.urlparts)
            self.scheme = self.urlparts[0].lower()
            self._add_ssl_info()
            self._add_response_info()
            if self.is_redirect():
                # run connection plugins for old connection
                self.aggregate.plugin_manager.run_connection_plugins(self)
        if response:
            log.debug(LOG_CHECK, "Redirected response headers %s", response.headers)
            self.set_encoding(response.encoding)
            log.debug(
                LOG_CHECK, "Redirected response encoding %s", self.content_encoding)

    def check_response(self):
        """Check final result and log it."""
        if (
            self.url_connection.status_code >= 400
            and self.url_connection.status_code != 429
        ):
            self.set_result(
                "%d %s" % (self.url_connection.status_code, self.url_connection.reason),
                valid=False,
            )
        else:
            if self.url_connection.status_code == 204:
                # no content
                self.add_warning(
                    self.url_connection.reason, tag=WARN_HTTP_EMPTY_CONTENT
                )

            if self.url_connection.status_code == 429:
                self.add_warning(
                    "Rate limited (Retry-After: %s)"
                    % self.headers.get("Retry-After"),
                    tag=WARN_HTTP_RATE_LIMITED,
                )

            if self.url_connection.status_code >= 200:
                self.set_result(
                    "%r %s"
                    % (self.url_connection.status_code, self.url_connection.reason)
                )
            else:
                self.set_result(_("OK"))

    def get_content(self):
        return super().get_content(self.content_encoding)

    def read_content(self):
        """Return data and data size for this URL.
        Can be overridden in subclasses."""
        maxbytes = self.aggregate.config["maxfilesizedownload"]
        buf = BytesIO()
        for data in self.url_connection.iter_content(chunk_size=self.ReadChunkBytes):
            if buf.tell() + len(data) > maxbytes:
                raise LinkCheckerError(_("File size too large"))
            buf.write(data)
        return buf.getvalue()

    def parse_header_links(self):
        """Parse URLs in HTTP headers Link:."""
        for linktype, linkinfo in self.url_connection.links.items():
            url = linkinfo["url"]
            name = f"Link: header {linktype}"
            self.add_url(url, name=name)
        if 'Refresh' in self.headers:
            from ..htmlutil.linkparse import refresh_re

            value = self.headers['Refresh'].strip()
            mo = refresh_re.match(value)
            if mo:
                url = mo.group("url")
                name = "Refresh: header"
                self.add_url(url, name=name)
        if 'Content-Location' in self.headers:
            url = self.headers['Content-Location'].strip()
            name = "Content-Location: header"
            self.add_url(url, name=name)

    def is_parseable(self):
        """
        Check if content is parseable for recursion.

        @return: True if content is parseable
        @rtype: bool
        """
        if not self.valid:
            return False
        # some content types must be validated with the page content
        if self.content_type in ("application/xml", "text/xml"):
            rtype = mimeutil.guess_mimetype_read(self.get_content)
            if rtype is not None:
                # XXX side effect
                self.content_type = rtype
                log.debug(LOG_CHECK, "Read MIME type: %s", self.content_type)
        return self.is_content_type_parseable()

    def get_robots_txt_url(self):
        """
        Get the according robots.txt URL for this URL.

        @return: robots.txt URL
        @rtype: string
        """
        return "%s://%s/robots.txt" % tuple(self.urlparts[0:2])