1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import os
import dotenv
import time
import logging
from logging.handlers import RotatingFileHandler
from concurrent.futures import ThreadPoolExecutor
from azure.eventhub import EventHubProducerClient, EventData
logger = logging.getLogger("SEND_PERF_TEST")
logger.setLevel(logging.INFO)
logger.addHandler(RotatingFileHandler("send_perf_test.log"))
dotenv.load_dotenv()
CONN_STRS = [
os.environ["EVENT_HUB_CONN_STR_BASIC_NORTHEU"],
os.environ["EVENT_HUB_CONN_STR_STANDARD_NORTHEU"],
os.environ["EVENT_HUB_CONN_STR_BASIC_WESTUS2"],
os.environ["EVENT_HUB_CONN_STR_STANDARD_WESTUS2"],
]
EVENTHUB_NAME = "pyamqp"
SINGLE_EVENT_SIZE_LIST = [512]
PARALLEL_THREAD_COUNT_LIST = [1]
FIXED_AMOUNT_OF_EVENTS = 100_000
RUN_DURATION = 30
def pre_prepare_client(client, data):
client.create_batch() # precall to retrieve sender link settings
client.send_batch([EventData(data)]) # precall to set up the sender link
def send_batch_message(conn_str, eventhub_name, num_of_events, single_event_size, run_times=1, description=None):
client = EventHubProducerClient(conn_str=conn_str, eventhub_name=eventhub_name)
data = b"a" * single_event_size
perf_records = []
pre_prepare_client(client, data)
for _ in range(run_times): # run run_times and calculate the avg performance
start_time = time.time()
batch = client.create_batch()
for _ in range(num_of_events):
try:
batch.add(EventData(data))
except ValueError:
# Batch full
client.send_batch(batch)
batch = client.create_batch()
batch.add(EventData(data))
client.send_batch(batch)
end_time = time.time()
total_time = end_time - start_time
speed = num_of_events / total_time
perf_records.append(speed)
client.close()
avg_perf = round(sum(perf_records) / len(perf_records), 2)
logger.info(
"Method: {}, The average performance is {} events/s, throughput: {} bytes/s, run times: {}.\n"
"Configs are: Num of events: {} events, Single message size: {} bytes.".format(
description or "send_batch_message",
avg_perf,
avg_perf * single_event_size,
run_times,
num_of_events,
single_event_size,
)
)
return avg_perf
def send_batch_message_worker_thread(client, data, run_flag):
total_cnt = 0
while run_flag[0]:
batch = client.create_batch()
try:
while True:
event_data = EventData(body=data)
batch.add(event_data)
except ValueError:
client.send_batch(batch)
total_cnt += len(batch)
return total_cnt
def send_batch_message_in_parallel(
conn_str, eventhub_name, single_event_size, parallel_thread_count=4, run_times=1, run_duration=30, description=None
):
perf_records = []
for _ in range(run_times):
futures = []
clients = [
EventHubProducerClient(conn_str=conn_str, eventhub_name=eventhub_name) for _ in range(parallel_thread_count)
]
data = b"a" * single_event_size
for client in clients:
pre_prepare_client(client, data)
with ThreadPoolExecutor(max_workers=parallel_thread_count) as executor:
run_flag = [True]
for i in range(parallel_thread_count):
futures.append(executor.submit(send_batch_message_worker_thread, clients[i], data, run_flag))
time.sleep(run_duration)
run_flag[0] = False
perf_records.append(sum([future.result() for future in futures]) / run_duration)
for client in clients:
client.close()
avg_perf = round(sum(perf_records) / len(perf_records), 2)
logger.info(
"Method: {}, The average performance is {} events/s, throughput: {} bytes/s, run times: {}.\n"
"Configs are: Single message size: {} bytes, Parallel thread count: {} threads, Run duration: {} seconds.".format(
description or "send_batch_message_in_parallel",
avg_perf,
avg_perf * single_event_size,
run_times,
single_event_size,
parallel_thread_count,
run_duration,
)
)
if __name__ == "__main__":
logger.info("------------------- START OF TEST -------------------")
for conn_str in CONN_STRS:
for single_event_size in SINGLE_EVENT_SIZE_LIST:
print("------------------- sending fixed amount of message -------------------")
send_batch_message(
conn_str=conn_str,
eventhub_name=EVENTHUB_NAME,
num_of_events=FIXED_AMOUNT_OF_EVENTS,
single_event_size=single_event_size,
description="sending fixed amount message",
)
for parallel_thread_count in PARALLEL_THREAD_COUNT_LIST:
for single_event_size in SINGLE_EVENT_SIZE_LIST:
print("------------------- multiple threads sending messages for a fixed period -------------------")
send_batch_message_in_parallel(
conn_str=conn_str,
eventhub_name=EVENTHUB_NAME,
single_event_size=single_event_size,
parallel_thread_count=parallel_thread_count,
run_duration=RUN_DURATION,
description="multiple threads sending messages",
)
logger.info("------------------- END OF TEST -------------------")
|