1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
"""The urandom() function, suitable for cryptographic use.
"""
from __future__ import with_statement
import os, sys
import errno
from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.rlib.objectmodel import not_rpython
from rpython.translator.tool.cbuild import ExternalCompilationInfo
from rpython.rtyper.tool import rffi_platform
if sys.platform == 'win32':
from rpython.rlib import rwin32
eci = ExternalCompilationInfo(
includes = ['windows.h', 'bcrypt.h'],
libraries = ['Bcrypt'],
)
class CConfig:
_compilation_info_ = eci
globals().update(rffi_platform.configure(CConfig))
BCryptGenRandom = rffi.llexternal(
'BCryptGenRandom',
[rffi.VOIDP , rffi.CArrayPtr(rwin32.BYTE), rffi.ULONG, rffi.ULONG],
rwin32.BOOL,
calling_conv='win',
compilation_info=eci,
save_err=rffi.RFFI_SAVE_LASTERROR)
def urandom(n, signal_checker=None):
# NOTE: no dictionaries here: rsiphash24 calls this to
# initialize the random seed of string hashes
BCRYPT_USE_SYSTEM_PREFERRED_RNG = 0x00000002
with lltype.scoped_alloc(rffi.CArray(rwin32.BYTE), n,
zero=True, # zero seed
) as buf:
result = BCryptGenRandom(None, buf, n, BCRYPT_USE_SYSTEM_PREFERRED_RNG)
if result != 0:
raise rwin32.lastSavedWindowsError("BCryptGenRandom")
return rffi.charpsize2str(rffi.cast(rffi.CCHARP, buf), n)
else: # Posix implementation
SYS_getrandom = None
if sys.platform.startswith('linux'):
eci = ExternalCompilationInfo(includes=['sys/syscall.h'])
class CConfig:
_compilation_info_ = eci
SYS_getrandom = rffi_platform.DefinedConstantInteger(
'SYS_getrandom')
globals().update(rffi_platform.configure(CConfig))
if SYS_getrandom is not None:
from rpython.rlib.rposix import get_saved_errno, handle_posix_error
import errno
eci = eci.merge(ExternalCompilationInfo(includes=['linux/random.h']))
class CConfig:
_compilation_info_ = eci
GRND_NONBLOCK = rffi_platform.DefinedConstantInteger(
'GRND_NONBLOCK')
globals().update(rffi_platform.configure(CConfig))
if GRND_NONBLOCK is None:
GRND_NONBLOCK = 0x0001 # from linux/random.h
# On Linux, use the syscall() function because the GNU libc doesn't
# expose the Linux getrandom() syscall yet.
syscall = rffi.llexternal(
'syscall',
[lltype.Signed, rffi.CCHARP, rffi.LONG, rffi.INT],
lltype.Signed,
compilation_info=eci,
save_err=rffi.RFFI_SAVE_ERRNO)
class Works:
status = True
getrandom_works = Works()
def _getrandom(n, result, signal_checker):
if not getrandom_works.status:
return n
while n > 0:
with rffi.scoped_alloc_buffer(n) as buf:
got = syscall(SYS_getrandom, buf.raw, n, GRND_NONBLOCK)
if got >= 0:
s = buf.str(got)
result.append(s)
n -= len(s)
continue
err = get_saved_errno()
if (err == errno.ENOSYS or err == errno.EPERM or
err == errno.EAGAIN): # see CPython 3.5
getrandom_works.status = False
return n
if err == errno.EINTR:
if signal_checker is not None:
signal_checker()
continue
handle_posix_error("getrandom", got)
raise AssertionError("unreachable")
return n
def urandom(n, signal_checker=None):
"Read n bytes from /dev/urandom."
# NOTE: no dictionaries here: rsiphash24 calls this to
# initialize the random seed of string hashes
result = []
if SYS_getrandom is not None:
n = _getrandom(n, result, signal_checker)
if n <= 0:
return ''.join(result)
# XXX should somehow cache the file descriptor. It's a mess.
# CPython has a 99% solution and hopes for the remaining 1%
# not to occur. For now, we just don't cache the file
# descriptor (any more... 6810f401d08e).
fd = os.open("/dev/urandom", os.O_RDONLY, 0777)
try:
while n > 0:
try:
data = os.read(fd, n)
except OSError as e:
if e.errno != errno.EINTR:
raise
data = ''
result.append(data)
n -= len(data)
finally:
os.close(fd)
return ''.join(result)
|