1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""
FILE: sample_consume_process_events.py
DESCRIPTION:
These samples demonstrate sending, receiving, releasing, and acknowledging CloudEvents.
USAGE:
python sample_consume_process_events.py
Set the environment variables with your own values before running the sample:
1) EVENTGRID_KEY - The access key of your eventgrid account.
2) EVENTGRID_ENDPOINT - The namespace endpoint. Typically it exists in the format
"https://<YOUR-NAMESPACE-NAME>.<REGION-NAME>.eventgrid.azure.net".
3) EVENTGRID_TOPIC_NAME - The namespace topic name.
4) EVENTGRID_EVENT_SUBSCRIPTION_NAME - The event subscription name.
"""
import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid.models import *
from azure.core.messaging import CloudEvent
from azure.core.exceptions import HttpResponseError
from azure.eventgrid import EventGridConsumerClient, EventGridPublisherClient
EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"]
EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"]
TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"]
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"]
# Create a client
publisher = EventGridPublisherClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY), namespace_topic=TOPIC_NAME)
client = EventGridConsumerClient(
EVENTGRID_ENDPOINT,
AzureKeyCredential(EVENTGRID_KEY),
namespace_topic=TOPIC_NAME,
subscription=EVENT_SUBSCRIPTION_NAME,
)
cloud_event_reject = CloudEvent(data="reject", source="https://example.com", type="example")
cloud_event_release = CloudEvent(data="release", source="https://example.com", type="example")
cloud_event_ack = CloudEvent(data="acknowledge", source="https://example.com", type="example")
cloud_event_renew = CloudEvent(data="renew", source="https://example.com", type="example")
# Send Cloud Events
publisher.send(
[
cloud_event_reject,
cloud_event_release,
cloud_event_ack,
cloud_event_renew,
]
)
# Receive Published Cloud Events
try:
receive_results = client.receive(
max_events=10,
max_wait_time=10,
)
except HttpResponseError:
raise
# Iterate through the results and collect the lock tokens for events we want to release/acknowledge/reject/renew:
release_events = []
acknowledge_events = []
reject_events = []
renew_events = []
for detail in receive_results:
data = detail.event.data
broker_properties = detail.broker_properties
if data == "release":
release_events.append(broker_properties.lock_token)
elif data == "acknowledge":
acknowledge_events.append(broker_properties.lock_token)
elif data == "renew":
renew_events.append(broker_properties.lock_token)
else:
reject_events.append(broker_properties.lock_token)
# Release/Acknowledge/Reject/Renew events
if len(release_events) > 0:
try:
release_result = client.release(
lock_tokens=release_events,
)
except HttpResponseError:
raise
for succeeded_lock_token in release_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")
if len(acknowledge_events) > 0:
try:
ack_result = client.acknowledge(
lock_tokens=acknowledge_events,
)
except HttpResponseError:
raise
for succeeded_lock_token in ack_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")
if len(reject_events) > 0:
try:
reject_result = client.reject(
lock_tokens=reject_events,
)
except HttpResponseError:
raise
for succeeded_lock_token in reject_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")
if len(renew_events) > 0:
try:
renew_result = client.renew_locks(
lock_tokens=renew_events,
)
except HttpResponseError:
raise
for succeeded_lock_token in renew_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")
|