1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
|
#!/usr/bin/python3
# vim:set fileencoding=utf-8 et ts=4 sts=4 sw=4:
#
# apt-listchanges - Show changelog entries between the installed versions
# of a set of packages and the versions contained in
# corresponding .deb files
#
# Copyright (C) 2000-2006 Matt Zimmerman <mdz@debian.org>
# Copyright (C) 2006 Pierre Habouzit <madcoder@debian.org>
# Copyright (C) 2016-2019 Robert Luberda <robert@debian.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
#
import os
import os.path
import re
import sys
import signal
import subprocess
import tempfile
import traceback
import apt_pkg
from apt_listchanges import ALCApt
from apt_listchanges import ALCConfig
from apt_listchanges import ALCLog
from apt_listchanges.ALCSeenDb import SeenDb, DbError
from apt_listchanges.ALChacks import _
from apt_listchanges.DebianFiles import Changes, ControlParser, Package
from apt_listchanges import frontends
from apt_listchanges.snapshot import Snapshot
class Filterer:
'''Filterer objects need to have the following methods:
reset(pkg, installed=None) -- Reset the filterer for a new log file in the
the package, optionally specifying the control stanza of the installed
version of the same package. If installed is specified then the filterer
does version-number-based filtering.
stop(entry) -- Return True to discard the specified entry and stop parsing
the current changelog
filter(entry) -- Return True to discard the current entry
Both stop() and filter() may be called with None, in which case they should
return False.
stop() is always called before filter() for any particular entry.'''
# Regular expressions for the names of packages for which we should trust
# that the version numbers in the changelog are properly ordered and
# consistent, so we can stop as soon as version_compare says to, even if we
# haven't yet seen our exact version in the changelog. This is only
# necessary because some packages mess around with the version number
# formats in their changelogs so they don't match the package version
# numbers exactly.
trust_version_packages = [re.compile(r) for r in (r'^linux-image-',)]
def __init__(self, config, seen_db, show_all=None, since=None,
latest=None):
'''keyword args override the values in config'''
self.show_all = config.show_all if show_all is None else show_all
self.since = config.since if since is None else since
self.latest = config.latest if latest is None else latest
self.seen_db = seen_db
self.accepted = 0
self.filtered = 0
self.saw_us = False
self.pkg = None
self.installed = None
def reset(self, pkg, installed=None):
self.pkg = pkg
self.installed = installed
self.accepted = 0
self.filtered = 0
self.saw_us = any(r.match(pkg.binary) or r.match(pkg.source)
for r in self.trust_version_packages)
@staticmethod
def _drop_binnmu_suffix(version):
pos = version.rfind('+')
if pos != -1 and len(version) in range(pos+3, pos+7) and \
version[pos+1] == 'b':
return version[:pos]
return version
def version_stop(self, entry):
if not self.installed:
return False
if entry.package == self.pkg.binary:
which = 'binary'
elif entry.package == self.pkg.source:
which = 'source'
else:
return False
if not self.saw_us:
our_version = self.pkg.version if which == 'binary' \
else self.pkg.source_version
if self._drop_binnmu_suffix(our_version) == entry.version:
self.saw_us = True
return False
cutoff_version = self.installed.version if which == 'binary' \
else self.installed.source_version
# No need to drop binnmu suffix here because x.x is less than x.x+b1
return apt_pkg.version_compare(entry.version, cutoff_version) <= 0
def stop(self, entry):
'''True means to stop, False means not to'''
if entry is None:
return False
if self.latest and self.accepted >= self.latest:
self.filtered += 1
return True
if self.since and \
apt_pkg.version_compare(entry.version, self.since) <= 0:
self.filtered += 1
return True
if self.seen_db.seen_here(entry):
self.filtered += 1
return True
if self.version_stop(entry):
self.filtered += 1
return True
return False
# Note that this function adds entries to the DB as a side effect!
def filter(self, entry):
'''True means to filter, False means not to'''
if entry is None:
return False # pragma: no cover
if self.show_all:
self.accepted += 1
return False
if self.seen_db.seen_here(entry):
self.filtered += 1
return True
do_filter = self.seen_db.seen_anywhere(entry, exact=False)
# It could have been in a different changelog file, but we need to make
# sure it's recorded in entry's file as well.
self.seen_db.add(entry)
if do_filter:
self.filtered += 1
else:
self.accepted += 1
return do_filter
def doit(config, args=None, force_db: bool = False):
'''force_db is only used by unit tests. Once pytest-subprocess is packaged
and we can use it on our tests, then we'll be able to simplify the tests
and eliminate the need for force_db.'''
apt_pkg.init()
config.setup(args)
ALCLog.set_debug(config.debug)
ALCLog.debug(_("Enabled debug output"))
with Snapshot(config) as snapshot:
return doit_with_snapshot(config, snapshot, force_db)
def doit_with_snapshot(config, snapshot, force_db: bool = False) -> None:
debs = config.debs
if config.save_seen:
snapshot.add_file(config.save_seen, 'db-before')
if config.dump_seen:
SeenDb(config.save_seen).dump()
sys.exit(0)
if config.apt_mode: # pragma: no cover
debs = ALCApt.AptPipeline(config).read()
if not debs:
sys.exit(0)
snapshot.add_data('\n'.join(debs), 'deb-list')
# Force quiet (loggable) mode if not running interactively
if not sys.stdout.isatty() and not config.quiet:
config.quiet = 1
try:
frontend = frontends.make_frontend(config, len(debs))
except frontends.EUnknownFrontend:
ALCLog.error(_("Unknown frontend: %s") % config.frontend)
sys.exit(1)
if frontend is None:
sys.exit(0)
if frontend.needs_tty_stdin() and not sys.stdin.isatty():
try:
# Give any forked processes (eg. lynx) a normal stdin;
# See Debian Bug #343423. (Note: with $APT_HOOK_INFO_FD
# support introduced in version 3.2, stdin should point to
# a terminal already, so there should be no need to reopen it).
with open('/dev/tty', 'rb+', buffering=0) as tty:
os.close(0)
os.dup2(tty.fileno(), 0)
except Exception as ex:
ALCLog.warning(_("Cannot reopen /dev/tty for stdin: %s") % str(ex))
if not force_db and (config.show_all or config.since or config.latest):
seen_db = SeenDb(None)
else:
seen_db = SeenDb(config.save_seen)
filterer = Filterer(config, seen_db)
pkgs = [Package(deb, filterer) for deb in debs]
news, changes = process_pkgs(config, frontend, seen_db, pkgs,
snapshot=snapshot)
snapshot.add_data(news or '', 'news')
snapshot.add_data(changes or '', 'changes')
# Assume the user "confirmed" if there wasn't actually anything to display
# so that any database seeding we did gets saved to the database.
confirmed = not (news or changes)
try:
if news or changes:
_display(frontend, news,
lambda: _('apt-listchanges: News'))
_display(frontend, changes,
lambda: _('apt-listchanges: Changelogs'))
frontends.confirm_or_exit(config, frontend)
confirmed = True
if frontends.can_send_emails(config):
hostname = subprocess.getoutput('hostname')
_send_email(
config, news,
lambda: _("apt-listchanges: news for %s") % hostname)
_send_email(
config, changes,
lambda: _("apt-listchanges: changelogs for %s") % hostname)
finally:
# Write out seen db
if confirmed:
seen_db.apply_changes()
if config.save_seen:
snapshot.add_file(config.save_seen, 'db-after')
elif config.save_seen:
with tempfile.NamedTemporaryFile() as f:
seen_db.save_as(f.name, force=True)
snapshot.add_file(f.name, 'db-after')
def _display(frontend, changes, title_getter):
if changes:
frontend.set_title(title_getter())
frontend.display_output(changes)
def _send_email(config, changes, subject_getter):
if changes:
frontends.mail_changes(config, changes, subject_getter())
def _setup_signals():
def signal_handler(signum, frame): # pylint: disable=unused-argument
ALCLog.error(_('Received signal %d, exiting') % signum)
sys.exit(frontends.BREAK_APT_EXIT_CODE)
for s in [signal.SIGHUP, signal.SIGQUIT, signal.SIGTERM]:
signal.signal(s, signal_handler)
def main():
_setup_signals()
config = ALCConfig.ALCConfig()
try:
doit(config)
except KeyboardInterrupt:
sys.exit(frontends.BREAK_APT_EXIT_CODE)
except ALCApt.AptPipelineError as ex:
ALCLog.error(str(ex))
sys.exit(frontends.BREAK_APT_EXIT_CODE)
except DbError as ex:
ALCLog.error(str(ex))
sys.exit(1)
except Exception:
traceback.print_exc()
frontends.confirm_or_exit(
config, frontends.ttyconfirm(config))
sys.exit(1)
def process_pkgs(config, frontend, seen_db, pkgs, snapshot=None):
all_news = Changes()
all_changelogs = Changes()
notes = []
status = None
if not (config.show_all or config.since or config.latest):
dpkg_status = apt_pkg.config.find_file('Dir::State::status')
if snapshot:
snapshot.add_file(dpkg_status, 'dpkg-status')
status = ControlParser()
status.readfile(dpkg_status)
status.makeindex('Package')
for pkg in pkgs:
(news, changelogs) = process_pkg(
config, seen_db, status, notes, pkg)
if news:
ALCLog.debug(f'Got news for {pkg.path}')
all_news += news
if changelogs:
ALCLog.debug(f'Got changelogs for {pkg.path}')
all_changelogs += changelogs
frontend.update_progress()
frontend.progress_done()
if config.reverse:
all_news.reverse()
all_changelogs.reverse()
news = join_changes(all_news, config.headers,
lambda package: _('News for %s') % package)
changes = join_changes(all_changelogs, config.headers,
lambda package: _('Changes for %s') % package)
binnmus = join_binnmus(all_changelogs)
if binnmus:
if changes:
changes += '\n\n' + binnmus
else:
changes = binnmus
if config.verbose and notes:
joined_notes = _("Informational notes") + ":\n\n" + '\n'.join(notes)
if config.which == "news":
news += joined_notes
else:
changes += joined_notes
return news, changes
def process_pkg(config, seen_db, status, notes, pkg):
status_entry = None
if not (config.show_all or config.since or config.latest):
status_entries = status.find('Package', pkg.binary) or []
try:
status_entry = next(
s for s in status_entries if s.arch == pkg.arch)
except StopIteration:
status_entry = None
if not getattr(status_entry, 'installed', False):
notes.append(_("%s: will be newly installed") % pkg.binary)
return (None, None)
if not seen_db.has_package(pkg.binary):
seen_db.add_package(pkg.binary)
seed_filterer = Filterer(config, seen_db, show_all=False,
since=False, latest=10)
# We don't care about what this returns, it's just seeding the DB
pkg.extract_changes_via_installed('both', seed_filterer)
# It would be great if we could check if the command above returns
# nothing, and if so and no_network is False, fetch the changelog
# entries for the installed version of the package from apt and
# use them to seed the database. However, Debian repositories
# typically only include data for the most recent version of the
# package in the index, so if we're installing a new version of a
# package then it's unlikely that the changelog for the installed
# version is available on the network, so there's no point in
# wasting the time and network bandwidth trying to fetch it.
(news, changelog) = pkg.extract_changes(config.which,
installed=status_entry)
if not config.no_network and config.which != "news" and not changelog:
changelog = pkg.extract_changes_via_apt(installed=status_entry)
return (news if news and news.entries else None,
changelog if changelog and
(changelog.entries or changelog.binnmus) else None)
def join_changes(all_changes, show_headers, header_package_getter):
if not show_headers:
return all_changes.changes
changes = ''
package = None
for entry in all_changes.entries:
if entry.package != package:
package = entry.package
changes += f'--- {header_package_getter(package)} ---\n\n'
changes += str(entry) + '\n\n'
return changes
def join_binnmus(all_binnmus):
by_content = {}
for entry in all_binnmus.binnmus:
by_content.setdefault(entry.content, []).append(entry)
binnmus = ''
for content, entries in by_content.items():
pkgs = '--- ' + _('Binary NMU of')
sep = ': '
lastlen = len(pkgs)
for entry in entries:
hdr = entry.header
idx = hdr.find(')')
if idx >= 0:
hdr = hdr[:idx+1]
# manually wrap the package lines
pkgs += sep
lastlen += len(sep)
sep = ', '
if lastlen + len(hdr) > 75:
pkgs += '\n '
lastlen = 1
pkgs += hdr
lastlen += len(hdr)
binnmus += pkgs + '\n\n' + content + '\n\n'
return binnmus
if __name__ == '__main__':
main()
|