File: pyopenssl.py

package info (click to toggle)
python-cheroot 11.1.2%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,236 kB
  • sloc: python: 6,969; makefile: 10
file content (451 lines) | stat: -rw-r--r-- 14,787 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
"""
A library for integrating :doc:`pyOpenSSL <pyopenssl:index>` with Cheroot.

The :py:mod:`OpenSSL <pyopenssl:OpenSSL>` module must be importable
for SSL/TLS/HTTPS functionality.
You can obtain it from `here <https://github.com/pyca/pyopenssl>`_.

To use this module, set :py:attr:`HTTPServer.ssl_adapter
<cheroot.server.HTTPServer.ssl_adapter>` to an instance of
:py:class:`ssl.Adapter <cheroot.ssl.Adapter>`.
There are two ways to use :abbr:`TLS (Transport-Level Security)`:

Method One
----------

 * :py:attr:`ssl_adapter.context
   <cheroot.ssl.pyopenssl.pyOpenSSLAdapter.context>`: an instance of
   :py:class:`SSL.Context <pyopenssl:OpenSSL.SSL.Context>`.

If this is not None, it is assumed to be an :py:class:`SSL.Context
<pyopenssl:OpenSSL.SSL.Context>` instance, and will be passed to
:py:class:`SSL.Connection <pyopenssl:OpenSSL.SSL.Connection>` on bind().
The developer is responsible for forming a valid :py:class:`Context
<pyopenssl:OpenSSL.SSL.Context>` object. This
approach is to be preferred for more flexibility, e.g. if the cert and
key are streams instead of files, or need decryption, or
:py:data:`SSL.SSLv3_METHOD <pyopenssl:OpenSSL.SSL.SSLv3_METHOD>`
is desired instead of the default :py:data:`SSL.SSLv23_METHOD
<pyopenssl:OpenSSL.SSL.SSLv3_METHOD>`, etc. Consult
the :doc:`pyOpenSSL <pyopenssl:api/ssl>` documentation for
complete options.

Method Two (shortcut)
---------------------

 * :py:attr:`ssl_adapter.certificate
   <cheroot.ssl.pyopenssl.pyOpenSSLAdapter.certificate>`: the file name
   of the server's TLS certificate.
 * :py:attr:`ssl_adapter.private_key
   <cheroot.ssl.pyopenssl.pyOpenSSLAdapter.private_key>`: the file name
   of the server's private key file.

Both are :py:data:`None` by default. If :py:attr:`ssl_adapter.context
<cheroot.ssl.pyopenssl.pyOpenSSLAdapter.context>` is :py:data:`None`,
but ``.private_key`` and ``.certificate`` are both given and valid, they
will be read, and the context will be automatically created from them.

.. spelling::

   pyopenssl
"""

import socket
import sys
import threading
import time
from warnings import warn as _warn


try:
    import OpenSSL.version
    from OpenSSL import SSL, crypto

    try:
        ssl_conn_type = SSL.Connection
    except AttributeError:
        ssl_conn_type = SSL.ConnectionType
except ImportError:
    SSL = None

import contextlib

from .. import (
    errors,
    server as cheroot_server,
)
from ..makefile import StreamReader, StreamWriter
from . import Adapter


class SSLFileobjectMixin:
    """Base mixin for a TLS socket stream."""

    ssl_timeout = 3
    ssl_retry = 0.01

    # FIXME:
    def _safe_call(self, is_reader, call, *args, **kwargs):  # noqa: C901
        """Wrap the given call with TLS error-trapping.

        is_reader: if False EOF errors will be raised. If True, EOF errors
        will return "" (to emulate normal sockets).
        """
        start = time.time()
        while True:
            try:
                return call(*args, **kwargs)
            except SSL.WantReadError:
                # Sleep and try again. This is dangerous, because it means
                # the rest of the stack has no way of differentiating
                # between a "new handshake" error and "client dropped".
                # Note this isn't an endless loop: there's a timeout below.
                # Ref: https://stackoverflow.com/a/5133568/595220
                time.sleep(self.ssl_retry)
            except SSL.WantWriteError:
                time.sleep(self.ssl_retry)
            except SSL.SysCallError as e:
                if is_reader and e.args == (-1, 'Unexpected EOF'):
                    return b''

                errnum = e.args[0]
                if is_reader and errnum in errors.socket_errors_to_ignore:
                    return b''
                raise socket.error(errnum)
            except SSL.Error as e:
                if is_reader and e.args == (-1, 'Unexpected EOF'):
                    return b''

                thirdarg = None
                with contextlib.suppress(IndexError):
                    thirdarg = e.args[0][0][2]

                if thirdarg == 'http request':
                    # The client is talking HTTP to an HTTPS server.
                    raise errors.NoSSLError

                raise errors.FatalSSLAlert(*e.args)

            if time.time() - start > self.ssl_timeout:
                raise socket.timeout('timed out')

    def recv(self, size):
        """Receive message of a size from the socket."""
        return self._safe_call(
            True,
            super(SSLFileobjectMixin, self).recv,
            size,
        )

    def readline(self, size=-1):
        """Receive message of a size from the socket.

        Matches the following interface:
        https://docs.python.org/3/library/io.html#io.IOBase.readline
        """
        return self._safe_call(
            True,
            super(SSLFileobjectMixin, self).readline,
            size,
        )

    def sendall(self, *args, **kwargs):
        """Send whole message to the socket."""
        return self._safe_call(
            False,
            super(SSLFileobjectMixin, self).sendall,
            *args,
            **kwargs,
        )

    def send(self, *args, **kwargs):
        """Send some part of message to the socket."""
        return self._safe_call(
            False,
            super(SSLFileobjectMixin, self).send,
            *args,
            **kwargs,
        )


class SSLFileobjectStreamReader(SSLFileobjectMixin, StreamReader):
    """SSL file object attached to a socket object."""


class SSLFileobjectStreamWriter(SSLFileobjectMixin, StreamWriter):
    """SSL file object attached to a socket object."""


class SSLConnectionProxyMeta:
    """Metaclass for generating a bunch of proxy methods."""

    def __new__(mcl, name, bases, nmspc):
        """Attach a list of proxy methods to a new class."""
        proxy_methods = (
            'get_context',
            'pending',
            'send',
            'write',
            'recv',
            'read',
            'renegotiate',
            'bind',
            'listen',
            'connect',
            'accept',
            'setblocking',
            'fileno',
            'close',
            'get_cipher_list',
            'getpeername',
            'getsockname',
            'getsockopt',
            'setsockopt',
            'makefile',
            'get_app_data',
            'set_app_data',
            'state_string',
            'sock_shutdown',
            'get_peer_certificate',
            'want_read',
            'want_write',
            'set_connect_state',
            'set_accept_state',
            'connect_ex',
            'sendall',
            'settimeout',
            'gettimeout',
            'shutdown',
        )
        proxy_methods_no_args = ('shutdown',)

        proxy_props = ('family',)

        def lock_decorator(method):
            """Create a proxy method for a new class."""

            def proxy_wrapper(self, *args):
                self._lock.acquire()
                try:
                    new_args = (
                        args[:] if method not in proxy_methods_no_args else []
                    )
                    return getattr(self._ssl_conn, method)(*new_args)
                finally:
                    self._lock.release()

            return proxy_wrapper

        for m in proxy_methods:
            nmspc[m] = lock_decorator(m)
            nmspc[m].__name__ = m

        def make_property(property_):
            """Create a proxy method for a new class."""

            def proxy_prop_wrapper(self):
                return getattr(self._ssl_conn, property_)

            proxy_prop_wrapper.__name__ = property_
            return property(proxy_prop_wrapper)

        for p in proxy_props:
            nmspc[p] = make_property(p)

        # Doesn't work via super() for some reason.
        # Falling back to type() instead:
        return type(name, bases, nmspc)


class SSLConnection(metaclass=SSLConnectionProxyMeta):
    r"""A thread-safe wrapper for an ``SSL.Connection``.

    :param tuple args: the arguments to create the wrapped \
                        :py:class:`SSL.Connection(*args) \
                        <pyopenssl:OpenSSL.SSL.Connection>`
    """

    def __init__(self, *args):
        """Initialize SSLConnection instance."""
        self._ssl_conn = SSL.Connection(*args)
        self._lock = threading.RLock()


class pyOpenSSLAdapter(Adapter):
    """A wrapper for integrating :doc:`pyOpenSSL <pyopenssl:index>`."""

    certificate = None
    """The file name of the server's TLS certificate."""

    private_key = None
    """The file name of the server's private key file."""

    certificate_chain = None
    """Optional. The file name of CA's intermediate certificate bundle.

    This is needed for cheaper "chained root" TLS certificates,
    and should be left as :py:data:`None` if not required."""

    context = None
    """
    An instance of :py:class:`SSL.Context <pyopenssl:OpenSSL.SSL.Context>`.
    """

    ciphers = None
    """The ciphers list of TLS."""

    private_key_password = None
    """Optional passphrase for password protected private key."""

    def __init__(
        self,
        certificate,
        private_key,
        certificate_chain=None,
        ciphers=None,
        *,
        private_key_password=None,
    ):
        """Initialize OpenSSL Adapter instance."""
        if SSL is None:
            raise ImportError('You must install pyOpenSSL to use HTTPS.')

        super(pyOpenSSLAdapter, self).__init__(
            certificate,
            private_key,
            certificate_chain,
            ciphers,
            private_key_password=private_key_password,
        )

        self._environ = None

    def bind(self, sock):
        """Wrap and return the given socket."""
        if self.context is None:
            self.context = self.get_context()
        conn = SSLConnection(self.context, sock)
        self._environ = self.get_environ()
        return conn

    def wrap(self, sock):
        """Wrap and return the given socket, plus WSGI environ entries."""
        # pyOpenSSL doesn't perform the handshake until the first read/write
        # forcing the handshake to complete tends to result in the connection
        # closing so we can't reliably access protocol/client cert for the env
        return sock, self._environ.copy()

    def _password_callback(
        self,
        password_max_length,
        _verify_twice,
        password,
        /,
    ):
        """Pass a passphrase to password protected private key."""
        b_password = b''  # returning a falsy value communicates an error
        if isinstance(password, str):
            b_password = password.encode('utf-8')
        elif isinstance(password, bytes):
            b_password = password

        password_length = len(b_password)
        if password_length > password_max_length:
            _warn(
                f'User-provided password is {password_length} bytes long and will '
                f'be truncated since it exceeds the maximum of {password_max_length}.',
                UserWarning,
                stacklevel=1,
            )

        return b_password

    def get_context(self):
        """Return an ``SSL.Context`` from self attributes.

        Ref: :py:class:`SSL.Context <pyopenssl:OpenSSL.SSL.Context>`
        """
        # See https://code.activestate.com/recipes/442473/
        c = SSL.Context(SSL.SSLv23_METHOD)
        c.set_passwd_cb(self._password_callback, self.private_key_password)
        c.use_privatekey_file(self.private_key)
        if self.certificate_chain:
            c.load_verify_locations(self.certificate_chain)
        c.use_certificate_file(self.certificate)
        return c

    def get_environ(self):
        """Return WSGI environ entries to be merged into each request."""
        ssl_environ = {
            'wsgi.url_scheme': 'https',
            'HTTPS': 'on',
            'SSL_VERSION_INTERFACE': '%s %s/%s Python/%s'
            % (
                cheroot_server.HTTPServer.version,
                OpenSSL.version.__title__,
                OpenSSL.version.__version__,
                sys.version,
            ),
            'SSL_VERSION_LIBRARY': SSL.SSLeay_version(
                SSL.SSLEAY_VERSION,
            ).decode(),
        }

        if self.certificate:
            # Server certificate attributes
            with open(self.certificate, 'rb') as cert_file:
                cert = crypto.load_certificate(
                    crypto.FILETYPE_PEM,
                    cert_file.read(),
                )

            ssl_environ.update(
                {
                    'SSL_SERVER_M_VERSION': cert.get_version(),
                    'SSL_SERVER_M_SERIAL': cert.get_serial_number(),
                    # 'SSL_SERVER_V_START':
                    #   Validity of server's certificate (start time),
                    # 'SSL_SERVER_V_END':
                    #   Validity of server's certificate (end time),
                },
            )

            for prefix, dn in [
                ('I', cert.get_issuer()),
                ('S', cert.get_subject()),
            ]:
                # X509Name objects don't seem to have a way to get the
                # complete DN string. Use str() and slice it instead,
                # because str(dn) == "<X509Name object '/C=US/ST=...'>"
                dnstr = str(dn)[18:-2]

                wsgikey = 'SSL_SERVER_%s_DN' % prefix
                ssl_environ[wsgikey] = dnstr

                # The DN should be of the form: /k1=v1/k2=v2, but we must allow
                # for any value to contain slashes itself (in a URL).
                while dnstr:
                    pos = dnstr.rfind('=')
                    dnstr, value = dnstr[:pos], dnstr[pos + 1 :]
                    pos = dnstr.rfind('/')
                    dnstr, key = dnstr[:pos], dnstr[pos + 1 :]
                    if key and value:
                        wsgikey = 'SSL_SERVER_%s_DN_%s' % (prefix, key)
                        ssl_environ[wsgikey] = value

        return ssl_environ

    def makefile(self, sock, mode='r', bufsize=-1):
        """Return socket file object."""
        cls = (
            SSLFileobjectStreamReader
            if 'r' in mode
            else SSLFileobjectStreamWriter
        )
        if SSL and isinstance(sock, ssl_conn_type):
            wrapped_socket = cls(sock, mode, bufsize)
            wrapped_socket.ssl_timeout = sock.gettimeout()
            return wrapped_socket
        # This is from past:
        # TODO: figure out what it's meant for
        return cheroot_server.CP_fileobject(sock, mode, bufsize)