File: _router_test_case_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (131 lines) | stat: -rw-r--r-- 5,672 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import asyncio
from abc import abstractmethod
from retry import retry
import warnings
from _shared.utils import get_http_logging_policy
from azure.communication.jobrouter.aio import (
    JobRouterClient,
    JobRouterAdministrationClient,
)
from azure.communication.jobrouter.models import (
    RouterJobStatus,
    CancelJobOptions,
    CompleteJobOptions,
    CloseJobOptions,
)
from devtools_testutils import AzureRecordedTestCase


class AsyncRouterRecordedTestCase(AzureRecordedTestCase):
    @abstractmethod
    async def clean_up(self):
        pass

    def create_client(self) -> JobRouterClient:
        return JobRouterClient.from_connection_string(
            conn_str=self.connection_string, http_logging_policy=get_http_logging_policy()
        )

    def create_admin_client(self) -> JobRouterAdministrationClient:
        return JobRouterAdministrationClient.from_connection_string(
            conn_str=self.connection_string, http_logging_policy=get_http_logging_policy()
        )

    @retry(Exception, delay=3, tries=3)
    async def clean_up_job(self, job_id, **kwargs):
        router_client: JobRouterClient = self.create_client()
        suppress_errors = kwargs.pop("suppress_errors", False)

        try:
            async with router_client:
                router_job = await router_client.get_job(job_id)

                if router_job.status == RouterJobStatus.PENDING_CLASSIFICATION:
                    # cancel and delete job
                    await router_client.cancel_job(
                        job_id, CancelJobOptions(disposition_code="JobCancelledAsPartOfTestCleanUp")
                    )
                    await router_client.delete_job(job_id)
                elif router_job.status == RouterJobStatus.QUEUED:
                    # cancel and delete job
                    await router_client.cancel_job(
                        job_id, CancelJobOptions(disposition_code="JobCancelledAsPartOfTestCleanUp")
                    )
                    await router_client.delete_job(job_id)
                elif router_job.status == RouterJobStatus.ASSIGNED:
                    # complete, close and delete job
                    worker_assignments = router_job.assignments

                    for assignment_id, job_assignment in worker_assignments.items():
                        await router_client.complete_job(job_id, assignment_id)
                        await router_client.close_job(job_id, assignment_id)

                    await router_client.delete_job(job_id)
                elif router_job.status == RouterJobStatus.COMPLETED:
                    # close and delete job
                    worker_assignments = router_job.assignments

                    for assignment_id, job_assignment in worker_assignments.items():
                        await router_client.close_job(job_id, assignment_id)

                    await router_client.delete_job(job_id)
                elif router_job.status == RouterJobStatus.CLOSED:
                    # delete job
                    await router_client.delete_job(job_id)
                elif router_job.status == RouterJobStatus.CANCELLED:
                    # delete job
                    await router_client.delete_job(job_id)
                elif router_job.status == RouterJobStatus.CLASSIFICATION_FAILED:
                    # delete job
                    await router_client.delete_job(job_id)
                elif router_job.status == RouterJobStatus.CREATED:
                    # cancel and delete job
                    await router_client.cancel_job(
                        job_id, CancelJobOptions(disposition_code="JobCancelledAsPartOfTestCleanUp")
                    )
                    await router_client.delete_job(job_id)
                elif router_job.status == RouterJobStatus.WAITING_FOR_ACTIVATION:
                    # cancel and delete job
                    await router_client.cancel_job(
                        job_id, CancelJobOptions(disposition_code="JobCancelledAsPartOfTestCleanUp")
                    )
                    await router_client.delete_job(job_id)
                else:
                    pass
        except Exception as e:
            msg = f"Deletion of job failed: {job_id}"
            warnings.warn(UserWarning(msg))
            print(e)
            if not suppress_errors:
                raise e

    async def _poll_until_no_exception(self, fn, expected_exception, *args, **kwargs):
        """polling helper for live tests because some operations take an unpredictable amount of time to complete"""
        max_retries = kwargs.pop("max_retries", 20)
        retry_delay = kwargs.pop("retry_delay", 3)

        for i in range(max_retries):
            try:
                return await fn(*args, **kwargs)
            except expected_exception:
                if i == max_retries - 1:
                    raise
                if self.is_live:
                    await asyncio.sleep(retry_delay)

    async def _poll_until_exception(self, fn, expected_exception, max_retries=20, retry_delay=3):
        """polling helper for live tests because some operations take an unpredictable amount of time to complete"""

        for _ in range(max_retries):
            try:
                await fn()
                if self.is_live:
                    await asyncio.sleep(retry_delay)
            except expected_exception:
                return

        raise AssertionError(f"expected exception {expected_exception} was not raised")