File: sync_receive.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (196 lines) | stat: -rw-r--r-- 6,584 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import dotenv
import logging
import threading
from logging.handlers import RotatingFileHandler
import time

from azure.eventhub import EventHubConsumerClient
from azure.eventhub import parse_connection_string

logger = logging.getLogger("RECEIVE_PERF_TEST")
logger.setLevel(logging.INFO)
logger.addHandler(RotatingFileHandler("receive_perf_test.log"))

dotenv.load_dotenv()
CONN_STRS = [
    os.environ["EVENT_HUB_CONN_STR_BASIC_NORTHEU"],
    os.environ["EVENT_HUB_CONN_STR_STANDARD_NORTHEU"],
    os.environ["EVENT_HUB_CONN_STR_BASIC_WESTUS2"],
    os.environ["EVENT_HUB_CONN_STR_STANDARD_WESTUS2"],
]
EH_NAME_EVENT_SIZE_PAIR = [
    ("pyamqp_512", 512),
]
PREFETCH_LIST = [300, 3000]
PARTITION_ID = "0"
RUN_DURATION = 30
FIXED_AMOUNT = 100_000


def receive_fixed_time_interval(
    conn_str,
    eventhub_name,
    single_event_size,
    prefetch=300,
    batch_receiving=False,
    description=None,
    run_duration=30,
    partition_id="0",
):

    consumer_client = EventHubConsumerClient(conn_str, consumer_group="$Default", eventhub_name=eventhub_name)

    last_received_count = [0]
    received_count = [0]
    run_flag = [True]
    all_perf_records = []
    check_interval = 1

    def on_event(partition_context, event):
        received_count[0] += 1

    def on_event_batch(partition_context, events):
        received_count[0] += len(events)

    def monitor():
        while run_flag[0]:
            snap = received_count[0]
            perf = (snap - last_received_count[0]) / check_interval
            last_received_count[0] = snap
            all_perf_records.append(perf)
            time.sleep(check_interval)

    target = consumer_client.receive_batch if batch_receiving else consumer_client.receive
    kwargs = {
        "partition_id": partition_id,
        "starting_position": "-1",  # "-1" is from the beginning of the partition.
        "prefetch": prefetch,
    }
    if batch_receiving:
        kwargs["max_batch_size"] = prefetch
        kwargs["on_event_batch"] = on_event_batch
    else:
        kwargs["on_event"] = on_event

    thread = threading.Thread(target=target, kwargs=kwargs)

    monitor_thread = threading.Thread(target=monitor)

    thread.daemon = True
    monitor_thread.daemon = True

    thread.start()
    monitor_thread.start()
    time.sleep(run_duration)
    consumer_client.close()
    run_flag[0] = False

    valid_perf_records = all_perf_records[10:]  # skip the first 10 records to let the receiving program be stable
    avg_perf = sum(valid_perf_records) / len(valid_perf_records)

    logger.info(
        "EH Namespace: {}.\nMethod: {}, The average performance is {} events/s, throughput: {} bytes/s.\n"
        "Configs are: Single message size: {} bytes, Run duration: {} seconds.\n"
        "Prefetch: {}.".format(
            parse_connection_string(conn_str).fully_qualified_namespace,
            description or "receive_fixed_time_interval",
            avg_perf,
            avg_perf * single_event_size,
            single_event_size,
            run_duration,
            prefetch,
        )
    )


def receive_fixed_amount(
    conn_str,
    eventhub_name,
    single_event_size,
    prefetch=300,
    batch_receiving=False,
    description=None,
    partition_id="0",
    run_times=1,
    fixed_amount=100_000,
):
    consumer_client = EventHubConsumerClient(
        conn_str, consumer_group="$Default", eventhub_name=eventhub_name, prefetch=prefetch
    )
    perf_records = []
    received_count = [0]

    def on_event(partition_context, event):
        received_count[0] += 1
        if received_count[0] == fixed_amount:
            consumer_client.close()

    def on_event_batch(partition_context, events):
        received_count[0] += len(events)
        if received_count[0] >= fixed_amount:
            consumer_client.close()

    for i in range(run_times):
        start_time = time.time()
        with consumer_client:
            if batch_receiving:
                consumer_client.receive_batch(
                    on_event_batch=on_event_batch,
                    partition_id=partition_id,
                    starting_position="-1",
                    max_batch_size=prefetch,
                    prefetch=prefetch,
                )
            else:
                consumer_client.receive(
                    on_event=on_event, partition_id=partition_id, starting_position="-1", prefetch=prefetch
                )
        end_time = time.time()
        total_time = end_time - start_time
        speed = fixed_amount / total_time
        perf_records.append(speed)
        received_count[0] = 0
    avg_perf = sum(perf_records) / len(perf_records)

    logger.info(
        "EH Namespace: {}.\nMethod: {}, The average performance is {} events/s, throughput: {} bytes/s.\n"
        "Configs are: Single message size: {} bytes, Total events to receive: {}.\n"
        "Prefetch:{}.".format(
            parse_connection_string(conn_str).fully_qualified_namespace,
            description or "receive_fixed_amount",
            avg_perf,
            avg_perf * single_event_size,
            single_event_size,
            fixed_amount,
            prefetch,
        )
    )


if __name__ == "__main__":
    for conn_str in CONN_STRS:
        for eh_name, single_event_size in EH_NAME_EVENT_SIZE_PAIR:
            for prefetch in PREFETCH_LIST:
                for batch_receiving in [True, False]:
                    print("------------------- receiving fixed amount -------------------")
                    receive_fixed_amount(
                        conn_str=conn_str,
                        eventhub_name=eh_name,
                        single_event_size=single_event_size,
                        prefetch=prefetch,
                        batch_receiving=batch_receiving,
                    )
                    print("------------------- receiving fixed interval -------------------")
                    receive_fixed_time_interval(
                        conn_str=conn_str,
                        eventhub_name=eh_name,
                        single_event_size=single_event_size,
                        prefetch=prefetch,
                        batch_receiving=batch_receiving,
                    )