1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
Example to show managing subscription entities under a ServiceBus Namespace, including
- Create a subscription
- Get subscription properties and runtime information
- Update a subscription
- Delete a subscription
- List subscriptions under the given ServiceBus Namespace
"""
import os
import uuid
from azure.servicebus.management import ServiceBusAdministrationClient
from azure.identity import DefaultAzureCredential
FULLY_QUALIFIED_NAMESPACE = os.environ["SERVICEBUS_FULLY_QUALIFIED_NAMESPACE"]
TOPIC_NAME = os.environ["SERVICEBUS_TOPIC_NAME"]
SUBSCRIPTION_NAME = "sb_mgmt_sub" + str(uuid.uuid4())
def create_subscription(servicebus_mgmt_client):
print("-- Create Subscription")
servicebus_mgmt_client.create_subscription(TOPIC_NAME, SUBSCRIPTION_NAME)
print("Subscription {} is created.".format(SUBSCRIPTION_NAME))
print("")
def delete_subscription(servicebus_mgmt_client):
print("-- Delete Subscription")
servicebus_mgmt_client.delete_subscription(TOPIC_NAME, SUBSCRIPTION_NAME)
print("Subscription {} is deleted.".format(SUBSCRIPTION_NAME))
print("")
def list_subscriptions(servicebus_mgmt_client):
print("-- List Subscriptions")
for subscription_properties in servicebus_mgmt_client.list_subscriptions(TOPIC_NAME):
print("Subscription Name:", subscription_properties.name)
print("")
def get_and_update_subscription(servicebus_mgmt_client):
print("-- Get and Update Subscription")
subscription_properties = servicebus_mgmt_client.get_subscription(TOPIC_NAME, SUBSCRIPTION_NAME)
print("Subscription Name:", subscription_properties.name)
print("Please refer to SubscriptionDescription for complete available settings.")
print("")
# update by updating the properties in the model
subscription_properties.max_delivery_count = 5
servicebus_mgmt_client.update_subscription(TOPIC_NAME, subscription_properties)
# update by passing keyword arguments
subscription_properties = servicebus_mgmt_client.get_subscription(TOPIC_NAME, SUBSCRIPTION_NAME)
servicebus_mgmt_client.update_subscription(TOPIC_NAME, subscription_properties, max_delivery_count=3)
def get_subscription_runtime_properties(servicebus_mgmt_client):
print("-- Get Subscription Runtime Properties")
get_subscription_runtime_properties = servicebus_mgmt_client.get_subscription_runtime_properties(
TOPIC_NAME, SUBSCRIPTION_NAME
)
print("Subscription Name:", get_subscription_runtime_properties.name)
print("Please refer to SubscriptionRuntimeProperties from complete available runtime properties.")
print("")
credential = DefaultAzureCredential()
with ServiceBusAdministrationClient(FULLY_QUALIFIED_NAMESPACE, credential) as servicebus_mgmt_client:
create_subscription(servicebus_mgmt_client)
list_subscriptions(servicebus_mgmt_client)
get_and_update_subscription(servicebus_mgmt_client)
get_subscription_runtime_properties(servicebus_mgmt_client)
delete_subscription(servicebus_mgmt_client)
|