1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
|
#! /usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later
# depthcharge-tools depthchargectl list subcommand
# Copyright (C) 2020-2022 Alper Nebi Yasak <alpernebiyasak@gmail.com>
# See COPYRIGHT and LICENSE files for full copyright information.
import argparse
import logging
import subprocess
from depthcharge_tools import __version__
from depthcharge_tools.utils.argparse import (
Command,
Argument,
Group,
CommandExit,
)
from depthcharge_tools.utils.collections import (
TypedList
)
from depthcharge_tools.utils.os import (
Disk,
CrosPartition,
)
from depthcharge_tools.depthchargectl import depthchargectl
class CrosPartitions(TypedList(CrosPartition)):
def __init__(self, partitions=None, columns=None, headings=True):
super().__init__(partitions)
if columns is None:
if any(part.path is None for part in partitions):
columns = ["S", "P", "T", "DISKPATH", "PARTNO"]
else:
columns = ["S", "P", "T", "PATH"]
self._headings = headings
self._columns = columns
def _row(self, part):
values = {}
if set(self._columns).intersection((
"A", "S", "P", "T",
"ATTRIBUTE", "SUCCESSFUL", "PRIORITY", "TRIES",
)):
flags = part.flags
values.update({
"A": flags["attribute"],
"S": flags["successful"],
"P": flags["priority"],
"T": flags["tries"],
"ATTRIBUTE": flags["attribute"],
"SUCCESSFUL": flags["successful"],
"PRIORITY": flags["priority"],
"TRIES": flags["tries"],
})
if "SIZE" in self._columns:
values["SIZE"] = part.size
if part.path is not None:
values["PATH"] = part.path
if part.disk is not None and part.disk.path is not None:
values["DISK"] = part.disk.path
values["DISKPATH"] = part.disk.path
if part.partno is not None:
values["PARTNO"] = part.partno
return [str(values.get(c, "")) for c in self._columns]
def __str__(self):
rows = []
if self._headings:
rows.append(self._columns)
parts = sorted(self, key=lambda p: p.path or p.disk.path)
rows.extend(self._row(part) for part in parts)
# Using tab characters makes things misalign when the data
# widths vary, so find max width for each column from its data,
# and format everything to those widths.
widths = [max(4, *map(len, col)) for col in zip(*rows)]
fmt = " ".join("{{:{w}}}".format(w=w) for w in widths)
return "\n".join(fmt.format(*row) for row in rows)
@depthchargectl.subcommand("list")
class depthchargectl_list(
depthchargectl,
prog="depthchargectl list",
usage="%(prog)s [options] [DISK ...]",
add_help=False,
):
"""List ChromeOS kernel partitions."""
_logger = depthchargectl._logger.getChild("list")
config_section = "depthchargectl/list"
@depthchargectl.board.copy()
def board(self, codename=""):
"""Assume we're running on the specified board"""
# We can list partitions without knowing the board.
try:
return super().board
except Exception as err:
self.logger.warning(err)
return None
@Group
def positionals(self):
"""Positional arguments"""
@positionals.add
@Argument(metavar="DISK")
def disks(self, *disks):
"""Disks to check for ChromeOS kernel partitions."""
if self.all_disks:
self.logger.info("Searching all disks.")
disks = self.diskinfo.roots()
elif disks:
self.logger.info(
"Searching real disks for {}."
.format(", ".join(str(d) for d in disks))
)
images = []
for d in disks:
if self.diskinfo.evaluate(d) is None:
try:
images.append(Disk(d))
except ValueError as err:
self.logger.warning(
err,
exc_info=self.logger.isEnabledFor(logging.DEBUG),
)
disks = [*self.diskinfo.roots(*disks), *images]
else:
self.logger.info("Searching bootable disks.")
root = (
self.diskinfo.by_mountpoint("/", fstab_only=True)
or self.diskinfo.by_mountpoint(self.root_mountpoint)
)
boot = (
self.diskinfo.by_mountpoint("/boot", fstab_only=True)
or self.diskinfo.by_mountpoint(self.boot_mountpoint)
)
disks = self.diskinfo.roots(root, boot)
if disks:
self.logger.info(
"Using disks: {}."
.format(", ".join(str(d) for d in disks))
)
else:
raise ValueError("Could not find any matching disks.")
return disks
@Group
def options(self):
"""Options"""
if self.count and self.output:
raise ValueError(
"Count and Output arguments are mutually exclusive."
)
@options.add
@Argument("-n", "--noheadings", headings=False)
def headings(self, headings=True):
"""Don't print column headings."""
return headings
@options.add
@Argument("-a", "--all-disks", all_disks=True)
def all_disks(self, all_disks=False):
"""List partitions on all disks."""
return all_disks
valid_columns = {
"ATTRIBUTE", "SUCCESSFUL", "PRIORITY", "TRIES",
"A", "S", "P", "T",
"PATH",
"DISKPATH", "DISK",
"PARTNO",
"SIZE",
}
@options.add
@Argument("-o", "--output", nargs=1, append=True)
def output(self, *columns):
"""Comma separated list of columns to output."""
if len(columns) == 0:
self.logger.info("Using default output format.")
return None
elif len(columns) == 1 and isinstance(columns[0], str):
columns = columns[0]
self.logger.info("Using output format '{}'.".format(columns))
else:
columns = ",".join(columns)
self.logger.info("Using output format '{}'.".format(columns))
columns = columns.split(',')
invalid_columns = sorted(
set(columns).difference(self.valid_columns),
key=columns.index,
)
if invalid_columns:
raise ValueError(
"Unsupported output columns '{}'."
.format(invalid_columns)
)
return columns
@options.add
@Argument("-c", "--count", count=True)
def count(self, count=False):
"""Print only the count of partitions."""
return count
def __call__(self):
parts = []
error_disks = []
for disk in self.disks:
try:
parts.extend(disk.cros_partitions())
except subprocess.CalledProcessError as err:
error_disks.append(disk)
self.logger.debug(
"Couldn't get partitions for disk '{}'."
.format(disk)
)
self.logger.debug(
err,
exc_info=self.logger.isEnabledFor(logging.DEBUG),
)
if self.count:
output = len(parts)
else:
output = CrosPartitions(
parts,
headings=self.headings,
columns=self.output,
)
if error_disks:
return CommandExit(
message=(
"Couldn't get partitions for disks {}."
.format(", ".join(str(d) for d in error_disks))
),
output=output,
returncode=1,
)
return output
global_options = depthchargectl.global_options
config_options = depthchargectl.config_options
|