File: apt_listchanges.py

package info (click to toggle)
apt-listchanges 4.8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,336 kB
  • sloc: python: 3,477; xml: 693; makefile: 167; sh: 71; perl: 61
file content (440 lines) | stat: -rwxr-xr-x 15,894 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
#!/usr/bin/python3
# vim:set fileencoding=utf-8 et ts=4 sts=4 sw=4:
#
#   apt-listchanges - Show changelog entries between the installed versions
#                     of a set of packages and the versions contained in
#                     corresponding .deb files
#
#   Copyright (C) 2000-2006  Matt Zimmerman  <mdz@debian.org>
#   Copyright (C) 2006       Pierre Habouzit <madcoder@debian.org>
#   Copyright (C) 2016-2019  Robert Luberda  <robert@debian.org>
#
#   This program is free software; you can redistribute it and/or modify
#   it under the terms of the GNU General Public License as published by
#   the Free Software Foundation; either version 2 of the License, or
#   (at your option) any later version.
#
#   This program is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with this program; if not, write to the Free Software
#   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
#

import os
import os.path
import re
import sys
import signal
import subprocess
import tempfile
import traceback

import apt_pkg

from apt_listchanges import ALCApt
from apt_listchanges import ALCConfig
from apt_listchanges import ALCLog
from apt_listchanges.ALCSeenDb import SeenDb, DbError
from apt_listchanges.ALChacks import _
from apt_listchanges.DebianFiles import Changes, ControlParser, Package
from apt_listchanges import frontends
from apt_listchanges.snapshot import Snapshot


class Filterer:
    '''Filterer objects need to have the following methods:
    reset(pkg, installed=None) -- Reset the filterer for a new log file in the
      the package, optionally specifying the control stanza of the installed
      version of the same package. If installed is specified then the filterer
      does version-number-based filtering.
    stop(entry) -- Return True to discard the specified entry and stop parsing
      the current changelog
    filter(entry) -- Return True to discard the current entry
    Both stop() and filter() may be called with None, in which case they should
    return False.
    stop() is always called before filter() for any particular entry.'''

    # Regular expressions for the names of packages for which we should trust
    # that the version numbers in the changelog are properly ordered and
    # consistent, so we can stop as soon as version_compare says to, even if we
    # haven't yet seen our exact version in the changelog. This is only
    # necessary because some packages mess around with the version number
    # formats in their changelogs so they don't match the package version
    # numbers exactly.
    trust_version_packages = [re.compile(r) for r in (r'^linux-image-',)]

    def __init__(self, config, seen_db, show_all=None, since=None,
                 latest=None):
        '''keyword args override the values in config'''
        self.show_all = config.show_all if show_all is None else show_all
        self.since = config.since if since is None else since
        self.latest = config.latest if latest is None else latest
        self.seen_db = seen_db
        self.accepted = 0
        self.filtered = 0
        self.saw_us = False
        self.pkg = None
        self.installed = None

    def reset(self, pkg, installed=None):
        self.pkg = pkg
        self.installed = installed
        self.accepted = 0
        self.filtered = 0
        self.saw_us = any(r.match(pkg.binary) or r.match(pkg.source)
                          for r in self.trust_version_packages)

    @staticmethod
    def _drop_binnmu_suffix(version):
        pos = version.rfind('+')
        if pos != -1 and len(version) in range(pos+3, pos+7) and \
           version[pos+1] == 'b':
            return version[:pos]
        return version

    def version_stop(self, entry):
        if not self.installed:
            return False
        if entry.package == self.pkg.binary:
            which = 'binary'
        elif entry.package == self.pkg.source:
            which = 'source'
        else:
            return False
        if not self.saw_us:
            our_version = self.pkg.version if which == 'binary' \
                else self.pkg.source_version
            if self._drop_binnmu_suffix(our_version) == entry.version:
                self.saw_us = True
            return False
        cutoff_version = self.installed.version if which == 'binary' \
            else self.installed.source_version
        # No need to drop binnmu suffix here because x.x is less than x.x+b1
        return apt_pkg.version_compare(entry.version, cutoff_version) <= 0

    def stop(self, entry):
        '''True means to stop, False means not to'''
        if entry is None:
            return False
        if self.latest and self.accepted >= self.latest:
            self.filtered += 1
            return True
        if self.since and \
           apt_pkg.version_compare(entry.version, self.since) <= 0:
            self.filtered += 1
            return True
        if self.seen_db.seen_here(entry):
            self.filtered += 1
            return True
        if self.version_stop(entry):
            self.filtered += 1
            return True
        return False

    # Note that this function adds entries to the DB as a side effect!
    def filter(self, entry):
        '''True means to filter, False means not to'''
        if entry is None:
            return False  # pragma: no cover
        if self.show_all:
            self.accepted += 1
            return False
        if self.seen_db.seen_here(entry):
            self.filtered += 1
            return True
        do_filter = self.seen_db.seen_anywhere(entry, exact=False)
        # It could have been in a different changelog file, but we need to make
        # sure it's recorded in entry's file as well.
        self.seen_db.add(entry)
        if do_filter:
            self.filtered += 1
        else:
            self.accepted += 1
        return do_filter


def doit(config, args=None, force_db: bool = False):
    '''force_db is only used by unit tests. Once pytest-subprocess is packaged
    and we can use it on our tests, then we'll be able to simplify the tests
    and eliminate the need for force_db.'''
    apt_pkg.init()
    config.setup(args)

    ALCLog.set_debug(config.debug)
    ALCLog.debug(_("Enabled debug output"))

    with Snapshot(config) as snapshot:
        return doit_with_snapshot(config, snapshot, force_db)


def doit_with_snapshot(config, snapshot, force_db: bool = False) -> None:
    debs = config.debs

    if config.save_seen:
        snapshot.add_file(config.save_seen, 'db-before')

    if config.dump_seen:
        SeenDb(config.save_seen).dump()
        sys.exit(0)

    if config.apt_mode:  # pragma: no cover
        debs = ALCApt.AptPipeline(config).read()
        if not debs:
            sys.exit(0)

    snapshot.add_data('\n'.join(debs), 'deb-list')

    # Force quiet (loggable) mode if not running interactively
    if not sys.stdout.isatty() and not config.quiet:
        config.quiet = 1

    try:
        frontend = frontends.make_frontend(config, len(debs))
    except frontends.EUnknownFrontend:
        ALCLog.error(_("Unknown frontend: %s") % config.frontend)
        sys.exit(1)

    if frontend is None:
        sys.exit(0)

    if frontend.needs_tty_stdin() and not sys.stdin.isatty():
        try:
            # Give any forked processes (eg. lynx) a normal stdin;
            # See Debian Bug #343423.  (Note: with $APT_HOOK_INFO_FD
            # support introduced in version 3.2, stdin should point to
            # a terminal already, so there should be no need to reopen it).
            with open('/dev/tty', 'rb+', buffering=0) as tty:
                os.close(0)
                os.dup2(tty.fileno(), 0)
        except Exception as ex:
            ALCLog.warning(_("Cannot reopen /dev/tty for stdin: %s") % str(ex))

    if not force_db and (config.show_all or config.since or config.latest):
        seen_db = SeenDb(None)
    else:
        seen_db = SeenDb(config.save_seen)

    filterer = Filterer(config, seen_db)
    pkgs = [Package(deb, filterer) for deb in debs]
    news, changes = process_pkgs(config, frontend, seen_db, pkgs,
                                 snapshot=snapshot)

    snapshot.add_data(news or '', 'news')
    snapshot.add_data(changes or '', 'changes')

    # Assume the user "confirmed" if there wasn't actually anything to display
    # so that any database seeding we did gets saved to the database.
    confirmed = not (news or changes)
    try:
        if news or changes:
            _display(frontend, news,
                     lambda: _('apt-listchanges: News'))
            _display(frontend, changes,
                     lambda: _('apt-listchanges: Changelogs'))

            frontends.confirm_or_exit(config, frontend)
            confirmed = True

            if frontends.can_send_emails(config):
                hostname = subprocess.getoutput('hostname')
                _send_email(
                    config, news,
                    lambda: _("apt-listchanges: news for %s") % hostname)
                _send_email(
                    config, changes,
                    lambda: _("apt-listchanges: changelogs for %s") % hostname)
    finally:
        # Write out seen db
        if confirmed:
            seen_db.apply_changes()
            if config.save_seen:
                snapshot.add_file(config.save_seen, 'db-after')
        elif config.save_seen:
            with tempfile.NamedTemporaryFile() as f:
                seen_db.save_as(f.name, force=True)
                snapshot.add_file(f.name, 'db-after')


def _display(frontend, changes, title_getter):
    if changes:
        frontend.set_title(title_getter())
        frontend.display_output(changes)


def _send_email(config, changes, subject_getter):
    if changes:
        frontends.mail_changes(config, changes, subject_getter())


def _setup_signals():
    def signal_handler(signum, frame):  # pylint: disable=unused-argument
        ALCLog.error(_('Received signal %d, exiting') % signum)
        sys.exit(frontends.BREAK_APT_EXIT_CODE)

    for s in [signal.SIGHUP, signal.SIGQUIT, signal.SIGTERM]:
        signal.signal(s, signal_handler)


def main():
    _setup_signals()
    config = ALCConfig.ALCConfig()
    try:
        doit(config)
    except KeyboardInterrupt:
        sys.exit(frontends.BREAK_APT_EXIT_CODE)
    except ALCApt.AptPipelineError as ex:
        ALCLog.error(str(ex))
        sys.exit(frontends.BREAK_APT_EXIT_CODE)
    except DbError as ex:
        ALCLog.error(str(ex))
        sys.exit(1)
    except Exception:
        traceback.print_exc()
        frontends.confirm_or_exit(
            config, frontends.ttyconfirm(config))
        sys.exit(1)


def process_pkgs(config, frontend, seen_db, pkgs, snapshot=None):
    all_news = Changes()
    all_changelogs = Changes()
    notes = []

    status = None
    if not (config.show_all or config.since or config.latest):
        dpkg_status = apt_pkg.config.find_file('Dir::State::status')
        if snapshot:
            snapshot.add_file(dpkg_status, 'dpkg-status')
        status = ControlParser()
        status.readfile(dpkg_status)
        status.makeindex('Package')

    for pkg in pkgs:
        (news, changelogs) = process_pkg(
            config, seen_db, status, notes, pkg)
        if news:
            ALCLog.debug(f'Got news for {pkg.path}')
            all_news += news
        if changelogs:
            ALCLog.debug(f'Got changelogs for {pkg.path}')
            all_changelogs += changelogs

        frontend.update_progress()

    frontend.progress_done()

    if config.reverse:
        all_news.reverse()
        all_changelogs.reverse()

    news = join_changes(all_news, config.headers,
                        lambda package: _('News for %s') % package)
    changes = join_changes(all_changelogs, config.headers,
                           lambda package: _('Changes for %s') % package)
    binnmus = join_binnmus(all_changelogs)

    if binnmus:
        if changes:
            changes += '\n\n' + binnmus
        else:
            changes = binnmus

    if config.verbose and notes:
        joined_notes = _("Informational notes") + ":\n\n" + '\n'.join(notes)
        if config.which == "news":
            news += joined_notes
        else:
            changes += joined_notes

    return news, changes


def process_pkg(config, seen_db, status, notes, pkg):
    status_entry = None
    if not (config.show_all or config.since or config.latest):
        status_entries = status.find('Package', pkg.binary) or []
        try:
            status_entry = next(
                s for s in status_entries if s.arch == pkg.arch)
        except StopIteration:
            status_entry = None
        if not getattr(status_entry, 'installed', False):
            notes.append(_("%s: will be newly installed") % pkg.binary)
            return (None, None)
        if not seen_db.has_package(pkg.binary):
            seen_db.add_package(pkg.binary)
            seed_filterer = Filterer(config, seen_db, show_all=False,
                                     since=False, latest=10)
            # We don't care about what this returns, it's just seeding the DB
            pkg.extract_changes_via_installed('both', seed_filterer)
            # It would be great if we could check if the command above returns
            # nothing, and if so and no_network is False, fetch the changelog
            # entries for the installed version of the package from apt and
            # use them to seed the database. However, Debian repositories
            # typically only include data for the most recent version of the
            # package in the index, so if we're installing a new version of a
            # package then it's unlikely that the changelog for the installed
            # version is available on the network, so there's no point in
            # wasting the time and network bandwidth trying to fetch it.
    (news, changelog) = pkg.extract_changes(config.which,
                                            installed=status_entry)
    if not config.no_network and config.which != "news" and not changelog:
        changelog = pkg.extract_changes_via_apt(installed=status_entry)

    return (news if news and news.entries else None,
            changelog if changelog and
            (changelog.entries or changelog.binnmus) else None)


def join_changes(all_changes, show_headers, header_package_getter):
    if not show_headers:
        return all_changes.changes

    changes = ''
    package = None
    for entry in all_changes.entries:
        if entry.package != package:
            package = entry.package
            changes += f'--- {header_package_getter(package)} ---\n\n'
        changes += str(entry) + '\n\n'

    return changes


def join_binnmus(all_binnmus):
    by_content = {}
    for entry in all_binnmus.binnmus:
        by_content.setdefault(entry.content, []).append(entry)

    binnmus = ''
    for content, entries in by_content.items():
        pkgs = '--- ' + _('Binary NMU of')
        sep = ': '
        lastlen = len(pkgs)
        for entry in entries:
            hdr = entry.header
            idx = hdr.find(')')
            if idx >= 0:
                hdr = hdr[:idx+1]

            # manually wrap the package lines
            pkgs += sep
            lastlen += len(sep)
            sep = ', '
            if lastlen + len(hdr) > 75:
                pkgs += '\n '
                lastlen = 1
            pkgs += hdr
            lastlen += len(hdr)

        binnmus += pkgs + '\n\n' + content + '\n\n'

    return binnmus


if __name__ == '__main__':
    main()