# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

import unittest
from datetime import date, datetime, timedelta

import pytest
from azure.core.credentials import AzureNamedKeyCredential, AzureSasCredential
from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError
)
from azure.core.pipeline.transport import AioHttpTransport
from azure.storage.queue import (
    AccessPolicy,
    AccountSasPermissions,
    generate_account_sas,
    generate_queue_sas,
    QueueSasPermissions,
    ResourceTypes
)
from azure.storage.queue.aio import QueueClient, QueueServiceClient

from devtools_testutils.aio import recorded_by_proxy_async
from devtools_testutils.storage.aio import AsyncStorageRecordedTestCase
from settings.testcase import QueuePreparer

# ------------------------------------------------------------------------------
TEST_QUEUE_PREFIX = 'pyqueueasync'
# ------------------------------------------------------------------------------
# pylint: disable=locally-disabled, multiple-statements, fixme, too-many-lines


class TestAsyncStorageQueue(AsyncStorageRecordedTestCase):
    # --Helpers-----------------------------------------------------------------
    def _get_queue_reference(self, qsc, prefix=TEST_QUEUE_PREFIX):
        queue_name = self.get_resource_name(prefix)
        queue = qsc.get_queue_client(queue_name)
        return queue

    async def _create_queue(self, qsc, prefix=TEST_QUEUE_PREFIX, queue_list=None):
        queue = self._get_queue_reference(qsc, prefix)
        created = await queue.create_queue()
        if queue_list:
            queue_list.append(created)
        return queue

    # --Test cases for queues ----------------------------------------------
    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_create_queue(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        created = await queue_client.create_queue()

        # Asserts
        assert created

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_create_queue_fail_on_exist(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        created = await queue_client.create_queue()
        with pytest.raises(ResourceExistsError):
            await queue_client.create_queue()

        # Asserts
        assert created

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_create_queue_fail_on_exist_different_metadata(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        created = await queue_client.create_queue()
        with pytest.raises(ResourceExistsError):
            await queue_client.create_queue(metadata={"val": "value"})

        # Asserts
        assert created

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_create_queue_with_options(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        await queue_client.create_queue(
            metadata={'val1': 'test', 'val2': 'blah'})
        props = await queue_client.get_queue_properties()

        # Asserts
        assert 0 == props.approximate_message_count
        assert 2 == len(props.metadata)
        assert 'test' == props.metadata['val1']
        assert 'blah' == props.metadata['val2']

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_messages_with_max_messages(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        await queue_client.create_queue()
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        await queue_client.send_message('message5')
        await queue_client.send_message('message6')
        await queue_client.send_message('message7')
        await queue_client.send_message('message8')
        await queue_client.send_message('message9')
        await queue_client.send_message('message10')
        result = []
        async for m in queue_client.receive_messages(max_messages=5):
            result.append(m)

        # Asserts
        assert result is not None
        assert 5 == len(result)

        for message in result:
            assert message is not None
            assert '' != message.id
            assert '' != message.content
            assert '' != message.pop_receipt
            assert 1 == message.dequeue_count
            assert '' != message.inserted_on
            assert '' != message.expires_on
            assert '' != message.next_visible_on

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_messages_with_too_little_messages(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(
            self.account_url(storage_account_name, "queue"),
            storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        await queue_client.create_queue()
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        await queue_client.send_message('message5')
        result = []
        async for m in queue_client.receive_messages(max_messages=10):
            result.append(m)

        # Asserts
        assert result is not None
        assert 5 == len(result)

        for message in result:
            assert message is not None
            assert '' != message.id
            assert '' != message.content
            assert '' != message.pop_receipt
            assert 1 == message.dequeue_count
            assert '' != message.inserted_on
            assert '' != message.expires_on
            assert '' != message.next_visible_on

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_messages_with_page_bigger_than_max(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(
            self.account_url(storage_account_name, "queue"),
            storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        await queue_client.create_queue()
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        await queue_client.send_message('message5')

        # Asserts
        with pytest.raises(ValueError):
            queue_client.receive_messages(messages_per_page=5, max_messages=2)

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_messages_with_remainder(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        await queue_client.create_queue()
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        await queue_client.send_message('message5')
        await queue_client.send_message('message6')
        await queue_client.send_message('message7')
        await queue_client.send_message('message8')
        await queue_client.send_message('message9')
        await queue_client.send_message('message10')
        await queue_client.send_message('message11')
        await queue_client.send_message('message12')

        result = []
        async for m in queue_client.receive_messages(messages_per_page=3, max_messages=10):
            result.append(m)

        remainder = []
        async for m in queue_client.receive_messages():
            remainder.append(m)

        # Asserts
        assert result is not None
        assert 10 == len(result)

        assert remainder is not None
        assert 2 == len(remainder)

        for message in result:
            assert message is not None
            assert '' != message.id
            assert '' != message.content
            assert '' != message.pop_receipt
            assert 1 == message.dequeue_count
            assert '' != message.inserted_on
            assert '' != message.expires_on
            assert '' != message.next_visible_on

        for message in remainder:
            assert message is not None
            assert '' != message.id
            assert '' != message.content
            assert '' != message.pop_receipt
            assert 1 == message.dequeue_count
            assert '' != message.inserted_on
            assert '' != message.expires_on
            assert '' != message.next_visible_on

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_delete_non_existing_queue(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)

        # Asserts
        with pytest.raises(ResourceNotFoundError):
            await queue_client.delete_queue()

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_delete_existing_queue_fail_not_exist(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)

        created = await queue_client.create_queue()
        deleted = await queue_client.delete_queue()

        # Asserts
        assert deleted is None

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_list_queues(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        await queue_client.create_queue()
        queues = []
        async for q in qsc.list_queues():
            queues.append(q)

        # Asserts
        assert queues is not None
        assert len(queues) >= 1

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_list_queues_with_options(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Arrange
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_list = []
        prefix = 'listqueue'
        for i in range(0, 4):
            await self._create_queue(qsc, prefix + str(i), queue_list)

        # Action
        generator1 =  qsc.list_queues(
            name_starts_with=prefix,
            results_per_page=3).by_page()
        queues1 = []
        async for el in await generator1.__anext__():
            queues1.append(el)

        generator2 = qsc.list_queues(
            name_starts_with=prefix,
            include_metadata=True).by_page(generator1.continuation_token)
        queues2 = []
        async for el in await generator2.__anext__():
            queues2.append(el)

        # Asserts
        assert queues1 is not None
        assert 3 == len(queues1)
        assert queues1[0] is not None
        assert queues1[0].metadata is None
        assert '' != queues1[0].name
        # Asserts
        assert queues2 is not None
        assert len(queue_list) - 3 <= len(queues2)
        assert queues2[0] is not None
        assert '' != queues2[0].name

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_list_queues_with_metadata(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue = await self._create_queue(qsc)
        await queue.set_queue_metadata(metadata={'val1': 'test', 'val2': 'blah'})

        listed_queue = []
        async for q in qsc.list_queues(
            name_starts_with=queue.queue_name,
            results_per_page=1,
            include_metadata=True):
            listed_queue.append(q)
        listed_queue = listed_queue[0]

        # Asserts
        assert listed_queue is not None
        assert queue.queue_name == listed_queue.name
        assert listed_queue.metadata is not None
        assert len(listed_queue.metadata) == 2
        assert listed_queue.metadata['val1'] == 'test'

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_list_queues_account_sas(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)
        await queue_client.create_queue()
        sas_token = self.generate_sas(
            generate_account_sas,
            storage_account_name,
            storage_account_key,
            ResourceTypes(service=True),
            AccountSasPermissions(list=True),
            datetime.utcnow() + timedelta(hours=1)
        )

        # Act
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), credential=sas_token)
        queues = []
        async for q in qsc.list_queues():
            queues.append(q)

        # Assert
        assert queues is not None
        assert len(queues) >= 1

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_set_queue_metadata(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        metadata = {'hello': 'world', 'number': '43'}
        queue = await self._create_queue(qsc)

        # Act
        await queue.set_queue_metadata(metadata)
        metadata_from_response = await queue.get_queue_properties()
        md = metadata_from_response.metadata
        # Assert
        assert md == metadata

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_queue_metadata_message_count(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        props = await queue_client.get_queue_properties()

        # Asserts
        assert props.approximate_message_count >= 1
        assert 0 == len(props.metadata)

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_queue_exists(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Arrange
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue = await self._create_queue(qsc)

        # Act
        exists = await queue.get_queue_properties()

        # Assert
        assert exists

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_queue_not_exists(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Arrange
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue = qsc.get_queue_client(self.get_resource_name('missing'))
        # Act
        with pytest.raises(ResourceNotFoundError):
            await queue.get_queue_properties()

        # Assert

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_put_message(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action.  No exception means pass. No asserts needed.
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        message = await queue_client.send_message('message4')

        # Asserts
        assert message is not None
        assert '' != message.id
        assert isinstance(message.inserted_on, datetime)
        assert isinstance(message.expires_on, datetime)
        assert '' != message.pop_receipt
        assert 'message4' == message.content

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_put_message_large_time_to_live(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Arrange
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        # There should be no upper bound on a queue message's time to live
        await queue_client.send_message('message1', time_to_live=1024*1024*1024)

        # Act
        messages = await queue_client.peek_messages()

        # Assert
        assert messages[0].expires_on >= (messages[0].inserted_on + timedelta(seconds=1024 * 1024 * 1024 - 3600))

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_put_message_infinite_time_to_live(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Arrange
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1', time_to_live=-1)

        # Act
        messages = await queue_client.peek_messages()

        # Assert
        assert messages[0].expires_on.year == date.max.year

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_messages(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
            if len(messages):
                break
        message = messages[0]
        # Asserts
        assert message is not None
        assert message is not None
        assert '' != message.id
        assert 'message1' == message.content
        assert '' != message.pop_receipt
        assert 1 == message.dequeue_count

        assert isinstance(message.inserted_on, datetime)
        assert isinstance(message.expires_on, datetime)
        assert isinstance(message.next_visible_on, datetime)

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_receive_one_message(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        assert await queue_client.receive_message() is None

        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')

        message1 = await queue_client.receive_message()
        message2 = await queue_client.receive_message()
        peeked_message3 = await queue_client.peek_messages()

        # Asserts
        assert message1 is not None
        assert '' != message1.id
        assert 'message1' == message1.content
        assert '' != message1.pop_receipt
        assert 1 == message1.dequeue_count

        assert message2 is not None
        assert '' != message2.id
        assert 'message2' == message2.content
        assert '' != message2.pop_receipt
        assert 1 == message2.dequeue_count

        assert 'message3' == peeked_message3[0].content
        assert 0 == peeked_message3[0].dequeue_count

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_messages_with_options(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        pager = queue_client.receive_messages(messages_per_page=4, visibility_timeout=20)
        result = []
        async for el in pager:
            result.append(el)

        # Asserts
        assert result is not None
        assert 4 == len(result)

        for message in result:
            assert message is not None
            assert '' != message.id
            assert '' != message.content
            assert '' != message.pop_receipt
            assert 1 == message.dequeue_count
            assert '' != message.inserted_on
            assert '' != message.expires_on
            assert '' != message.next_visible_on

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_peek_messages(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        result = await queue_client.peek_messages()

        # Asserts
        assert result is not None
        assert 1 == len(result)
        message = result[0]
        assert message is not None
        assert '' != message.id
        assert '' != message.content
        assert message.pop_receipt is None
        assert 0 == message.dequeue_count
        assert '' != message.inserted_on
        assert '' != message.expires_on
        assert message.next_visible_on is None

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_peek_messages_with_options(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        result = await queue_client.peek_messages(max_messages=4)

        # Asserts
        assert result is not None
        assert 4 == len(result)
        for message in result:
            assert message is not None
            assert '' != message.id
            assert '' != message.content
            assert message.pop_receipt is None
            assert 0 == message.dequeue_count
            assert '' != message.inserted_on
            assert '' != message.expires_on
            assert message.next_visible_on is None

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_clear_messages(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        await queue_client.clear_messages()
        result = await queue_client.peek_messages()

        # Asserts
        assert result is not None
        assert 0 == len(result)

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_delete_message(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        await queue_client.send_message('message2')
        await queue_client.send_message('message3')
        await queue_client.send_message('message4')
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
            await queue_client.delete_message(m)
        async for m in queue_client.receive_messages():
            messages.append(m)
        # Asserts
        assert messages is not None
        assert 3 == len(messages)-1

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_update_message(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
        list_result1 = messages[0]
        message = await queue_client.update_message(
            list_result1.id,
            pop_receipt=list_result1.pop_receipt,
            visibility_timeout=0)
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
        list_result2 = messages[0]

        # Asserts
        # Update response
        assert message is not None
        assert message.pop_receipt is not None
        assert message.next_visible_on is not None
        assert isinstance(message.next_visible_on, datetime)

        # Get response
        assert list_result2 is not None
        message = list_result2
        assert message is not None
        assert list_result1.id == message.id
        assert 'message1' == message.content
        assert 2 == message.dequeue_count
        assert message.pop_receipt is not None
        assert message.inserted_on is not None
        assert message.expires_on is not None
        assert message.next_visible_on is not None

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_update_message_content(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')

        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
        list_result1 = messages[0]
        message = await queue_client.update_message(
            list_result1.id,
            pop_receipt=list_result1.pop_receipt,
            visibility_timeout=0,
            content='new text')
        assert 'new text' == message.content

        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
        list_result2 = messages[0]

        # Asserts
        # Update response
        assert message is not None
        assert message.pop_receipt is not None
        assert message.next_visible_on is not None
        assert isinstance(message.next_visible_on, datetime)

        # Get response
        assert list_result2 is not None
        message = list_result2
        assert message is not None
        assert list_result1.id == message.id
        assert 'new text' == message.content
        assert 2 == message.dequeue_count
        assert message.pop_receipt is not None
        assert message.inserted_on is not None
        assert message.expires_on is not None
        assert message.next_visible_on is not None

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_account_sas(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)

        # Arrange
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        token = self.generate_sas(
            generate_account_sas,
            qsc.account_name,
            qsc.credential.account_key,
            ResourceTypes(object=True),
            AccountSasPermissions(read=True),
            datetime.utcnow() + timedelta(hours=1),
            datetime.utcnow() - timedelta(minutes=5)
        )

        # Act
        for credential in [token, AzureSasCredential(token)]:
            service = QueueServiceClient(
                account_url=qsc.url,
                credential=credential,
            )
            new_queue_client = service.get_queue_client(queue_client.queue_name)
            result = await new_queue_client.peek_messages()

            # Assert
            assert result is not None
            assert 1 == len(result)
            message = result[0]
            assert message is not None
            assert '' != message.id
            assert 'message1' == message.content

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_azure_named_key_credential_access(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")


        # Arrange
        named_key = AzureNamedKeyCredential(storage_account_name, storage_account_key)
        qsc = QueueServiceClient(
            self.account_url(storage_account_name, "queue"), named_key)
        queue_client = self._get_queue_reference(qsc)
        await queue_client.create_queue()
        await queue_client.send_message('message1')

        # Act
        result = await queue_client.peek_messages()

        # Assert
        assert result is not None

    @QueuePreparer()
    async def test_account_sas_raises_if_sas_already_in_uri(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        with pytest.raises(ValueError):
            QueueServiceClient(
                self.account_url(storage_account_name, "queue") + "?sig=foo",
                credential=AzureSasCredential("?foo=bar"))

    @pytest.mark.live_test_only
    @QueuePreparer()
    async def test_token_credential(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        token_credential = self.generate_oauth_token()

        # Action 1: make sure token works
        service = QueueServiceClient(
            self.account_url(storage_account_name, "queue"),
            credential=token_credential)
        queues = await service.get_service_properties()
        assert queues is not None

        # Action 2: change token value to make request fail
        fake_credential = self.generate_fake_token()
        service = QueueServiceClient(
            self.account_url(storage_account_name, "queue"),
            credential=fake_credential)
        with pytest.raises(ClientAuthenticationError):
            await service.get_service_properties()

        # Action 3: update token to make it working again
        service = QueueServiceClient(
            self.account_url(storage_account_name, "queue"),
            credential=token_credential)
        queues = await service.get_service_properties()  # Not raise means success
        assert queues is not None

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_sas_read(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)

        # Arrange
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        token = self.generate_sas(
            generate_queue_sas,
            queue_client.account_name,
            queue_client.queue_name,
            queue_client.credential.account_key,
            QueueSasPermissions(read=True),
            datetime.utcnow() + timedelta(hours=1),
            datetime.utcnow() - timedelta(minutes=5)
        )

        # Act
        service = QueueClient.from_queue_url(
            queue_url=queue_client.url,
            credential=token,
        )
        result = await service.peek_messages()

        # Assert
        assert result is not None
        assert 1 == len(result)
        message = result[0]
        assert message is not None
        assert '' != message.id
        assert 'message1' == message.content

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_sas_add(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)

        # Arrange
        queue_client = await self._create_queue(qsc)
        token = self.generate_sas(
            generate_queue_sas,
            queue_client.account_name,
            queue_client.queue_name,
            queue_client.credential.account_key,
            QueueSasPermissions(add=True),
            datetime.utcnow() + timedelta(hours=1),
        )

        # Act
        service = QueueClient.from_queue_url(
            queue_url=queue_client.url,
            credential=token,
        )
        result = await service.send_message('addedmessage')
        assert 'addedmessage' == result.content

        # Assert
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
        result = messages[0]
        assert 'addedmessage' == result.content

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_sas_update(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)

        # Arrange
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        token = self.generate_sas(
            generate_queue_sas,
            queue_client.account_name,
            queue_client.queue_name,
            queue_client.credential.account_key,
            QueueSasPermissions(update=True),
            datetime.utcnow() + timedelta(hours=1),
        )
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
        result = messages[0]

        # Act
        service = QueueClient.from_queue_url(
            queue_url=queue_client.url,
            credential=token,
        )
        await service.update_message(
            result.id,
            pop_receipt=result.pop_receipt,
            visibility_timeout=0,
            content='updatedmessage1',
        )

        # Assert
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
        result = messages[0]
        assert 'updatedmessage1' == result.content

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_sas_process(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)

        # Arrange
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        token = self.generate_sas(
            generate_queue_sas,
            queue_client.account_name,
            queue_client.queue_name,
            queue_client.credential.account_key,
            QueueSasPermissions(process=True),
            datetime.utcnow() + timedelta(hours=1),
        )

        # Act
        service = QueueClient.from_queue_url(
            queue_url=queue_client.url,
            credential=token,
        )
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
        message = messages[0]

        # Assert
        assert message is not None
        assert '' != message.id
        assert 'message1' == message.content

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_sas_signed_identifier(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")
        variables = kwargs.pop('variables', {})

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)

        # Arrange
        access_policy = AccessPolicy()
        start_time = self.get_datetime_variable(variables, 'start_time', datetime.utcnow() - timedelta(hours=1))
        expiry_time = self.get_datetime_variable(variables, 'expiry_time', datetime.utcnow() + timedelta(hours=1))
        access_policy.start = start_time
        access_policy.expiry = expiry_time
        access_policy.permission = QueueSasPermissions(read=True)

        identifiers = {'testid': access_policy}

        queue_client = await self._create_queue(qsc)
        resp = await queue_client.set_queue_access_policy(identifiers)

        await queue_client.send_message('message1')

        token = self.generate_sas(
            generate_queue_sas,
            queue_client.account_name,
            queue_client.queue_name,
            queue_client.credential.account_key,
            policy_id='testid'
        )

        # Act
        service = QueueClient.from_queue_url(
            queue_url=queue_client.url,
            credential=token,
        )
        result = await service.peek_messages()

        # Assert
        assert result is not None
        assert 1 == len(result)
        message = result[0]
        assert message is not None
        assert '' != message.id
        assert 'message1' == message.content

        return variables

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_queue_acl(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        # Arrange
        queue_client = await self._create_queue(qsc)

        # Act
        acl = await queue_client.get_queue_access_policy()

        # Assert
        assert acl is not None
        assert len(acl) == 0

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_queue_acl_iter(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        # Arrange
        queue_client = await self._create_queue(qsc)

        # Act
        acl = await queue_client.get_queue_access_policy()
        for signed_identifier in acl:
            pass

        # Assert
        assert acl is not None
        assert len(acl) == 0

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_get_queue_acl_with_non_existing_queue(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        # Arrange
        queue_client = self._get_queue_reference(qsc)

        # Act
        with pytest.raises(ResourceNotFoundError):
            await queue_client.get_queue_access_policy()

            # Assert

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_set_queue_acl(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        # Arrange
        queue_client = await self._create_queue(qsc)

        # Act
        resp = await queue_client.set_queue_access_policy(signed_identifiers=dict())

        # Assert
        assert resp is None
        acl = await queue_client.get_queue_access_policy()
        assert acl is not None

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_set_queue_acl_with_empty_signed_identifiers(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        # Arrange
        queue_client = await self._create_queue(qsc)

        # Act
        await queue_client.set_queue_access_policy(signed_identifiers={})

        # Assert
        acl = await queue_client.get_queue_access_policy()
        assert acl is not None
        assert len(acl) == 0

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_set_queue_acl_with_empty_signed_identifier(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        # Arrange
        queue_client = await self._create_queue(qsc)

        # Act
        await queue_client.set_queue_access_policy(signed_identifiers={'empty': None})

        # Assert
        acl = await queue_client.get_queue_access_policy()
        assert acl is not None
        assert len(acl) == 1
        assert acl['empty'] is not None
        assert acl['empty'].permission is None
        assert acl['empty'].expiry is None
        assert acl['empty'].start is None

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_set_queue_acl_with_signed_identifiers(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")
        variables = kwargs.pop('variables', {})

        # Arrange
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)

        # Act
        expiry_time = self.get_datetime_variable(variables, 'expiry_time', datetime.utcnow() + timedelta(hours=1))
        start_time = self.get_datetime_variable(variables, 'start_time', datetime.utcnow() - timedelta(minutes=5))
        access_policy = AccessPolicy(permission=QueueSasPermissions(read=True),
                                     expiry=expiry_time,
                                     start=start_time)
        identifiers = {'testid': access_policy}

        resp = await queue_client.set_queue_access_policy(signed_identifiers=identifiers)

        # Assert
        assert resp is None
        acl = await queue_client.get_queue_access_policy()
        assert acl is not None
        assert len(acl) == 1
        assert 'testid' in acl

        return variables

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_set_queue_acl_too_many_ids(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Arrange
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)

        # Act
        identifiers = dict()
        for i in range(0, 16):
            identifiers[f'id{i}'] = AccessPolicy()

        # Assert
        with pytest.raises(ValueError):
            await queue_client.set_queue_access_policy(identifiers)

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_set_queue_acl_with_non_existing_queue(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Arrange
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = self._get_queue_reference(qsc)

        # Act
        with pytest.raises(ResourceNotFoundError):
            await queue_client.set_queue_access_policy(signed_identifiers=dict())

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_unicode_create_queue_unicode_name(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_name = '啊齄丂狛狜'

        with pytest.raises(HttpResponseError):
            # not supported - queue name must be alphanumeric, lowercase
            client = qsc.get_queue_client(queue_name)
            await client.create_queue()

            # Asserts

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_unicode_get_messages_unicode_data(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1㚈')
        message = None
        async for m in queue_client.receive_messages():
            message = m
        # Asserts
        assert message is not None
        assert '' != message.id
        assert 'message1㚈' == message.content
        assert '' != message.pop_receipt
        assert 1 == message.dequeue_count
        assert isinstance(message.inserted_on, datetime)
        assert isinstance(message.expires_on, datetime)
        assert isinstance(message.next_visible_on, datetime)

    @QueuePreparer()
    @recorded_by_proxy_async
    async def test_unicode_update_message_unicode_data(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        # Action
        qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
        queue_client = await self._create_queue(qsc)
        await queue_client.send_message('message1')
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)

        list_result1 = messages[0]
        list_result1.content = '啊齄丂狛狜'
        await queue_client.update_message(list_result1, visibility_timeout=0)
        messages = []
        async for m in queue_client.receive_messages():
            messages.append(m)
        # Asserts
        message = messages[0]
        assert message is not None
        assert list_result1.id == message.id
        assert '啊齄丂狛狜' == message.content
        assert '' != message.pop_receipt
        assert 2 == message.dequeue_count
        assert isinstance(message.inserted_on, datetime)
        assert isinstance(message.expires_on, datetime)
        assert isinstance(message.next_visible_on, datetime)

    @pytest.mark.live_test_only
    @QueuePreparer()
    async def test_transport_closed_only_once(self, **kwargs):
        storage_account_name = kwargs.pop("storage_account_name")
        storage_account_key = kwargs.pop("storage_account_key")

        transport = AioHttpTransport()
        prefix = TEST_QUEUE_PREFIX
        queue_name = self.get_resource_name(prefix)
        async with QueueServiceClient(
                self.account_url(storage_account_name, "queue"),
                credential=storage_account_key, transport=transport) as qsc:
            await qsc.get_service_properties()
            assert transport.session is not None
            async with qsc.get_queue_client(queue_name) as qc:
                assert transport.session is not None
            await qsc.get_service_properties()
            assert transport.session is not None

# ------------------------------------------------------------------------------
if __name__ == '__main__':
    unittest.main()
