File: resolver.py

package info (click to toggle)
electrum 4.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 35,248 kB
  • sloc: python: 222,785; sh: 165; java: 73; javascript: 10; makefile: 9
file content (174 lines) | stat: -rw-r--r-- 6,673 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import functools
import logging

from pip._vendor import six
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible
from pip._vendor.resolvelib import Resolver as RLResolver

from pip._internal.exceptions import InstallationError
from pip._internal.req.req_set import RequirementSet
from pip._internal.resolution.base import BaseResolver
from pip._internal.resolution.resolvelib.provider import PipProvider
from pip._internal.utils.typing import MYPY_CHECK_RUNNING

from .factory import Factory

if MYPY_CHECK_RUNNING:
    from typing import Dict, List, Optional, Tuple

    from pip._vendor.resolvelib.resolvers import Result

    from pip._internal.cache import WheelCache
    from pip._internal.index.package_finder import PackageFinder
    from pip._internal.operations.prepare import RequirementPreparer
    from pip._internal.req.req_install import InstallRequirement
    from pip._internal.resolution.base import InstallRequirementProvider


logger = logging.getLogger(__name__)


class Resolver(BaseResolver):
    def __init__(
        self,
        preparer,  # type: RequirementPreparer
        finder,  # type: PackageFinder
        wheel_cache,  # type: Optional[WheelCache]
        make_install_req,  # type: InstallRequirementProvider
        use_user_site,  # type: bool
        ignore_dependencies,  # type: bool
        ignore_installed,  # type: bool
        ignore_requires_python,  # type: bool
        force_reinstall,  # type: bool
        upgrade_strategy,  # type: str
        py_version_info=None,  # type: Optional[Tuple[int, ...]]
    ):
        super(Resolver, self).__init__()
        self.factory = Factory(
            finder=finder,
            preparer=preparer,
            make_install_req=make_install_req,
            force_reinstall=force_reinstall,
            ignore_installed=ignore_installed,
            ignore_requires_python=ignore_requires_python,
            py_version_info=py_version_info,
        )
        self.ignore_dependencies = ignore_dependencies
        self._result = None  # type: Optional[Result]

    def resolve(self, root_reqs, check_supported_wheels):
        # type: (List[InstallRequirement], bool) -> RequirementSet

        # FIXME: Implement constraints.
        if any(r.constraint for r in root_reqs):
            raise InstallationError("Constraints are not yet supported.")

        provider = PipProvider(
            factory=self.factory,
            ignore_dependencies=self.ignore_dependencies,
        )
        reporter = BaseReporter()
        resolver = RLResolver(provider, reporter)

        requirements = [
            self.factory.make_requirement_from_install_req(r)
            for r in root_reqs
        ]

        try:
            self._result = resolver.resolve(requirements)

        except ResolutionImpossible as e:
            error = self.factory.get_installation_error(e)
            if not error:
                # TODO: This needs fixing, we need to look at the
                # factory.get_installation_error infrastructure, as that
                # doesn't really allow for the logger.critical calls I'm
                # using here.
                for req, parent in e.causes:
                    logger.critical(
                        "Could not find a version that satisfies " +
                        "the requirement " +
                        str(req) +
                        ("" if parent is None else " (from {})".format(
                            parent.name
                        ))
                    )
                raise InstallationError(
                    "No matching distribution found for " +
                    ", ".join([r.name for r, _ in e.causes])
                )
                raise
            six.raise_from(error, e)

        req_set = RequirementSet(check_supported_wheels=check_supported_wheels)
        for candidate in self._result.mapping.values():
            ireq = provider.get_install_requirement(candidate)
            if ireq is None:
                continue
            ireq.should_reinstall = self.factory.should_reinstall(candidate)
            req_set.add_named_requirement(ireq)

        return req_set

    def get_installation_order(self, req_set):
        # type: (RequirementSet) -> List[InstallRequirement]
        """Create a list that orders given requirements for installation.

        The returned list should contain all requirements in ``req_set``,
        so the caller can loop through it and have a requirement installed
        before the requiring thing.

        The current implementation walks the resolved dependency graph, and
        make sure every node has a greater "weight" than all its parents.
        """
        assert self._result is not None, "must call resolve() first"
        weights = {}  # type: Dict[Optional[str], int]

        graph = self._result.graph
        key_count = len(self._result.mapping) + 1  # Packages plus sentinal.
        while len(weights) < key_count:
            progressed = False
            for key in graph:
                if key in weights:
                    continue
                parents = list(graph.iter_parents(key))
                if not all(p in weights for p in parents):
                    continue
                if parents:
                    weight = max(weights[p] for p in parents) + 1
                else:
                    weight = 0
                weights[key] = weight
                progressed = True

            # FIXME: This check will fail if there are unbreakable cycles.
            # Implement something to forcifully break them up to continue.
            if not progressed:
                raise InstallationError(
                    "Could not determine installation order due to cicular "
                    "dependency."
                )

        sorted_items = sorted(
            req_set.requirements.items(),
            key=functools.partial(_req_set_item_sorter, weights=weights),
            reverse=True,
        )
        return [ireq for _, ireq in sorted_items]


def _req_set_item_sorter(
    item,     # type: Tuple[str, InstallRequirement]
    weights,  # type: Dict[Optional[str], int]
):
    # type: (...) -> Tuple[int, str]
    """Key function used to sort install requirements for installation.

    Based on the "weight" mapping calculated in ``get_installation_order()``.
    The canonical package name is returned as the second member as a tie-
    breaker to ensure the result is predictable, which is useful in tests.
    """
    name = canonicalize_name(item[0])
    return weights[name], name