File: rurandom.py

package info (click to toggle)
pypy3 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 111,848 kB
  • sloc: python: 1,291,746; ansic: 74,281; asm: 5,187; cpp: 3,017; sh: 2,533; makefile: 544; xml: 243; lisp: 45; csh: 21; awk: 4
file content (172 lines) | stat: -rw-r--r-- 6,202 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""The urandom() function, suitable for cryptographic use.
"""

from __future__ import with_statement
import os, sys
import errno

from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.rlib.objectmodel import not_rpython
from rpython.translator.tool.cbuild import ExternalCompilationInfo
from rpython.rtyper.tool import rffi_platform


if sys.platform == 'win32':
    from rpython.rlib import rwin32

    eci = ExternalCompilationInfo(
        includes = ['windows.h', 'wincrypt.h'],
        libraries = ['advapi32'],
        )

    class CConfig:
        _compilation_info_ = eci
        PROV_RSA_FULL = rffi_platform.ConstantInteger(
            "PROV_RSA_FULL")
        CRYPT_VERIFYCONTEXT = rffi_platform.ConstantInteger(
            "CRYPT_VERIFYCONTEXT")

    globals().update(rffi_platform.configure(CConfig))

    HCRYPTPROV = rwin32.ULONG_PTR

    CryptAcquireContext = rffi.llexternal(
        'CryptAcquireContextA',
        [rffi.CArrayPtr(HCRYPTPROV),
         rwin32.LPCSTR, rwin32.LPCSTR, rwin32.DWORD, rwin32.DWORD],
        rwin32.BOOL,
        calling_conv='win',
        compilation_info=eci,
        save_err=rffi.RFFI_SAVE_LASTERROR)

    CryptGenRandom = rffi.llexternal(
        'CryptGenRandom',
        [HCRYPTPROV, rwin32.DWORD, rffi.CArrayPtr(rwin32.BYTE)],
        rwin32.BOOL,
        calling_conv='win',
        compilation_info=eci,
        save_err=rffi.RFFI_SAVE_LASTERROR)

    @not_rpython
    def init_urandom():
        """
        Return an array of one HCRYPTPROV, initialized to NULL.
        It is filled automatically the first time urandom() is called.
        """
        return lltype.malloc(rffi.CArray(HCRYPTPROV), 1,
                             immortal=True, zero=True)

    def urandom(context, n, signal_checker=None):
        # NOTE: no dictionaries here: rsiphash24 calls this to
        # initialize the random seed of string hashes
        provider = context[0]
        if not provider:
            # This handle is never explicitly released. The operating
            # system will release it when the process terminates.
            if not CryptAcquireContext(
                context, None, None,
                PROV_RSA_FULL, CRYPT_VERIFYCONTEXT):
                raise rwin32.lastSavedWindowsError("CryptAcquireContext")
            provider = context[0]
        # TODO(win64) This is limited to 2**31
        with lltype.scoped_alloc(rffi.CArray(rwin32.BYTE), n,
                                 zero=True, # zero seed
                                 ) as buf:
            if not CryptGenRandom(provider, n, buf):
                raise rwin32.lastSavedWindowsError("CryptGenRandom")

            return rffi.charpsize2str(rffi.cast(rffi.CCHARP, buf), n)

else:  # Posix implementation
    @not_rpython
    def init_urandom():
        return None

    SYS_getrandom = None

    if sys.platform.startswith('linux'):
        eci = ExternalCompilationInfo(includes=['sys/syscall.h'])
        class CConfig:
            _compilation_info_ = eci
            SYS_getrandom = rffi_platform.DefinedConstantInteger(
                'SYS_getrandom')
        globals().update(rffi_platform.configure(CConfig))

    if SYS_getrandom is not None:
        from rpython.rlib.rposix import get_saved_errno, handle_posix_error
        import errno

        eci = eci.merge(ExternalCompilationInfo(includes=['linux/random.h']))
        class CConfig:
            _compilation_info_ = eci
            GRND_NONBLOCK = rffi_platform.DefinedConstantInteger(
                'GRND_NONBLOCK')
        globals().update(rffi_platform.configure(CConfig))
        if GRND_NONBLOCK is None:
            GRND_NONBLOCK = 0x0001      # from linux/random.h

        # On Linux, use the syscall() function because the GNU libc doesn't
        # expose the Linux getrandom() syscall yet.
        syscall = rffi.llexternal(
            'syscall',
            [lltype.Signed, rffi.CCHARP, rffi.LONG, rffi.INT],
            lltype.Signed,
            compilation_info=eci,
            save_err=rffi.RFFI_SAVE_ERRNO)

        class Works:
            status = True
        getrandom_works = Works()

        def _getrandom(n, result, signal_checker):
            if not getrandom_works.status:
                return n
            while n > 0:
                with rffi.scoped_alloc_buffer(n) as buf:
                    got = syscall(SYS_getrandom, buf.raw, n, GRND_NONBLOCK)
                    if got >= 0:
                        s = buf.str(got)
                        result.append(s)
                        n -= len(s)
                        continue
                err = get_saved_errno()
                if (err == errno.ENOSYS or err == errno.EPERM or
                        err == errno.EAGAIN):   # see CPython 3.5
                    getrandom_works.status = False
                    return n
                if err == errno.EINTR:
                    if signal_checker is not None:
                        signal_checker()
                    continue
                handle_posix_error("getrandom", got)
                raise AssertionError("unreachable")
            return n

    def urandom(context, n, signal_checker=None):
        "Read n bytes from /dev/urandom."
        # NOTE: no dictionaries here: rsiphash24 calls this to
        # initialize the random seed of string hashes
        result = []
        if SYS_getrandom is not None:
            n = _getrandom(n, result, signal_checker)
        if n <= 0:
            return ''.join(result)

        # XXX should somehow cache the file descriptor.  It's a mess.
        # CPython has a 99% solution and hopes for the remaining 1%
        # not to occur.  For now, we just don't cache the file
        # descriptor (any more... 6810f401d08e).
        fd = os.open("/dev/urandom", os.O_RDONLY, 0777)
        try:
            while n > 0:
                try:
                    data = os.read(fd, n)
                except OSError as e:
                    if e.errno != errno.EINTR:
                        raise
                    data = ''
                result.append(data)
                n -= len(data)
        finally:
            os.close(fd)
        return ''.join(result)