File: event_hubs_async.py

package info (click to toggle)
python-azure 20201208%2Bgit-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,437,920 kB
  • sloc: python: 4,287,452; javascript: 269; makefile: 198; sh: 187; xml: 106
file content (96 lines) | stat: -rw-r--r-- 3,417 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import os
from datetime import datetime
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
from azure.eventhub import EventData

RECEIVE_TIMEOUT = 30
CONSUMER_GROUP = "$Default"
STARTING_POSITION = "-1"
TEST_EVENTS = [
    EventData(b"Test Event 1 in Python"),
    EventData(b"Test Event 2 in Python"),
    EventData(b"Test Event 3 in Python"),
]

class EventHubAsync:
    def __init__(self):
        # This test requires a previusly created Event Hub.
        # In this example the name is "myeventhub", but it could be change below
        connection_string = os.environ["EVENT_HUBS_CONNECTION_STRING"]
        event_hub_name = "myeventhub"
        self.consumer_client = EventHubConsumerClient.from_connection_string(
            connection_string, CONSUMER_GROUP, idle_timeout=RECEIVE_TIMEOUT
        )
        self.producer_client = EventHubProducerClient.from_connection_string(
            connection_string
        )

        self.received_event_count = 0

    async def get_partition_ids(self):
        print("Getting partitions id...")
        partition_ids = await self.consumer_client.get_partition_ids()
        print("\tdone")
        return partition_ids

    async def send_and_receive_events(self, partition_id):
        print("Sending events...")

        batch = await self.producer_client.create_batch(partition_id=partition_id)

        for event in TEST_EVENTS:
            batch.add(event)

        await self.producer_client.send_batch(batch)
        await self.producer_client.close()
        print("\tdone")

        print("Receiving events...")
        await self.consumer_client.receive(
            # on_event will close the consumer_client which resumes execution
            on_event=self.on_event,
            on_error=self.on_error,
            starting_position=STARTING_POSITION
        )

        print("\tdone")

        if self.received_event_count < len(TEST_EVENTS):
            raise Exception(
                "Error, expecting {0} events, but {1} were received.".format(
                    str(len(TEST_EVENTS)), str(self.received_event_count)
                )
            )

    async def on_event(self, context, event):
        self.received_event_count += 1
        print(event.body_as_str())
        if self.received_event_count >= len(TEST_EVENTS):
            # Close the client which allows execution to continue
            await self.close_client()

    async def on_error(self, context, error):
        await self.close_client()
        raise Exception("Received Error: {0}".format(error))

    async def close_client(self):
        await self.consumer_client.close()

    async def run(self):
        print("")
        print("------------------------")
        print("Event Hubs")
        print("------------------------")
        print("1) Get partition ID")
        print("2) Send Events")
        print("3) Consume Events")
        print("")

        partitionIDs = await self.get_partition_ids()
        # In this sample the same partition id is going to be used for the producer and consumer,
        # It is the first one, but it could be any (is not relevant as long as it is the same in both producer and consumer)
        await self.send_and_receive_events(partitionIDs[0])