1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
|
# -*- coding: utf-8 -*-
import json
import os
from distutils.version import LooseVersion
import pip
from django.core.management.base import BaseCommand, CommandError
from pip.req import parse_requirements
from django_extensions.management.color import color_style
from django_extensions.management.utils import signalcommand
try:
from urllib.parse import urlparse
from urllib.error import HTTPError
from urllib.request import Request, urlopen
from xmlrpc.client import ServerProxy
except ImportError:
# Python 2
from urlparse import urlparse
from urllib2 import HTTPError, Request, urlopen
from xmlrpclib import ServerProxy
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
class Command(BaseCommand):
help = "Scan pip requirement files for out-of-date packages."
def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument(
"-t", "--github-api-token", action="store",
dest="github_api_token", help="A github api authentication token."
)
parser.add_argument(
"-r", "--requirement", action="append", dest="requirements",
default=[], metavar="FILENAME",
help="Check all the packages listed in the given requirements "
"file. This option can be used multiple times."
),
parser.add_argument(
"-n", "--newer", action="store_true", dest="show_newer",
help="Also show when newer version then available is installed."
)
@signalcommand
def handle(self, *args, **options):
self.style = color_style()
self.options = options
if options["requirements"]:
req_files = options["requirements"]
elif os.path.exists("requirements.txt"):
req_files = ["requirements.txt"]
elif os.path.exists("requirements"):
req_files = ["requirements/{0}".format(f) for f in os.listdir("requirements")
if os.path.isfile(os.path.join("requirements", f)) and
f.lower().endswith(".txt")]
elif os.path.exists("requirements-dev.txt"):
req_files = ["requirements-dev.txt"]
elif os.path.exists("requirements-prod.txt"):
req_files = ["requirements-prod.txt"]
else:
raise CommandError("Requirements file(s) not found")
try:
from pip.download import PipSession
except ImportError:
raise CommandError("Pip version 6 or higher is required")
self.reqs = {}
with PipSession() as session:
for filename in req_files:
for req in parse_requirements(filename, session=session):
# url attribute changed to link in pip version 6.1.0 and above
if LooseVersion(pip.__version__) > LooseVersion('6.0.8'):
self.reqs[req.name] = {
"pip_req": req,
"url": req.link,
}
else:
self.reqs[req.name] = {
"pip_req": req,
"url": req.url,
}
if options["github_api_token"]:
self.github_api_token = options["github_api_token"]
elif os.environ.get("GITHUB_API_TOKEN"):
self.github_api_token = os.environ.get("GITHUB_API_TOKEN")
else:
self.github_api_token = None # only 50 requests per hour
self.check_pypi()
if HAS_REQUESTS:
self.check_github()
else:
print(self.style.ERROR("Cannot check github urls. The requests library is not installed. ( pip install requests )"))
self.check_other()
def _urlopen_as_json(self, url, headers=None):
"""Shorcut for return contents as json"""
req = Request(url, headers=headers)
return json.loads(urlopen(req).read())
def check_pypi(self):
"""
If the requirement is frozen to pypi, check for a new version.
"""
for dist in pip.get_installed_distributions():
name = dist.project_name
if name in self.reqs.keys():
self.reqs[name]["dist"] = dist
pypi = ServerProxy("https://pypi.python.org/pypi")
for name, req in list(self.reqs.items()):
if req["url"]:
continue # skipping github packages.
elif "dist" in req:
dist = req["dist"]
dist_version = LooseVersion(dist.version)
available = pypi.package_releases(req["pip_req"].name)
try:
available_version = LooseVersion(available[0])
except IndexError:
available_version = None
if not available_version:
msg = self.style.WARN("release is not on pypi (check capitalization and/or --extra-index-url)")
elif self.options['show_newer'] and dist_version > available_version:
msg = self.style.INFO("{0} available (newer installed)".format(available_version))
elif available_version > dist_version:
msg = self.style.INFO("{0} available".format(available_version))
else:
msg = "up to date"
del self.reqs[name]
continue
pkg_info = self.style.BOLD("{dist.project_name} {dist.version}".format(dist=dist))
else:
msg = "not installed"
pkg_info = name
print("{pkg_info:40} {msg}".format(pkg_info=pkg_info, msg=msg))
del self.reqs[name]
def check_github(self):
"""
If the requirement is frozen to a github url, check for new commits.
API Tokens
----------
For more than 50 github api calls per hour, pipchecker requires
authentication with the github api by settings the environemnt
variable ``GITHUB_API_TOKEN`` or setting the command flag
--github-api-token='mytoken'``.
To create a github api token for use at the command line::
curl -u 'rizumu' -d '{"scopes":["repo"], "note":"pipchecker"}' https://api.github.com/authorizations
For more info on github api tokens:
https://help.github.com/articles/creating-an-oauth-token-for-command-line-use
http://developer.github.com/v3/oauth/#oauth-authorizations-api
Requirement Format
------------------
Pipchecker gets the sha of frozen repo and checks if it is
found at the head of any branches. If it is not found then
the requirement is considered to be out of date.
Therefore, freezing at the commit hash will provide the expected
results, but if freezing at a branch or tag name, pipchecker will
not be able to determine with certainty if the repo is out of date.
Freeze at the commit hash (sha)::
git+git://github.com/django/django.git@393c268e725f5b229ecb554f3fac02cfc250d2df#egg=Django
Freeze with a branch name::
git+git://github.com/django/django.git@master#egg=Django
Freeze with a tag::
git+git://github.com/django/django.git@1.5b2#egg=Django
Do not freeze::
git+git://github.com/django/django.git#egg=Django
"""
for name, req in list(self.reqs.items()):
req_url = req["url"]
if not req_url:
continue
req_url = str(req_url)
if req_url.startswith("git") and "github.com/" not in req_url:
continue
if req_url.endswith(".tar.gz") or req_url.endswith(".tar.bz2") or req_url.endswith(".zip"):
continue
headers = {
"content-type": "application/json",
}
if self.github_api_token:
headers["Authorization"] = "token {0}".format(self.github_api_token)
try:
user, repo = urlparse(req_url).path.split("#")[0].strip("/").rstrip("/").split("/")
except (ValueError, IndexError) as e:
print(self.style.ERROR("\nFailed to parse %r: %s\n" % (req_url, e)))
continue
try:
test_auth = requests.get("https://api.github.com/django/", headers=headers).json()
except HTTPError as e:
print("\n%s\n" % str(e))
return
if "message" in test_auth and test_auth["message"] == "Bad credentials":
print(self.style.ERROR("\nGithub API: Bad credentials. Aborting!\n"))
return
elif "message" in test_auth and test_auth["message"].startswith("API Rate Limit Exceeded"):
print(self.style.ERROR("\nGithub API: Rate Limit Exceeded. Aborting!\n"))
return
frozen_commit_sha = None
if ".git" in repo:
repo_name, frozen_commit_full = repo.split(".git")
if frozen_commit_full.startswith("@"):
frozen_commit_sha = frozen_commit_full[1:]
elif "@" in repo:
repo_name, frozen_commit_sha = repo.split("@")
if frozen_commit_sha is None:
msg = self.style.ERROR("repo is not frozen")
if frozen_commit_sha:
branch_url = "https://api.github.com/repos/{0}/{1}/branches".format(user, repo_name)
branch_data = requests.get(branch_url, headers=headers).json()
frozen_commit_url = "https://api.github.com/repos/{0}/{1}/commits/{2}".format(
user, repo_name, frozen_commit_sha
)
frozen_commit_data = requests.get(frozen_commit_url, headers=headers).json()
if "message" in frozen_commit_data and frozen_commit_data["message"] == "Not Found":
msg = self.style.ERROR("{0} not found in {1}. Repo may be private.".format(frozen_commit_sha[:10], name))
elif frozen_commit_sha in [branch["commit"]["sha"] for branch in branch_data]:
msg = self.style.BOLD("up to date")
else:
msg = self.style.INFO("{0} is not the head of any branch".format(frozen_commit_data["sha"][:10]))
if "dist" in req:
pkg_info = "{dist.project_name} {dist.version}".format(dist=req["dist"])
elif frozen_commit_sha is None:
pkg_info = name
else:
pkg_info = "{0} {1}".format(name, frozen_commit_sha[:10])
print("{pkg_info:40} {msg}".format(pkg_info=pkg_info, msg=msg))
del self.reqs[name]
def check_other(self):
"""
If the requirement is frozen somewhere other than pypi or github, skip.
If you have a private pypi or use --extra-index-url, consider contributing
support here.
"""
if self.reqs:
print(self.style.ERROR("\nOnly pypi and github based requirements are supported:"))
for name, req in self.reqs.items():
if "dist" in req:
pkg_info = "{dist.project_name} {dist.version}".format(dist=req["dist"])
elif "url" in req:
pkg_info = "{url}".format(url=req["url"])
else:
pkg_info = "unknown package"
print(self.style.BOLD("{pkg_info:40} is not a pypi or github requirement".format(pkg_info=pkg_info)))
|