1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
# coding: utf-8
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""
FILE: sample_hooks_async.py
DESCRIPTION:
This sample demonstrates how to create, get, list, update, and delete hooks
under your Metrics Advisor account. EmailNotificationHook is used as an example in this sample.
USAGE:
python sample_hooks_async.py
Set the environment variables with your own values before running the sample:
1) METRICS_ADVISOR_ENDPOINT - the endpoint of your Azure Metrics Advisor service
2) METRICS_ADVISOR_SUBSCRIPTION_KEY - Metrics Advisor service subscription key
3) METRICS_ADVISOR_API_KEY - Metrics Advisor service API key
"""
import os
import asyncio
async def sample_create_hook_async():
# [START create_hook_async]
from azure.ai.metricsadvisor import MetricsAdvisorKeyCredential
from azure.ai.metricsadvisor.aio import MetricsAdvisorAdministrationClient
from azure.ai.metricsadvisor.models import EmailNotificationHook
service_endpoint = os.getenv("METRICS_ADVISOR_ENDPOINT")
subscription_key = os.getenv("METRICS_ADVISOR_SUBSCRIPTION_KEY")
api_key = os.getenv("METRICS_ADVISOR_API_KEY")
client = MetricsAdvisorAdministrationClient(service_endpoint,
MetricsAdvisorKeyCredential(subscription_key, api_key))
async with client:
hook = await client.create_hook(
hook=EmailNotificationHook(
name="email hook",
description="my email hook",
emails_to_alert=["alertme@alertme.com"],
external_link="https://docs.microsoft.com/en-us/azure/cognitive-services/metrics-advisor/how-tos/alerts"
)
)
return hook
# [END create_hook_async]
async def sample_get_hook_async(hook_id):
# [START get_hook_async]
from azure.ai.metricsadvisor import MetricsAdvisorKeyCredential
from azure.ai.metricsadvisor.aio import MetricsAdvisorAdministrationClient
service_endpoint = os.getenv("METRICS_ADVISOR_ENDPOINT")
subscription_key = os.getenv("METRICS_ADVISOR_SUBSCRIPTION_KEY")
api_key = os.getenv("METRICS_ADVISOR_API_KEY")
client = MetricsAdvisorAdministrationClient(service_endpoint,
MetricsAdvisorKeyCredential(subscription_key, api_key))
async with client:
hook = await client.get_hook(hook_id)
print("Hook name: {}".format(hook.name))
print("Description: {}".format(hook.description))
print("Emails to alert: {}".format(hook.emails_to_alert))
print("External link: {}".format(hook.external_link))
print("Admins: {}".format(hook.admins))
# [END get_hook_async]
async def sample_list_hooks_async():
# [START list_hooks_async]
from azure.ai.metricsadvisor import MetricsAdvisorKeyCredential
from azure.ai.metricsadvisor.aio import MetricsAdvisorAdministrationClient
service_endpoint = os.getenv("METRICS_ADVISOR_ENDPOINT")
subscription_key = os.getenv("METRICS_ADVISOR_SUBSCRIPTION_KEY")
api_key = os.getenv("METRICS_ADVISOR_API_KEY")
client = MetricsAdvisorAdministrationClient(service_endpoint,
MetricsAdvisorKeyCredential(subscription_key, api_key))
async with client:
hooks = client.list_hooks()
async for hook in hooks:
print("Hook type: {}".format(hook.hook_type))
print("Hook name: {}".format(hook.name))
print("Description: {}\n".format(hook.description))
# [END list_hooks_async]
async def sample_update_hook_async(hook):
# [START update_hook_async]
from azure.ai.metricsadvisor import MetricsAdvisorKeyCredential
from azure.ai.metricsadvisor.aio import MetricsAdvisorAdministrationClient
service_endpoint = os.getenv("METRICS_ADVISOR_ENDPOINT")
subscription_key = os.getenv("METRICS_ADVISOR_SUBSCRIPTION_KEY")
api_key = os.getenv("METRICS_ADVISOR_API_KEY")
client = MetricsAdvisorAdministrationClient(service_endpoint,
MetricsAdvisorKeyCredential(subscription_key, api_key))
hook.name = "updated hook name"
hook.description = "updated hook description"
async with client:
updated = await client.update_hook(
hook,
emails_to_alert=["newemail@alertme.com"]
)
print("Updated name: {}".format(updated.name))
print("Updated description: {}".format(updated.description))
print("Updated emails: {}".format(updated.emails_to_alert))
# [END update_hook_async]
async def sample_delete_hook_async(hook_id):
# [START delete_hook_async]
from azure.core.exceptions import ResourceNotFoundError
from azure.ai.metricsadvisor import MetricsAdvisorKeyCredential
from azure.ai.metricsadvisor.aio import MetricsAdvisorAdministrationClient
service_endpoint = os.getenv("METRICS_ADVISOR_ENDPOINT")
subscription_key = os.getenv("METRICS_ADVISOR_SUBSCRIPTION_KEY")
api_key = os.getenv("METRICS_ADVISOR_API_KEY")
client = MetricsAdvisorAdministrationClient(service_endpoint,
MetricsAdvisorKeyCredential(subscription_key, api_key))
async with client:
await client.delete_hook(hook_id)
try:
await client.get_hook(hook_id)
except ResourceNotFoundError:
print("Hook successfully deleted.")
# [END delete_hook_async]
async def main():
print("---Creating hook...")
hook = await sample_create_hook_async()
print("Hook successfully created...")
print("\n---Get a hook...")
await sample_get_hook_async(hook.id)
print("\n---List hooks...")
await sample_list_hooks_async()
print("\n---Update a hook...")
await sample_update_hook_async(hook)
print("\n---Delete a hook...")
await sample_delete_hook_async(hook.id)
if __name__ == '__main__':
asyncio.run(main())
|