File: error.py

package info (click to toggle)
pypy3 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 111,848 kB
  • sloc: python: 1,291,746; ansic: 74,281; asm: 5,187; cpp: 3,017; sh: 2,533; makefile: 544; xml: 243; lisp: 45; csh: 21; awk: 4
file content (202 lines) | stat: -rw-r--r-- 6,360 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import sys
import os
import traceback
from _pypy_openssl import ffi
from _pypy_openssl import lib

from _cffi_ssl._stdssl.utility import _string_from_asn1, _str_to_ffi_buffer, _str_from_buf
from _cffi_ssl._stdssl.errorcodes import _error_codes, _lib_codes

SSL_ERROR_NONE = 0
SSL_ERROR_SSL = 1
SSL_ERROR_WANT_READ = 2
SSL_ERROR_WANT_WRITE = 3
SSL_ERROR_WANT_X509_LOOKUP = 4
SSL_ERROR_SYSCALL = 5
SSL_ERROR_ZERO_RETURN = 6
SSL_ERROR_WANT_CONNECT = 7
# start of non ssl.h errorcodes
SSL_ERROR_EOF = 8 # special case of SSL_ERROR_SYSCALL
SSL_ERROR_NO_SOCKET = 9 # socket has been GC'd
SSL_ERROR_INVALID_ERROR_CODE = 10

class SSLError(OSError):
    """ An error occurred in the SSL implementation. """
    def __str__(self):
        if self.strerror and isinstance(self.strerror, str):
            return self.strerror
        return str(self.args)

class SSLZeroReturnError(SSLError):
    """ SSL/TLS session closed cleanly. """

class SSLWantReadError(SSLError):
    """ Non-blocking SSL socket needs to read more data
        before the requested operation can be completed.
    """

class SSLWantWriteError(SSLError):
    """Non-blocking SSL socket needs to write more data
       before the requested operation can be completed.
    """

class SSLSyscallError(SSLError):
    """ System error when attempting SSL operation. """

class SSLEOFError(SSLError):
    """ SSL/TLS connection terminated abruptly. """

def ssl_error(errstr, errcode=0):
    if errstr is None:
        errcode = lib.ERR_peek_last_error()
    try:
        return fill_sslerror(SSLError, errcode, errstr, errcode)
    finally:
        lib.ERR_clear_error()

ERR_CODES_TO_NAMES = {}
ERR_NAMES_TO_CODES = {}
LIB_CODES_TO_NAMES = {}

for mnemo, library, reason in _error_codes:
    key = (library, reason)
    assert mnemo is not None and key is not None
    ERR_CODES_TO_NAMES[key] = mnemo
    ERR_NAMES_TO_CODES[mnemo] = key


for mnemo, number in _lib_codes:
    LIB_CODES_TO_NAMES[number] = mnemo


# the PySSL_SetError equivalent
def pyssl_error(obj, ret):
    errcode = lib.ERR_peek_last_error()

    errstr = ""
    errval = 0
    errtype = SSLError
    e = lib.ERR_peek_last_error()

    if obj.ssl != ffi.NULL:
        err = lib.SSL_get_error(obj.ssl, ret)

        if err == SSL_ERROR_ZERO_RETURN:
            errtype = SSLZeroReturnError
            errstr = "TLS/SSL connection has been closed"
            errval = SSL_ERROR_ZERO_RETURN
        elif err == SSL_ERROR_WANT_READ:
            errtype = SSLWantReadError
            errstr = "The operation did not complete (read)"
            errval = SSL_ERROR_WANT_READ
        elif err == SSL_ERROR_WANT_WRITE:
            errtype = SSLWantWriteError
            errstr = "The operation did not complete (write)"
            errval = SSL_ERROR_WANT_WRITE
        elif err == SSL_ERROR_WANT_X509_LOOKUP:
            errstr = "The operation did not complete (X509 lookup)"
            errval = SSL_ERROR_WANT_X509_LOOKUP
        elif err == SSL_ERROR_WANT_CONNECT:
            errstr = "The operation did not complete (connect)"
            errval = SSL_ERROR_WANT_CONNECT
        elif err == SSL_ERROR_SYSCALL:
            if e == 0:
                if ret == 0 or obj.socket is None:
                    errtype = SSLEOFError
                    errstr = "EOF occurred in violation of protocol"
                    errval = SSL_ERROR_EOF
                elif ret == -1 and obj.socket is not None:
                    # the underlying BIO reported an I/0 error
                    lib.ERR_clear_error()
                    # s = obj.get_socket_or_None()
                    # XXX: Windows?
                    errno = ffi.errno
                    return OSError(errno, os.strerror(errno))
                else:
                    errtype = SSLSyscallError
                    errstr = "Some I/O error occurred"
                    errval = SSL_ERROR_SYSCALL
            else:
                errstr = _str_from_buf(lib.ERR_error_string(e, ffi.NULL))
                errval = SSL_ERROR_SYSCALL
        elif err == SSL_ERROR_SSL:
            errval = SSL_ERROR_SSL
            if errcode != 0:
                errstr = _str_from_buf(lib.ERR_error_string(errcode, ffi.NULL))
            else:
                errstr = "A failure in the SSL library occurred"
        else:
            errstr = "Invalid error code"
            errval = SSL_ERROR_INVALID_ERROR_CODE
    return fill_sslerror(errtype, errval, errstr, e)


def fill_sslerror(errtype, ssl_errno, errstr, errcode):
    reason_str = None
    lib_str = None
    if errcode != 0:
        err_lib = lib.ERR_GET_LIB(errcode)
        err_reason = lib.ERR_GET_REASON(errcode)
        reason_str = ERR_CODES_TO_NAMES.get((err_lib, err_reason), None)
        lib_str = LIB_CODES_TO_NAMES.get(err_lib, None)
        if errstr is None:
            errstr = _str_from_buf(lib.ERR_reason_error_string(errcode))
    msg = errstr
    if not errstr:
        msg = "unknown error"
    if reason_str and lib_str:
        msg = "[%s: %s] %s" % (lib_str, reason_str, errstr)
    elif lib_str:
        msg = "[%s] %s" % (lib_str, errstr)

    err_value = errtype(ssl_errno, msg)
    err_value.reason = reason_str if reason_str else None
    err_value.library = lib_str if lib_str else None
    return err_value

def pyerr_write_unraisable(exc, obj):
    f = sys.stderr

    if obj:
        f.write("Exception ignored in: ")
        f.write(repr(obj))
        f.write("\n")

    t, v, tb = sys.exc_info()
    traceback.print_tb(tb, file=f)

    assert isinstance(v, Exception)
    f.write(t.__module__ + "." + t.__name__)
    f.write(": ")
    f.write(str(v))
    f.write("\n")

SSL_AD_NAMES = [
    "ACCESS_DENIED",
    "BAD_CERTIFICATE",
    "BAD_CERTIFICATE_HASH_VALUE",
    "BAD_CERTIFICATE_STATUS_RESPONSE",
    "BAD_RECORD_MAC",
    "CERTIFICATE_EXPIRED",
    "CERTIFICATE_REVOKED",
    "CERTIFICATE_UNKNOWN",
    "CERTIFICATE_UNOBTAINABLE",
    "CLOSE_NOTIFY",
    "DECODE_ERROR",
    "DECOMPRESSION_FAILURE",
    "DECRYPT_ERROR",
    "HANDSHAKE_FAILURE",
    "ILLEGAL_PARAMETER",
    "INSUFFICIENT_SECURITY",
    "INTERNAL_ERROR",
    "NO_RENEGOTIATION",
    "PROTOCOL_VERSION",
    "RECORD_OVERFLOW",
    "UNEXPECTED_MESSAGE",
    "UNKNOWN_CA",
    "UNKNOWN_PSK_IDENTITY",
    "UNRECOGNIZED_NAME",
    "UNSUPPORTED_CERTIFICATE",
    "UNSUPPORTED_EXTENSION",
    "USER_CANCELLED",
]