1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
|
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import absolute_import, print_function
import itertools
import os
import site
import sys
import uuid
from pkg_resources import (
DistributionNotFound,
Environment,
Requirement,
WorkingSet,
find_distributions
)
from .common import die, open_zip, rename_if_empty, safe_mkdir, safe_rmtree
from .interpreter import PythonInterpreter
from .package import distribution_compatible
from .pex_builder import PEXBuilder
from .pex_info import PexInfo
from .platforms import Platform
from .tracer import TRACER
from .util import CacheHelper, DistributionHelper
class PEXEnvironment(Environment):
@classmethod
def force_local(cls, pex, pex_info):
if pex_info.code_hash is None:
# Do not support force_local if code_hash is not set. (It should always be set.)
return pex
explode_dir = os.path.join(pex_info.zip_unsafe_cache, pex_info.code_hash)
TRACER.log('PEX is not zip safe, exploding to %s' % explode_dir)
if not os.path.exists(explode_dir):
explode_tmp = explode_dir + '.' + uuid.uuid4().hex
with TRACER.timed('Unzipping %s' % pex):
try:
safe_mkdir(explode_tmp)
with open_zip(pex) as pex_zip:
pex_files = (x for x in pex_zip.namelist()
if not x.startswith(PEXBuilder.BOOTSTRAP_DIR) and
not x.startswith(PexInfo.INTERNAL_CACHE))
pex_zip.extractall(explode_tmp, pex_files)
except: # noqa: T803
safe_rmtree(explode_tmp)
raise
TRACER.log('Renaming %s to %s' % (explode_tmp, explode_dir))
rename_if_empty(explode_tmp, explode_dir)
return explode_dir
@classmethod
def update_module_paths(cls, new_code_path):
# Force subsequent imports to come from the .pex directory rather than the .pex file.
TRACER.log('Adding to the head of sys.path: %s' % new_code_path)
sys.path.insert(0, new_code_path)
for name, module in sys.modules.items():
if hasattr(module, '__path__'):
module_dir = os.path.join(new_code_path, *name.split("."))
TRACER.log('Adding to the head of %s.__path__: %s' % (module.__name__, module_dir))
try:
module.__path__.insert(0, module_dir)
except AttributeError:
# TODO: This is a temporary bandaid for an unhandled AttributeError which results
# in a startup crash. See https://github.com/pantsbuild/pex/issues/598 for more info.
TRACER.log(
'Failed to insert %s: %s.__path__ of type %s does not support insertion!' % (
module_dir,
module.__name__,
type(module.__path__)
)
)
@classmethod
def write_zipped_internal_cache(cls, pex, pex_info):
prefix_length = len(pex_info.internal_cache) + 1
existing_cached_distributions = []
newly_cached_distributions = []
zip_safe_distributions = []
with open_zip(pex) as zf:
# Distribution names are the first element after ".deps/" and before the next "/"
distribution_names = set(filter(None, (filename[prefix_length:].split('/')[0]
for filename in zf.namelist() if filename.startswith(pex_info.internal_cache))))
# Create Distribution objects from these, and possibly write to disk if necessary.
for distribution_name in distribution_names:
internal_dist_path = '/'.join([pex_info.internal_cache, distribution_name])
# First check if this is already cached
dist_digest = pex_info.distributions.get(distribution_name) or CacheHelper.zip_hash(
zf, internal_dist_path)
cached_location = os.path.join(pex_info.install_cache, '%s.%s' % (
distribution_name, dist_digest))
if os.path.exists(cached_location):
dist = DistributionHelper.distribution_from_path(cached_location)
if dist is not None:
existing_cached_distributions.append(dist)
continue
else:
dist = DistributionHelper.distribution_from_path(os.path.join(pex, internal_dist_path))
if dist is not None:
if DistributionHelper.zipsafe(dist) and not pex_info.always_write_cache:
zip_safe_distributions.append(dist)
continue
with TRACER.timed('Caching %s' % dist):
newly_cached_distributions.append(
CacheHelper.cache_distribution(zf, internal_dist_path, cached_location))
return existing_cached_distributions, newly_cached_distributions, zip_safe_distributions
@classmethod
def load_internal_cache(cls, pex, pex_info):
"""Possibly cache out the internal cache."""
internal_cache = os.path.join(pex, pex_info.internal_cache)
with TRACER.timed('Searching dependency cache: %s' % internal_cache, V=2):
if os.path.isdir(pex):
for dist in find_distributions(internal_cache):
yield dist
else:
for dist in itertools.chain(*cls.write_zipped_internal_cache(pex, pex_info)):
yield dist
def __init__(self, pex, pex_info, interpreter=None, **kw):
self._internal_cache = os.path.join(pex, pex_info.internal_cache)
self._pex = pex
self._pex_info = pex_info
self._activated = False
self._working_set = None
self._interpreter = interpreter or PythonInterpreter.get()
self._inherit_path = pex_info.inherit_path
self._supported_tags = []
platform = Platform.current()
platform_name = platform.platform
super(PEXEnvironment, self).__init__(
search_path=[] if pex_info.inherit_path == 'false' else sys.path,
# NB: Our pkg_resources.Environment base-class wants the platform name string and not the
# pex.platform.Platform object.
platform=platform_name,
**kw
)
self._target_interpreter_env = self._interpreter.identity.pkg_resources_env(platform_name)
self._supported_tags.extend(platform.supported_tags(self._interpreter))
TRACER.log(
'E: tags for %r x %r -> %s' % (self.platform, self._interpreter, self._supported_tags),
V=9
)
def update_candidate_distributions(self, distribution_iter):
for dist in distribution_iter:
if self.can_add(dist):
with TRACER.timed('Adding %s' % dist, V=2):
self.add(dist)
def can_add(self, dist):
return distribution_compatible(dist, self._supported_tags)
def activate(self):
if not self._activated:
with TRACER.timed('Activating PEX virtual environment from %s' % self._pex):
self._working_set = self._activate()
self._activated = True
return self._working_set
def _resolve(self, working_set, reqs):
reqs = reqs[:]
unresolved_reqs = set()
resolveds = set()
environment = self._target_interpreter_env.copy()
environment['extra'] = list(set(itertools.chain(*(req.extras for req in reqs))))
# Resolve them one at a time so that we can figure out which ones we need to elide should
# there be an interpreter incompatibility.
for req in reqs:
if req.marker and not req.marker.evaluate(environment=environment):
TRACER.log('Skipping activation of `%s` due to environment marker de-selection' % req)
continue
with TRACER.timed('Resolving %s' % req, V=2):
try:
resolveds.update(working_set.resolve([req], env=self))
except DistributionNotFound as e:
TRACER.log('Failed to resolve a requirement: %s' % e)
unresolved_reqs.add(e.args[0].project_name)
# Older versions of pkg_resources just call `DistributionNotFound(req)` instead of the
# modern `DistributionNotFound(req, requirers)` and so we may not have the 2nd requirers
# slot at all.
if len(e.args) >= 2 and e.args[1]:
unresolved_reqs.update(e.args[1])
unresolved_reqs = set([req.lower() for req in unresolved_reqs])
if unresolved_reqs:
TRACER.log('Unresolved requirements:')
for req in unresolved_reqs:
TRACER.log(' - %s' % req)
TRACER.log('Distributions contained within this pex:')
if not self._pex_info.distributions:
TRACER.log(' None')
else:
for dist in self._pex_info.distributions:
TRACER.log(' - %s' % dist)
if not self._pex_info.ignore_errors:
die(
'Failed to execute PEX file, missing %s compatible dependencies for:\n%s' % (
Platform.current(),
'\n'.join(str(r) for r in unresolved_reqs)
)
)
return resolveds
def _activate(self):
self.update_candidate_distributions(self.load_internal_cache(self._pex, self._pex_info))
if not self._pex_info.zip_safe and os.path.isfile(self._pex):
self.update_module_paths(self.force_local(self._pex, self._pex_info))
all_reqs = [Requirement.parse(req) for req in self._pex_info.requirements]
working_set = WorkingSet([])
resolved = self._resolve(working_set, all_reqs)
for dist in resolved:
with TRACER.timed('Activating %s' % dist, V=2):
working_set.add(dist)
if os.path.isdir(dist.location):
with TRACER.timed('Adding sitedir', V=2):
if dist.location not in sys.path and self._inherit_path == "fallback":
# Prepend location to sys.path.
# This ensures that bundled versions of libraries will be used before system-installed
# versions, in case something is installed in both, helping to favor hermeticity in
# the case of non-hermetic PEX files (i.e. those with inherit_path=True).
#
# If the path is not already in sys.path, site.addsitedir will append (not prepend)
# the path to sys.path. But if the path is already in sys.path, site.addsitedir will
# leave sys.path unmodified, but will do everything else it would do. This is not part
# of its advertised contract (which is very vague), but has been verified to be the
# case by inspecting its source for both cpython 2.7 and cpython 3.7.
sys.path.insert(0, dist.location)
site.addsitedir(dist.location)
dist.activate()
return working_set
|