File: shell_pool.py

package info (click to toggle)
dpdk 25.11-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 127,892 kB
  • sloc: ansic: 2,358,479; python: 16,426; sh: 4,474; makefile: 1,713; awk: 70
file content (106 lines) | stat: -rw-r--r-- 4,236 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2025 Arm Limited

"""Module defining the shell pool class.

The shell pool is used by the test run state machine to control
the shells spawned by the test suites. Each layer of execution can
stash the current pool and create a new layer of shells by calling `start_new_pool`.
You can go back to the previous pool by calling `terminate_current_pool`. These layers are
identified in the pool as levels where the higher the number, the deeper the layer of execution.
As an example layers of execution could be: test run, test suite and test case.
Which could appropriately identified with level numbers 0, 1 and 2 respectively.

The shell pool layers are implemented as a stack. Therefore, when creating a new pool, this is
pushed on top of the stack. Similarly, terminating the current pool also means removing the one
at the top of the stack.
"""

from typing import TYPE_CHECKING

from framework.logger import DTSLogger, get_dts_logger

if TYPE_CHECKING:
    from framework.remote_session.interactive_shell import (
        InteractiveShell,
    )


class ShellPool:
    """A pool managing active shells."""

    _logger: DTSLogger
    _pools: list[set["InteractiveShell"]]

    def __init__(self) -> None:
        """Shell pool constructor."""
        self._logger = get_dts_logger("shell_pool")
        self._pools = [set()]

    @property
    def pool_level(self) -> int:
        """The current level of shell pool.

        The higher level, the deeper we are in the execution state.
        """
        return len(self._pools) - 1

    @property
    def _current_pool(self) -> set["InteractiveShell"]:
        """The pool in use for the current scope."""
        return self._pools[-1]

    def register_shell(self, shell: "InteractiveShell") -> None:
        """Register a new shell to the current pool."""
        self._logger.debug(f"Registering shell {shell} to pool level {self.pool_level}.")
        self._current_pool.add(shell)

    def unregister_shell(self, shell: "InteractiveShell") -> None:
        """Unregister a shell from any pool."""
        for level, pool in enumerate(self._pools):
            try:
                pool.remove(shell)
                if pool == self._current_pool:
                    self._logger.debug(
                        f"Unregistering shell {shell} from pool level {self.pool_level}."
                    )
                else:
                    self._logger.debug(
                        f"Unregistering shell {shell} from pool level {level}, "
                        f"but we currently are in level {self.pool_level}. Is this expected?"
                    )
            except KeyError:
                pass

    def start_new_pool(self) -> None:
        """Start a new shell pool."""
        self._logger.debug(f"Starting new shell pool and advancing to level {self.pool_level+1}.")
        self._pools.append(set())

    def terminate_current_pool(self) -> None:
        """Terminate all the shells in the current pool, and restore the previous pool if any.

        If any failure occurs while closing any shell, this is tolerated allowing the termination
        to continue until the current pool is empty and removed. But this function will re-raise the
        last occurred exception back to the caller.
        """
        occurred_exception = None
        current_pool_level = self.pool_level
        self._logger.debug(f"Terminating shell pool level {current_pool_level}.")
        for shell in self._pools.pop():
            self._logger.debug(f"Closing shell {shell} in shell pool level {current_pool_level}.")
            try:
                shell.close()
            except Exception as e:
                self._logger.error(f"An exception has occurred while closing shell {shell}:")
                self._logger.exception(e)
                occurred_exception = e

        if current_pool_level == 0:
            self.start_new_pool()
        else:
            self._logger.debug(f"Restoring shell pool from level {self.pool_level}.")

        # Raise the last occurred exception again to let the system register a failure.
        if occurred_exception is not None:
            raise occurred_exception