File: base.py

package info (click to toggle)
python-pyfunceble 4.2.29.dev-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,108 kB
  • sloc: python: 27,413; sh: 142; makefile: 27
file content (464 lines) | stat: -rw-r--r-- 15,567 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
"""
The tool to check the availability or syntax of domain, IP or URL.

::


    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝

Provides the base of all extra handlers.

Author:
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom

Special thanks:
    https://pyfunceble.github.io/#/special-thanks

Contributors:
    https://pyfunceble.github.io/#/contributors

Project link:
    https://github.com/funilrys/PyFunceble

Project documentation:
    https://docs.pyfunceble.com

Project homepage:
    https://pyfunceble.github.io/

License:
::


    Copyright 2017, 2018, 2019, 2020, 2022, 2023, 2024 Nissar Chababy

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        https://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""

import functools
import socket
from typing import Callable, Dict, List, Optional, Union

import requests

import PyFunceble.factory
from PyFunceble.checker.availability.status import AvailabilityCheckerStatus
from PyFunceble.helpers.regex import RegexHelper
from PyFunceble.query.dns.query_tool import DNSQueryTool


class ExtraRuleHandlerBase:
    """
    Provides the base of all extra rules handler.

    :param status:
        The previously gathered status.
    :type status:
        :class:`~PyFunceble.checker.availability.status.AvailabilityCheckerStatus`
    """

    _status: Optional[AvailabilityCheckerStatus] = None
    req: Optional[requests.Response] = None
    dns_query_tool: Optional[DNSQueryTool] = None
    regex_helper: Optional[RegexHelper] = None

    def __init__(self, status: Optional[AvailabilityCheckerStatus] = None) -> None:
        if status is not None:
            self.status = status

        # Be sure that all settings are loaded proprely!!
        PyFunceble.factory.Requester.guess_all_settings()
        self.dns_query_tool = DNSQueryTool()
        self.regex_helper = RegexHelper()

    def ensure_status_is_given(
        func: Callable[..., "ExtraRuleHandlerBase"]
    ):  # pylint: disable=no-self-argument
        """
        Ensures that the status is given before running the decorated method.

        :raise TypeError:
            If the subject is not a string.
        """

        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
            if not self.status:
                raise TypeError(
                    f"<self.status> should be {AvailabilityCheckerStatus}, "
                    f"{type(self.status)} given."
                )

            return func(self, *args, **kwargs)  # pylint: disable=not-callable

        return wrapper

    def setup_status_before(
        func: Callable[..., "ExtraRuleHandlerBase"]
    ):  # pylint: disable=no-self-argument
        """
        Ensures that the status is given before running the decorated method.

        :raise TypeError:
            If the subject is not a string.
        """

        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
            self.status.status_before_extra_rules = self.status.status
            self.status.status_source_before_extra_rules = self.status.status_source

            return func(self, *args, **kwargs)  # pylint: disable=not-callable

        return wrapper

    def setup_status_after(
        func: Callable[..., "ExtraRuleHandlerBase"]
    ):  # pylint: disable=no-self-argument
        """
        Ensures that the status is given before running the decorated method.

        :raise TypeError:
            If the subject is not a string.
        """

        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
            result = func(self, *args, **kwargs)  # pylint: disable=not-callable

            if self.status.status_after_extra_rules:
                self.status.status = self.status.status_after_extra_rules
                self.status.status_source = self.status.status_source_after_extra_rules

                PyFunceble.facility.Logger.info(
                    "Could define the status of %r from our own set of rules.",
                    self.status.idna_subject,
                )
            else:
                self.status.status_before_extra_rules = None
                self.status.status_source_before_extra_rules = None
                self.status.status_after_extra_rules = None
                self.status.status_source_after_extra_rules = None

            return result

        return wrapper

    @property
    def req_url(self) -> Optional[str]:
        """
        Provides a viable request URL.
        """

        if any(self.status.idna_subject.startswith(x) for x in ("http:", "https:")):
            return self.status.idna_subject
        return f"http://{self.status.idna_subject}:80"

    @property
    def req_url_https(self) -> Optional[str]:
        """
        Provides a viable request URL that default to an HTTPS URL.
        """

        if any(self.status.idna_subject.startswith(x) for x in ("http:", "https:")):
            return self.status.idna_subject
        return f"https://{self.status.idna_subject}:443"

    @property
    def status(self) -> Optional[AvailabilityCheckerStatus]:
        """
        Provides the current state of the :code:`_status` attribute.
        """

        return self._status

    @status.setter
    def status(self, value: AvailabilityCheckerStatus) -> None:
        """
        Sets the status to work with.

        :param value:
            The status to work with.

        :raise TypeError:
            When the given :code:`value` is not a
            :class:`~PyFunceble.checker.availability.status.AvailabilityCheckerStatus`.
        """

        if not isinstance(value, AvailabilityCheckerStatus):
            raise TypeError(
                f"<value> should be {AvailabilityCheckerStatus}, {type(value)} given."
            )

        self._status = value

    def set_status(self, value: AvailabilityCheckerStatus) -> "ExtraRuleHandlerBase":
        """
        Sets the status to work with.

        :param value:
            The status to work with.
        """

        self.status = value

        return self

    def do_request(self, *, allow_redirects: bool = True) -> requests.Response:
        """
        Do a request and store its response into the `req` attribute.

        :param bool allow_redirects:
            Whether we shoold follow the redirection - or not.
        """

        self.req = PyFunceble.factory.Requester.get(
            self.req_url, allow_redirects=allow_redirects
        )

        return self

    def do_on_body_match(
        self,
        url: str,
        matches: List[str],
        *,
        method: Callable[..., "ExtraRuleHandlerBase"],
        match_mode: str = "regex",
        strict: bool = False,
        allow_redirects: bool = False,
    ) -> "ExtraRuleHandlerBase":
        """
        Make a request to the given :code:`url` and run the given :code:`method`,
        if one of the given :code:`matches` matches.

        :param url:
            The URL to query.
        :param matches:
            A list of strings to match.
        :param match_mode:
            A matching mode. Use :code:`regex` for a regex match, and anything
            else for a string match.
        :param strict:
            Whether we should match any (:code:`False`) or all (:code:`True`).
        """

        matcher = any if not strict else all

        def handle_regex_match_mode(_req: requests.Response):
            if matcher(
                self.regex_helper.set_regex(x).match(_req.text, return_match=False)
                for x in matches
            ):
                method()

        def handle_string_match_mode(_req: requests.Response):
            if matcher(x in _req.text for x in matches):
                method()

        try:
            req = PyFunceble.factory.Requester.get(url, allow_redirects=allow_redirects)

            if match_mode == "regex":
                handle_regex_match_mode(req)
            else:
                handle_string_match_mode(req)
        except (
            PyFunceble.factory.Requester.exceptions.RequestException,
            PyFunceble.factory.Requester.exceptions.InvalidURL,
            PyFunceble.factory.Requester.exceptions.Timeout,
            PyFunceble.factory.Requester.exceptions.ConnectionError,
            PyFunceble.factory.Requester.urllib3_exceptions.InvalidHeader,
            socket.timeout,
        ):
            pass

        return self

    def do_on_header_match(
        self,
        url: str,
        matches: Dict[str, List[str]],
        *,
        method: Callable[..., "ExtraRuleHandlerBase"],
        match_mode: str = "regex",
        strict: bool = False,
        allow_redirects: bool = True,
    ) -> "ExtraRuleHandlerBase":
        """
        Make a request to the given :code:`url` and run the given :code:`method`,
        if one of the chosen header matches any of the given matches.

        :param url:
            The URL to query.
        :param matches:
            A dict representing the match.

            .. example::

                {
                    "Location": ["foo", "bar"] // try to match foo or bar
                }
        :param match_mode:
            A matching mode. Use :code:`regex` for a regex match, and anything
            else for a string match.
        :param strict:
            Whether we should match any (:code:`False`) or all (:code:`True`).
        :param allow_redirects:
            Whether we should allow redirect.
        """

        matcher = any if not strict else all

        def handle_regex_match_mode(_req: requests.Response):
            matches2search_result = {}

            for header, loc_matches in matches:
                matches2search_result[header] = False

                if header not in _req.headers:
                    continue

                if matcher(
                    self.regex_helper.set_regex(x).match(
                        _req.headers[header], return_match=False
                    )
                    for x in loc_matches
                ):
                    matches2search_result[header] = True
                    continue

            if matcher(x for x in matches2search_result.values()):
                method()

        def handle_string_match_mode(_req: requests.Response):
            matches2search_result = {}

            for header, loc_matches in matches.items():
                matches2search_result[header] = False

                if header not in _req.headers:
                    continue

                if matcher(x in _req.headers[header] for x in loc_matches):
                    matches2search_result[header] = True
                    continue

            if matcher(x for x in matches2search_result.values()):
                method()

        try:
            req = PyFunceble.factory.Requester.get(url, allow_redirects=allow_redirects)

            if match_mode == "regex":
                handle_regex_match_mode(req)
            else:
                handle_string_match_mode(req)
        except (
            PyFunceble.factory.Requester.exceptions.RequestException,
            PyFunceble.factory.Requester.exceptions.InvalidURL,
            PyFunceble.factory.Requester.exceptions.Timeout,
            PyFunceble.factory.Requester.exceptions.ConnectionError,
            PyFunceble.factory.Requester.urllib3_exceptions.InvalidHeader,
            socket.timeout,
        ):
            pass

        return self

    def do_dns_lookup(self, *, subject: str, query_type: str) -> List[str]:
        """
        Do a DNS lookup and return its response.

        :param subject:
            The subject to query.
        :param query_type:
            The query type.
        """

        return (
            self.dns_query_tool.set_query_record_type(query_type)
            .set_subject(subject)
            .query()
        )

    def start(self) -> "ExtraRuleHandlerBase":
        """
        Starts the gathering process.
        """

        raise NotImplementedError()

    def switch_to_down(self) -> "ExtraRuleHandlerBase":
        """
        Switches the status to inactive.
        """

        self.status.status_after_extra_rules = PyFunceble.storage.STATUS.down
        self.status.status_source_after_extra_rules = "SPECIAL"

        return self

    def switch_to_down_if_status_code(
        self, status_code: Union[int, List[int]]
    ) -> "ExtraRuleHandlerBase":
        """
        Switches the status to inactive if the caught status code matches one
        of the given one.
        """

        if not isinstance(status_code, (list, tuple, set)):
            status_code = [status_code]

        if any(self.status.http_status_code == x for x in status_code):
            self.switch_to_down()

        return self

    def switch_down_if_dns_match(
        self, query_type: str, matches: list
    ) -> "ExtraRuleHandlerBase":
        """
        Switches the status to inactive if the DNS query of the type :code:`query_type`
        matches any of the given :code:`matches`.

        :param query_type:
            A DNS query type.
        :param matches:
            A list of string (not regex) to match.
        """

        for record in (
            self.dns_query_tool.set_query_record_type(query_type)
            .set_subject(self.status.netloc)
            .query()
        ):
            for match in matches:
                if match in record:
                    self.switch_to_down()
                    break

        return self

    def switch_to_up(self) -> "ExtraRuleHandlerBase":
        """
        Switches the status to active.
        """

        self.status.status_after_extra_rules = PyFunceble.storage.STATUS.up
        self.status.status_source_after_extra_rules = "SPECIAL"