File: async_send.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (181 lines) | stat: -rw-r--r-- 6,276 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import asyncio
import os
import dotenv
import time
import logging
from logging.handlers import RotatingFileHandler

from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

logger = logging.getLogger("ASYNC_SEND_PERF_TEST")
logger.setLevel(logging.INFO)
logger.addHandler(RotatingFileHandler("async_send_perf_test.log"))

dotenv.load_dotenv()
CONN_STRS = [
    os.environ["EVENT_HUB_CONN_STR_BASIC_NORTHEU"],
    os.environ["EVENT_HUB_CONN_STR_STANDARD_NORTHEU"],
    os.environ["EVENT_HUB_CONN_STR_BASIC_WESTUS2"],
    os.environ["EVENT_HUB_CONN_STR_STANDARD_WESTUS2"],
]
EVENTHUB_NAME = "pyamqp"

SINGLE_EVENT_SIZE_LIST = [512]
PARALLEL_COROUTINE_COUNT_LIST = [1]
FIXED_AMOUNT_OF_EVENTS = 100_000
RUN_DURATION = 30


async def pre_prepare_client(client, data):
    await client.create_batch()  # precall to retrieve sender link settings
    await client.send_batch([EventData(data)])  # precall to set up the sender link


async def send_batch_message(conn_str, eventhub_name, num_of_events, single_event_size, run_times=1, description=None):

    client = EventHubProducerClient(conn_str=conn_str, eventhub_name=eventhub_name)

    data = b"a" * single_event_size
    perf_records = []
    await pre_prepare_client(client, data)

    for _ in range(run_times):  # run run_times and calculate the avg performance
        start_time = time.time()
        batch = await client.create_batch()
        for _ in range(num_of_events):
            try:
                batch.add(EventData(data))
            except ValueError:
                # Batch full
                await client.send_batch(batch)
                batch = await client.create_batch()
                batch.add(EventData(data))
        await client.send_batch(batch)

        end_time = time.time()

        total_time = end_time - start_time
        speed = num_of_events / total_time
        perf_records.append(speed)

    await client.close()
    avg_perf = round(sum(perf_records) / len(perf_records), 2)
    logger.info(
        "Method: {}, The average performance is {} events/s, throughput: {} bytes/s, run times: {}.\n"
        "Configs are: Num of events: {} events, Single message size: {} bytes.".format(
            description or "send_batch_message",
            avg_perf,
            avg_perf * single_event_size,
            run_times,
            num_of_events,
            single_event_size,
        )
    )
    return avg_perf


async def send_batch_message_worker_coroutine(client, data, run_flag):
    total_cnt = 0
    while run_flag[0]:
        batch = await client.create_batch()
        try:
            while True:
                event_data = EventData(body=data)
                batch.add(event_data)
        except ValueError:
            await client.send_batch(batch)
            total_cnt += len(batch)
    return total_cnt


async def send_batch_message_in_parallel(
    conn_str,
    eventhub_name,
    single_event_size,
    parallel_coroutine_count=4,
    run_times=1,
    run_duration=30,
    description=None,
):

    perf_records = []

    for _ in range(run_times):

        futures = []
        clients = [
            EventHubProducerClient(conn_str=conn_str, eventhub_name=eventhub_name)
            for _ in range(parallel_coroutine_count)
        ]

        data = b"a" * single_event_size

        for client in clients:
            await pre_prepare_client(client, data)

        run_flag = [True]
        for i in range(parallel_coroutine_count):
            futures.append(asyncio.create_task(send_batch_message_worker_coroutine(clients[i], data, run_flag)))

        await asyncio.sleep(run_duration)
        run_flag[0] = False
        await asyncio.gather(*futures)
        perf_records.append(sum([future.result() for future in futures]) / run_duration)

        for client in clients:
            await client.close()

    avg_perf = round(sum(perf_records) / len(perf_records), 2)

    logger.info(
        "Method: {}, The average performance is {} events/s, throughput: {} bytes/s, run times: {}.\n"
        "Configs are: Single message size: {} bytes, Parallel thread count: {} threads, Run duration: {} seconds.".format(
            description or "send_batch_message_in_parallel",
            avg_perf,
            avg_perf * single_event_size,
            run_times,
            single_event_size,
            parallel_coroutine_count,
            run_duration,
        )
    )


if __name__ == "__main__":
    logger.info("------------------- START OF TEST -------------------")

    for conn_str in CONN_STRS:
        for single_event_size in SINGLE_EVENT_SIZE_LIST:
            print("-------------------  sending fixed amount of message  -------------------")
            asyncio.run(
                send_batch_message(
                    conn_str=conn_str,
                    eventhub_name=EVENTHUB_NAME,
                    num_of_events=FIXED_AMOUNT_OF_EVENTS,
                    single_event_size=single_event_size,
                    description="sending fixed amount message",
                )
            )

        for parallel_coroutine_count in PARALLEL_COROUTINE_COUNT_LIST:
            for single_event_size in SINGLE_EVENT_SIZE_LIST:
                print(
                    "-------------------  multiple coroutines sending messages for a fixed period -------------------"
                )
                asyncio.run(
                    send_batch_message_in_parallel(
                        conn_str=conn_str,
                        eventhub_name=EVENTHUB_NAME,
                        single_event_size=single_event_size,
                        parallel_coroutine_count=parallel_coroutine_count,
                        run_duration=RUN_DURATION,
                        description="multiple coroutine sending messages",
                    )
                )

    logger.info("------------------- END OF TEST -------------------")