File: context.py

package info (click to toggle)
dpdk 25.11-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 127,892 kB
  • sloc: ansic: 2,358,479; python: 16,426; sh: 4,474; makefile: 1,713; awk: 70
file content (131 lines) | stat: -rw-r--r-- 4,640 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2025 Arm Limited

"""Runtime contexts."""

import functools
from collections.abc import Callable
from dataclasses import MISSING, dataclass, field, fields
from typing import TYPE_CHECKING, Any, Optional, ParamSpec, Union

from framework.exception import InternalError
from framework.remote_session.shell_pool import ShellPool
from framework.settings import SETTINGS
from framework.testbed_model.cpu import LogicalCoreCount, LogicalCoreList
from framework.testbed_model.node import Node
from framework.testbed_model.topology import Topology

if TYPE_CHECKING:
    from framework.remote_session.dpdk import DPDKBuildEnvironment, DPDKRuntimeEnvironment
    from framework.test_suite import TestCase, TestSuite
    from framework.testbed_model.capability import TestProtocol
    from framework.testbed_model.traffic_generator.traffic_generator import TrafficGenerator

P = ParamSpec("P")


@dataclass
class LocalContext:
    """Updatable context local to test suites and cases.

    Attributes:
        current_test_suite: The currently running test suite, if any.
        current_test_case: The currently running test case, if any.
        lcore_filter_specifier: A number of lcores/cores/sockets to use or a list of lcore ids to
            use. The default will select one lcore for each of two cores on one socket, in ascending
            order of core ids.
        ascending_cores: Sort cores in ascending order (lowest to highest IDs). If :data:`False`,
            sort in descending order.
        append_prefix_timestamp: If :data:`True`, will append a timestamp to DPDK file prefix.
        timeout: The timeout used for the SSH channel that is dedicated to this interactive
            shell. This timeout is for collecting output, so if reading from the buffer
            and no output is gathered within the timeout, an exception is thrown.
    """

    current_test_suite: Union["TestSuite", None] = None
    current_test_case: Union[type["TestCase"], None] = None
    lcore_filter_specifier: LogicalCoreCount | LogicalCoreList = field(
        default_factory=LogicalCoreCount
    )
    ascending_cores: bool = True
    append_prefix_timestamp: bool = True
    timeout: float = SETTINGS.timeout

    def reset(self) -> None:
        """Reset the local context to the default values."""
        for _field in fields(LocalContext):
            default = (
                _field.default_factory()
                if _field.default_factory is not MISSING
                else _field.default
            )

            assert (
                default is not MISSING
            ), "{LocalContext.__name__} must have defaults on all fields!"

            setattr(self, _field.name, default)


@dataclass(frozen=True)
class Context:
    """Runtime context."""

    sut_node: Node
    tg_node: Node
    topology: Topology
    dpdk_build: "DPDKBuildEnvironment"
    dpdk: "DPDKRuntimeEnvironment"
    func_tg: Optional["TrafficGenerator"]
    perf_tg: Optional["TrafficGenerator"]
    local: LocalContext = field(default_factory=LocalContext)
    shell_pool: ShellPool = field(default_factory=ShellPool)


__current_ctx: Context | None = None


def get_ctx() -> Context:
    """Retrieve the current runtime context.

    Raises:
        InternalError: If there is no context.
    """
    if __current_ctx:
        return __current_ctx

    raise InternalError("Attempted to retrieve context that has not been initialized yet.")


def init_ctx(ctx: Context) -> None:
    """Initialize context."""
    global __current_ctx
    __current_ctx = ctx


def filter_cores(
    specifier: LogicalCoreCount | LogicalCoreList, ascending_cores: bool | None = None
) -> Callable[[type["TestProtocol"]], Callable]:
    """Decorates functions that require a temporary update to the lcore specifier."""

    def decorator(func: type["TestProtocol"]) -> Callable:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> Any:
            local_ctx = get_ctx().local

            old_specifier = local_ctx.lcore_filter_specifier
            local_ctx.lcore_filter_specifier = specifier
            if ascending_cores is not None:
                old_ascending_cores = local_ctx.ascending_cores
                local_ctx.ascending_cores = ascending_cores

            try:
                return func(*args, **kwargs)
            finally:
                local_ctx.lcore_filter_specifier = old_specifier
                if ascending_cores is not None:
                    local_ctx.ascending_cores = old_ascending_cores

        return wrapper

    return decorator