File: test_xpickle.py

package info (click to toggle)
python3.13 3.13.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 123,756 kB
  • sloc: python: 710,464; ansic: 655,626; xml: 31,250; sh: 5,844; cpp: 4,327; makefile: 1,983; objc: 787; lisp: 502; javascript: 213; asm: 75; csh: 12
file content (281 lines) | stat: -rw-r--r-- 10,136 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# This test covers backwards compatibility with previous versions of Python
# by bouncing pickled objects through Python versions by running xpickle_worker.py.
import io
import os
import pickle
import struct
import subprocess
import sys
import unittest


from test import support
from test import pickletester

try:
    import _pickle
    has_c_implementation = True
except ModuleNotFoundError:
    has_c_implementation = False

support.requires('xpickle')

is_windows = sys.platform.startswith('win')

# Map python version to a tuple containing the name of a corresponding valid
# Python binary to execute and its arguments.
py_executable_map = {}

protocols_map = {
    3: (3, 0),
    4: (3, 4),
    5: (3, 8),
}

def highest_proto_for_py_version(py_version):
    """Finds the highest supported pickle protocol for a given Python version.
    Args:
        py_version: a 2-tuple of the major, minor version. Eg. Python 3.7 would
                    be (3, 7)
    Returns:
        int for the highest supported pickle protocol
    """
    proto = 2
    for p, v in protocols_map.items():
        if py_version < v:
            break
        proto = p
    return proto

def have_python_version(py_version):
    """Check whether a Python binary exists for the given py_version and has
    support. This respects your PATH.
    For Windows, it will first try to use the py launcher specified in PEP 397.
    Otherwise (and for all other platforms), it will attempt to check for
    python<py_version[0]>.<py_version[1]>.

    Eg. given a *py_version* of (3, 7), the function will attempt to try
    'py -3.7' (for Windows) first, then 'python3.7', and return
    ['py', '-3.7'] (on Windows) or ['python3.7'] on other platforms.

    Args:
        py_version: a 2-tuple of the major, minor version. Eg. python 3.7 would
                    be (3, 7)
    Returns:
        List/Tuple containing the Python binary name and its required arguments,
        or None if no valid binary names found.
    """
    python_str = ".".join(map(str, py_version))
    targets = [('py', f'-{python_str}'), (f'python{python_str}',)]
    if py_version not in py_executable_map:
        for target in targets[0 if is_windows else 1:]:
            try:
                worker = subprocess.Popen([*target, '-c', 'pass'],
                                          stdout=subprocess.DEVNULL,
                                          stderr=subprocess.DEVNULL,
                                          shell=is_windows)
                worker.communicate()
                if worker.returncode == 0:
                    py_executable_map[py_version] = target
                break
            except FileNotFoundError:
                pass

    return py_executable_map.get(py_version, None)


def read_exact(f, n):
    buf = b''
    while len(buf) < n:
        chunk = f.read(n - len(buf))
        if not chunk:
            raise EOFError
        buf += chunk
    return buf


class AbstractCompatTests(pickletester.AbstractPickleTests):
    py_version = None
    worker = None

    @classmethod
    def setUpClass(cls):
        assert cls.py_version is not None, 'Needs a python version tuple'
        if not have_python_version(cls.py_version):
            py_version_str = ".".join(map(str, cls.py_version))
            raise unittest.SkipTest(f'Python {py_version_str} not available')
        cls.addClassCleanup(cls.finish_worker)
        # Override the default pickle protocol to match what xpickle worker
        # will be running.
        highest_protocol = highest_proto_for_py_version(cls.py_version)
        cls.enterClassContext(support.swap_attr(pickletester, 'protocols',
                                                range(highest_protocol + 1)))
        cls.enterClassContext(support.swap_attr(pickle, 'HIGHEST_PROTOCOL',
                                                highest_protocol))

    @classmethod
    def start_worker(cls, python):
        target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py')
        worker = subprocess.Popen([*python, target],
                                  stdin=subprocess.PIPE,
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.PIPE,
                                  # For windows bpo-17023.
                                  shell=is_windows)
        cls.worker = worker
        return worker

    @classmethod
    def finish_worker(cls):
        worker = cls.worker
        if worker is None:
            return
        cls.worker = None
        worker.stdin.close()
        worker.stdout.close()
        worker.stderr.close()
        worker.terminate()
        worker.wait()

    @classmethod
    def send_to_worker(cls, python, data):
        """Bounce a pickled object through another version of Python.
        This will send data to a child process where it will
        be unpickled, then repickled and sent back to the parent process.
        Args:
            python: list containing the python binary to start and its arguments
            data: bytes object to send to the child process
        Returns:
            The pickled data received from the child process.
        """
        worker = cls.worker
        if worker is None:
            worker = cls.start_worker(python)

        try:
            worker.stdin.write(struct.pack('!i', len(data)) + data)
            worker.stdin.flush()

            size, = struct.unpack('!i', read_exact(worker.stdout, 4))
            if size > 0:
                return read_exact(worker.stdout, size)
            # if the worker fails, it will write the exception to stdout
            if size < 0:
                stdout = read_exact(worker.stdout, -size)
                try:
                    exception = pickle.loads(stdout)
                except (pickle.UnpicklingError, EOFError):
                    pass
                else:
                    if isinstance(exception, Exception):
                        # To allow for tests which test for errors.
                        raise exception
            _, stderr = worker.communicate()
            raise RuntimeError(stderr)
        except:
            cls.finish_worker()
            raise

    def dumps(self, arg, proto=0, **kwargs):
        # Skip tests that require buffer_callback arguments since
        # there isn't a reliable way to marshal/pickle the callback and ensure
        # it works in a different Python version.
        if 'buffer_callback' in kwargs:
            self.skipTest('Test does not support "buffer_callback" argument.')
        f = io.BytesIO()
        p = self.pickler(f, proto, **kwargs)
        p.dump(arg)
        data = struct.pack('!i', proto) + f.getvalue()
        python = py_executable_map[self.py_version]
        return self.send_to_worker(python, data)

    def loads(self, buf, **kwds):
        f = io.BytesIO(buf)
        u = self.unpickler(f, **kwds)
        return u.load()

    # A scaled-down version of test_bytes from pickletester, to reduce
    # the number of calls to self.dumps() and hence reduce the number of
    # child python processes forked. This allows the test to complete
    # much faster (the one from pickletester takes 3-4 minutes when running
    # under text_xpickle).
    def test_bytes(self):
        if self.py_version < (3, 0):
            self.skipTest('not supported in Python < 3.0')
        for proto in pickletester.protocols:
            for s in b'', b'xyz', b'xyz'*100:
                p = self.dumps(s, proto)
                self.assert_is_copy(s, self.loads(p))
            s = bytes(range(256))
            p = self.dumps(s, proto)
            self.assert_is_copy(s, self.loads(p))
            s = bytes([i for i in range(256) for _ in range(2)])
            p = self.dumps(s, proto)
            self.assert_is_copy(s, self.loads(p))

    # These tests are disabled because they require some special setup
    # on the worker that's hard to keep in sync.
    test_global_ext1 = None
    test_global_ext2 = None
    test_global_ext4 = None

    # These tests fail because they require classes from pickletester
    # which cannot be properly imported by the xpickle worker.
    test_recursive_nested_names = None
    test_recursive_nested_names2 = None

    # Attribute lookup problems are expected, disable the test
    test_dynamic_class = None
    test_evil_class_mutating_dict = None

    # Expected exception is raised during unpickling in a subprocess.
    test_pickle_setstate_None = None

    # Other Python version may not have NumPy.
    test_buffers_numpy = None

    # Skip tests that require buffer_callback arguments since
    # there isn't a reliable way to marshal/pickle the callback and ensure
    # it works in a different Python version.
    test_in_band_buffers = None
    test_buffers_error = None
    test_oob_buffers = None
    test_oob_buffers_writable_to_readonly = None

class PyPicklePythonCompat(AbstractCompatTests):
    pickler = pickle._Pickler
    unpickler = pickle._Unpickler

if has_c_implementation:
    class CPicklePythonCompat(AbstractCompatTests):
        pickler = _pickle.Pickler
        unpickler = _pickle.Unpickler


def make_test(py_version, base):
    class_dict = {'py_version': py_version}
    name = base.__name__.replace('Python', 'Python%d%d' % py_version)
    return type(name, (base, unittest.TestCase), class_dict)

def load_tests(loader, tests, pattern):
    def add_tests(py_version):
        test_class = make_test(py_version, PyPicklePythonCompat)
        tests.addTest(loader.loadTestsFromTestCase(test_class))
        if has_c_implementation:
            test_class = make_test(py_version, CPicklePythonCompat)
            tests.addTest(loader.loadTestsFromTestCase(test_class))

    value = support.get_resource_value('xpickle')
    if value is None:
        major = sys.version_info.major
        assert major == 3
        add_tests((2, 7))
        for minor in range(2, sys.version_info.minor):
            add_tests((major, minor))
    else:
        add_tests(tuple(map(int, value.split('.'))))
    return tests


if __name__ == '__main__':
    unittest.main()