1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
from __future__ import annotations
import json
import os
import sys
import zipfile
from functools import cached_property
from importlib.abc import SourceLoader
from importlib.util import spec_from_file_location
ABS_HERE = os.path.abspath(os.path.dirname(__file__))
class VersionPlatformSelect:
def __init__(self) -> None:
zipapp = ABS_HERE
self.archive = zipapp
self._zip_file = zipfile.ZipFile(zipapp)
self.modules = self._load("modules.json")
self.distributions = self._load("distributions.json")
self.__cache = {}
def _load(self, of_file):
version = ".".join(str(i) for i in sys.version_info[0:2])
per_version = json.loads(self.get_data(of_file).decode())
all_platforms = per_version[version] if version in per_version else per_version["3.9"]
content = all_platforms.get("==any", {}) # start will all platforms
not_us = f"!={sys.platform}"
for key, value in all_platforms.items(): # now override that with not platform
if key.startswith("!=") and key != not_us:
content.update(value)
content.update(all_platforms.get(f"=={sys.platform}", {})) # and finish it off with our platform
return content
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._zip_file.close()
def find_mod(self, fullname):
if fullname in self.modules:
return self.modules[fullname]
return None
def get_filename(self, fullname):
zip_path = self.find_mod(fullname)
return None if zip_path is None else os.path.join(ABS_HERE, zip_path)
def get_data(self, filename):
if filename.startswith(ABS_HERE):
# keep paths relative from the zipfile
filename = filename[len(ABS_HERE) + 1 :]
filename = filename.lstrip(os.sep)
if sys.platform == "win32":
# paths within the zipfile is always /, fixup on Windows to transform \ to /
filename = "/".join(filename.split(os.sep))
with self._zip_file.open(filename) as file_handler:
return file_handler.read()
def find_distributions(self, context):
dist_class = versioned_distribution_class()
name = context.name
if name in self.distributions:
result = dist_class(file_loader=self.get_data, dist_path=self.distributions[name])
yield result
def __repr__(self) -> str:
return f"{self.__class__.__name__}(path={ABS_HERE})"
def _register_distutils_finder(self): # noqa: C901
if "distlib" not in self.modules:
return
class Resource:
def __init__(self, path: str, name: str, loader: SourceLoader) -> None:
self.path = os.path.join(path, name)
self._name = name
self.loader = loader
@cached_property
def name(self) -> str:
return os.path.basename(self._name)
@property
def bytes(self) -> bytes:
return self.loader.get_data(self._name)
@property
def is_container(self) -> bool:
return len(self.resources) > 1
@cached_property
def resources(self) -> list[str]:
return [
i.filename
for i in (
(j for j in zip_file.filelist if j.filename.startswith(f"{self._name}/"))
if self._name
else zip_file.filelist
)
]
class DistlibFinder:
def __init__(self, path, loader) -> None:
self.path = path
self.loader = loader
def find(self, name):
return Resource(self.path, name, self.loader)
def iterator(self, resource_name):
resource = self.find(resource_name)
if resource is not None:
todo = [resource]
while todo:
resource = todo.pop(0)
yield resource
if resource.is_container:
resource_name = resource.name
for name in resource.resources:
child = self.find(f"{resource_name}/{name}" if resource_name else name)
if child.is_container:
todo.append(child)
else:
yield child
from distlib.resources import register_finder # noqa: PLC0415
zip_file = self._zip_file
register_finder(self, lambda module: DistlibFinder(os.path.dirname(module.__file__), self))
_VER_DISTRIBUTION_CLASS = None
def versioned_distribution_class():
global _VER_DISTRIBUTION_CLASS # noqa: PLW0603
if _VER_DISTRIBUTION_CLASS is None:
from importlib.metadata import Distribution # noqa: PLC0415
class VersionedDistribution(Distribution):
def __init__(self, file_loader, dist_path) -> None:
self.file_loader = file_loader
self.dist_path = dist_path
def read_text(self, filename):
return self.file_loader(self.locate_file(filename)).decode("utf-8")
def locate_file(self, path):
return os.path.join(self.dist_path, path)
_VER_DISTRIBUTION_CLASS = VersionedDistribution
return _VER_DISTRIBUTION_CLASS
class VersionedFindLoad(VersionPlatformSelect, SourceLoader):
def find_spec(self, fullname, path, target=None): # noqa: ARG002
zip_path = self.find_mod(fullname)
if zip_path is not None:
return spec_from_file_location(name=fullname, loader=self)
return None
def module_repr(self, module):
raise NotImplementedError
def run():
with VersionedFindLoad() as finder:
sys.meta_path.insert(0, finder)
finder._register_distutils_finder() # noqa: SLF001
from virtualenv.__main__ import run as run_virtualenv # noqa: PLC0415, PLC2701
run_virtualenv()
if __name__ == "__main__":
run()
|