1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import asyncio
import os
import dotenv
import time
import logging
from logging.handlers import RotatingFileHandler
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
logger = logging.getLogger("ASYNC_SEND_PERF_TEST")
logger.setLevel(logging.INFO)
logger.addHandler(RotatingFileHandler("async_send_perf_test.log"))
dotenv.load_dotenv()
CONN_STRS = [
os.environ["EVENT_HUB_CONN_STR_BASIC_NORTHEU"],
os.environ["EVENT_HUB_CONN_STR_STANDARD_NORTHEU"],
os.environ["EVENT_HUB_CONN_STR_BASIC_WESTUS2"],
os.environ["EVENT_HUB_CONN_STR_STANDARD_WESTUS2"],
]
EVENTHUB_NAME = "pyamqp"
SINGLE_EVENT_SIZE_LIST = [512]
PARALLEL_COROUTINE_COUNT_LIST = [1]
FIXED_AMOUNT_OF_EVENTS = 100_000
RUN_DURATION = 30
async def pre_prepare_client(client, data):
await client.create_batch() # precall to retrieve sender link settings
await client.send_batch([EventData(data)]) # precall to set up the sender link
async def send_batch_message(conn_str, eventhub_name, num_of_events, single_event_size, run_times=1, description=None):
client = EventHubProducerClient(conn_str=conn_str, eventhub_name=eventhub_name)
data = b"a" * single_event_size
perf_records = []
await pre_prepare_client(client, data)
for _ in range(run_times): # run run_times and calculate the avg performance
start_time = time.time()
batch = await client.create_batch()
for _ in range(num_of_events):
try:
batch.add(EventData(data))
except ValueError:
# Batch full
await client.send_batch(batch)
batch = await client.create_batch()
batch.add(EventData(data))
await client.send_batch(batch)
end_time = time.time()
total_time = end_time - start_time
speed = num_of_events / total_time
perf_records.append(speed)
await client.close()
avg_perf = round(sum(perf_records) / len(perf_records), 2)
logger.info(
"Method: {}, The average performance is {} events/s, throughput: {} bytes/s, run times: {}.\n"
"Configs are: Num of events: {} events, Single message size: {} bytes.".format(
description or "send_batch_message",
avg_perf,
avg_perf * single_event_size,
run_times,
num_of_events,
single_event_size,
)
)
return avg_perf
async def send_batch_message_worker_coroutine(client, data, run_flag):
total_cnt = 0
while run_flag[0]:
batch = await client.create_batch()
try:
while True:
event_data = EventData(body=data)
batch.add(event_data)
except ValueError:
await client.send_batch(batch)
total_cnt += len(batch)
return total_cnt
async def send_batch_message_in_parallel(
conn_str,
eventhub_name,
single_event_size,
parallel_coroutine_count=4,
run_times=1,
run_duration=30,
description=None,
):
perf_records = []
for _ in range(run_times):
futures = []
clients = [
EventHubProducerClient(conn_str=conn_str, eventhub_name=eventhub_name)
for _ in range(parallel_coroutine_count)
]
data = b"a" * single_event_size
for client in clients:
await pre_prepare_client(client, data)
run_flag = [True]
for i in range(parallel_coroutine_count):
futures.append(asyncio.create_task(send_batch_message_worker_coroutine(clients[i], data, run_flag)))
await asyncio.sleep(run_duration)
run_flag[0] = False
await asyncio.gather(*futures)
perf_records.append(sum([future.result() for future in futures]) / run_duration)
for client in clients:
await client.close()
avg_perf = round(sum(perf_records) / len(perf_records), 2)
logger.info(
"Method: {}, The average performance is {} events/s, throughput: {} bytes/s, run times: {}.\n"
"Configs are: Single message size: {} bytes, Parallel thread count: {} threads, Run duration: {} seconds.".format(
description or "send_batch_message_in_parallel",
avg_perf,
avg_perf * single_event_size,
run_times,
single_event_size,
parallel_coroutine_count,
run_duration,
)
)
if __name__ == "__main__":
logger.info("------------------- START OF TEST -------------------")
for conn_str in CONN_STRS:
for single_event_size in SINGLE_EVENT_SIZE_LIST:
print("------------------- sending fixed amount of message -------------------")
asyncio.run(
send_batch_message(
conn_str=conn_str,
eventhub_name=EVENTHUB_NAME,
num_of_events=FIXED_AMOUNT_OF_EVENTS,
single_event_size=single_event_size,
description="sending fixed amount message",
)
)
for parallel_coroutine_count in PARALLEL_COROUTINE_COUNT_LIST:
for single_event_size in SINGLE_EVENT_SIZE_LIST:
print(
"------------------- multiple coroutines sending messages for a fixed period -------------------"
)
asyncio.run(
send_batch_message_in_parallel(
conn_str=conn_str,
eventhub_name=EVENTHUB_NAME,
single_event_size=single_event_size,
parallel_coroutine_count=parallel_coroutine_count,
run_duration=RUN_DURATION,
description="multiple coroutine sending messages",
)
)
logger.info("------------------- END OF TEST -------------------")
|