File: _event_perf_test.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (147 lines) | stat: -rw-r--r-- 5,230 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import asyncio
import threading
import time

from ._repeated_timer import AtomicCounter
from ._perf_stress_base import _PerfTestBase


class EventPerfTest(_PerfTestBase):

    def __init__(self, arguments):
        super().__init__(arguments)
        if self.args.sync:
            self._condition = threading.Condition()
        else:
            self._condition = asyncio.Condition()
        self._start_time = time.time()
        self._error = None
        self._processing = None
        self._completed_operations = AtomicCounter()

    @property
    def completed_operations(self) -> int:
        """
        Total number of operations completed by run_all().
        Reset after warmup.
        """
        return self._completed_operations.value()

    @property
    def last_completion_time(self) -> float:
        """
        Elapsed time between start of warmup/run and last completed operation.
        Reset after warmup.
        """
        return self._last_completion_time - self._start_time

    def event_raised_sync(self):
        self._completed_operations.increment()
        self._last_completion_time = time.time()

    def error_raised_sync(self, error):
        with self._condition:
            self._error = error
            self._condition.notify_all()

    async def event_raised_async(self):
        self._completed_operations.increment()
        self._last_completion_time = time.time()

    async def error_raised_async(self, error):
        async with self._condition:
            self._error = error
            self._condition.notify_all()

    async def setup(self) -> None:
        """
        Setup called once per parallel test instance.
        Used to setup state specific to this test instance.
        """
        if self.args.sync:
            self._processing = threading.Thread(target=self.start_events_sync)
            self._processing.daemon = True
            self._processing.start()
        else:
            self._processing = asyncio.ensure_future(self.start_events_async())

    async def cleanup(self) -> None:
        """
        Cleanup called once per parallel test instance.
        Used to cleanup state specific to this test instance.
        """
        if self.args.sync:
            self.stop_events_sync()
            self._processing.join()
        else:
            await self.stop_events_async()
            await self._processing
        try:
            raise self._error
        except TypeError:
            pass

    def run_all_sync(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
        """
        Run all sync tests, including both warmup and duration.
        """
        with self._condition:
            self._completed_operations.reset()
            self._last_completion_time = 0.0
            self._start_time = time.time()
            if run_profiler:
                self._profile.enable()
            self._condition.wait(timeout=duration)
            if run_profiler:
                self._profile.disable()
                self._save_profile("sync", output_path=self.args.profile_path)
                self._print_profile_stats()

    async def run_all_async(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
        """
        Run all async tests, including both warmup and duration.
        """
        async with self._condition:
            self._completed_operations.reset()
            self._last_completion_time = 0.0
            self._start_time = time.time()
            if run_profiler:
                self._profile.enable()
            try:
                await asyncio.wait_for(self._condition.wait(), timeout=duration)
            except asyncio.TimeoutError:
                pass
            finally:
                if run_profiler:
                    self._profile.disable()
                    self._save_profile("async", output_path=self.args.profile_path)
                    self._print_profile_stats()

    def start_events_sync(self) -> None:
        """
        Start the process for receiving events.
        """
        raise NotImplementedError("start_events_sync must be implemented for {}".format(self.__class__.__name__))

    def stop_events_sync(self) -> None:
        """
        Stop the process for receiving events.
        """
        raise NotImplementedError("stop_events_sync must be implemented for {}".format(self.__class__.__name__))

    async def start_events_async(self) -> None:
        """
        Start the process for receiving events.
        """
        raise NotImplementedError("start_events_async must be implemented for {}".format(self.__class__.__name__))

    async def stop_events_async(self) -> None:
        """
        Stop the process for receiving events.
        """
        raise NotImplementedError("stop_events_async must be implemented for {}".format(self.__class__.__name__))