1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
"""The urandom() function, suitable for cryptographic use.
"""
from __future__ import with_statement
import os, sys
import errno
from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.rlib.objectmodel import not_rpython
from rpython.translator.tool.cbuild import ExternalCompilationInfo
from rpython.rtyper.tool import rffi_platform
if sys.platform == 'win32':
from rpython.rlib import rwin32
eci = ExternalCompilationInfo(
includes = ['windows.h', 'wincrypt.h'],
libraries = ['advapi32'],
)
class CConfig:
_compilation_info_ = eci
PROV_RSA_FULL = rffi_platform.ConstantInteger(
"PROV_RSA_FULL")
CRYPT_VERIFYCONTEXT = rffi_platform.ConstantInteger(
"CRYPT_VERIFYCONTEXT")
globals().update(rffi_platform.configure(CConfig))
HCRYPTPROV = rwin32.ULONG_PTR
CryptAcquireContext = rffi.llexternal(
'CryptAcquireContextA',
[rffi.CArrayPtr(HCRYPTPROV),
rwin32.LPCSTR, rwin32.LPCSTR, rwin32.DWORD, rwin32.DWORD],
rwin32.BOOL,
calling_conv='win',
compilation_info=eci,
save_err=rffi.RFFI_SAVE_LASTERROR)
CryptGenRandom = rffi.llexternal(
'CryptGenRandom',
[HCRYPTPROV, rwin32.DWORD, rffi.CArrayPtr(rwin32.BYTE)],
rwin32.BOOL,
calling_conv='win',
compilation_info=eci,
save_err=rffi.RFFI_SAVE_LASTERROR)
@not_rpython
def init_urandom():
"""
Return an array of one HCRYPTPROV, initialized to NULL.
It is filled automatically the first time urandom() is called.
"""
return lltype.malloc(rffi.CArray(HCRYPTPROV), 1,
immortal=True, zero=True)
def urandom(context, n, signal_checker=None):
# NOTE: no dictionaries here: rsiphash24 calls this to
# initialize the random seed of string hashes
provider = context[0]
if not provider:
# This handle is never explicitly released. The operating
# system will release it when the process terminates.
if not CryptAcquireContext(
context, None, None,
PROV_RSA_FULL, CRYPT_VERIFYCONTEXT):
raise rwin32.lastSavedWindowsError("CryptAcquireContext")
provider = context[0]
# TODO(win64) This is limited to 2**31
with lltype.scoped_alloc(rffi.CArray(rwin32.BYTE), n,
zero=True, # zero seed
) as buf:
if not CryptGenRandom(provider, n, buf):
raise rwin32.lastSavedWindowsError("CryptGenRandom")
return rffi.charpsize2str(rffi.cast(rffi.CCHARP, buf), n)
else: # Posix implementation
@not_rpython
def init_urandom():
return None
SYS_getrandom = None
if sys.platform.startswith('linux'):
eci = ExternalCompilationInfo(includes=['sys/syscall.h'])
class CConfig:
_compilation_info_ = eci
SYS_getrandom = rffi_platform.DefinedConstantInteger(
'SYS_getrandom')
globals().update(rffi_platform.configure(CConfig))
if SYS_getrandom is not None:
from rpython.rlib.rposix import get_saved_errno, handle_posix_error
import errno
eci = eci.merge(ExternalCompilationInfo(includes=['linux/random.h']))
class CConfig:
_compilation_info_ = eci
GRND_NONBLOCK = rffi_platform.DefinedConstantInteger(
'GRND_NONBLOCK')
globals().update(rffi_platform.configure(CConfig))
if GRND_NONBLOCK is None:
GRND_NONBLOCK = 0x0001 # from linux/random.h
# On Linux, use the syscall() function because the GNU libc doesn't
# expose the Linux getrandom() syscall yet.
syscall = rffi.llexternal(
'syscall',
[lltype.Signed, rffi.CCHARP, rffi.LONG, rffi.INT],
lltype.Signed,
compilation_info=eci,
save_err=rffi.RFFI_SAVE_ERRNO)
class Works:
status = True
getrandom_works = Works()
def _getrandom(n, result, signal_checker):
if not getrandom_works.status:
return n
while n > 0:
with rffi.scoped_alloc_buffer(n) as buf:
got = syscall(SYS_getrandom, buf.raw, n, GRND_NONBLOCK)
if got >= 0:
s = buf.str(got)
result.append(s)
n -= len(s)
continue
err = get_saved_errno()
if (err == errno.ENOSYS or err == errno.EPERM or
err == errno.EAGAIN): # see CPython 3.5
getrandom_works.status = False
return n
if err == errno.EINTR:
if signal_checker is not None:
signal_checker()
continue
handle_posix_error("getrandom", got)
raise AssertionError("unreachable")
return n
def urandom(context, n, signal_checker=None):
"Read n bytes from /dev/urandom."
# NOTE: no dictionaries here: rsiphash24 calls this to
# initialize the random seed of string hashes
result = []
if SYS_getrandom is not None:
n = _getrandom(n, result, signal_checker)
if n <= 0:
return ''.join(result)
# XXX should somehow cache the file descriptor. It's a mess.
# CPython has a 99% solution and hopes for the remaining 1%
# not to occur. For now, we just don't cache the file
# descriptor (any more... 6810f401d08e).
fd = os.open("/dev/urandom", os.O_RDONLY, 0777)
try:
while n > 0:
try:
data = os.read(fd, n)
except OSError as e:
if e.errno != errno.EINTR:
raise
data = ''
result.append(data)
n -= len(data)
finally:
os.close(fd)
return ''.join(result)
|