1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""
FILE: exception_policy_crud_ops_async.py
DESCRIPTION:
These samples demonstrates how to create Exception Policy used in ACS JobRouter.
You need a valid connection string to an Azure Communication Service to execute the sample
USAGE:
python exception_policy_crud_ops_async.py
Set the environment variables with your own values before running the sample:
1) AZURE_COMMUNICATION_SERVICE_CONNECTION_STRING - Communication Service connection string
"""
import os
import asyncio
class ExceptionPolicySamplesAsync(object):
connection_string = os.environ["AZURE_COMMUNICATION_SERVICE_CONNECTION_STRING"]
_ep_policy_id = "sample_ep_policy"
_cp_policy_ids = [
"escalation-on-q-over-flow",
"escalation-on-wait-time-exceeded",
]
async def setup(self):
connection_string = self.connection_string
from azure.communication.jobrouter.aio import JobRouterAdministrationClient
from azure.communication.jobrouter.models import (
ClassificationPolicy,
StaticRouterRule,
ExpressionRouterRule,
StaticQueueSelectorAttachment,
ConditionalQueueSelectorAttachment,
RouterQueueSelector,
StaticWorkerSelectorAttachment,
RouterWorkerSelector,
LabelOperator,
)
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str=connection_string)
async with router_admin_client:
for _id in self._cp_policy_ids:
classification_policy: ClassificationPolicy = await router_admin_client.upsert_classification_policy(
_id,
ClassificationPolicy(
prioritization_rule=StaticRouterRule(value=100),
queue_selector_attachments=[
StaticQueueSelectorAttachment(
queue_selector=RouterQueueSelector(
key="Escalate", label_operator=LabelOperator.EQUAL, value=True
)
),
],
worker_selector_attachments=[
StaticWorkerSelectorAttachment(
worker_selector=RouterWorkerSelector(
key="Escalate", label_operator=LabelOperator.EQUAL, value=True
)
),
],
),
)
async def create_exception_policy(self):
connection_string = self.connection_string
policy_id = self._ep_policy_id
# [START create_exception_policy_async]
from azure.communication.jobrouter.aio import JobRouterAdministrationClient
from azure.communication.jobrouter.models import (
WaitTimeExceptionTrigger,
QueueLengthExceptionTrigger,
ReclassifyExceptionAction,
ExceptionRule,
ExceptionPolicy,
)
# set `connection_string` to an existing ACS endpoint
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str=connection_string)
print("JobRouterAdministrationClient created successfully!")
# we are going to create 2 rules:
# 1. EscalateJobOnQueueOverFlowTrigger: triggers when queue has more than 10 jobs already en-queued,
# then reclassifies job adding additional labels on the job.
# 2. EscalateJobOnWaitTimeExceededTrigger: triggers when job has waited in the queue for more than 10 minutes,
# then reclassifies job adding additional labels on the job
# define exception trigger for queue over flow
queue_length_exception_trigger: QueueLengthExceptionTrigger = QueueLengthExceptionTrigger(threshold=10)
# define exception actions that needs to be executed when trigger condition is satisfied
escalate_job_on_queue_over_flow: ReclassifyExceptionAction = ReclassifyExceptionAction(
classification_policy_id="escalation-on-q-over-flow",
labels_to_upsert={"EscalateJob": True, "EscalationReasonCode": "QueueOverFlow"},
)
# define second exception trigger for wait time
wait_time_exception_trigger: WaitTimeExceptionTrigger = WaitTimeExceptionTrigger(threshold_seconds=10 * 60)
# define exception actions that needs to be executed when trigger condition is satisfied
escalate_job_on_wait_time_exceeded: ReclassifyExceptionAction = ReclassifyExceptionAction(
classification_policy_id="escalation-on-wait-time-exceeded",
labels_to_upsert={"EscalateJob": True, "EscalationReasonCode": "WaitTimeExceeded"},
)
# define exception rule
exception_rule = [
ExceptionRule(
id="EscalateJobOnQueueOverFlowTrigger",
trigger=queue_length_exception_trigger,
actions=[escalate_job_on_queue_over_flow],
),
ExceptionRule(
id="EscalateJobOnWaitTimeExceededTrigger",
trigger=wait_time_exception_trigger,
actions=[escalate_job_on_wait_time_exceeded],
),
]
# create the exception policy
# set a unique value to `policy_id`
async with router_admin_client:
exception_policy = await router_admin_client.upsert_exception_policy(
policy_id,
ExceptionPolicy(name="TriggerJobCancellationWhenQueueLenIs10", exception_rules=exception_rule),
)
print(f"Exception policy has been successfully created with id: {exception_policy.id}")
# [END create_exception_policy_async]
async def update_exception_policy(self):
connection_string = self.connection_string
policy_id = self._ep_policy_id
# [START update_exception_policy_async]
from azure.communication.jobrouter.aio import JobRouterAdministrationClient
from azure.communication.jobrouter.models import (
WaitTimeExceptionTrigger,
ReclassifyExceptionAction,
ExceptionPolicy,
ExceptionRule,
QueueLengthExceptionTrigger,
)
# set `connection_string` to an existing ACS endpoint
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str=connection_string)
print("JobRouterAdministrationClient created successfully!")
# we are going to
# 1. Add an exception rule: EscalateJobOnWaitTimeExceededTrigger2Min: triggers when job has waited in the
# queue for more than 2 minutes, then reclassifies job adding additional labels on the job
# 2. Modify an existing rule: EscalateJobOnQueueOverFlowTrigger: change 'threshold' to 100
# 3. Delete an exception rule: EscalateJobOnWaitTimeExceededTrigger to be deleted
# let's define the new rule to be added
# define exception trigger
escalate_job_on_wait_time_exceed2: WaitTimeExceptionTrigger = WaitTimeExceptionTrigger(threshold_seconds=2 * 60)
# define exception action
escalate_job_on_wait_time_exceeded2: ReclassifyExceptionAction = ReclassifyExceptionAction(
classification_policy_id="escalation-on-wait-time-exceeded",
labels_to_upsert={"EscalateJob": True, "EscalationReasonCode": "WaitTimeExceeded2Min"},
)
async with router_admin_client:
updated_exception_policy: ExceptionPolicy = await router_admin_client.upsert_exception_policy(
policy_id,
exception_rules=[
# adding new rule
ExceptionRule(
id="EscalateJobOnWaitTimeExceededTrigger2Min",
trigger=escalate_job_on_wait_time_exceed2,
actions=[escalate_job_on_wait_time_exceeded2],
),
# modifying existing rule
ExceptionRule(
id="EscalateJobOnQueueOverFlowTrigger",
trigger=QueueLengthExceptionTrigger(threshold=100),
actions=[
ReclassifyExceptionAction(
classification_policy_id="escalation-on-q-over-flow",
labels_to_upsert={"EscalateJob": True, "EscalationReasonCode": "QueueOverFlow"},
)
],
),
],
)
print(f"Exception policy updated with rules: {updated_exception_policy.exception_rules}")
print("Exception policy has been successfully updated")
# [END update_exception_policy_async]
async def get_exception_policy(self):
connection_string = self.connection_string
policy_id = self._ep_policy_id
# [START get_exception_policy_async]
from azure.communication.jobrouter.aio import JobRouterAdministrationClient
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str=connection_string)
async with router_admin_client:
exception_policy = await router_admin_client.get_exception_policy(policy_id)
print(f"Successfully fetched exception policy with id: {exception_policy.id}")
# [END get_exception_policy_async]
async def list_exception_policies(self):
connection_string = self.connection_string
# [START list_exception_policies_async]
from azure.communication.jobrouter.aio import JobRouterAdministrationClient
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str=connection_string)
async with router_admin_client:
exception_policy_iterator = router_admin_client.list_exception_policies()
async for ep in exception_policy_iterator:
print(f"Retrieved exception policy with id: {ep.id}")
print(f"Successfully completed fetching exception policies")
# [END list_exception_policies_async]
async def list_exception_policies_batched(self):
connection_string = self.connection_string
# [START list_exception_policies_batched_async]
from azure.communication.jobrouter.aio import JobRouterAdministrationClient
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str=connection_string)
async with router_admin_client:
exception_policy_iterator = router_admin_client.list_exception_policies(results_per_page=10)
async for policy_page in exception_policy_iterator.by_page():
policies_in_page = [i async for i in policy_page]
print(f"Retrieved {len(policies_in_page)} policies in current page")
for ep in policies_in_page:
print(f"Retrieved exception policy with id: {ep.id}")
print(f"Successfully completed fetching exception policies")
# [END list_exception_policies_batched_async]
async def clean_up(self):
connection_string = self.connection_string
policy_id = self._ep_policy_id
# [START delete_exception_policy_async]
from azure.communication.jobrouter.aio import JobRouterAdministrationClient
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str=connection_string)
async with router_admin_client:
await router_admin_client.delete_exception_policy(policy_id)
# [END delete_exception_policy_async]
for _id in self._cp_policy_ids:
await router_admin_client.delete_classification_policy(_id)
async def main():
sample = ExceptionPolicySamplesAsync()
await sample.setup()
await sample.create_exception_policy()
await sample.get_exception_policy()
await sample.update_exception_policy()
await sample.list_exception_policies()
await sample.list_exception_policies_batched()
await sample.clean_up()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
|