File: sample_consume_process_events.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (129 lines) | stat: -rw-r--r-- 4,383 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""
FILE: sample_consume_process_events.py
DESCRIPTION:
    These samples demonstrate sending, receiving, releasing, and acknowledging CloudEvents.
USAGE:
    python sample_consume_process_events.py
    Set the environment variables with your own values before running the sample:
    1) EVENTGRID_KEY - The access key of your eventgrid account.
    2) EVENTGRID_ENDPOINT - The namespace endpoint. Typically it exists in the format
    "https://<YOUR-NAMESPACE-NAME>.<REGION-NAME>.eventgrid.azure.net".
    3) EVENTGRID_TOPIC_NAME - The namespace topic name.
    4) EVENTGRID_EVENT_SUBSCRIPTION_NAME - The event subscription name.
"""
import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid.models import *
from azure.core.messaging import CloudEvent
from azure.core.exceptions import HttpResponseError
from azure.eventgrid import EventGridConsumerClient, EventGridPublisherClient

EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"]
EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"]
TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"]
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"]


# Create a client
publisher = EventGridPublisherClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY), namespace_topic=TOPIC_NAME)
client = EventGridConsumerClient(
    EVENTGRID_ENDPOINT,
    AzureKeyCredential(EVENTGRID_KEY),
    namespace_topic=TOPIC_NAME,
    subscription=EVENT_SUBSCRIPTION_NAME,
)


cloud_event_reject = CloudEvent(data="reject", source="https://example.com", type="example")
cloud_event_release = CloudEvent(data="release", source="https://example.com", type="example")
cloud_event_ack = CloudEvent(data="acknowledge", source="https://example.com", type="example")
cloud_event_renew = CloudEvent(data="renew", source="https://example.com", type="example")

# Send Cloud Events
publisher.send(
    [
        cloud_event_reject,
        cloud_event_release,
        cloud_event_ack,
        cloud_event_renew,
    ]
)

# Receive Published Cloud Events
try:
    receive_results = client.receive(
        max_events=10,
        max_wait_time=10,
    )
except HttpResponseError:
    raise

# Iterate through the results and collect the lock tokens for events we want to release/acknowledge/reject/renew:

release_events = []
acknowledge_events = []
reject_events = []
renew_events = []

for detail in receive_results:
    data = detail.event.data
    broker_properties = detail.broker_properties
    if data == "release":
        release_events.append(broker_properties.lock_token)
    elif data == "acknowledge":
        acknowledge_events.append(broker_properties.lock_token)
    elif data == "renew":
        renew_events.append(broker_properties.lock_token)
    else:
        reject_events.append(broker_properties.lock_token)

# Release/Acknowledge/Reject/Renew events

if len(release_events) > 0:
    try:
        release_result = client.release(
            lock_tokens=release_events,
        )
    except HttpResponseError:
        raise

    for succeeded_lock_token in release_result.succeeded_lock_tokens:
        print(f"Succeeded Lock Token:{succeeded_lock_token}")

if len(acknowledge_events) > 0:
    try:
        ack_result = client.acknowledge(
            lock_tokens=acknowledge_events,
        )
    except HttpResponseError:
        raise

    for succeeded_lock_token in ack_result.succeeded_lock_tokens:
        print(f"Succeeded Lock Token:{succeeded_lock_token}")

if len(reject_events) > 0:
    try:
        reject_result = client.reject(
            lock_tokens=reject_events,
        )
    except HttpResponseError:
        raise

    for succeeded_lock_token in reject_result.succeeded_lock_tokens:
        print(f"Succeeded Lock Token:{succeeded_lock_token}")

if len(renew_events) > 0:
    try:
        renew_result = client.renew_locks(
            lock_tokens=renew_events,
        )
    except HttpResponseError:
        raise

    for succeeded_lock_token in renew_result.succeeded_lock_tokens:
        print(f"Succeeded Lock Token:{succeeded_lock_token}")