1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
|
import sys
import os
import traceback
from _pypy_openssl import ffi
from _pypy_openssl import lib
from _cffi_ssl._stdssl.utility import _string_from_asn1, _str_to_ffi_buffer, _str_from_buf
from _cffi_ssl._stdssl.errorcodes import _error_codes, _lib_codes
SSL_ERROR_NONE = 0
SSL_ERROR_SSL = 1
SSL_ERROR_WANT_READ = 2
SSL_ERROR_WANT_WRITE = 3
SSL_ERROR_WANT_X509_LOOKUP = 4
SSL_ERROR_SYSCALL = 5
SSL_ERROR_ZERO_RETURN = 6
SSL_ERROR_WANT_CONNECT = 7
# start of non ssl.h errorcodes
SSL_ERROR_EOF = 8 # special case of SSL_ERROR_SYSCALL
SSL_ERROR_NO_SOCKET = 9 # socket has been GC'd
SSL_ERROR_INVALID_ERROR_CODE = 10
class SSLError(OSError):
""" An error occurred in the SSL implementation. """
def __str__(self):
if self.strerror and isinstance(self.strerror, str):
return self.strerror
return str(self.args)
class SSLZeroReturnError(SSLError):
""" SSL/TLS session closed cleanly. """
class SSLWantReadError(SSLError):
""" Non-blocking SSL socket needs to read more data
before the requested operation can be completed.
"""
class SSLWantWriteError(SSLError):
"""Non-blocking SSL socket needs to write more data
before the requested operation can be completed.
"""
class SSLSyscallError(SSLError):
""" System error when attempting SSL operation. """
class SSLEOFError(SSLError):
""" SSL/TLS connection terminated abruptly. """
def ssl_error(errstr, errcode=0):
if errstr is None:
errcode = lib.ERR_peek_last_error()
try:
return fill_sslerror(SSLError, errcode, errstr, errcode)
finally:
lib.ERR_clear_error()
ERR_CODES_TO_NAMES = {}
ERR_NAMES_TO_CODES = {}
LIB_CODES_TO_NAMES = {}
for mnemo, library, reason in _error_codes:
key = (library, reason)
assert mnemo is not None and key is not None
ERR_CODES_TO_NAMES[key] = mnemo
ERR_NAMES_TO_CODES[mnemo] = key
for mnemo, number in _lib_codes:
LIB_CODES_TO_NAMES[number] = mnemo
# the PySSL_SetError equivalent
def pyssl_error(obj, ret):
errcode = lib.ERR_peek_last_error()
errstr = ""
errval = 0
errtype = SSLError
e = lib.ERR_peek_last_error()
if obj.ssl != ffi.NULL:
err = lib.SSL_get_error(obj.ssl, ret)
if err == SSL_ERROR_ZERO_RETURN:
errtype = SSLZeroReturnError
errstr = "TLS/SSL connection has been closed"
errval = SSL_ERROR_ZERO_RETURN
elif err == SSL_ERROR_WANT_READ:
errtype = SSLWantReadError
errstr = "The operation did not complete (read)"
errval = SSL_ERROR_WANT_READ
elif err == SSL_ERROR_WANT_WRITE:
errtype = SSLWantWriteError
errstr = "The operation did not complete (write)"
errval = SSL_ERROR_WANT_WRITE
elif err == SSL_ERROR_WANT_X509_LOOKUP:
errstr = "The operation did not complete (X509 lookup)"
errval = SSL_ERROR_WANT_X509_LOOKUP
elif err == SSL_ERROR_WANT_CONNECT:
errstr = "The operation did not complete (connect)"
errval = SSL_ERROR_WANT_CONNECT
elif err == SSL_ERROR_SYSCALL:
if e == 0:
if ret == 0 or obj.socket is None:
errtype = SSLEOFError
errstr = "EOF occurred in violation of protocol"
errval = SSL_ERROR_EOF
elif ret == -1 and obj.socket is not None:
# the underlying BIO reported an I/0 error
lib.ERR_clear_error()
# s = obj.get_socket_or_None()
# XXX: Windows?
errno = ffi.errno
return OSError(errno, os.strerror(errno))
else:
errtype = SSLSyscallError
errstr = "Some I/O error occurred"
errval = SSL_ERROR_SYSCALL
else:
errstr = _str_from_buf(lib.ERR_error_string(e, ffi.NULL))
errval = SSL_ERROR_SYSCALL
elif err == SSL_ERROR_SSL:
errval = SSL_ERROR_SSL
if errcode != 0:
errstr = _str_from_buf(lib.ERR_error_string(errcode, ffi.NULL))
else:
errstr = "A failure in the SSL library occurred"
else:
errstr = "Invalid error code"
errval = SSL_ERROR_INVALID_ERROR_CODE
return fill_sslerror(errtype, errval, errstr, e)
def fill_sslerror(errtype, ssl_errno, errstr, errcode):
reason_str = None
lib_str = None
if errcode != 0:
err_lib = lib.ERR_GET_LIB(errcode)
err_reason = lib.ERR_GET_REASON(errcode)
reason_str = ERR_CODES_TO_NAMES.get((err_lib, err_reason), None)
lib_str = LIB_CODES_TO_NAMES.get(err_lib, None)
if errstr is None:
errstr = _str_from_buf(lib.ERR_reason_error_string(errcode))
msg = errstr
if not errstr:
msg = "unknown error"
if reason_str and lib_str:
msg = "[%s: %s] %s" % (lib_str, reason_str, errstr)
elif lib_str:
msg = "[%s] %s" % (lib_str, errstr)
err_value = errtype(ssl_errno, msg)
err_value.reason = reason_str if reason_str else None
err_value.library = lib_str if lib_str else None
return err_value
def pyerr_write_unraisable(exc, obj):
f = sys.stderr
if obj:
f.write("Exception ignored in: ")
f.write(repr(obj))
f.write("\n")
t, v, tb = sys.exc_info()
traceback.print_tb(tb, file=f)
assert isinstance(v, Exception)
f.write(t.__module__ + "." + t.__name__)
f.write(": ")
f.write(str(v))
f.write("\n")
SSL_AD_NAMES = [
"ACCESS_DENIED",
"BAD_CERTIFICATE",
"BAD_CERTIFICATE_HASH_VALUE",
"BAD_CERTIFICATE_STATUS_RESPONSE",
"BAD_RECORD_MAC",
"CERTIFICATE_EXPIRED",
"CERTIFICATE_REVOKED",
"CERTIFICATE_UNKNOWN",
"CERTIFICATE_UNOBTAINABLE",
"CLOSE_NOTIFY",
"DECODE_ERROR",
"DECOMPRESSION_FAILURE",
"DECRYPT_ERROR",
"HANDSHAKE_FAILURE",
"ILLEGAL_PARAMETER",
"INSUFFICIENT_SECURITY",
"INTERNAL_ERROR",
"NO_RENEGOTIATION",
"PROTOCOL_VERSION",
"RECORD_OVERFLOW",
"UNEXPECTED_MESSAGE",
"UNKNOWN_CA",
"UNKNOWN_PSK_IDENTITY",
"UNRECOGNIZED_NAME",
"UNSUPPORTED_CERTIFICATE",
"UNSUPPORTED_EXTENSION",
"USER_CANCELLED",
]
|