File: cloudinit.py

package info (click to toggle)
virt-manager 1%3A5.0.0-5
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 20,200 kB
  • sloc: python: 44,538; xml: 28,397; makefile: 17; sh: 6
file content (143 lines) | stat: -rw-r--r-- 4,574 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# Copyright 2019 Red Hat, Inc.
#
# This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory.

import os
import random
import re
import string
import tempfile

from ..logger import log


class CloudInitData():
    disable = None
    root_password_generate = None
    root_password_file = None
    generated_root_password = None
    root_ssh_key = None
    clouduser_ssh_key = None
    user_data = None
    meta_data = None
    network_config = None

    def _generate_password(self):
        if not self.generated_root_password:
            self.generated_root_password = ""
            for dummy in range(16):
                self.generated_root_password += random.choice(
                        string.ascii_letters + string.digits)
        return self.generated_root_password

    def _get_password(self, pwdfile):
        with open(pwdfile, "r") as fobj:
            return fobj.readline().rstrip("\n\r")

    def get_password_if_generated(self):
        if self.root_password_generate:
            return self._generate_password()

    def get_root_password(self):
        if self.root_password_file:
            return self._get_password(self.root_password_file)
        return self.get_password_if_generated()

    def get_root_ssh_key(self):
        if self.root_ssh_key:
            return self._get_password(self.root_ssh_key)

    def get_clouduser_ssh_key(self):
        if self.clouduser_ssh_key:
            return self._get_password(self.clouduser_ssh_key)


def _create_metadata_content(cloudinit_data):
    content = ""
    if cloudinit_data.meta_data:
        log.debug("Using meta-data content from path=%s",
                cloudinit_data.meta_data)
        content = open(cloudinit_data.meta_data).read()
    return content


def _create_userdata_content(cloudinit_data):
    if cloudinit_data.user_data:
        log.debug("Using user-data content from path=%s",
                cloudinit_data.user_data)
        return open(cloudinit_data.user_data).read()

    content = "#cloud-config\n"

    if cloudinit_data.root_password_generate or cloudinit_data.root_password_file:
        rootpass = cloudinit_data.get_root_password()
        content += "chpasswd:\n"
        content += "  list: |\n"
        content += "    root:%s\n" % rootpass

    if cloudinit_data.root_password_generate:
        content += "  expire: True\n"
    elif cloudinit_data.root_password_file:
        content += "  expire: False\n"

    if cloudinit_data.root_ssh_key:
        rootkey = cloudinit_data.get_root_ssh_key()
        content += "users:\n"
        content += "  - default\n"
        content += "  - name: root\n"
        content += "    ssh_authorized_keys:\n"
        content += "      - %s\n" % rootkey

    if cloudinit_data.clouduser_ssh_key:
        userkey = cloudinit_data.get_clouduser_ssh_key()
        content += "ssh_authorized_keys:\n"
        content += "  - %s\n" % userkey

    if cloudinit_data.disable:
        content += "runcmd:\n"
        content += ('- echo "Disabled by virt-install" > '
                    "/etc/cloud/cloud-init.disabled\n")

    clean_content = re.sub(r"root:(.*)", 'root:[SCRUBBLED]', content)
    if "VIRTINST_TEST_SUITE_PRINT_CLOUDINIT" in os.environ:
        print(clean_content)

    log.debug("Generated cloud-init userdata: \n%s", clean_content)
    return content


def _create_network_config_content(cloudinit_data):
    content = ""
    if cloudinit_data.network_config:
        log.debug("Using network-config content from path=%s",
                  cloudinit_data.network_config)
        content = open(cloudinit_data.network_config).read()
    return content


def create_files(scratchdir, cloudinit_data):
    metadata = _create_metadata_content(cloudinit_data)
    userdata = _create_userdata_content(cloudinit_data)

    data = [(metadata, "meta-data"), (userdata, "user-data")]
    network_config = _create_network_config_content(cloudinit_data)
    if network_config:
        data.append((network_config, 'network-config'))

    filepairs = []
    try:
        for content, destfile in data:
            fileobj = tempfile.NamedTemporaryFile(
                    prefix="virtinst-", suffix=("-%s" % destfile),
                    dir=scratchdir, delete=False)
            filename = fileobj.name
            filepairs.append((filename, destfile))

            with open(filename, "w+") as f:
                f.write(content)
    except Exception:  # pragma: no cover
        for filepair in filepairs:
            os.unlink(filepair[0])

    return filepairs