1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
|
# Copyright (C) 2020 Linaro Limited
#
# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
#
# SPDX-License-Identifier: GPL-2.0-or-later
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from lava_common.constants import DISPATCHER_DOWNLOAD_DIR
from lava_common.yaml import yaml_safe_dump, yaml_safe_load
if TYPE_CHECKING:
from collections.abc import Iterator
DISPATCHER_DOWNLOAD_PATH = Path(DISPATCHER_DOWNLOAD_DIR)
def get_mapping_path(job_id) -> Path:
return DISPATCHER_DOWNLOAD_PATH / str(job_id) / "usbmap.yaml"
def iter_mapping_paths() -> Iterator[Path]:
yield from DISPATCHER_DOWNLOAD_PATH.glob("*/usbmap.yaml")
def add_device_container_mapping(job_id, device_info, container, container_type="lxc"):
validate_device_info(device_info)
item = {
"device_info": device_info,
"container": container,
"container_type": container_type,
"job_id": job_id,
}
mapping_path = get_mapping_path(job_id)
data = load_mapping_data(mapping_path)
# remove old mappings for the same device_info
newdata = [old for old in data if old["device_info"] != item["device_info"]]
newdata.append(item)
mapping_path.parent.mkdir(exist_ok=True)
with open(mapping_path, "w") as f:
f.write(yaml_safe_dump(newdata))
def remove_device_container_mappings(job_id):
get_mapping_path(job_id).unlink()
def validate_device_info(device_info):
if not device_info:
raise ValueError("Adding mapping for empty device info: %r" % device_info)
if not any(device_info.values()):
raise ValueError(
"Adding mapping for device info with empty keys: %r" % device_info
)
def find_mapping(options):
for mapping in iter_mapping_paths():
data = load_mapping_data(mapping)
for item in data:
if match_mapping(item["device_info"], options):
job_id = str(Path(mapping).parent.name)
return item, job_id
return None, None
def load_mapping_data(filename):
try:
with open(filename) as f:
data = yaml_safe_load(f) or []
if isinstance(data, dict):
data = [data]
return data
except FileNotFoundError:
return []
def match_mapping(device_info, options):
matched = False
for k, v in device_info.items():
if v:
if k in options and getattr(options, k) == v:
matched = True
else:
return False
else:
matched = True
return matched
|