File: sync_send.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (168 lines) | stat: -rw-r--r-- 5,975 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import dotenv
import time
import logging
from logging.handlers import RotatingFileHandler
from concurrent.futures import ThreadPoolExecutor

from azure.eventhub import EventHubProducerClient, EventData

logger = logging.getLogger("SEND_PERF_TEST")
logger.setLevel(logging.INFO)
logger.addHandler(RotatingFileHandler("send_perf_test.log"))

dotenv.load_dotenv()
CONN_STRS = [
    os.environ["EVENT_HUB_CONN_STR_BASIC_NORTHEU"],
    os.environ["EVENT_HUB_CONN_STR_STANDARD_NORTHEU"],
    os.environ["EVENT_HUB_CONN_STR_BASIC_WESTUS2"],
    os.environ["EVENT_HUB_CONN_STR_STANDARD_WESTUS2"],
]
EVENTHUB_NAME = "pyamqp"

SINGLE_EVENT_SIZE_LIST = [512]
PARALLEL_THREAD_COUNT_LIST = [1]
FIXED_AMOUNT_OF_EVENTS = 100_000
RUN_DURATION = 30


def pre_prepare_client(client, data):
    client.create_batch()  # precall to retrieve sender link settings
    client.send_batch([EventData(data)])  # precall to set up the sender link


def send_batch_message(conn_str, eventhub_name, num_of_events, single_event_size, run_times=1, description=None):

    client = EventHubProducerClient(conn_str=conn_str, eventhub_name=eventhub_name)

    data = b"a" * single_event_size
    perf_records = []
    pre_prepare_client(client, data)

    for _ in range(run_times):  # run run_times and calculate the avg performance
        start_time = time.time()
        batch = client.create_batch()
        for _ in range(num_of_events):
            try:
                batch.add(EventData(data))
            except ValueError:
                # Batch full
                client.send_batch(batch)
                batch = client.create_batch()
                batch.add(EventData(data))
        client.send_batch(batch)

        end_time = time.time()

        total_time = end_time - start_time
        speed = num_of_events / total_time
        perf_records.append(speed)

    client.close()
    avg_perf = round(sum(perf_records) / len(perf_records), 2)
    logger.info(
        "Method: {}, The average performance is {} events/s, throughput: {} bytes/s, run times: {}.\n"
        "Configs are: Num of events: {} events, Single message size: {} bytes.".format(
            description or "send_batch_message",
            avg_perf,
            avg_perf * single_event_size,
            run_times,
            num_of_events,
            single_event_size,
        )
    )
    return avg_perf


def send_batch_message_worker_thread(client, data, run_flag):
    total_cnt = 0
    while run_flag[0]:
        batch = client.create_batch()
        try:
            while True:
                event_data = EventData(body=data)
                batch.add(event_data)
        except ValueError:
            client.send_batch(batch)
            total_cnt += len(batch)
    return total_cnt


def send_batch_message_in_parallel(
    conn_str, eventhub_name, single_event_size, parallel_thread_count=4, run_times=1, run_duration=30, description=None
):

    perf_records = []

    for _ in range(run_times):

        futures = []
        clients = [
            EventHubProducerClient(conn_str=conn_str, eventhub_name=eventhub_name) for _ in range(parallel_thread_count)
        ]

        data = b"a" * single_event_size

        for client in clients:
            pre_prepare_client(client, data)

        with ThreadPoolExecutor(max_workers=parallel_thread_count) as executor:
            run_flag = [True]
            for i in range(parallel_thread_count):
                futures.append(executor.submit(send_batch_message_worker_thread, clients[i], data, run_flag))

            time.sleep(run_duration)
            run_flag[0] = False
            perf_records.append(sum([future.result() for future in futures]) / run_duration)

            for client in clients:
                client.close()

    avg_perf = round(sum(perf_records) / len(perf_records), 2)

    logger.info(
        "Method: {}, The average performance is {} events/s, throughput: {} bytes/s, run times: {}.\n"
        "Configs are: Single message size: {} bytes, Parallel thread count: {} threads, Run duration: {} seconds.".format(
            description or "send_batch_message_in_parallel",
            avg_perf,
            avg_perf * single_event_size,
            run_times,
            single_event_size,
            parallel_thread_count,
            run_duration,
        )
    )


if __name__ == "__main__":
    logger.info("------------------- START OF TEST -------------------")

    for conn_str in CONN_STRS:
        for single_event_size in SINGLE_EVENT_SIZE_LIST:
            print("-------------------  sending fixed amount of message  -------------------")
            send_batch_message(
                conn_str=conn_str,
                eventhub_name=EVENTHUB_NAME,
                num_of_events=FIXED_AMOUNT_OF_EVENTS,
                single_event_size=single_event_size,
                description="sending fixed amount message",
            )

        for parallel_thread_count in PARALLEL_THREAD_COUNT_LIST:
            for single_event_size in SINGLE_EVENT_SIZE_LIST:
                print("-------------------  multiple threads sending messages for a fixed period -------------------")
                send_batch_message_in_parallel(
                    conn_str=conn_str,
                    eventhub_name=EVENTHUB_NAME,
                    single_event_size=single_event_size,
                    parallel_thread_count=parallel_thread_count,
                    run_duration=RUN_DURATION,
                    description="multiple threads sending messages",
                )

    logger.info("------------------- END OF TEST -------------------")