1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import pytest
from devtools_testutils import AzureRecordedTestCase, recorded_by_proxy
from azure.data.tables import TableServiceClient
from _shared.testcase import TableTestCase
from preparers import cosmos_decorator
SERVICE_UNAVAILABLE_RESP_BODY = (
'<?xml version="1.0" encoding="utf-8"?><StorageServiceStats><GeoReplication><Status'
">unavailable</Status><LastSyncTime></LastSyncTime></GeoReplication"
"></StorageServiceStats> "
)
SERVICE_LIVE_RESP_BODY = (
'<?xml version="1.0" encoding="utf-8"?><StorageServiceStats><GeoReplication><Status'
">live</Status><LastSyncTime>Wed, 19 Jan 2021 22:28:43 GMT</LastSyncTime></GeoReplication"
"></StorageServiceStats> "
)
# --Test Class -----------------------------------------------------------------
class TestTableServiceStatsCosmos(AzureRecordedTestCase, TableTestCase):
@staticmethod
def override_response_body_with_unavailable_status(response):
response.http_response.text = lambda _: SERVICE_UNAVAILABLE_RESP_BODY
@staticmethod
def override_response_body_with_live_status(response):
response.http_response.text = lambda _: SERVICE_LIVE_RESP_BODY
# TODO: Should we remove these both from cosmos sync/async?
# --Test cases per service ---------------------------------------
@pytest.mark.skip("JSON is invalid for cosmos")
@cosmos_decorator
@recorded_by_proxy
def test_table_service_stats_f(self, tables_cosmos_account_name, tables_primary_cosmos_account_key):
tsc = TableServiceClient(
self.account_url(tables_cosmos_account_name, "cosmos"), credential=tables_primary_cosmos_account_key
)
stats = tsc.get_service_stats(raw_response_hook=self.override_response_body_with_live_status)
self._assert_stats_default(stats)
@pytest.mark.skip("JSON is invalid for cosmos")
@cosmos_decorator
@recorded_by_proxy
def test_table_service_stats_when_unavailable(self, tables_cosmos_account_name, tables_primary_cosmos_account_key):
tsc = TableServiceClient(
self.account_url(tables_cosmos_account_name, "cosmos"), credential=tables_primary_cosmos_account_key
)
stats = tsc.get_service_stats(raw_response_hook=self.override_response_body_with_unavailable_status)
self._assert_stats_unavailable(stats)
|