File: device_mappings.py

package info (click to toggle)
lava 2024.09-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 29,616 kB
  • sloc: python: 79,650; javascript: 16,875; sh: 1,364; makefile: 336
file content (92 lines) | stat: -rw-r--r-- 2,599 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# Copyright (C) 2020 Linaro Limited
#
# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
#
# SPDX-License-Identifier: GPL-2.0-or-later
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

from lava_common.constants import DISPATCHER_DOWNLOAD_DIR
from lava_common.yaml import yaml_safe_dump, yaml_safe_load

if TYPE_CHECKING:
    from collections.abc import Iterator

DISPATCHER_DOWNLOAD_PATH = Path(DISPATCHER_DOWNLOAD_DIR)


def get_mapping_path(job_id) -> Path:
    return DISPATCHER_DOWNLOAD_PATH / str(job_id) / "usbmap.yaml"


def iter_mapping_paths() -> Iterator[Path]:
    yield from DISPATCHER_DOWNLOAD_PATH.glob("*/usbmap.yaml")


def add_device_container_mapping(job_id, device_info, container, container_type="lxc"):
    validate_device_info(device_info)
    item = {
        "device_info": device_info,
        "container": container,
        "container_type": container_type,
        "job_id": job_id,
    }
    mapping_path = get_mapping_path(job_id)
    data = load_mapping_data(mapping_path)

    # remove old mappings for the same device_info
    newdata = [old for old in data if old["device_info"] != item["device_info"]]
    newdata.append(item)

    mapping_path.parent.mkdir(exist_ok=True)
    with open(mapping_path, "w") as f:
        f.write(yaml_safe_dump(newdata))


def remove_device_container_mappings(job_id):
    get_mapping_path(job_id).unlink()


def validate_device_info(device_info):
    if not device_info:
        raise ValueError("Adding mapping for empty device info: %r" % device_info)
    if not any(device_info.values()):
        raise ValueError(
            "Adding mapping for device info with empty keys: %r" % device_info
        )


def find_mapping(options):
    for mapping in iter_mapping_paths():
        data = load_mapping_data(mapping)
        for item in data:
            if match_mapping(item["device_info"], options):
                job_id = str(Path(mapping).parent.name)
                return item, job_id
    return None, None


def load_mapping_data(filename):
    try:
        with open(filename) as f:
            data = yaml_safe_load(f) or []
        if isinstance(data, dict):
            data = [data]
        return data
    except FileNotFoundError:
        return []


def match_mapping(device_info, options):
    matched = False
    for k, v in device_info.items():
        if v:
            if k in options and getattr(options, k) == v:
                matched = True
            else:
                return False
        else:
            matched = True
    return matched