1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
|
import sys
from email.message import EmailMessage
from email.parser import BytesParser
from io import BytesIO
from operator import attrgetter
from platform import python_version
from urllib.parse import urlparse
from zipfile import ZipFile
import html5lib
import requests
from packaging.requirements import Requirement
from packaging.specifiers import SpecifierSet
from packaging.utils import canonicalize_name
from packaging.version import InvalidVersion, Version
from resolvelib import BaseReporter, Resolver
from .extras_provider import ExtrasProvider
PYTHON_VERSION = Version(python_version())
class Candidate:
def __init__(self, name, version, url=None, extras=None):
self.name = canonicalize_name(name)
self.version = version
self.url = url
self.extras = extras
self._metadata = None
self._dependencies = None
def __repr__(self):
if not self.extras:
return f"<{self.name}=={self.version}>"
return f"<{self.name}[{','.join(self.extras)}]=={self.version}>"
@property
def metadata(self):
if self._metadata is None:
self._metadata = get_metadata_for_wheel(self.url)
return self._metadata
@property
def requires_python(self):
return self.metadata.get("Requires-Python")
def _get_dependencies(self):
deps = self.metadata.get_all("Requires-Dist", [])
extras = self.extras if self.extras else [""]
for d in deps:
r = Requirement(d)
if r.marker is None:
yield r
else:
for e in extras:
if r.marker.evaluate({"extra": e}):
yield r
@property
def dependencies(self):
if self._dependencies is None:
self._dependencies = list(self._get_dependencies())
return self._dependencies
def get_project_from_pypi(project, extras):
"""Return candidates created from the project name and extras."""
url = f"https://pypi.org/simple/{project}"
data = requests.get(url).content
doc = html5lib.parse(data, namespaceHTMLElements=False)
for i in doc.findall(".//a"):
url = i.attrib["href"]
py_req = i.attrib.get("data-requires-python")
# Skip items that need a different Python version
if py_req:
spec = SpecifierSet(py_req)
if PYTHON_VERSION not in spec:
continue
path = urlparse(url).path
filename = path.rpartition("/")[-1]
# We only handle wheels
if not filename.endswith(".whl"):
continue
# TODO: Handle compatibility tags?
# Very primitive wheel filename parsing
name, version = filename[:-4].split("-")[:2]
try:
version = Version(version)
except InvalidVersion:
# Ignore files with invalid versions
continue
yield Candidate(name, version, url=url, extras=extras)
def get_metadata_for_wheel(url):
data = requests.get(url).content
with ZipFile(BytesIO(data)) as z:
for n in z.namelist():
if n.endswith(".dist-info/METADATA"):
p = BytesParser()
return p.parse(z.open(n), headersonly=True)
# If we didn't find the metadata, return an empty dict
return EmailMessage()
class PyPIProvider(ExtrasProvider):
def identify(self, requirement_or_candidate):
return canonicalize_name(requirement_or_candidate.name)
def get_extras_for(self, requirement_or_candidate):
# Extras is a set, which is not hashable
return tuple(sorted(requirement_or_candidate.extras))
def get_base_requirement(self, candidate):
return Requirement(f"{candidate.name}=={candidate.version}")
def get_preference(self, identifier, resolutions, candidates, information):
return sum(1 for _ in candidates[identifier])
def find_matches(self, identifier, requirements, incompatibilities):
requirements = list(requirements[identifier])
assert not any(
r.extras for r in requirements
), "extras not supported in this example"
bad_versions = {c.version for c in incompatibilities[identifier]}
# Need to pass the extras to the search, so they
# are added to the candidate at creation - we
# treat candidates as immutable once created.
candidates = (
candidate
for candidate in get_project_from_pypi(identifier, set())
if candidate.version not in bad_versions
and all(candidate.version in r.specifier for r in requirements)
)
return sorted(candidates, key=attrgetter("version"), reverse=True)
def is_satisfied_by(self, requirement, candidate):
if canonicalize_name(requirement.name) != candidate.name:
return False
return candidate.version in requirement.specifier
def get_dependencies(self, candidate):
return candidate.dependencies
def display_resolution(result):
"""Print pinned candidates and dependency graph to stdout."""
print("\n--- Pinned Candidates ---")
for name, candidate in result.mapping.items():
print(f"{name}: {candidate.name} {candidate.version}")
print("\n--- Dependency Graph ---")
for name in result.graph:
targets = ", ".join(result.graph.iter_children(name))
print(f"{name} -> {targets}")
def main():
"""Resolve requirements as project names on PyPI.
The requirements are taken as command-line arguments
and the resolution result will be printed to stdout.
"""
if len(sys.argv) == 1:
print("Usage:", sys.argv[0], "<PyPI project name(s)>")
return
# Things I want to resolve.
reqs = sys.argv[1:]
requirements = [Requirement(r) for r in reqs]
# Create the (reusable) resolver.
provider = PyPIProvider()
reporter = BaseReporter()
resolver = Resolver(provider, reporter)
# Kick off the resolution process, and get the final result.
print("Resolving", ", ".join(reqs))
result = resolver.resolve(requirements)
display_resolution(result)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print()
|