File: boto3_stubs.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (44 lines) | stat: -rw-r--r-- 1,806 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
"""Stubs for boto3"""
import six

try:
    # boto using awsrequest
    from botocore.awsrequest import AWSHTTPConnection as HTTPConnection
    from botocore.awsrequest import AWSHTTPSConnection as VerifiedHTTPSConnection

except ImportError:  # pragma: nocover
    # boto using vendored requests
    # urllib3 defines its own HTTPConnection classes, which boto3 goes ahead and assumes
    # you're using.  It includes some polyfills for newer features missing in older pythons.
    try:
        from urllib3.connectionpool import HTTPConnection, VerifiedHTTPSConnection
    except ImportError:  # pragma: nocover
        from requests.packages.urllib3.connectionpool import HTTPConnection, VerifiedHTTPSConnection

from ..stubs import VCRHTTPConnection, VCRHTTPSConnection


class VCRRequestsHTTPConnection(VCRHTTPConnection, HTTPConnection):
    _baseclass = HTTPConnection


class VCRRequestsHTTPSConnection(VCRHTTPSConnection, VerifiedHTTPSConnection):
    _baseclass = VerifiedHTTPSConnection

    def __init__(self, *args, **kwargs):
        if six.PY3:
            kwargs.pop("strict", None)  # apparently this is gone in py3

        # need to temporarily reset here because the real connection
        # inherits from the thing that we are mocking out.  Take out
        # the reset if you want to see what I mean :)
        from vcr.patch import force_reset

        with force_reset():
            self.real_connection = self._baseclass(*args, **kwargs)
            # Make sure to set those attributes as it seems `AWSHTTPConnection` does not
            # set them, making the connection to fail !
            self.real_connection.assert_hostname = kwargs.get("assert_hostname", False)
            self.real_connection.cert_reqs = kwargs.get("cert_reqs", "CERT_NONE")

        self._sock = None