1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
|
# Copyright (C) 2024-2025, Raspberry Pi Ltd.
#
# Author: Paul Oberosler <paul.oberosler@raspberrypi.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
import logging
from typing import Union
from cloudinit import subp
from cloudinit.cloud import Cloud
from cloudinit.config import Config
from cloudinit.config.schema import MetaSchema
from cloudinit.settings import PER_INSTANCE
LOG = logging.getLogger(__name__)
RPI_BASE_KEY = "rpi"
RPI_INTERFACES_KEY = "interfaces"
ENABLE_RPI_CONNECT_KEY = "enable_rpi_connect"
SUPPORTED_INTERFACES = {
"spi": "do_spi",
"i2c": "do_i2c",
"serial": "do_serial",
"onewire": "do_onewire",
}
RASPI_CONFIG_SERIAL_CONS_FN = "do_serial_cons"
RASPI_CONFIG_SERIAL_HW_FN = "do_serial_hw"
meta: MetaSchema = {
"id": "cc_raspberry_pi",
"distros": ["raspberry-pi-os"],
"frequency": PER_INSTANCE,
"activate_by_schema_keys": [RPI_BASE_KEY],
}
def configure_rpi_connect(enable: bool) -> None:
LOG.debug("Configuring rpi-connect: %s", enable)
num = 0 if enable else 1
try:
subp.subp(["/usr/bin/raspi-config", "do_rpi_connect", str(num)])
except subp.ProcessExecutionError as e:
LOG.error("Failed to configure rpi-connect: %s", e)
def is_pifive() -> bool:
try:
subp.subp(["/usr/bin/raspi-config", "nonint", "is_pifive"])
return True
except subp.ProcessExecutionError:
return False
def configure_serial_interface(
cfg: Union[dict, bool], instCfg: Config, cloud: Cloud
) -> None:
def get_bool_field(cfg_dict: dict, name: str, default=False):
val = cfg_dict.get(name, default)
if not isinstance(val, bool):
LOG.warning(
"Invalid value for %s.serial.%s: %s",
RPI_INTERFACES_KEY,
name,
val,
)
return default
return val
enable_console = False
enable_hw = False
if isinstance(cfg, dict):
enable_console = get_bool_field(cfg, "console")
enable_hw = get_bool_field(cfg, "hardware")
elif isinstance(cfg, bool):
# default to enabling console as if < pi5
# this will also enable the hardware
enable_console = cfg
if not is_pifive() and enable_console:
# only pi5 has 2 usable UARTs
# on other models, enabling the console
# will also block the other UART
enable_hw = True
try:
subp.subp(
[
"/usr/bin/raspi-config",
"nonint",
RASPI_CONFIG_SERIAL_CONS_FN,
str(0 if enable_console else 1),
]
)
try:
subp.subp(
[
"/usr/bin/raspi-config",
"nonint",
RASPI_CONFIG_SERIAL_HW_FN,
str(0 if enable_hw else 1),
]
)
except subp.ProcessExecutionError as e:
LOG.error("Failed to configure serial hardware: %s", e)
# Reboot to apply changes
cmd = cloud.distro.shutdown_command(
mode="reboot",
delay="now",
message="Rebooting to apply serial console changes",
)
subp.subp(cmd)
except subp.ProcessExecutionError as e:
LOG.error("Failed to configure serial console: %s", e)
def configure_interface(iface: str, enable: bool) -> None:
assert (
iface in SUPPORTED_INTERFACES.keys() and iface != "serial"
), f"Unsupported interface: {iface}"
try:
subp.subp(
[
"/usr/bin/raspi-config",
"nonint",
SUPPORTED_INTERFACES[iface],
str(0 if enable else 1),
]
)
except subp.ProcessExecutionError as e:
LOG.error("Failed to configure %s: %s", iface, e)
def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:
if RPI_BASE_KEY not in cfg:
return
elif not isinstance(cfg[RPI_BASE_KEY], dict):
LOG.warning(
"Invalid value for %s: %s",
RPI_BASE_KEY,
cfg[RPI_BASE_KEY],
)
return
elif not cfg[RPI_BASE_KEY]:
LOG.debug("Empty value for %s. Skipping...", RPI_BASE_KEY)
return
for key in cfg[RPI_BASE_KEY]:
if key == ENABLE_RPI_CONNECT_KEY:
enable = cfg[RPI_BASE_KEY][key]
if isinstance(enable, bool):
configure_rpi_connect(enable)
else:
LOG.warning(
"Invalid value for %s: %s", ENABLE_RPI_CONNECT_KEY, enable
)
continue
elif key == RPI_INTERFACES_KEY:
if not isinstance(cfg[RPI_BASE_KEY][key], dict):
LOG.warning(
"Invalid value for %s: %s",
RPI_BASE_KEY,
cfg[RPI_BASE_KEY][key],
)
return
elif not cfg[RPI_BASE_KEY][key]:
LOG.debug("Empty value for %s. Skipping...", key)
return
subkeys = list(cfg[RPI_BASE_KEY][key].keys())
# Move " serial" to the end if it exists
if "serial" in subkeys:
subkeys.append(subkeys.pop(subkeys.index("serial")))
# check for supported ARM interfaces
for subkey in subkeys:
if subkey not in SUPPORTED_INTERFACES.keys():
LOG.warning(
"Invalid key for %s: %s", RPI_INTERFACES_KEY, subkey
)
continue
enable = cfg[RPI_BASE_KEY][key][subkey]
if subkey == "serial":
if not isinstance(enable, (dict, bool)):
LOG.warning(
"Invalid value for %s.%s: %s",
RPI_INTERFACES_KEY,
subkey,
enable,
)
else:
configure_serial_interface(enable, cfg, cloud)
continue
if isinstance(enable, bool):
configure_interface(subkey, enable)
else:
LOG.warning(
"Invalid value for %s.%s: %s",
RPI_INTERFACES_KEY,
subkey,
enable,
)
else:
LOG.warning("Unsupported key: %s", key)
continue
|