File: test_chat_client_e2e_async.py

package info (click to toggle)
python-azure 20201208%2Bgit-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,437,920 kB
  • sloc: python: 4,287,452; javascript: 269; makefile: 198; sh: 187; xml: 106
file content (134 lines) | stat: -rw-r--r-- 5,253 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import pytest
import asyncio
import os
from datetime import datetime
from msrest.serialization import TZ_UTC

from azure.communication.administration import CommunicationIdentityClient
from azure.communication.chat.aio import (
    ChatClient,
    CommunicationUserCredential
)
from azure.communication.chat import (
    ChatThreadMember
)
from azure.communication.administration._shared.utils import parse_connection_str
from azure_devtools.scenario_tests import RecordingProcessor
from helper import URIIdentityReplacer
from chat_e2e_helper import ChatURIReplacer
from _shared.asynctestcase import AsyncCommunicationTestCase
from _shared.testcase import BodyReplacerProcessor, ResponseReplacerProcessor


class ChatClientTestAsync(AsyncCommunicationTestCase):
    def setUp(self):
        super(ChatClientTestAsync, self).setUp()

        self.recording_processors.extend([
            BodyReplacerProcessor(keys=["id", "token", "createdBy", "members", "multipleStatus", "value"]),
            URIIdentityReplacer(),
            ResponseReplacerProcessor(keys=[self._resource_name]),
            ChatURIReplacer()])

        endpoint, _ = parse_connection_str(self.connection_str)
        self.endpoint = endpoint

        self.identity_client = CommunicationIdentityClient.from_connection_string(self.connection_str)

        # create user
        self.user = self.identity_client.create_user()
        token_response = self.identity_client.issue_token(self.user, scopes=["chat"])
        self.token = token_response.token

        # create ChatClient
        self.chat_client = ChatClient(self.endpoint, CommunicationUserCredential(self.token))

    def tearDown(self):
        super(ChatClientTestAsync, self).tearDown()

        # delete created users
        if not self.is_playback():
            self.identity_client.delete_user(self.user)

    async def _create_thread(self):
        # create chat thread
        topic = "test topic"
        share_history_time = datetime.utcnow()
        share_history_time = share_history_time.replace(tzinfo=TZ_UTC)
        members = [ChatThreadMember(
            user=self.user,
            display_name='name',
            share_history_time=share_history_time
        )]
        chat_thread_client = await self.chat_client.create_chat_thread(topic, members)
        self.thread_id = chat_thread_client.thread_id

    @pytest.mark.live_test_only
    @AsyncCommunicationTestCase.await_prepared_test
    async def test_create_chat_thread_async(self):
        async with self.chat_client:
            await self._create_thread()
            assert self.thread_id is not None

            # delete created users and chat threads
            if not self.is_playback():
                await self.chat_client.delete_chat_thread(self.thread_id)

    @pytest.mark.live_test_only
    @AsyncCommunicationTestCase.await_prepared_test
    async def test_get_chat_thread(self):
        async with self.chat_client:
            await self._create_thread()
            get_thread_result = await self.chat_client.get_chat_thread(self.thread_id)
            assert get_thread_result.id == self.thread_id

            # delete created users and chat threads
            if not self.is_playback():
                await self.chat_client.delete_chat_thread(self.thread_id)

    @pytest.mark.live_test_only
    @AsyncCommunicationTestCase.await_prepared_test
    async def test_list_chat_threads(self):
        async with self.chat_client:
            await self._create_thread()
            if self.is_live:
                await asyncio.sleep(2)

            chat_thread_infos = self.chat_client.list_chat_threads(results_per_page=1)

            items = []
            async for item in chat_thread_infos:
                items.append(item)
            assert len(items) == 1

            # delete created users and chat threads
            if not self.is_playback():
                await self.chat_client.delete_chat_thread(self.thread_id)

    @pytest.mark.live_test_only
    @AsyncCommunicationTestCase.await_prepared_test
    async def test_get_thread_client(self):
        async with self.chat_client:
            await self._create_thread()
            chat_thread_client = self.chat_client.get_chat_thread_client(self.thread_id)
            assert chat_thread_client.thread_id == self.thread_id

            # delete created users and chat threads
            if not self.is_playback():
                await self.chat_client.delete_chat_thread(self.thread_id)

    @pytest.mark.live_test_only
    @AsyncCommunicationTestCase.await_prepared_test
    async def test_delete_chat_thread(self):
        async with self.chat_client:
            await self._create_thread()
            await self.chat_client.delete_chat_thread(self.thread_id)

            # delete created users and chat threads
            if not self.is_playback():
                await self.chat_client.delete_chat_thread(self.thread_id)