File: versionlock.py

package info (click to toggle)
dnf-plugins-core 4.3.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 2,300 kB
  • sloc: python: 6,227; sh: 23; makefile: 15; xml: 7
file content (319 lines) | stat: -rw-r--r-- 12,317 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
#
# Copyright (C) 2015  Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.  You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#

from __future__ import absolute_import
from __future__ import unicode_literals
from dnfpluginscore import _, logger

import dnf
import dnf.cli
import dnf.exceptions
import fnmatch
import hawkey
import os
import tempfile
import time
import warnings

NOT_READABLE = _('Unable to read version lock configuration: %s')
NO_LOCKLIST = _('Locklist not set')
ADDING_SPEC = _('Adding versionlock on:')
EXCLUDING_SPEC = _('Adding exclude on:')
EXISTING_SPEC = _('Package already locked in equivalent form:')
ALREADY_LOCKED = _('Package {} is already locked')
ALREADY_EXCLUDED = _('Package {} is already excluded')
DELETING_SPEC = _('Deleting versionlock for:')
NOTFOUND_SPEC = _('No package found for:')
NO_VERSIONLOCK = _('Excludes from versionlock plugin were not applied')
APPLY_LOCK = _('Versionlock plugin: number of lock rules from file "{}" applied: {}')
APPLY_EXCLUDE = _('Versionlock plugin: number of exclude rules from file "{}" applied: {}')
NEVRA_ERROR = _('Versionlock plugin: could not parse pattern:')

locklist_fn = None


class VersionLock(dnf.Plugin):

    name = 'versionlock'

    def __init__(self, base, cli):
        super(VersionLock, self).__init__(base, cli)
        self.base = base
        self.cli = cli
        if self.cli is not None:
            self.cli.register_command(VersionLockCommand)

    def config(self):
        global locklist_fn
        cp = self.read_config(self.base.conf)
        locklist_fn = (cp.has_section('main') and cp.has_option('main', 'locklist')
                       and cp.get('main', 'locklist'))

    def locking_enabled(self):
        if self.cli is None:
            enabled = True  # loaded via the api, not called by cli
        else:
            enabled = self.cli.demands.plugin_filtering_enabled
            if enabled is None:
                enabled = self.cli.demands.resolving
        return enabled

    def sack(self):
        if not self.locking_enabled():
            logger.debug(NO_VERSIONLOCK)
            return

        excludes_query = self.base.sack.query().filter(empty=True)
        locked_query = self.base.sack.query().filter(empty=True)
        locked_names = set()
        # counter of applied rules [locked_count, excluded_count]
        count = [0, 0]
        for pat in _read_locklist():
            excl = 0
            if pat and pat[0] == '!':
                pat = pat[1:]
                excl = 1

            possible_nevras = dnf.subject.Subject(pat).get_nevra_possibilities(
                forms=[hawkey.FORM_NEVRA, hawkey.FORM_NEVR, hawkey.FORM_NEV,
                       hawkey.FORM_NA, hawkey.FORM_NAME])
            if possible_nevras:
                count[excl] += 1
            else:
                logger.error("%s %s", NEVRA_ERROR, pat)
                continue
            for nevra in possible_nevras:
                pat_query = nevra.to_query(self.base.sack)
                if excl:
                    excludes_query = excludes_query.union(pat_query)
                else:
                    locked_names.add(nevra.name)
                    locked_query = locked_query.union(pat_query)
                if pat_query:
                    break

        if count[1]:
            logger.debug(APPLY_EXCLUDE.format(locklist_fn, count[1]))
        if count[0]:
            logger.debug(APPLY_LOCK.format(locklist_fn, count[0]))

        if locked_names:
            all_versions = self.base.sack.query().filter(name__glob=list(locked_names))
            other_versions = all_versions.difference(locked_query)
            excludes_query = excludes_query.union(other_versions)
            # exclude also anything that obsoletes the locked versions of packages
            obsoletes_query = self.base.sack.query().filterm(obsoletes=locked_query)
            # leave out obsoleters that are also part of locked versions (otherwise the obsoleter package
            # would not be installable at all)
            excludes_query = excludes_query.union(obsoletes_query.difference(locked_query))

        excludes_query.filterm(reponame__neq=hawkey.SYSTEM_REPO_NAME)
        if excludes_query:
            self.base.sack.add_excludes(excludes_query)

EXC_CMDS = ['exclude', 'add-!', 'add!']
DEL_CMDS = ['delete', 'del']
DEP_EXC_CMDS = ['blacklist']
ALL_CMDS = ['add', 'clear', 'list'] + EXC_CMDS + DEL_CMDS + DEP_EXC_CMDS


class VersionLockCommand(dnf.cli.Command):

    aliases = ("versionlock",)
    summary = _("control package version locks")
    usage = "[add|exclude|list|delete|clear] [<package-nevr-spec>]"

    @staticmethod
    def set_argparser(parser):
        parser.add_argument("--raw", default=False, action='store_true',
                            help=_("Use package specifications as they are, do not "
                                   "try to parse them"))
        parser.add_argument("subcommand", nargs='?',
                            metavar="[add|exclude|list|delete|clear]")
        parser.add_argument("package", nargs='*',
                            metavar="[<package-nevr-spec>]")

    def configure(self):
        self.cli.demands.sack_activation = True
        self.cli.demands.available_repos = True

    def run(self):
        cmd = 'list'
        if self.opts.subcommand:
            if self.opts.subcommand not in ALL_CMDS:
                cmd = 'add'
                self.opts.package.insert(0, self.opts.subcommand)
            elif self.opts.subcommand in EXC_CMDS:
                cmd = 'exclude'
            elif self.opts.subcommand in DEP_EXC_CMDS:
                msg = _("Subcommand '{}' is deprecated. Use 'exclude' subcommand instead.").format(
                    self.opts.subcommand)
                warnings.warn(msg, dnf.exceptions.DeprecationWarning, stacklevel=2)
                cmd = 'exclude'
            elif self.opts.subcommand in DEL_CMDS:
                cmd = 'delete'
            else:
                cmd = self.opts.subcommand

        if cmd == 'add':
            results = _search_locklist(self.opts.package)
            for entry, entry_cmd in results:
                if entry_cmd == '':
                    _write_locklist(self.base, [entry], self.opts.raw, True,
                                    "\n# Added lock on %s\n" % time.ctime(),
                                    ADDING_SPEC, '')
                elif cmd != entry_cmd:
                    raise dnf.exceptions.Error(ALREADY_EXCLUDED.format(entry))
                else:
                    logger.info("%s %s", EXISTING_SPEC, entry)
        elif cmd == 'exclude':
            results = _search_locklist(self.opts.package)
            for entry, entry_cmd in results:
                if entry_cmd == '':
                    _write_locklist(self.base, [entry], self.opts.raw, False,
                                    "\n# Added exclude on %s\n" % time.ctime(),
                                    EXCLUDING_SPEC, '!')
                elif cmd != entry_cmd:
                    raise dnf.exceptions.Error(ALREADY_LOCKED.format(entry))
                else:
                    logger.info("%s %s", EXISTING_SPEC, entry)
        elif cmd == 'list':
            for pat in _read_locklist():
                print(pat)
        elif cmd == 'clear':
            if not locklist_fn:
                raise dnf.exceptions.Error(NO_LOCKLIST)
            with open(locklist_fn, 'w') as f:
                # open in write mode truncates file
                pass
        elif cmd == 'delete':
            if not locklist_fn:
                raise dnf.exceptions.Error(NO_LOCKLIST)
            dirname = os.path.dirname(locklist_fn)
            (out, tmpfilename) = tempfile.mkstemp(dir=dirname, suffix='.tmp')
            locked_specs = _read_locklist()
            count = 0
            with os.fdopen(out, 'w', -1) as out:
                for ent in locked_specs:
                    if _match(ent, self.opts.package):
                        print("%s %s" % (DELETING_SPEC, ent))
                        count += 1
                        continue
                    out.write(ent)
                    out.write('\n')
            if not count:
                os.unlink(tmpfilename)
            else:
                os.chmod(tmpfilename, 0o644)
                os.rename(tmpfilename, locklist_fn)


def _read_locklist():
    locklist = []
    try:
        if not locklist_fn:
            raise dnf.exceptions.Error(NO_LOCKLIST)
        with open(locklist_fn) as llfile:
            for line in llfile.readlines():
                if line.startswith('#') or line.strip() == '':
                    continue
                locklist.append(line.strip())
    except IOError as e:
        raise dnf.exceptions.Error(NOT_READABLE % e)
    return locklist


def _search_locklist(package):
    results = []
    found = action = ''
    locked_specs = _read_locklist()
    for pkg in package:
        match = False
        for ent in locked_specs:
            found = action = ''
            if _match(ent, [pkg]):
                found = ent
                action = 'exclude' if ent.startswith('!') else 'add'
                results.append((found, action))
                match = True
        if not match:
            results.append((pkg, action))
    return results


def _write_locklist(base, args, raw, try_installed, comment, info, prefix):
    specs = set()
    for pat in args:
        if raw:
            specs.add(pat)
            continue
        subj = dnf.subject.Subject(pat)
        pkgs = None
        if try_installed:
            pkgs = subj.get_best_query(dnf.sack._rpmdb_sack(base), with_nevra=True,
                                       with_provides=False, with_filenames=False)
        if not pkgs:
            pkgs = subj.get_best_query(base.sack, with_nevra=True, with_provides=False,
                                       with_filenames=False)
        if not pkgs:
            print("%s %s" % (NOTFOUND_SPEC, pat))

        for pkg in pkgs:
            specs.add(pkgtup2spec(*pkg.pkgtup))

    if specs:
        try:
            if not locklist_fn:
                raise dnf.exceptions.Error(NO_LOCKLIST)
            with open(locklist_fn, 'a') as f:
                f.write(comment)
                for spec in specs:
                    print("%s %s" % (info, spec))
                    f.write("%s%s\n" % (prefix, spec))
        except IOError as e:
            raise dnf.exceptions.Error(NOT_READABLE % e)

def _match(ent, patterns):
    ent = ent.lstrip('!')
    for pat in patterns:
        if ent == pat:
            return True
    try:
        n = hawkey.split_nevra(ent)
    except hawkey.ValueException:
        return False
    for name in (
        '%s' % n.name,
        '%s.%s' % (n.name, n.arch),
        '%s-%s' % (n.name, n.version),
        '%s-%s-%s' % (n.name, n.version, n.release),
        '%s-%s:%s' % (n.name, n.epoch, n.version),
        '%s-%s-%s.%s' % (n.name, n.version, n.release, n.arch),
        '%s-%s:%s-%s' % (n.name, n.epoch, n.version, n.release),
        '%s:%s-%s-%s.%s' % (n.epoch, n.name, n.version, n.release, n.arch),
        '%s-%s:%s-%s.%s' % (n.name, n.epoch, n.version, n.release, n.arch),
    ):
        for pat in patterns:
            if fnmatch.fnmatch(name, pat):
                return True
    return False


def pkgtup2spec(name, arch, epoch, version, release):
    # we ignore arch
    return "%s-%s:%s-%s.*" % (name, epoch or "0", version, release)