1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
import functools
import logging
from pip._vendor import six
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible
from pip._vendor.resolvelib import Resolver as RLResolver
from pip._internal.exceptions import InstallationError
from pip._internal.req.req_set import RequirementSet
from pip._internal.resolution.base import BaseResolver
from pip._internal.resolution.resolvelib.provider import PipProvider
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from .factory import Factory
if MYPY_CHECK_RUNNING:
from typing import Dict, List, Optional, Tuple
from pip._vendor.resolvelib.resolvers import Result
from pip._internal.cache import WheelCache
from pip._internal.index.package_finder import PackageFinder
from pip._internal.operations.prepare import RequirementPreparer
from pip._internal.req.req_install import InstallRequirement
from pip._internal.resolution.base import InstallRequirementProvider
logger = logging.getLogger(__name__)
class Resolver(BaseResolver):
def __init__(
self,
preparer, # type: RequirementPreparer
finder, # type: PackageFinder
wheel_cache, # type: Optional[WheelCache]
make_install_req, # type: InstallRequirementProvider
use_user_site, # type: bool
ignore_dependencies, # type: bool
ignore_installed, # type: bool
ignore_requires_python, # type: bool
force_reinstall, # type: bool
upgrade_strategy, # type: str
py_version_info=None, # type: Optional[Tuple[int, ...]]
):
super(Resolver, self).__init__()
self.factory = Factory(
finder=finder,
preparer=preparer,
make_install_req=make_install_req,
force_reinstall=force_reinstall,
ignore_installed=ignore_installed,
ignore_requires_python=ignore_requires_python,
py_version_info=py_version_info,
)
self.ignore_dependencies = ignore_dependencies
self._result = None # type: Optional[Result]
def resolve(self, root_reqs, check_supported_wheels):
# type: (List[InstallRequirement], bool) -> RequirementSet
# FIXME: Implement constraints.
if any(r.constraint for r in root_reqs):
raise InstallationError("Constraints are not yet supported.")
provider = PipProvider(
factory=self.factory,
ignore_dependencies=self.ignore_dependencies,
)
reporter = BaseReporter()
resolver = RLResolver(provider, reporter)
requirements = [
self.factory.make_requirement_from_install_req(r)
for r in root_reqs
]
try:
self._result = resolver.resolve(requirements)
except ResolutionImpossible as e:
error = self.factory.get_installation_error(e)
if not error:
# TODO: This needs fixing, we need to look at the
# factory.get_installation_error infrastructure, as that
# doesn't really allow for the logger.critical calls I'm
# using here.
for req, parent in e.causes:
logger.critical(
"Could not find a version that satisfies " +
"the requirement " +
str(req) +
("" if parent is None else " (from {})".format(
parent.name
))
)
raise InstallationError(
"No matching distribution found for " +
", ".join([r.name for r, _ in e.causes])
)
raise
six.raise_from(error, e)
req_set = RequirementSet(check_supported_wheels=check_supported_wheels)
for candidate in self._result.mapping.values():
ireq = provider.get_install_requirement(candidate)
if ireq is None:
continue
ireq.should_reinstall = self.factory.should_reinstall(candidate)
req_set.add_named_requirement(ireq)
return req_set
def get_installation_order(self, req_set):
# type: (RequirementSet) -> List[InstallRequirement]
"""Create a list that orders given requirements for installation.
The returned list should contain all requirements in ``req_set``,
so the caller can loop through it and have a requirement installed
before the requiring thing.
The current implementation walks the resolved dependency graph, and
make sure every node has a greater "weight" than all its parents.
"""
assert self._result is not None, "must call resolve() first"
weights = {} # type: Dict[Optional[str], int]
graph = self._result.graph
key_count = len(self._result.mapping) + 1 # Packages plus sentinal.
while len(weights) < key_count:
progressed = False
for key in graph:
if key in weights:
continue
parents = list(graph.iter_parents(key))
if not all(p in weights for p in parents):
continue
if parents:
weight = max(weights[p] for p in parents) + 1
else:
weight = 0
weights[key] = weight
progressed = True
# FIXME: This check will fail if there are unbreakable cycles.
# Implement something to forcifully break them up to continue.
if not progressed:
raise InstallationError(
"Could not determine installation order due to cicular "
"dependency."
)
sorted_items = sorted(
req_set.requirements.items(),
key=functools.partial(_req_set_item_sorter, weights=weights),
reverse=True,
)
return [ireq for _, ireq in sorted_items]
def _req_set_item_sorter(
item, # type: Tuple[str, InstallRequirement]
weights, # type: Dict[Optional[str], int]
):
# type: (...) -> Tuple[int, str]
"""Key function used to sort install requirements for installation.
Based on the "weight" mapping calculated in ``get_installation_order()``.
The canonical package name is returned as the second member as a tie-
breaker to ensure the result is predictable, which is useful in tests.
"""
name = canonicalize_name(item[0])
return weights[name], name
|