1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
|
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2023 PANTHEON.tech s.r.o.
# Copyright(c) 2025 Arm Limited
"""CPU core representation and filtering.
This module provides a unified representation of logical CPU cores along
with filtering capabilities.
When symmetric multiprocessing (SMP or multithreading) is enabled on a server,
the physical CPU cores are split into logical CPU cores with different IDs.
:class:`LogicalCoreCountFilter` filters by the number of logical cores. It's possible to specify
the socket from which to filter the number of logical cores. It's also possible to not use all
logical CPU cores from each physical core (e.g. only the first logical core of each physical core).
:class:`LogicalCoreListFilter` filters by logical core IDs. This mostly checks that
the logical cores are actually present on the server.
"""
import dataclasses
from abc import ABC, abstractmethod
from collections.abc import Iterable, ValuesView
from dataclasses import dataclass
from enum import auto, unique
from framework.utils import StrEnum, expand_range
@unique
class Architecture(StrEnum):
r"""The supported architectures of :class:`~framework.testbed_model.node.Node`\s."""
#:
i686 = auto()
#:
x86_64 = auto()
#:
x86_32 = auto()
#:
aarch64 = auto()
#:
ppc64le = auto()
@dataclass(slots=True, frozen=True)
class LogicalCore:
"""Representation of a logical CPU core.
A physical core is represented in OS by multiple logical cores (lcores)
if CPU multithreading is enabled. When multithreading is disabled, their IDs are the same.
Attributes:
lcore: The logical core ID of a CPU core. It's the same as `core` with
disabled multithreading.
core: The physical core ID of a CPU core.
socket: The physical socket ID where the CPU resides.
node: The NUMA node ID where the CPU resides.
"""
lcore: int
core: int
socket: int
node: int
def __int__(self) -> int:
"""The CPU is best represented by the logical core, as that's what we configure in EAL."""
return self.lcore
class LogicalCoreList:
r"""A unified way to store :class:`LogicalCore`\s.
Create a unified format used across the framework and allow the user to use
either a :class:`str` representation (using ``str(instance)`` or directly in f-strings)
or a :class:`list` representation (by accessing the `lcore_list` property,
which stores logical core IDs).
"""
_lcore_list: list[int]
_lcore_str: str
def __init__(self, lcore_list: list[int] | list[str] | list[LogicalCore] | str) -> None:
"""Process `lcore_list`, then sort.
There are four supported logical core list formats::
lcore_list = [LogicalCore1, LogicalCore2] # a list of LogicalCores
lcore_list = [0, 1, 2, 3] # a list of int indices
lcore_list = ["0", "1", "2-3"] # a list of str indices; ranges are supported
lcore_list = "0,1,2-3" # a comma delimited str of indices; ranges are supported
Args:
lcore_list: Various ways to represent multiple logical cores.
Empty `lcore_list` is allowed.
"""
self._lcore_list = []
if isinstance(lcore_list, str):
lcore_list = lcore_list.split(",")
for lcore in lcore_list:
if isinstance(lcore, str):
self._lcore_list.extend(expand_range(lcore))
else:
self._lcore_list.append(int(lcore))
# the input lcores may not be sorted
self._lcore_list.sort()
self._lcore_str = f'{",".join(self._get_consecutive_lcores_range(self._lcore_list))}'
@property
def lcore_list(self) -> list[int]:
"""The logical core IDs."""
return self._lcore_list
def _get_consecutive_lcores_range(self, lcore_ids_list: list[int]) -> list[str]:
formatted_core_list = []
segment = lcore_ids_list[:1]
for lcore_id in lcore_ids_list[1:]:
if lcore_id - segment[-1] == 1:
segment.append(lcore_id)
else:
formatted_core_list.append(
f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}"
)
current_core_index = lcore_ids_list.index(lcore_id)
formatted_core_list.extend(
self._get_consecutive_lcores_range(lcore_ids_list[current_core_index:])
)
segment.clear()
break
if len(segment) > 0:
formatted_core_list.append(
f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}"
)
return formatted_core_list
def __str__(self) -> str:
"""The consecutive ranges of logical core IDs."""
return self._lcore_str
@dataclasses.dataclass(slots=True, frozen=True)
class LogicalCoreCount(object):
"""Define the number of logical cores per physical cores per sockets."""
#: Use this many logical cores per each physical core.
lcores_per_core: int = 1
#: Use this many physical cores per each socket.
cores_per_socket: int = 2
#: Use this many sockets.
socket_count: int = 1
#: Use exactly these sockets. This takes precedence over `socket_count`,
#: so when `sockets` is not :data:`None`, `socket_count` is ignored.
sockets: list[int] | None = None
class LogicalCoreFilter(ABC):
"""Common filtering class.
Each filter needs to be implemented in a subclass. This base class sorts the list of cores
and defines the filtering method, which must be implemented by subclasses.
"""
_filter_specifier: LogicalCoreCount | LogicalCoreList
_lcores_to_filter: list[LogicalCore]
def __init__(
self,
lcore_list: list[LogicalCore],
filter_specifier: LogicalCoreCount | LogicalCoreList,
ascending: bool = True,
) -> None:
"""Filter according to the input filter specifier.
The input `lcore_list` is copied and sorted by physical core before filtering.
The list is copied so that the original is left intact.
Args:
lcore_list: The logical CPU cores to filter.
filter_specifier: Filter cores from `lcore_list` according to this filter.
ascending: Sort cores in ascending order (lowest to highest IDs). If data:`False`,
sort in descending order.
"""
self._filter_specifier = filter_specifier
# sorting by core is needed in case hyperthreading is enabled
self._lcores_to_filter = sorted(lcore_list, key=lambda x: x.core, reverse=not ascending)
@abstractmethod
def filter(self) -> list[LogicalCore]:
r"""Filter the cores.
Use `self._filter_specifier` to filter `self._lcores_to_filter` and return
the filtered :class:`LogicalCore`\s.
`self._lcores_to_filter` is a sorted copy of the original list, so it may be modified.
Returns:
The filtered cores.
"""
class LogicalCoreCountFilter(LogicalCoreFilter):
"""Filter cores by specified counts.
Filter the input list of LogicalCores according to specified rules:
* The input `filter_specifier` is :class:`LogicalCoreCount`,
* Use cores from the specified number of sockets or from the specified socket ids,
* If `sockets` is specified, it takes precedence over `socket_count`,
* From each of those sockets, use only `cores_per_socket` of cores,
* And for each core, use `lcores_per_core` of logical cores. Hypertheading
must be enabled for this to take effect.
"""
_filter_specifier: LogicalCoreCount
def filter(self) -> list[LogicalCore]:
"""Filter the cores according to :class:`LogicalCoreCount`.
Start by filtering the allowed sockets. The cores matching the allowed sockets are returned.
The cores of each socket are stored in separate lists.
Then filter the allowed physical cores from those lists of cores per socket. When filtering
physical cores, store the desired number of logical cores per physical core which then
together constitute the final filtered list.
Returns:
The filtered cores.
"""
if 0 in self._lcores_to_filter:
self._lcores_to_filter = self._lcores_to_filter[1:]
sockets_to_filter = self._filter_sockets(self._lcores_to_filter)
filtered_lcores = []
for socket_to_filter in sockets_to_filter:
filtered_lcores.extend(self._filter_cores_from_socket(socket_to_filter))
return filtered_lcores
def _filter_sockets(
self, lcores_to_filter: Iterable[LogicalCore]
) -> ValuesView[list[LogicalCore]]:
"""Filter a list of cores per each allowed socket.
The sockets may be specified in two ways, either a number or a specific list of sockets.
In case of a specific list, we just need to return the cores from those sockets.
If filtering a number of cores, we need to go through all cores and note which sockets
appear and only filter from the first n that appear.
Args:
lcores_to_filter: The cores to filter. These must be sorted by the physical core.
Returns:
A list of lists of logical CPU cores. Each list contains cores from one socket.
Raises:
ValueError: If the number of the requested sockets by the filter can't be satisfied.
"""
allowed_sockets: set[int] = set()
socket_count = self._filter_specifier.socket_count
if self._filter_specifier.sockets:
# when sockets in filter is specified, the sockets are already set
socket_count = len(self._filter_specifier.sockets)
allowed_sockets = set(self._filter_specifier.sockets)
# filter socket_count sockets from all sockets by checking the socket of each CPU
filtered_lcores: dict[int, list[LogicalCore]] = {}
for lcore in lcores_to_filter:
if not self._filter_specifier.sockets:
# this is when sockets is not set, so we do the actual filtering
# when it is set, allowed_sockets is already defined and can't be changed
if len(allowed_sockets) < socket_count:
# allowed_sockets is a set, so adding an existing socket won't re-add it
allowed_sockets.add(lcore.socket)
if lcore.socket in allowed_sockets:
# separate lcores into sockets; this makes it easier in further processing
if lcore.socket in filtered_lcores:
filtered_lcores[lcore.socket].append(lcore)
else:
filtered_lcores[lcore.socket] = [lcore]
if len(allowed_sockets) < socket_count:
raise ValueError(
f"The actual number of sockets from which to use cores "
f"({len(allowed_sockets)}) is lower than required ({socket_count})."
)
return filtered_lcores.values()
def _filter_cores_from_socket(
self, lcores_to_filter: Iterable[LogicalCore]
) -> list[LogicalCore]:
"""Filter a list of cores from the given socket.
Go through the cores and note how many logical cores per physical core have been filtered.
Returns:
The filtered logical CPU cores.
Raises:
ValueError: If the number of the requested cores per socket by the filter
can't be satisfied.
"""
# no need to use ordered dict, from Python3.7 the dict
# insertion order is preserved (LIFO).
lcore_count_per_core_map: dict[int, int] = {}
filtered_lcores = []
for lcore in lcores_to_filter:
if lcore.core in lcore_count_per_core_map:
current_core_lcore_count = lcore_count_per_core_map[lcore.core]
if self._filter_specifier.lcores_per_core > current_core_lcore_count:
# only add lcores of the given core
lcore_count_per_core_map[lcore.core] += 1
filtered_lcores.append(lcore)
else:
# we have enough lcores per this core
continue
elif self._filter_specifier.cores_per_socket > len(lcore_count_per_core_map):
# only add cores if we need more
lcore_count_per_core_map[lcore.core] = 1
filtered_lcores.append(lcore)
else:
# we have enough cores
break
cores_per_socket = len(lcore_count_per_core_map)
if cores_per_socket < self._filter_specifier.cores_per_socket:
raise ValueError(
f"The actual number of cores per socket ({cores_per_socket}) "
f"is lower than required ({self._filter_specifier.cores_per_socket})."
)
lcores_per_core = lcore_count_per_core_map[filtered_lcores[-1].core]
if lcores_per_core < self._filter_specifier.lcores_per_core:
raise ValueError(
f"The actual number of logical cores per core ({lcores_per_core}) "
f"is lower than required ({self._filter_specifier.lcores_per_core})."
)
return filtered_lcores
class LogicalCoreListFilter(LogicalCoreFilter):
"""Filter the logical CPU cores by logical CPU core IDs.
This is a simple filter that looks at logical CPU IDs and only filter those that match.
The input filter is :class:`LogicalCoreList`. An empty LogicalCoreList won't filter anything.
"""
_filter_specifier: LogicalCoreList
def filter(self) -> list[LogicalCore]:
"""Filter based on logical CPU core ID.
Return:
The filtered logical CPU cores.
Raises:
ValueError: If the specified lcore filter specifier is invalid.
"""
if 0 not in self._filter_specifier.lcore_list:
self._lcores_to_filter = self._lcores_to_filter[1:]
if not len(self._filter_specifier.lcore_list):
return self._lcores_to_filter
filtered_lcores = []
for core in self._lcores_to_filter:
if core.lcore in self._filter_specifier.lcore_list:
filtered_lcores.append(core)
if len(filtered_lcores) != len(self._filter_specifier.lcore_list):
raise ValueError(
f"Not all logical cores from {self._filter_specifier.lcore_list} "
f"were found among {self._lcores_to_filter}"
)
return filtered_lcores
def lcore_filter(
core_list: list[LogicalCore],
filter_specifier: LogicalCoreCount | LogicalCoreList,
ascending: bool,
) -> LogicalCoreFilter:
"""Factory for providing the filter that corresponds to `filter_specifier`.
Args:
core_list: The logical CPU cores to filter.
filter_specifier: The filter to use.
ascending: Sort cores in ascending order (lowest to highest IDs). If :data:`False`,
sort in descending order.
Returns:
The filter that corresponds to `filter_specifier`.
Raises:
ValueError: If the supplied `filter_specifier` is invalid.
"""
if isinstance(filter_specifier, LogicalCoreList):
return LogicalCoreListFilter(core_list, filter_specifier, ascending)
elif isinstance(filter_specifier, LogicalCoreCount):
return LogicalCoreCountFilter(core_list, filter_specifier, ascending)
else:
raise ValueError(f"Unsupported filter r{filter_specifier}")
|