File: test_misc.py

package info (click to toggle)
pytorch-cuda 2.6.0%2Bdfsg-7
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 161,620 kB
  • sloc: python: 1,278,832; cpp: 900,322; ansic: 82,710; asm: 7,754; java: 3,363; sh: 2,811; javascript: 2,443; makefile: 597; ruby: 195; xml: 84; objc: 68
file content (360 lines) | stat: -rw-r--r-- 12,821 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# Owner(s): ["oncall: package/deploy"]

import inspect
import os
import platform
import sys
from io import BytesIO
from pathlib import Path
from textwrap import dedent
from unittest import skipIf

from torch.package import is_from_package, PackageExporter, PackageImporter
from torch.package.package_exporter import PackagingError
from torch.testing._internal.common_utils import (
    IS_FBCODE,
    IS_SANDCASTLE,
    run_tests,
    skipIfTorchDynamo,
)


try:
    from .common import PackageTestCase
except ImportError:
    # Support the case where we run this file directly.
    from common import PackageTestCase


class TestMisc(PackageTestCase):
    """Tests for one-off or random functionality. Try not to add to this!"""

    def test_file_structure(self):
        """
        Tests package's Directory structure representation of a zip file. Ensures
        that the returned Directory prints what is expected and filters
        inputs/outputs correctly.
        """
        buffer = BytesIO()

        export_plain = dedent(
            """\
                \u251c\u2500\u2500 .data
                \u2502   \u251c\u2500\u2500 extern_modules
                \u2502   \u251c\u2500\u2500 python_version
                \u2502   \u251c\u2500\u2500 serialization_id
                \u2502   \u2514\u2500\u2500 version
                \u251c\u2500\u2500 main
                \u2502   \u2514\u2500\u2500 main
                \u251c\u2500\u2500 obj
                \u2502   \u2514\u2500\u2500 obj.pkl
                \u251c\u2500\u2500 package_a
                \u2502   \u251c\u2500\u2500 __init__.py
                \u2502   \u2514\u2500\u2500 subpackage.py
                \u251c\u2500\u2500 byteorder
                \u2514\u2500\u2500 module_a.py
            """
        )
        export_include = dedent(
            """\
                \u251c\u2500\u2500 obj
                \u2502   \u2514\u2500\u2500 obj.pkl
                \u2514\u2500\u2500 package_a
                    \u2514\u2500\u2500 subpackage.py
            """
        )
        import_exclude = dedent(
            """\
                \u251c\u2500\u2500 .data
                \u2502   \u251c\u2500\u2500 extern_modules
                \u2502   \u251c\u2500\u2500 python_version
                \u2502   \u251c\u2500\u2500 serialization_id
                \u2502   \u2514\u2500\u2500 version
                \u251c\u2500\u2500 main
                \u2502   \u2514\u2500\u2500 main
                \u251c\u2500\u2500 obj
                \u2502   \u2514\u2500\u2500 obj.pkl
                \u251c\u2500\u2500 package_a
                \u2502   \u251c\u2500\u2500 __init__.py
                \u2502   \u2514\u2500\u2500 subpackage.py
                \u251c\u2500\u2500 byteorder
                \u2514\u2500\u2500 module_a.py
            """
        )

        with PackageExporter(buffer) as he:
            import module_a
            import package_a
            import package_a.subpackage

            obj = package_a.subpackage.PackageASubpackageObject()
            he.intern("**")
            he.save_module(module_a.__name__)
            he.save_module(package_a.__name__)
            he.save_pickle("obj", "obj.pkl", obj)
            he.save_text("main", "main", "my string")

        buffer.seek(0)
        hi = PackageImporter(buffer)

        file_structure = hi.file_structure()
        # remove first line from testing because WINDOW/iOS/Unix treat the buffer differently
        self.assertEqual(
            dedent("\n".join(str(file_structure).split("\n")[1:])),
            export_plain,
        )
        file_structure = hi.file_structure(include=["**/subpackage.py", "**/*.pkl"])
        self.assertEqual(
            dedent("\n".join(str(file_structure).split("\n")[1:])),
            export_include,
        )

        file_structure = hi.file_structure(exclude="**/*.storage")
        self.assertEqual(
            dedent("\n".join(str(file_structure).split("\n")[1:])),
            import_exclude,
        )

    def test_loaders_that_remap_files_work_ok(self):
        from importlib.abc import MetaPathFinder
        from importlib.machinery import SourceFileLoader
        from importlib.util import spec_from_loader

        class LoaderThatRemapsModuleA(SourceFileLoader):
            def get_filename(self, name):
                result = super().get_filename(name)
                if name == "module_a":
                    return os.path.join(
                        os.path.dirname(result), "module_a_remapped_path.py"
                    )
                else:
                    return result

        class FinderThatRemapsModuleA(MetaPathFinder):
            def find_spec(self, fullname, path, target):
                """Try to find the original spec for module_a using all the
                remaining meta_path finders."""
                if fullname != "module_a":
                    return None
                spec = None
                for finder in sys.meta_path:
                    if finder is self:
                        continue
                    if hasattr(finder, "find_spec"):
                        spec = finder.find_spec(fullname, path, target=target)
                    elif hasattr(finder, "load_module"):
                        spec = spec_from_loader(fullname, finder)
                    if spec is not None:
                        break
                assert spec is not None and isinstance(spec.loader, SourceFileLoader)
                spec.loader = LoaderThatRemapsModuleA(
                    spec.loader.name, spec.loader.path
                )
                return spec

        sys.meta_path.insert(0, FinderThatRemapsModuleA())
        # clear it from sys.modules so that we use the custom finder next time
        # it gets imported
        sys.modules.pop("module_a", None)
        try:
            buffer = BytesIO()
            with PackageExporter(buffer) as he:
                import module_a

                he.intern("**")
                he.save_module(module_a.__name__)

            buffer.seek(0)
            hi = PackageImporter(buffer)
            self.assertTrue("remapped_path" in hi.get_source("module_a"))
        finally:
            # pop it again to ensure it does not mess up other tests
            sys.modules.pop("module_a", None)
            sys.meta_path.pop(0)

    def test_python_version(self):
        """
        Tests that the current python version is stored in the package and is available
        via PackageImporter's python_version() method.
        """
        buffer = BytesIO()

        with PackageExporter(buffer) as he:
            from package_a.test_module import SimpleTest

            he.intern("**")
            obj = SimpleTest()
            he.save_pickle("obj", "obj.pkl", obj)

        buffer.seek(0)
        hi = PackageImporter(buffer)

        self.assertEqual(hi.python_version(), platform.python_version())

    @skipIf(
        IS_FBCODE or IS_SANDCASTLE,
        "Tests that use temporary files are disabled in fbcode",
    )
    def test_load_python_version_from_package(self):
        """Tests loading a package with a python version embdded"""
        importer1 = PackageImporter(
            f"{Path(__file__).parent}/package_e/test_nn_module.pt"
        )
        self.assertEqual(importer1.python_version(), "3.9.7")

    def test_file_structure_has_file(self):
        """
        Test Directory's has_file() method.
        """
        buffer = BytesIO()
        with PackageExporter(buffer) as he:
            import package_a.subpackage

            he.intern("**")
            obj = package_a.subpackage.PackageASubpackageObject()
            he.save_pickle("obj", "obj.pkl", obj)

        buffer.seek(0)

        importer = PackageImporter(buffer)
        file_structure = importer.file_structure()
        self.assertTrue(file_structure.has_file("package_a/subpackage.py"))
        self.assertFalse(file_structure.has_file("package_a/subpackage"))

    def test_exporter_content_lists(self):
        """
        Test content list API for PackageExporter's contained modules.
        """

        with PackageExporter(BytesIO()) as he:
            import package_b

            he.extern("package_b.subpackage_1")
            he.mock("package_b.subpackage_2")
            he.intern("**")
            he.save_pickle("obj", "obj.pkl", package_b.PackageBObject(["a"]))
            self.assertEqual(he.externed_modules(), ["package_b.subpackage_1"])
            self.assertEqual(he.mocked_modules(), ["package_b.subpackage_2"])
            self.assertEqual(
                he.interned_modules(),
                ["package_b", "package_b.subpackage_0.subsubpackage_0"],
            )
            self.assertEqual(he.get_rdeps("package_b.subpackage_2"), ["package_b"])

        with self.assertRaises(PackagingError) as e:
            with PackageExporter(BytesIO()) as he:
                import package_b

                he.deny("package_b")
                he.save_pickle("obj", "obj.pkl", package_b.PackageBObject(["a"]))
                self.assertEqual(he.denied_modules(), ["package_b"])

    def test_is_from_package(self):
        """is_from_package should work for objects and modules"""
        import package_a.subpackage

        buffer = BytesIO()
        obj = package_a.subpackage.PackageASubpackageObject()

        with PackageExporter(buffer) as pe:
            pe.intern("**")
            pe.save_pickle("obj", "obj.pkl", obj)

        buffer.seek(0)
        pi = PackageImporter(buffer)
        mod = pi.import_module("package_a.subpackage")
        loaded_obj = pi.load_pickle("obj", "obj.pkl")

        self.assertFalse(is_from_package(package_a.subpackage))
        self.assertTrue(is_from_package(mod))

        self.assertFalse(is_from_package(obj))
        self.assertTrue(is_from_package(loaded_obj))

    def test_inspect_class(self):
        """Should be able to retrieve source for a packaged class."""
        import package_a.subpackage

        buffer = BytesIO()
        obj = package_a.subpackage.PackageASubpackageObject()

        with PackageExporter(buffer) as pe:
            pe.intern("**")
            pe.save_pickle("obj", "obj.pkl", obj)

        buffer.seek(0)
        pi = PackageImporter(buffer)
        packaged_class = pi.import_module(
            "package_a.subpackage"
        ).PackageASubpackageObject
        regular_class = package_a.subpackage.PackageASubpackageObject

        packaged_src = inspect.getsourcelines(packaged_class)
        regular_src = inspect.getsourcelines(regular_class)
        self.assertEqual(packaged_src, regular_src)

    def test_dunder_package_present(self):
        """
        The attribute '__torch_package__' should be populated on imported modules.
        """
        import package_a.subpackage

        buffer = BytesIO()
        obj = package_a.subpackage.PackageASubpackageObject()

        with PackageExporter(buffer) as pe:
            pe.intern("**")
            pe.save_pickle("obj", "obj.pkl", obj)

        buffer.seek(0)
        pi = PackageImporter(buffer)
        mod = pi.import_module("package_a.subpackage")
        self.assertTrue(hasattr(mod, "__torch_package__"))

    def test_dunder_package_works_from_package(self):
        """
        The attribute '__torch_package__' should be accessible from within
        the module itself, so that packaged code can detect whether it's
        being used in a packaged context or not.
        """
        import package_a.use_dunder_package as mod

        buffer = BytesIO()

        with PackageExporter(buffer) as pe:
            pe.intern("**")
            pe.save_module(mod.__name__)

        buffer.seek(0)
        pi = PackageImporter(buffer)
        imported_mod = pi.import_module(mod.__name__)
        self.assertTrue(imported_mod.is_from_package())
        self.assertFalse(mod.is_from_package())

    @skipIfTorchDynamo("Not a suitable test for TorchDynamo")
    def test_std_lib_sys_hackery_checks(self):
        """
        The standard library performs sys.module assignment hackery which
        causes modules who do this hackery to fail on import. See
        https://github.com/pytorch/pytorch/issues/57490 for more information.
        """
        if sys.version_info < (3, 13):
            import package_a.std_sys_module_hacks as std_sys_module_hacks
        else:
            import package_a.std_sys_module_hacks_3_13 as std_sys_module_hacks

        buffer = BytesIO()
        mod = std_sys_module_hacks.Module()

        with PackageExporter(buffer) as pe:
            pe.intern("**")
            pe.save_pickle("obj", "obj.pkl", mod)

        buffer.seek(0)
        pi = PackageImporter(buffer)
        mod = pi.load_pickle("obj", "obj.pkl")
        mod()


if __name__ == "__main__":
    run_tests()