1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import asyncio
import threading
import time
from ._repeated_timer import AtomicCounter
from ._perf_stress_base import _PerfTestBase
class EventPerfTest(_PerfTestBase):
def __init__(self, arguments):
super().__init__(arguments)
if self.args.sync:
self._condition = threading.Condition()
else:
self._condition = asyncio.Condition()
self._start_time = time.time()
self._error = None
self._processing = None
self._completed_operations = AtomicCounter()
@property
def completed_operations(self) -> int:
"""
Total number of operations completed by run_all().
Reset after warmup.
"""
return self._completed_operations.value()
@property
def last_completion_time(self) -> float:
"""
Elapsed time between start of warmup/run and last completed operation.
Reset after warmup.
"""
return self._last_completion_time - self._start_time
def event_raised_sync(self):
self._completed_operations.increment()
self._last_completion_time = time.time()
def error_raised_sync(self, error):
with self._condition:
self._error = error
self._condition.notify_all()
async def event_raised_async(self):
self._completed_operations.increment()
self._last_completion_time = time.time()
async def error_raised_async(self, error):
async with self._condition:
self._error = error
self._condition.notify_all()
async def setup(self) -> None:
"""
Setup called once per parallel test instance.
Used to setup state specific to this test instance.
"""
if self.args.sync:
self._processing = threading.Thread(target=self.start_events_sync)
self._processing.daemon = True
self._processing.start()
else:
self._processing = asyncio.ensure_future(self.start_events_async())
async def cleanup(self) -> None:
"""
Cleanup called once per parallel test instance.
Used to cleanup state specific to this test instance.
"""
if self.args.sync:
self.stop_events_sync()
self._processing.join()
else:
await self.stop_events_async()
await self._processing
try:
raise self._error
except TypeError:
pass
def run_all_sync(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
"""
Run all sync tests, including both warmup and duration.
"""
with self._condition:
self._completed_operations.reset()
self._last_completion_time = 0.0
self._start_time = time.time()
if run_profiler:
self._profile.enable()
self._condition.wait(timeout=duration)
if run_profiler:
self._profile.disable()
self._save_profile("sync", output_path=self.args.profile_path)
self._print_profile_stats()
async def run_all_async(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
"""
Run all async tests, including both warmup and duration.
"""
async with self._condition:
self._completed_operations.reset()
self._last_completion_time = 0.0
self._start_time = time.time()
if run_profiler:
self._profile.enable()
try:
await asyncio.wait_for(self._condition.wait(), timeout=duration)
except asyncio.TimeoutError:
pass
finally:
if run_profiler:
self._profile.disable()
self._save_profile("async", output_path=self.args.profile_path)
self._print_profile_stats()
def start_events_sync(self) -> None:
"""
Start the process for receiving events.
"""
raise NotImplementedError("start_events_sync must be implemented for {}".format(self.__class__.__name__))
def stop_events_sync(self) -> None:
"""
Stop the process for receiving events.
"""
raise NotImplementedError("stop_events_sync must be implemented for {}".format(self.__class__.__name__))
async def start_events_async(self) -> None:
"""
Start the process for receiving events.
"""
raise NotImplementedError("start_events_async must be implemented for {}".format(self.__class__.__name__))
async def stop_events_async(self) -> None:
"""
Stop the process for receiving events.
"""
raise NotImplementedError("stop_events_async must be implemented for {}".format(self.__class__.__name__))
|