File: _perf_stress_base.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (248 lines) | stat: -rw-r--r-- 8,228 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import abc
import multiprocessing
import argparse
import pstats
import cProfile
from typing import Optional


PSTATS_PRINT_DEFAULT_SORT_KEY = pstats.SortKey.TIME
PSTATS_PRINT_DEFAULT_LINE_COUNT = 36


class _PerfTestABC(abc.ABC):

    @property
    @abc.abstractmethod
    def completed_operations(self) -> int:
        """
        Total number of operations completed by run_all().
        Reset after warmup.
        """

    @property
    @abc.abstractmethod
    def last_completion_time(self) -> float:
        """
        Elapsed time between start of warmup/run and last completed operation.
        Reset after warmup.
        """

    @abc.abstractmethod
    async def global_setup(self) -> None:
        """
        Setup called once across all parallel test instances.
        Used to setup state that can be used by all test instances.
        """

    @abc.abstractmethod
    async def global_cleanup(self) -> None:
        """
        Cleanup called once across all parallel test instances.
        Used to cleanup state that can be used by all test instances.
        """

    @abc.abstractmethod
    async def setup(self) -> None:
        """
        Setup called once per parallel test instance.
        Used to setup state specific to this test instance.
        """

    @abc.abstractmethod
    async def cleanup(self) -> None:
        """
        Cleanup called once per parallel test instance.
        Used to cleanup state specific to this test instance.
        """

    @abc.abstractmethod
    async def post_setup(self) -> None:
        """
        Post-setup called once per parallel test instance.
        Used by base classes to setup state (like test-proxy) after all derived class setup is complete.
        """

    @abc.abstractmethod
    async def pre_cleanup(self) -> None:
        """
        Pre-cleanup called once per parallel test instance.
        Used by base classes to cleanup state (like test-proxy) before all derived class cleanup runs.
        """

    @abc.abstractmethod
    async def close(self) -> None:
        """
        Close any open client resources/connections per parallel test instance.
        """

    @abc.abstractmethod
    def run_all_sync(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
        """
        Run all sync tests, including both warmup and duration.
        """

    @abc.abstractmethod
    async def run_all_async(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
        """
        Run all async tests, including both warmup and duration.
        """

    @staticmethod
    @abc.abstractmethod
    def add_arguments(parser: argparse.ArgumentParser) -> None:
        """
        Add test class specific command line arguments.
        """

    @staticmethod
    @abc.abstractmethod
    def get_from_env(variable: str) -> str:
        """
        Get the value of an env var. If empty or not found, a ValueError will be raised.
        """


class _PerfTestBase(_PerfTestABC):
    """Base class for implementing a python perf test."""

    args: argparse.Namespace
    _global_parallel_index_lock = multiprocessing.Lock()
    _global_parallel_index: int = 0

    def __init__(self, arguments: argparse.Namespace):
        self.args = arguments
        self._completed_operations: int = 0
        self._last_completion_time: float = 0.0
        self._parallel_index: int = _PerfTestBase._global_parallel_index
        _PerfTestBase._global_parallel_index += 1
        self._profile: Optional[cProfile.Profile] = None
        if self.args.profile:
            self._profile = cProfile.Profile()

    @property
    def completed_operations(self) -> int:
        """
        Total number of operations completed by run_all().
        Reset after warmup.
        """
        return self._completed_operations

    @property
    def last_completion_time(self) -> float:
        """
        Elapsed time between start of warmup/run and last completed operation.
        Reset after warmup.
        """
        return self._last_completion_time

    async def global_setup(self) -> None:
        """
        Setup called once per process across all threaded test instances.
        Used to setup state that can be used by all test instances.
        """
        return

    async def global_cleanup(self) -> None:
        """
        Cleanup called once per process across all threaded test instances.
        Used to cleanup state that can be used by all test instances.
        """
        return

    async def setup(self) -> None:
        """
        Setup called once per parallel test instance.
        Used to setup state specific to this test instance.
        """
        return

    async def cleanup(self) -> None:
        """
        Cleanup called once per parallel test instance.
        Used to cleanup state specific to this test instance.
        """
        return

    async def post_setup(self) -> None:
        """
        Post-setup called once per parallel test instance.
        Used by base classes to setup state (like test-proxy) after all derived class setup is complete.
        """
        return

    async def pre_cleanup(self) -> None:
        """
        Pre-cleanup called once per parallel test instance.
        Used by base classes to cleanup state (like test-proxy) before all derived class cleanup runs.
        """
        return

    async def close(self):
        """
        Close any open client resources/connections per parallel test instance.
        """
        return

    def run_all_sync(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
        """
        Run all sync tests, including both warmup and duration.
        """
        raise NotImplementedError("run_all_sync must be implemented for {}".format(self.__class__.__name__))

    async def run_all_async(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
        """
        Run all async tests, including both warmup and duration.
        """
        raise NotImplementedError("run_all_async must be implemented for {}".format(self.__class__.__name__))

    @staticmethod
    def add_arguments(parser: argparse.ArgumentParser) -> None:
        """
        Add test class specific command line arguments.
        """
        return

    @staticmethod
    def get_from_env(variable: str) -> str:
        """
        Get the value of an env var. If empty or not found, a ValueError will be raised.
        """
        value = os.environ.get(variable)
        if not value:
            raise ValueError("Undefined environment variable {}".format(variable))
        return value

    def _save_profile(self, sync: str, output_path: Optional[str] = None) -> None:
        """
        Dump the profiler data to the file path specified. If no path is specified, use the current working directory.
        """
        if self._profile:
            profile_name = output_path or "{}/cProfile-{}-{}-{}.pstats".format(
                os.getcwd(), self.__class__.__name__, self._parallel_index, sync
            )
            print("Dumping profile data to {}".format(profile_name))
            self._profile.dump_stats(profile_name)
        else:
            print("No profile generated.")

    def _print_profile_stats(
        self, *, sort_key: pstats.SortKey = PSTATS_PRINT_DEFAULT_SORT_KEY, count: int = PSTATS_PRINT_DEFAULT_LINE_COUNT
    ) -> None:
        """Print the profile stats to stdout.

        A sort key can be specified to establish how stats should be sorted, and a line count can also be
        specified to limit the number of lines printed.
        """
        if self._profile:
            # Increase the precision of the pstats output
            pstats.f8 = lambda x: f"{x:8.5f}"

            stats = pstats.Stats(self._profile).sort_stats(sort_key)
            stats.print_stats(count)