File: compat.py

package info (click to toggle)
chargebee-python 1.6.3-4
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 480 kB
  • sloc: python: 980; sh: 5; makefile: 3
file content (129 lines) | stat: -rw-r--r-- 4,025 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129

from chargebee.main import Environment

import re
import sys
import socket

try:
    import simplejson as json
except ImportError:
    import json

py_major_v = sys.version_info[0]
py_minor_v = sys.version_info[1]

if py_major_v < 3:
    from urllib import urlencode
    from urlparse import urlparse
    from urllib2 import urlopen as _urlopen, Request
elif py_major_v >= 3:
    from urllib.parse import urlencode, urlparse
    from urllib.request import urlopen as _urlopen, Request



try:
    SSLError = None
    ssl = None
    
    if Environment.chargebee_domain is None:
        HTTPSConnection = object
    else:
        HTTPConnection = object
            
    if py_major_v < 3:
        from httplib import HTTPConnection, HTTPSConnection, HTTPException
    else:
        from http.client import HTTPConnection, HTTPSConnection, HTTPException

    import ssl
    SSLError = ssl.SSLError

except (ImportError, AttributeError):  # Platform-specific: No SSL
    pass


class VerifiedHTTPSConnection(HTTPSConnection):
    """Based on httplib.HTTPSConnection but wraps socket with SSL certification"""

    def set_cert(self, ca_certs, cert_reqs='CERT_REQUIRED'):
        ssl_req_scheme = {
            'CERT_NONE': ssl.CERT_NONE,
            'CERT_OPTIONAL': ssl.CERT_OPTIONAL,
            'CERT_REQUIRED': ssl.CERT_REQUIRED
        }

        self.cert_reqs = ssl_req_scheme.get(cert_reqs) or ssl.CERT_NONE
        self.ca_certs = ca_certs

    def connect(self):
        # Add certificate verification
        sock = socket.create_connection((self.host, self.port), self.timeout)

        # Wrap socket using verification with the root certs in
        # trusted_root_certs
        self.sock = ssl.wrap_socket(sock, cert_reqs=self.cert_reqs, ca_certs=self.ca_certs)

        if self.ca_certs:
            match_hostname(self.sock.getpeercert(), self.host)


class CertificateError(ValueError):
    pass


def _dnsname_to_pat(dn):
    pats = []
    for frag in dn.split(r'.'):
        if frag == '*':
            # When '*' is a fragment by itself, it matches a non-empty dotless
            # fragment.
            pats.append('[^.]+')
        else:
            # Otherwise, '*' matches any dotless fragment.
            frag = re.escape(frag)
            pats.append(frag.replace(r'\*', '[^.]*'))
    return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)


def match_hostname(cert, hostname):
    """Verify that *cert* (in decoded format as returned by
    SSLSocket.getpeercert()) matches the *hostname*.  RFC 2818 rules
    are mostly followed, but IP addresses are not accepted for *hostname*.

    CertificateError is raised on failure. On success, the function
    returns nothing.
    """

    if not cert:
        raise ValueError("empty or no certificate")
    dnsnames = []
    san = cert.get('subjectAltName', ())
    for key, value in san:
        if key == 'DNS':
            if _dnsname_to_pat(value).match(hostname):
                return
            dnsnames.append(value)
    if not dnsnames:
        # The subject is only checked when there is no dNSName entry
        # in subjectAltName
        for sub in cert.get('subject', ()):
            for key, value in sub:
                # XXX according to RFC 2818, the most specific Common Name
                # must be used.
                if key == 'commonName':
                    if _dnsname_to_pat(value).match(hostname):
                        return
                    dnsnames.append(value)
    if len(dnsnames) > 1:
        raise CertificateError("hostname %r "
                               "doesn't match either of %s"
                               % (hostname, ', '.join(map(repr, dnsnames))))
    elif len(dnsnames) == 1:
        raise CertificateError("hostname %r "
                               "doesn't match %r"
                               % (hostname, dnsnames[0]))
    else:
        raise CertificateError("no appropriate commonName or "
                               "subjectAltName fields were found")