File: builtin.py

package info (click to toggle)
python-cheroot 6.5.4%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 504 kB
  • sloc: python: 4,648; makefile: 14; sh: 2
file content (214 lines) | stat: -rw-r--r-- 7,324 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
A library for integrating Python's builtin ``ssl`` library with Cheroot.

The ssl module must be importable for SSL functionality.

To use this module, set ``HTTPServer.ssl_adapter`` to an instance of
``BuiltinSSLAdapter``.
"""

from __future__ import absolute_import, division, print_function
__metaclass__ = type

import sys

try:
    import ssl
except ImportError:
    ssl = None

try:
    from _pyio import DEFAULT_BUFFER_SIZE
except ImportError:
    try:
        from io import DEFAULT_BUFFER_SIZE
    except ImportError:
        DEFAULT_BUFFER_SIZE = -1

import six

from . import Adapter
from .. import errors
from .._compat import IS_ABOVE_OPENSSL10
from ..makefile import StreamReader, StreamWriter

if six.PY3:
    generic_socket_error = OSError
else:
    import socket
    generic_socket_error = socket.error
    del socket


IS_BELOW_PY37 = sys.version_info[:2] < (3, 7)


def _assert_ssl_exc_contains(exc, *msgs):
    """Check whether SSL exception contains either of messages provided."""
    if len(msgs) < 1:
        raise TypeError(
            '_assert_ssl_exc_contains() requires '
            'at least one message to be passed.'
        )
    err_msg_lower = str(exc).lower()
    return any(m.lower() in err_msg_lower for m in msgs)


class BuiltinSSLAdapter(Adapter):
    """A wrapper for integrating Python's builtin ssl module with Cheroot."""

    certificate = None
    """The filename of the server SSL certificate."""

    private_key = None
    """The filename of the server's private key file."""

    certificate_chain = None
    """The filename of the certificate chain file."""

    context = None
    """The ssl.SSLContext that will be used to wrap sockets."""

    ciphers = None
    """The ciphers list of SSL."""

    CERT_KEY_TO_ENV = {
        'subject': 'SSL_CLIENT_S_DN',
        'issuer': 'SSL_CLIENT_I_DN'
    }

    CERT_KEY_TO_LDAP_CODE = {
        'countryName': 'C',
        'stateOrProvinceName': 'ST',
        'localityName': 'L',
        'organizationName': 'O',
        'organizationalUnitName': 'OU',
        'commonName': 'CN',
        'emailAddress': 'Email',
    }

    def __init__(
            self, certificate, private_key, certificate_chain=None,
            ciphers=None):
        """Set up context in addition to base class properties if available."""
        if ssl is None:
            raise ImportError('You must install the ssl module to use HTTPS.')

        super(BuiltinSSLAdapter, self).__init__(
            certificate, private_key, certificate_chain, ciphers)

        self.context = ssl.create_default_context(
            purpose=ssl.Purpose.CLIENT_AUTH,
            cafile=certificate_chain
        )
        self.context.load_cert_chain(certificate, private_key)
        if self.ciphers is not None:
            self.context.set_ciphers(ciphers)

    def bind(self, sock):
        """Wrap and return the given socket."""
        return super(BuiltinSSLAdapter, self).bind(sock)

    def wrap(self, sock):
        """Wrap and return the given socket, plus WSGI environ entries."""
        EMPTY_RESULT = None, {}
        try:
            s = self.context.wrap_socket(
                sock, do_handshake_on_connect=True, server_side=True,
            )
        except ssl.SSLError as ex:
            if ex.errno == ssl.SSL_ERROR_EOF:
                # This is almost certainly due to the cherrypy engine
                # 'pinging' the socket to assert it's connectable;
                # the 'ping' isn't SSL.
                return EMPTY_RESULT
            elif ex.errno == ssl.SSL_ERROR_SSL:
                if _assert_ssl_exc_contains(ex, 'http request'):
                    # The client is speaking HTTP to an HTTPS server.
                    raise errors.NoSSLError

                # Check if it's one of the known errors
                # Errors that are caught by PyOpenSSL, but thrown by
                # built-in ssl
                _block_errors = (
                    'unknown protocol', 'unknown ca', 'unknown_ca',
                    'unknown error',
                    'https proxy request', 'inappropriate fallback',
                    'wrong version number',
                    'no shared cipher', 'certificate unknown',
                    'ccs received early',
                    'certificate verify failed',  # client cert w/o trusted CA
                )
                if _assert_ssl_exc_contains(ex, *_block_errors):
                    # Accepted error, let's pass
                    return EMPTY_RESULT
            elif _assert_ssl_exc_contains(ex, 'handshake operation timed out'):
                # This error is thrown by builtin SSL after a timeout
                # when client is speaking HTTP to an HTTPS server.
                # The connection can safely be dropped.
                return EMPTY_RESULT
            raise
        except generic_socket_error as exc:
            """It is unclear why exactly this happens.

            It's reproducible only under Python<=3.6 with openssl>1.0
            and stdlib ``ssl`` wrapper.
            In CherryPy it's triggered by Checker plugin, which connects
            to the app listening to the socket port in TLS mode via plain
            HTTP during startup (from the same process).


            Ref: https://github.com/cherrypy/cherrypy/issues/1618
            """
            is_error0 = exc.args == (0, 'Error')
            ssl_doesnt_handle_error0 = IS_ABOVE_OPENSSL10 and IS_BELOW_PY37

            if is_error0 and ssl_doesnt_handle_error0:
                return EMPTY_RESULT
            raise
        return s, self.get_environ(s)

    # TODO: fill this out more with mod ssl env
    def get_environ(self, sock):
        """Create WSGI environ entries to be merged into each request."""
        cipher = sock.cipher()
        ssl_environ = {
            'wsgi.url_scheme': 'https',
            'HTTPS': 'on',
            'SSL_PROTOCOL': cipher[1],
            'SSL_CIPHER': cipher[0]
            # SSL_VERSION_INTERFACE     string  The mod_ssl program version
            # SSL_VERSION_LIBRARY   string  The OpenSSL program version
        }

        if self.context and self.context.verify_mode != ssl.CERT_NONE:
            client_cert = sock.getpeercert()
            if client_cert:
                for cert_key, env_var in self.CERT_KEY_TO_ENV.items():
                    ssl_environ.update(
                        self.env_dn_dict(env_var, client_cert.get(cert_key)))

        return ssl_environ

    def env_dn_dict(self, env_prefix, cert_value):
        """Return a dict of WSGI environment variables for a client cert DN.

        E.g. SSL_CLIENT_S_DN_CN, SSL_CLIENT_S_DN_C, etc.
        See SSL_CLIENT_S_DN_x509 at
        https://httpd.apache.org/docs/2.4/mod/mod_ssl.html#envvars.
        """
        if not cert_value:
            return {}

        env = {}
        for rdn in cert_value:
            for attr_name, val in rdn:
                attr_code = self.CERT_KEY_TO_LDAP_CODE.get(attr_name)
                if attr_code:
                    env['%s_%s' % (env_prefix, attr_code)] = val
        return env

    def makefile(self, sock, mode='r', bufsize=DEFAULT_BUFFER_SIZE):
        """Return socket file object."""
        cls = StreamReader if 'r' in mode else StreamWriter
        return cls(sock, mode, bufsize)