File: __init__.py

package info (click to toggle)
pypy3 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 111,848 kB
  • sloc: python: 1,291,746; ansic: 74,281; asm: 5,187; cpp: 3,017; sh: 2,533; makefile: 544; xml: 243; lisp: 45; csh: 21; awk: 4
file content (176 lines) | stat: -rw-r--r-- 5,895 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import sys
from threading import Lock
from _pypy_openssl import ffi, lib
from _cffi_ssl._stdssl.utility import (_str_to_ffi_buffer, _bytes_with_len,
        _str_from_buf)

try: from __pypy__ import builtinify
except ImportError: builtinify = lambda f: f


def new(name, string=b''):
    h = Hash(name)
    h.update(string)
    return h

class Hash(object):

    def __init__(self, name, copy_from=None):
        self.ctx = ffi.NULL
        self.name = name
        digest_type = self.digest_type_by_name()
        self.digest_size = lib.EVP_MD_size(digest_type)

        # Allocate a lock for each HASH object.
        # An optimization would be to not release the GIL on small requests,
        # and use a custom lock only when needed.
        self.lock = Lock()

        ctx = lib.Cryptography_EVP_MD_CTX_new()
        if ctx == ffi.NULL:
            raise MemoryError
        ctx = ffi.gc(ctx, lib.Cryptography_EVP_MD_CTX_free)

        try:
            if copy_from is not None:
                # cpython uses EVP_MD_CTX_copy(...)
                if not lib.EVP_MD_CTX_copy_ex(ctx, copy_from):
                    raise ValueError
            else:
                # cpython uses EVP_DigestInit
                lib.EVP_DigestInit_ex(ctx, digest_type, ffi.NULL)
            self.ctx = ctx
        except:
            # no need to gc ctx! 
            raise

    def digest_type_by_name(self):
        c_name = _str_to_ffi_buffer(self.name)
        digest_type = lib.EVP_get_digestbyname(c_name)
        if not digest_type:
            raise ValueError("unknown hash function")
        # TODO
        return digest_type

    def __repr__(self):
        return "<%s HASH object at 0x%s>" % (self.name, id(self))

    def update(self, string):
        buf = ffi.from_buffer(string)
        with self.lock:
            # XXX try to not release the GIL for small requests
            lib.EVP_DigestUpdate(self.ctx, buf, len(buf))

    def copy(self):
        """Return a copy of the hash object."""
        with self.lock:
            return Hash(self.name, copy_from=self.ctx)

    def digest(self):
        """Return the digest value as a string of binary data."""
        return self._digest()

    def hexdigest(self):
        """Return the digest value as a string of hexadecimal digits."""
        digest = self._digest()
        hexdigits = '0123456789abcdef'
        result = []
        for c in digest:
            result.append(hexdigits[(c >> 4) & 0xf])
            result.append(hexdigits[ c       & 0xf])
        return ''.join(result)

    @property
    def block_size(self):
        return lib.EVP_MD_CTX_block_size(self.ctx)

    def _digest(self):
        ctx = lib.Cryptography_EVP_MD_CTX_new()
        if ctx == ffi.NULL:
            raise MemoryError
        try:
            with self.lock:
                if not lib.EVP_MD_CTX_copy_ex(ctx, self.ctx):
                    raise ValueError
            digest_size = self.digest_size
            buf = ffi.new("unsigned char[]", digest_size)
            lib.EVP_DigestFinal_ex(ctx, buf, ffi.NULL)
            return _bytes_with_len(buf, digest_size)
        finally:
            lib.Cryptography_EVP_MD_CTX_free(ctx)

algorithms = ('md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512')

class NameFetcher:
    def __init__(self):
        self.meth_names = []
        self.error = None


def _fetch_names():
    name_fetcher = NameFetcher()
    handle = ffi.new_handle(name_fetcher)
    lib.OBJ_NAME_do_all(lib.OBJ_NAME_TYPE_MD_METH, hash_name_mapper_callback, handle)
    if name_fetcher.error:
        raise name_fetcher.error
    meth_names = name_fetcher.meth_names
    name_fetcher.meth_names = None
    return frozenset(meth_names)

@ffi.callback("void(OBJ_NAME*, void*)")
def hash_name_mapper_callback(obj_name, userdata):
    if not obj_name:
        return
    name_fetcher = ffi.from_handle(userdata)
    # Ignore aliased names, they pollute the list and OpenSSL appears
    # to have a its own definition of alias as the resulting list
    # still contains duplicate and alternate names for several
    # algorithms.
    if obj_name.alias != 0:
        return
    name = _str_from_buf(obj_name.name)
    name_fetcher.meth_names.append(name)

openssl_md_meth_names = _fetch_names()
del _fetch_names

# shortcut functions
def make_new_hash(name, funcname):
    @builtinify
    def new_hash(string=b''):
        return new(name, string)
    new_hash.__name__ = funcname
    return new_hash

for _name in algorithms:
    _newname = 'openssl_%s' % (_name,)
    globals()[_newname] = make_new_hash(_name, _newname)

if hasattr(lib, 'PKCS5_PBKDF2_HMAC'):
    @builtinify
    def pbkdf2_hmac(hash_name, password, salt, iterations, dklen=None):
        if not isinstance(hash_name, str):
            raise TypeError("expected 'str' for name, but got %s" % type(hash_name))
        c_name = _str_to_ffi_buffer(hash_name)
        digest = lib.EVP_get_digestbyname(c_name)
        if digest == ffi.NULL:
            raise ValueError("unsupported hash type")
        if dklen is None:
            dklen = lib.EVP_MD_size(digest)
        if dklen < 1:
            raise ValueError("key length must be greater than 0.")
        if dklen >= sys.maxsize:
            raise OverflowError("key length is too great.")
        if iterations < 1:
            raise ValueError("iteration value must be greater than 0.")
        if iterations >= sys.maxsize:
            raise OverflowError("iteration value is too great.")
        buf = ffi.new("unsigned char[]", dklen)
        c_password = ffi.from_buffer(bytes(password))
        c_salt = ffi.from_buffer(bytes(salt))
        r = lib.PKCS5_PBKDF2_HMAC(c_password, len(c_password),
                ffi.cast("unsigned char*",c_salt), len(c_salt),
                iterations, digest, dklen, buf)
        if r == 0:
            raise ValueError
        return _bytes_with_len(buf, dklen)