1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
"""OS client for supervisor."""
from .client import _SupervisorComponentClient
from .models.os import (
DataDisk,
DataDiskList,
GreenInfo,
GreenOptions,
MigrateDataOptions,
OSInfo,
OSUpdate,
SetBootSlotOptions,
SwapInfo,
SwapOptions,
YellowInfo,
YellowOptions,
)
class OSClient(_SupervisorComponentClient):
"""Handles OS access in supervisor."""
async def info(self) -> OSInfo:
"""Get OS info."""
result = await self._client.get("os/info")
return OSInfo.from_dict(result.data)
async def update(self, options: OSUpdate | None = None) -> None:
"""Update OS."""
await self._client.post(
"os/update", json=options.to_dict() if options else None, timeout=None
)
async def swap_info(self) -> SwapInfo:
"""Get swap settings."""
result = await self._client.get("os/config/swap")
return SwapInfo.from_dict(result.data)
async def set_swap_options(self, options: SwapOptions) -> None:
"""Set swap settings."""
await self._client.post("os/config/swap", json=options.to_dict())
async def config_sync(self) -> None:
"""Trigger config reload on OS."""
await self._client.post("os/config/sync")
async def migrate_data(self, options: MigrateDataOptions) -> None:
"""Migrate data to new data disk and reboot."""
await self._client.post("os/datadisk/move", json=options.to_dict())
async def list_data_disks(self) -> list[DataDisk]:
"""Get all data disks."""
result = await self._client.get("os/datadisk/list")
return DataDiskList.from_dict(result.data).disks
async def wipe_data(self) -> None:
"""Trigger data disk wipe on host and reboot."""
await self._client.post("os/datadisk/wipe")
async def set_boot_slot(self, options: SetBootSlotOptions) -> None:
"""Change active boot slot on host and reboot."""
await self._client.post("os/boot-slot", json=options.to_dict())
async def green_info(self) -> GreenInfo:
"""Get info for green board (if in use)."""
result = await self._client.get("os/boards/green")
return GreenInfo.from_dict(result.data)
async def set_green_options(self, options: GreenOptions) -> None:
"""Set options for green board (if in use)."""
await self._client.post("os/boards/green", json=options.to_dict())
async def yellow_info(self) -> YellowInfo:
"""Get info for yellow board (if in use)."""
result = await self._client.get("os/boards/yellow")
return YellowInfo.from_dict(result.data)
async def set_yellow_options(self, options: YellowOptions) -> None:
"""Set options for yellow board (if in use)."""
await self._client.post("os/boards/yellow", json=options.to_dict())
|