1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import unittest
import pytest
from azure.storage.queue import QueueServiceClient
from devtools_testutils import recorded_by_proxy
from devtools_testutils.storage import StorageRecordedTestCase
from settings.testcase import QueuePreparer
# --Test Class -----------------------------------------------------------------
class TestQueueServiceStats(StorageRecordedTestCase):
# --Helpers-----------------------------------------------------------------
def _assert_stats_default(self, stats):
assert stats is not None
assert stats["geo_replication"] is not None
assert stats["geo_replication"]["status"] == "live"
assert stats["geo_replication"]["last_sync_time"] is not None
def _assert_stats_unavailable(self, stats):
assert stats is not None
assert stats["geo_replication"] is not None
assert stats["geo_replication"]["status"] == "unavailable"
assert stats["geo_replication"]["last_sync_time"] is None
# --Test cases per service ---------------------------------------
@pytest.mark.playback_test_only
@QueuePreparer()
@recorded_by_proxy
def test_queue_service_stats(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")
# Arrange
qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
# Act
stats = qsc.get_service_stats()
# Assert
self._assert_stats_default(stats)
@pytest.mark.playback_test_only
@QueuePreparer()
@recorded_by_proxy
def test_queue_service_stats_when_unavailable(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")
# Arrange
qsc = QueueServiceClient(self.account_url(storage_account_name, "queue"), storage_account_key)
# Act
stats = qsc.get_service_stats()
# Assert
self._assert_stats_unavailable(stats)
# ------------------------------------------------------------------------------
if __name__ == "__main__":
unittest.main()
|