File: __main__zipapp.py

package info (click to toggle)
python-virtualenv 20.33.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,124 kB
  • sloc: python: 10,907; sh: 160; ansic: 61; csh: 47; makefile: 8
file content (178 lines) | stat: -rw-r--r-- 6,305 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from __future__ import annotations

import json
import os
import sys
import zipfile
from functools import cached_property
from importlib.abc import SourceLoader
from importlib.util import spec_from_file_location

ABS_HERE = os.path.abspath(os.path.dirname(__file__))


class VersionPlatformSelect:
    def __init__(self) -> None:
        zipapp = ABS_HERE
        self.archive = zipapp
        self._zip_file = zipfile.ZipFile(zipapp)
        self.modules = self._load("modules.json")
        self.distributions = self._load("distributions.json")
        self.__cache = {}

    def _load(self, of_file):
        version = ".".join(str(i) for i in sys.version_info[0:2])
        per_version = json.loads(self.get_data(of_file).decode())
        all_platforms = per_version[version] if version in per_version else per_version["3.9"]
        content = all_platforms.get("==any", {})  # start will all platforms
        not_us = f"!={sys.platform}"
        for key, value in all_platforms.items():  # now override that with not platform
            if key.startswith("!=") and key != not_us:
                content.update(value)
        content.update(all_platforms.get(f"=={sys.platform}", {}))  # and finish it off with our platform
        return content

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._zip_file.close()

    def find_mod(self, fullname):
        if fullname in self.modules:
            return self.modules[fullname]
        return None

    def get_filename(self, fullname):
        zip_path = self.find_mod(fullname)
        return None if zip_path is None else os.path.join(ABS_HERE, zip_path)

    def get_data(self, filename):
        if filename.startswith(ABS_HERE):
            # keep paths relative from the zipfile
            filename = filename[len(ABS_HERE) + 1 :]
            filename = filename.lstrip(os.sep)
        if sys.platform == "win32":
            # paths within the zipfile is always /, fixup on Windows to transform \ to /
            filename = "/".join(filename.split(os.sep))
        with self._zip_file.open(filename) as file_handler:
            return file_handler.read()

    def find_distributions(self, context):
        dist_class = versioned_distribution_class()
        name = context.name
        if name in self.distributions:
            result = dist_class(file_loader=self.get_data, dist_path=self.distributions[name])
            yield result

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(path={ABS_HERE})"

    def _register_distutils_finder(self):  # noqa: C901
        if "distlib" not in self.modules:
            return

        class Resource:
            def __init__(self, path: str, name: str, loader: SourceLoader) -> None:
                self.path = os.path.join(path, name)
                self._name = name
                self.loader = loader

            @cached_property
            def name(self) -> str:
                return os.path.basename(self._name)

            @property
            def bytes(self) -> bytes:
                return self.loader.get_data(self._name)

            @property
            def is_container(self) -> bool:
                return len(self.resources) > 1

            @cached_property
            def resources(self) -> list[str]:
                return [
                    i.filename
                    for i in (
                        (j for j in zip_file.filelist if j.filename.startswith(f"{self._name}/"))
                        if self._name
                        else zip_file.filelist
                    )
                ]

        class DistlibFinder:
            def __init__(self, path, loader) -> None:
                self.path = path
                self.loader = loader

            def find(self, name):
                return Resource(self.path, name, self.loader)

            def iterator(self, resource_name):
                resource = self.find(resource_name)
                if resource is not None:
                    todo = [resource]
                    while todo:
                        resource = todo.pop(0)
                        yield resource
                        if resource.is_container:
                            resource_name = resource.name
                            for name in resource.resources:
                                child = self.find(f"{resource_name}/{name}" if resource_name else name)
                                if child.is_container:
                                    todo.append(child)
                                else:
                                    yield child

        from distlib.resources import register_finder  # noqa: PLC0415

        zip_file = self._zip_file
        register_finder(self, lambda module: DistlibFinder(os.path.dirname(module.__file__), self))


_VER_DISTRIBUTION_CLASS = None


def versioned_distribution_class():
    global _VER_DISTRIBUTION_CLASS  # noqa: PLW0603
    if _VER_DISTRIBUTION_CLASS is None:
        from importlib.metadata import Distribution  # noqa: PLC0415

        class VersionedDistribution(Distribution):
            def __init__(self, file_loader, dist_path) -> None:
                self.file_loader = file_loader
                self.dist_path = dist_path

            def read_text(self, filename):
                return self.file_loader(self.locate_file(filename)).decode("utf-8")

            def locate_file(self, path):
                return os.path.join(self.dist_path, path)

        _VER_DISTRIBUTION_CLASS = VersionedDistribution
    return _VER_DISTRIBUTION_CLASS


class VersionedFindLoad(VersionPlatformSelect, SourceLoader):
    def find_spec(self, fullname, path, target=None):  # noqa: ARG002
        zip_path = self.find_mod(fullname)
        if zip_path is not None:
            return spec_from_file_location(name=fullname, loader=self)
        return None

    def module_repr(self, module):
        raise NotImplementedError


def run():
    with VersionedFindLoad() as finder:
        sys.meta_path.insert(0, finder)
        finder._register_distutils_finder()  # noqa: SLF001
        from virtualenv.__main__ import run as run_virtualenv  # noqa: PLC0415, PLC2701

        run_virtualenv()


if __name__ == "__main__":
    run()