File: _remove.py

package info (click to toggle)
depthcharge-tools 0.6.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 732 kB
  • sloc: python: 6,280; sh: 650; makefile: 12
file content (262 lines) | stat: -rw-r--r-- 8,534 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
#! /usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later

# depthcharge-tools depthchargectl remove subcommand
# Copyright (C) 2020-2022 Alper Nebi Yasak <alpernebiyasak@gmail.com>
# See COPYRIGHT and LICENSE files for full copyright information.

import argparse
import logging
import subprocess

from pathlib import Path

from depthcharge_tools import (
    __version__,
)
from depthcharge_tools.utils.argparse import (
    Command,
    Argument,
    Group,
    CommandExit,
)

from depthcharge_tools.depthchargectl import depthchargectl


class BootedPartitionError(CommandExit):
    def __init__(self, partition):
        self.partition = partition
        super().__init__(
            "Refusing to disable currently booted partition '{}'."
            .format(partition)
        )


@depthchargectl.subcommand("remove")
class depthchargectl_remove(
    depthchargectl,
    prog="depthchargectl remove",
    usage="%(prog)s [options] (KERNEL_VERSION | IMAGE)",
    add_help=False,
):
    """Remove images and disable partitions containing them."""

    _logger = depthchargectl._logger.getChild("remove")
    config_section = "depthchargectl/remove"

    @depthchargectl.board.copy()
    def board(self, codename=""):
        """Assume we're running on the specified board"""
        # We can disable partitions without knowing the board.
        try:
            return super().board
        except Exception as err:
            self.logger.warning(err)
            return None

    @Group
    def positionals(self):
        """Positional arguments"""

        if self.image is not None and self.kernel_version is not None:
            raise ValueError(
                "Image and kernel_version arguments are mutually exclusive."
            )

        if self.image is not None:
            image = self.image
        else:
            image = self.kernel_version

        if isinstance(image, str):
            # This can be run after the kernel is uninstalled, where the
            # version would no longer be valid, so don't check for that.
            # Instead just check if we have it as an image.
            img = (self.images_dir / "{}.img".format(image)).resolve()
            if img.parent == self.images_dir and img.is_file():
                self.logger.info(
                    "Disabling partitions for kernel version '{}'."
                    .format(image)
                )
                self.image = img
                self.kernel_version = image

            else:
                self.image = Path(image).resolve()
                self.kernel_version = None
                self.logger.info(
                    "Disabling partitions for depthcharge image '{}'."
                    .format(image)
                )

        if not self.image.is_file():
            raise TypeError(
                "Image to remove '{}' is not a file."
                .format(self.image)
            )

    @positionals.add
    @Argument(dest=argparse.SUPPRESS, nargs=0)
    def kernel_version(self, kernel_version):
        """Installed kernel version to disable."""
        return kernel_version

    @positionals.add
    @Argument
    def image(self, image):
        """Depthcharge image to disable."""
        return image

    @Group
    def options(self):
        """Options"""

    @options.add
    @Argument("-f", "--force", force=True)
    def force(self, force=False):
        """Allow disabling the currently booted partition."""
        return force

    def __call__(self):
        image = self.image

        # When called with --vblockonly vbutil_kernel creates a file of
        # size 64KiB == 0x10000.
        image_vblock = image.read_bytes()[:0x10000]

        partitions = depthchargectl.list(
            root=self.root,
            root_mountpoint=self.root_mountpoint,
            boot_mountpoint=self.boot_mountpoint,
            config=self.config,
            board=self.board,
            tmpdir=self.tmpdir / "list",
            images_dir=self.images_dir,
            vboot_keyblock=self.vboot_keyblock,
            vboot_public_key=self.vboot_public_key,
            vboot_private_key=self.vboot_private_key,
            kernel_cmdline=self.kernel_cmdline,
            ignore_initramfs=self.ignore_initramfs,
            verbosity=self.verbosity,
        )

        self.logger.info(
            "Searching for Chrome OS Kernel partitions containing '{}'."
            .format(image)
        )
        badparts = []
        error_disks = []

        for part in partitions:
            self.logger.info("Checking partition '{}'.".format(part))

            # It's OK to check only the vblock header, as that
            # contains signatures on the content and those will be
            # different if the content is different.
            with part.path.open("rb") as p:
                if p.read(0x10000) == image_vblock:
                    try:
                        if part.attribute:
                            badparts.append(part)
                    except subprocess.CalledProcessError as err:
                        self.logger.warning(
                            "Couldn't get attribute for partition '{}'."
                            .format(part)
                        )
                        self.logger.debug(
                            err,
                            exc_info=self.logger.isEnabledFor(
                                logging.DEBUG,
                            ),
                        )

        current = self.diskinfo.by_kern_guid()
        if current in badparts:
            if self.force:
                self.logger.warning(
                    "Deactivating the currently booted partition '{}'. "
                    "This might make your system unbootable."
                    .format(current)
                )
            else:
                raise BootedPartitionError(current)

        done_parts = []
        error_parts = []
        for part in badparts:
            self.logger.info("Deactivating '{}'.".format(part))
            try:
                depthchargectl.bless(
                    partition=part,
                    bad=True,
                    root=self.root,
                    root_mountpoint=self.root_mountpoint,
                    boot_mountpoint=self.boot_mountpoint,
                    config=self.config,
                    board=self.board,
                    tmpdir=self.tmpdir / "bless",
                    images_dir=self.images_dir,
                    vboot_keyblock=self.vboot_keyblock,
                    vboot_public_key=self.vboot_public_key,
                    vboot_private_key=self.vboot_private_key,
                    kernel_cmdline=self.kernel_cmdline,
                    ignore_initramfs=self.ignore_initramfs,
                    verbosity=self.verbosity,
                )
            except Exception as err:
                error_parts.append(part)
                self.logger.debug(
                    err,
                    exc_info=self.logger.isEnabledFor(logging.DEBUG),
                )
                continue

            done_parts.append(part)
            self.logger.warning("Deactivated '{}'.".format(part))

        if image.parent == self.images_dir and not error_disks and not error_parts:
            self.logger.info(
                "Image '{}' is in images dir, deleting."
                .format(image)
            )
            image.unlink()
            self.logger.warning("Deleted image '{}'.".format(image))

        else:
            self.logger.info(
                "Not deleting image file '{}'."
                .format(image)
            )

        output = badparts or None

        error_msg = []
        if error_disks:
            error_msg.append(
                "Couldn't disable partitions for disks {}."
                .format(", ".join(str(d) for d in error_disks))
            )

        if error_parts:
            error_msg.append(
                "Couldn't disable partitions {}."
                .format(", ".join(str(d) for d in error_parts))
            )

        if error_msg:
            return CommandExit(
                message="\n".join(error_msg),
                output=done_parts,
                returncode=1,
            )

        if not output:
            self.logger.warning(
                "No active partitions contain the given image."
            )

        return output

    global_options = depthchargectl.global_options
    config_options = depthchargectl.config_options