1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import aiohttp
import time
from typing import Optional, Any, Dict, List
from urllib.parse import urljoin
from ._perf_stress_base import _PerfTestBase
from ._policies import PerfTestProxyPolicy
class BatchPerfTest(_PerfTestBase):
def __init__(self, arguments):
super().__init__(arguments)
self._session: Optional[aiohttp.ClientSession] = None
self._test_proxy: Optional[List[str]] = None
self._test_proxy_policy: Optional[PerfTestProxyPolicy] = None
self._client_kwargs: Dict[str, Any] = {}
self._recording_id: Optional[str] = None
if self.args.insecure:
# Disable SSL verification for SDK Client
self._client_kwargs["connection_verify"] = False
# Disable SSL verification for test proxy session
self._session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False))
# Suppress warnings
import warnings
from urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", InsecureRequestWarning)
else:
self._session = aiohttp.ClientSession()
if self.args.test_proxies:
# Add policy to redirect requests to the test proxy
self._test_proxy = self.args.test_proxies[self._parallel_index % len(self.args.test_proxies)]
self._test_proxy_policy = PerfTestProxyPolicy(self._test_proxy)
self._client_kwargs["per_retry_policies"] = [self._test_proxy_policy]
async def post_setup(self) -> None:
"""
Post-setup called once per parallel test instance.
Used by base classes to setup state (like test-proxy) after all derived class setup is complete.
"""
if self._test_proxy_policy:
# Make one call to run() before starting recording, to avoid capturing
# one-time setup like authorization requests.
if self.args.sync:
self.run_batch_sync()
else:
await self.run_batch_async()
await self._start_recording()
self._test_proxy_policy.recording_id = self._recording_id
self._test_proxy_policy.mode = "record"
# Record one call to run()
if self.args.sync:
self.run_batch_sync()
else:
await self.run_batch_async()
await self._stop_recording()
await self._start_playback()
self._test_proxy_policy.recording_id = self._recording_id
self._test_proxy_policy.mode = "playback"
async def pre_cleanup(self) -> None:
"""
Pre-cleanup called once per parallel test instance.
Used by base classes to cleanup state (like test-proxy) before all derived class cleanup runs.
"""
# cSpell:ignore inmemory
# Only stop playback if it was successfully started
if self._test_proxy_policy and self._test_proxy_policy.mode == "playback":
headers = {"x-recording-id": self._recording_id, "x-purge-inmemory-recording": "true"}
url = urljoin(self._test_proxy, "/playback/stop")
async with self._session.post(url, headers=headers) as resp:
assert resp.status == 200
# Stop redirecting requests to test proxy
self._test_proxy_policy.recording_id = None
self._test_proxy_policy.mode = None
async def close(self) -> None:
"""
Close any open client resources/connections per parallel test instance.
"""
await self._session.close()
async def _start_recording(self) -> None:
url = urljoin(self._test_proxy, "/record/start")
async with self._session.post(url) as resp:
assert resp.status == 200
self._recording_id = resp.headers["x-recording-id"]
async def _stop_recording(self) -> None:
headers = {"x-recording-id": self._recording_id}
url = urljoin(self._test_proxy, "/record/stop")
async with self._session.post(url, headers=headers) as resp:
assert resp.status == 200
async def _start_playback(self) -> None:
headers = {"x-recording-id": self._recording_id}
url = urljoin(self._test_proxy, "/playback/start")
async with self._session.post(url, headers=headers) as resp:
assert resp.status == 200
self._recording_id = resp.headers["x-recording-id"]
def run_batch_sync(self) -> int:
"""
Run cumulative operation(s) - i.e. an operation that results in more than a single logical result.
:returns: The number of completed results.
:rtype: int
"""
raise NotImplementedError("run_batch_sync must be implemented for {}".format(self.__class__.__name__))
async def run_batch_async(self) -> int:
"""
Run cumulative operation(s) - i.e. an operation that results in more than a single logical result.
:returns: The number of completed results.
:rtype: int
"""
raise NotImplementedError("run_batch_async must be implemented for {}".format(self.__class__.__name__))
def run_all_sync(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
"""
Run all sync tests, including both warmup and duration.
"""
self._completed_operations = 0
self._last_completion_time = 0.0
starttime = time.time()
if run_profiler:
try:
self._profile.enable()
self._completed_operations += self.run_batch_sync()
finally:
self._profile.disable()
self._last_completion_time = time.time() - starttime
self._save_profile("sync", output_path=self.args.profile_path)
self._print_profile_stats()
else:
while self._last_completion_time < duration:
self._completed_operations += self.run_batch_sync()
self._last_completion_time = time.time() - starttime
async def run_all_async(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
"""
Run all async tests, including both warmup and duration.
"""
self._completed_operations = 0
self._last_completion_time = 0.0
starttime = time.time()
if run_profiler:
try:
self._profile.enable()
self._completed_operations += await self.run_batch_async()
finally:
self._profile.disable()
self._last_completion_time = time.time() - starttime
self._save_profile("async", output_path=self.args.profile_path)
self._print_profile_stats()
else:
while self._last_completion_time < duration:
self._completed_operations += await self.run_batch_async()
self._last_completion_time = time.time() - starttime
|