File: _perf_stress_proc.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (155 lines) | stat: -rw-r--r-- 7,133 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import asyncio
import time
import queue
from typing import List
import multiprocessing
import threading
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import importlib

from ._perf_stress_base import _PerfTestABC, _PerfTestBase


def run_process(index, args, module, test_name, num_tests, test_stages, results, status):
    """The child process main function.

    Here we load the test class from the correct module and start it.
    """
    test_module = importlib.import_module(module)
    test_class = getattr(test_module, test_name)
    value = asyncio.run(_start_tests(index, test_class, num_tests, args, test_stages, results, status))
    return value


def _synchronize(stages, ignore_error=False):
    """Synchronize all processes by waiting on the barrier.

    Optionally we can also ignore a broken barrier during the cleanup stages
    so that if some processes have failed, the others can still complete cleanup.
    """
    try:
        stages.wait()
    except threading.BrokenBarrierError:
        if not ignore_error:
            raise


async def _start_tests(index, test_class, num_tests, args, test_stages, results, status):
    """Create test classes, run setup, tests and cleanup."""
    # Create all parallel tests with a global unique index value
    tests = []

    try:
        with _PerfTestBase._global_parallel_index_lock:
            _PerfTestBase._global_parallel_index = index
            tests = [test_class(args) for _ in range(num_tests)]

        # Run the global setup once per process.
        await tests[0].global_setup()

        # Waiting till all processes are ready to start "Setup". This allows each child
        # process to setup any global resources before the rest of setup is run.
        _synchronize(test_stages["Setup"])
        await asyncio.gather(*[test.setup() for test in tests])

        # Waiting till all processes are ready to start "Post Setup"
        _synchronize(test_stages["Post Setup"])
        await asyncio.gather(*[test.post_setup() for test in tests])

        if args.warmup:
            # Waiting till all processes are ready to start "Warmup"
            _synchronize(test_stages["Warmup"])
            await _run_tests(args.warmup, args, tests, results, status, with_profiler=False)

        # Waiting till all processes are ready to start "Tests"
        _synchronize(test_stages["Tests"])
        await _run_tests(args.duration, args, tests, results, status, with_profiler=args.profile)

        # Waiting till all processes have finished tests, ready to start "Pre Cleanup"
        _synchronize(test_stages["Pre Cleanup"])
    except threading.BrokenBarrierError:
        # A separate process has failed, so all of them are shutting down.
        print("Another test process has aborted - shutting down.")
    except Exception as e:
        print(f"Test processes failed - aborting. Error: {e}")
        for barrier in test_stages.values():
            barrier.abort()
    finally:
        try:
            # We'll attempt to clean up the tests using the barrier.
            # This may fail if the tests are already in an unrecoverable error state.
            # If one process has failed, we'll still attempt to clean up without the barrier.
            await asyncio.gather(*[test.pre_cleanup() for test in tests])
            if not args.no_cleanup:
                # Waiting till all processes are ready to start "Cleanup"
                # If any process has failed earlier, the barrier will be broken - so wait
                # if we can but otherwise attempt to clean up anyway.
                _synchronize(test_stages["Cleanup"], ignore_error=True)
                await asyncio.gather(*[test.cleanup() for test in tests])

            # Waiting till all processes have completed the cleanup stages.
            _synchronize(test_stages["Finished"], ignore_error=True)
            if not args.no_cleanup:
                # Run global cleanup once per process.
                await tests[0].global_cleanup()
        except Exception as e:
            # Tests were unable to clean up, maybe due to earlier failure state.
            print(f"Failed to cleanup up tests: {e}")
        finally:
            # Always call close on the tests, even if cleanup failed.
            try:
                await asyncio.gather(*[test.close() for test in tests])
            except Exception as e:
                print(f"Failed to close tests: {e}")


async def _run_tests(duration: int, args, tests, results, status, *, with_profiler: bool = False) -> None:
    """Run the listed tests either in parallel asynchronously or in a thread pool."""
    # Kick of a status monitoring thread.
    stop_status = threading.Event()
    status_thread = threading.Thread(target=_report_status, args=(status, tests, stop_status), daemon=True)
    status_thread.start()

    try:
        if args.sync:
            with ThreadPoolExecutor(max_workers=args.parallel) as ex:
                tasks = [ex.submit(test.run_all_sync, duration, run_profiler=with_profiler) for test in tests]
                wait(tasks, return_when=ALL_COMPLETED)
        else:
            tasks = [asyncio.create_task(test.run_all_async(duration, run_profiler=with_profiler)) for test in tests]
            await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

        # If any of the parallel test runs raised an exception, let it be propagated, after all tasks have
        # completed.
        # TODO: This will only raise the first Exception encountered. Once we migrate the perf pipelines
        # to 3.11 we could refactor to use ExceptionGroups so all exceptions will be captured.
        for task in tasks:
            task.result()

        # Add final test results to the results queue to be accumulated by the parent process.
        for test in tests:
            results.put((test._parallel_index, test.completed_operations, test.last_completion_time))
    finally:
        # Clean up status reporting thread.
        stop_status.set()
        status_thread.join()


def _report_status(status: multiprocessing.JoinableQueue, tests: List[_PerfTestABC], stop: threading.Event):
    """Report ongoing status of running tests.

    This is achieved by adding status to a joinable queue then waiting for that queue to be cleared
    by the parent processes. This should implicitly synchronize the status reporting across all child
    processes and the parent will dictate the frequency by which status is gathered.
    """
    # Delay the start a tiny bit to let the tests reset their status after warmup
    time.sleep(1)
    while not stop.is_set():
        for test in tests:
            status.put((test._parallel_index, test.completed_operations, test.last_completion_time))
        status.join()