File: send_buffered_mode_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (64 lines) | stat: -rw-r--r-- 2,042 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Examples to show sending events in buffered mode to an Event Hub asynchronously.
"""

import time
import asyncio
import os

from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
from azure.identity.aio import DefaultAzureCredential

FULLY_QUALIFIED_NAMESPACE = os.environ["EVENT_HUB_HOSTNAME"]
EVENTHUB_NAME = os.environ["EVENT_HUB_NAME"]


async def on_success(events, pid):
    # sending succeeded
    print(events, pid)


async def on_error(events, pid, error):
    # sending failed
    print(events, pid, error)


async def run():

    producer = EventHubProducerClient(
        fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
        eventhub_name=EVENTHUB_NAME,
        credential=DefaultAzureCredential(),
        buffered_mode=True,
        on_success=on_success,
        on_error=on_error,
    )

    # exiting the context manager will automatically call flush
    async with producer:
        # single events will be batched automatically
        for i in range(10):
            # the method returning indicates the event has been enqueued to the buffer
            await producer.send_event(EventData("Single data {}".format(i)))

        batch = await producer.create_batch()
        for i in range(10):
            batch.add(EventData("Single data in batch {}".format(i)))
        # alternatively, you can enqueue an EventDataBatch object to the buffer
        await producer.send_batch(batch)

        # calling flush sends out the events in the buffer immediately
        await producer.flush()


start_time = time.time()
asyncio.run(run())
print("Send messages in {} seconds.".format(time.time() - start_time))