File: unattended.py

package info (click to toggle)
virt-manager 1%3A5.0.0-5
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 20,200 kB
  • sloc: python: 44,538; xml: 28,397; makefile: 17; sh: 6
file content (424 lines) | stat: -rw-r--r-- 14,771 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
#
# Common code for unattended installations
#
# Copyright 2019 Red Hat, Inc.
#
# This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory.

import getpass
import locale
import os
import pwd
import re
import tempfile

from gi.repository import Libosinfo

from . import urlfetcher
from .. import progress
from ..logger import log


def _is_user_login_safe(login):
    return login != "root"


def _login_from_hostuser():
    hostuser = getpass.getuser()
    realname = pwd.getpwnam(hostuser).pw_gecos
    if not _is_user_login_safe(hostuser):
        return None, None  # pragma: no cover
    return hostuser, realname  # pragma: no cover


def _make_installconfig(script, osobj, unattended_data, arch, hostname, url):
    """
    Build a Libosinfo.InstallConfig instance
    """
    def get_timezone():
        TZ_FILE = "/etc/localtime"
        linkpath = os.path.realpath(TZ_FILE)
        tokens = linkpath.split("zoneinfo/")
        if len(tokens) > 1:
            return tokens[1]

    def get_language():
        return locale.getlocale()[0]

    config = Libosinfo.InstallConfig()

    # Set user login and name
    # In case it's specified via command-line, use the specified one as login
    # and realname. Otherwise, fallback fto the one from the system
    login = unattended_data.user_login
    realname = unattended_data.user_login
    if not login:
        login, realname = _login_from_hostuser()

    if login:
        login = login.lower()
        if not _is_user_login_safe(login):
            raise RuntimeError(
                _("%(osname)s cannot use '%(loginname)s' as user-login.") %
                {"osname": osobj.name, "loginname": login})

        config.set_user_login(login)
        config.set_user_realname(realname)

    # Set user-password.
    # In case it's required and not passed, just raise a RuntimeError.
    if (script.requires_user_password() and
        not unattended_data.get_user_password()):
        raise RuntimeError(
            _("%s requires the user-password to be set.") %
            osobj.name)
    config.set_user_password(unattended_data.get_user_password() or "")

    # Set the admin-password:
    # In case it's required and not passed, just raise a RuntimeError.
    if (script.requires_admin_password() and
        not unattended_data.get_admin_password()):
        raise RuntimeError(
            _("%s requires the admin-password to be set.") %
            osobj.name)
    config.set_admin_password(unattended_data.get_admin_password() or "")

    # Set the target disk.
    # virtiodisk is the preferred way, in case it's supported, otherwise
    # just fallback to scsi.
    #
    # Note: this is linux specific and will require some changes whenever
    # support for Windows will be added.
    tgt = "/dev/vda" if osobj.supports_virtiodisk() else "/dev/sda"
    if osobj.is_windows():
        tgt = "C"
    config.set_target_disk(tgt)

    # Set hardware architecture and hostname
    config.set_hardware_arch(arch)

    # Some installations will bail if the Computer's name contains one of the
    # following characters: "[{|}~[\\]^':; <=>?@!\"#$%`()+/.,*&]".
    # In order to take a safer path, let's ensure that we never set those,
    # replacing them by "-".
    hostname = re.sub("[{|}~[\\]^':; <=>?@!\"#$%`()+/.,*&]", "-", hostname)
    config.set_hostname(hostname)

    # Try to guess the timezone from '/etc/localtime', in case it's not
    # possible 'America/New_York' will be used.
    timezone = get_timezone()
    if timezone:
        config.set_l10n_timezone(timezone)

    # Try to guess to language and keyboard layout from the system's
    # language.
    #
    # This method has flaws as it's quite common to have language and
    # keyboard layout not matching. Otherwise, there's no easy way to guess
    # the keyboard layout without relying on a set of APIs of an specific
    # Desktop Environment.
    language = get_language()
    if language:
        config.set_l10n_language(language)
        config.set_l10n_keyboard(language)

    if url:
        config.set_installation_url(url)  # pylint: disable=no-member

    if unattended_data.reg_login:
        config.set_reg_login(unattended_data.reg_login)

    if unattended_data.product_key:
        config.set_reg_product_key(unattended_data.product_key)

    log.debug("InstallScriptConfig created with the following params:")
    log.debug("username: %s", config.get_user_login())
    log.debug("realname: %s", config.get_user_realname())
    log.debug("target disk: %s", config.get_target_disk())
    log.debug("hardware arch: %s", config.get_hardware_arch())
    log.debug("hostname: %s", config.get_hostname())
    log.debug("timezone: %s", config.get_l10n_timezone())
    log.debug("language: %s", config.get_l10n_language())
    log.debug("keyboard: %s", config.get_l10n_keyboard())
    if hasattr(config, "get_installation_url"):
        log.debug("url: %s",
                config.get_installation_url())  # pylint: disable=no-member
    log.debug("reg-login %s", config.get_reg_login())
    log.debug("product-key: %s", config.get_reg_product_key())

    return config


class OSInstallScript:
    """
    Wrapper for Libosinfo.InstallScript interactions
    """
    @staticmethod
    def have_new_libosinfo():
        from ..osdict import OSDB

        win7 = OSDB.lookup_os("win7")
        for script in win7.get_install_script_list():
            if (Libosinfo.InstallScriptInjectionMethod.CDROM &
                script.get_injection_methods()):
                return True
        return False  # pragma: no cover

    @staticmethod
    def have_libosinfo_installation_url():
        return hasattr(Libosinfo.InstallConfig, "set_installation_url")

    def __init__(self, script, osobj, osinfomediaobj, osinfotreeobj):
        self._script = script
        self._osobj = osobj
        self._osinfomediaobj = osinfomediaobj
        self._osinfotreeobj = osinfotreeobj
        self._config = None

        if not OSInstallScript.have_new_libosinfo():  # pragma: no cover
            raise RuntimeError(_("libosinfo or osinfo-db is too old to "
                "support unattended installs."))

    def get_expected_filename(self):
        return self._script.get_expected_filename()

    def set_preferred_injection_method(self, namestr):
        # If we ever make this user configurable, this will need to be smarter
        names = {
            "cdrom": Libosinfo.InstallScriptInjectionMethod.CDROM,
            "initrd": Libosinfo.InstallScriptInjectionMethod.INITRD,
        }

        log.debug("Using '%s' injection method", namestr)
        injection_method = names[namestr]
        supported_injection_methods = self._script.get_injection_methods()
        if (injection_method & supported_injection_methods == 0):
            raise RuntimeError(
                _("OS '%(osname)s' does not support required "
                  "injection method '%(methodname)s'") %
                {"osname": self._osobj.name, "methodname": namestr})

        self._script.set_preferred_injection_method(injection_method)

    def set_installation_source(self, namestr):
        # If we ever make this user configurable, this will need to be smarter
        names = {
            "media": Libosinfo.InstallScriptInstallationSource.MEDIA,
            "network": Libosinfo.InstallScriptInstallationSource.NETWORK,
        }

        log.debug("Using '%s' installation source", namestr)
        self._script.set_installation_source(names[namestr])

    def _requires_param(self, config_param):
        param = self._script.get_config_param(config_param)
        return bool(param and not param.is_optional())

    def requires_user_password(self):
        return self._requires_param(
                Libosinfo.INSTALL_CONFIG_PROP_USER_PASSWORD)
    def requires_admin_password(self):
        return self._requires_param(
                Libosinfo.INSTALL_CONFIG_PROP_ADMIN_PASSWORD)

    def set_config(self, config):
        self._config = config

    def generate(self):
        if self._osinfomediaobj:
            return self._script.generate_for_media(
                    self._osinfomediaobj, self._config)
        if hasattr(self._script, "generate_for_tree") and self._osinfotreeobj:
            # osinfo_install_script_generate_for_tree() is part of
            # libosinfo 1.6.0
            return self._script.generate_for_tree(
                    self._osinfotreeobj, self._config)

        return self._script.generate(self._osobj.get_handle(), self._config)

    def generate_cmdline(self):
        if self._osinfomediaobj:
            return self._script.generate_command_line_for_media(
                    self._osinfomediaobj, self._config)
        if (hasattr(self._script, "generate_command_line_for_tree") and
                self._osinfotreeobj):
            # osinfo_install_script_generate_command_line_for_tree() is part of
            # libosinfo 1.6.0
            return self._script.generate_command_line_for_tree(
                    self._osinfotreeobj, self._config)
        return self._script.generate_command_line(
                self._osobj.get_handle(), self._config)

    def _generate_debug(self):
        original_user_password = self._config.get_user_password()
        original_admin_password = self._config.get_admin_password()

        self._config.set_user_password("[SCRUBBLED]")
        self._config.set_admin_password("[SCRUBBLED]")

        debug_content = self.generate()

        self._config.set_user_password(original_user_password)
        self._config.set_admin_password(original_admin_password)

        return debug_content

    def write(self):
        fileobj = tempfile.NamedTemporaryFile(
            prefix="virtinst-unattended-script", delete=False)
        scriptpath = fileobj.name

        content = self.generate()
        open(scriptpath, "w").write(content)

        debug_content = self._generate_debug()

        log.debug("Generated unattended script: %s", scriptpath)
        log.debug("Generated script contents:\n%s", debug_content)

        return scriptpath


class UnattendedData():
    profile = None
    admin_password_file = None
    user_login = None
    user_password_file = None
    product_key = None
    reg_login = None

    def _get_password(self, pwdfile):
        with open(pwdfile, "r") as fobj:
            return fobj.readline().rstrip("\n\r")

    def get_user_password(self):
        if self.user_password_file:
            return self._get_password(self.user_password_file)

    def get_admin_password(self):
        if self.admin_password_file:
            return self._get_password(self.admin_password_file)


def _make_scriptmap(script_list):
    """
    Generate a mapping of profile name -> [list, of, rawscripts]
    """
    script_map = {}
    for script in script_list:
        profile = script.get_profile()
        if profile not in script_map:
            script_map[profile] = []
        script_map[profile].append(script)
    return script_map


def _find_default_profile(profile_names):
    profile_prefs = ["desktop"]
    found = None
    for p in profile_prefs:
        if p in profile_names:
            found = p
            break
    return found or profile_names[0]


def _lookup_rawscripts(osinfo, profile, os_media):
    script_list = []

    if os_media:
        if not os_media.supports_installer_script():
            # This is a specific annotation for media like livecds that
            # don't support unattended installs
            raise RuntimeError(
                _("OS '%s' media does not support unattended "
                  "installation") % (osinfo.name))

        # In case we're dealing with a media installation, let's try to get
        # the installer scripts from the media, in case any is set.
        script_list = os_media.get_install_script_list()

    if not script_list:
        script_list = osinfo.get_install_script_list()
    if not script_list:
        raise RuntimeError(
            _("OS '%s' does not support unattended installation.") %
            osinfo.name)

    script_map = _make_scriptmap(script_list)
    profile_names = list(sorted(script_map.keys()))
    if profile:
        rawscripts = script_map.get(profile, [])
        if not rawscripts:
            raise RuntimeError(
                _("OS '%(osname)s' does not support unattended "
                  "installation for the '%(profilename)s' profile. "
                  "Available profiles: %(profiles)s") %
                {"osname": osinfo.name, "profilename": profile,
                 "profiles": ", ".join(profile_names)})
    else:
        profile = _find_default_profile(profile_names)
        log.warning(_("Using unattended profile '%s'"), profile)
        rawscripts = script_map[profile]

    # Some OSes (as Windows) have more than one installer script,
    # depending on the OS version and profile chosen, to be used to
    # perform the unattended installation.
    ids = []
    for rawscript in rawscripts:
        ids.append(rawscript.get_id())

    log.debug("Install scripts found for profile '%s': %s",
            profile, ", ".join(ids))
    return rawscripts


def prepare_install_scripts(guest, unattended_data,
        url, os_media, os_tree, injection_method):
    def _get_installation_source(os_media):
        if not os_media:
            return "network"
        return "media"

    scripts = []
    rawscripts = _lookup_rawscripts(guest.osinfo,
            unattended_data.profile, os_media)

    osinfomediaobj = os_media.get_osinfo_media() if os_media else None
    osinfotreeobj = os_tree.get_osinfo_tree() if os_tree else None

    for rawscript in rawscripts:
        script = OSInstallScript(
                rawscript, guest.osinfo, osinfomediaobj, osinfotreeobj)

        script.set_preferred_injection_method(injection_method)

        installationsource = _get_installation_source(os_media)
        script.set_installation_source(installationsource)

        config = _make_installconfig(script, guest.osinfo, unattended_data,
                guest.os.arch, guest.name, url)
        script.set_config(config)
        scripts.append(script)
    return scripts


def download_drivers(locations, scratchdir, meter):
    meter = progress.ensure_meter(meter)
    fetcher = urlfetcher.DirectFetcher(None, scratchdir, meter)
    fetcher.meter = meter

    drivers = []

    try:
        for location in locations:
            filename = location.rsplit('/', 1)[1]
            driver = fetcher.acquireFile(location)
            drivers.append((driver, filename))
    except Exception:  # pragma: no cover
        for driverpair in drivers:
            os.unlink(driverpair[0])
        raise

    return drivers