File: test_service_stats.py

package info (click to toggle)
python-azure-storage 20181109%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 76,472 kB
  • sloc: python: 28,724; makefile: 204; sh: 1
file content (98 lines) | stat: -rw-r--r-- 3,312 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import unittest

from azure.storage.blob import BlockBlobService
from azure.storage.common.retry import (
    LinearRetry
)
from azure.storage.queue import QueueService
from tests.testcase import (
    StorageTestCase,
    record,
)

SERVICE_UNAVAILABLE_RESP_BODY = '<?xml version="1.0" encoding="utf-8"?><StorageServiceStats><GeoReplication><Status' \
                                '>unavailable</Status><LastSyncTime></LastSyncTime></GeoReplication' \
                                '></StorageServiceStats> '


# --Test Class -----------------------------------------------------------------
class ServiceStatsTest(StorageTestCase):
    # --Helpers-----------------------------------------------------------------
    def _assert_stats_default(self, stats):
        self.assertIsNotNone(stats)
        self.assertIsNotNone(stats.geo_replication)

        if not self.settings.IS_EMULATED:
            self.assertEqual(stats.geo_replication.status, 'live')
            self.assertIsNotNone(stats.geo_replication.last_sync_time)

    def _assert_stats_unavailable(self, stats):
        self.assertIsNotNone(stats)
        self.assertIsNotNone(stats.geo_replication)

        self.assertEqual(stats.geo_replication.status, 'unavailable')
        self.assertIsNone(stats.geo_replication.last_sync_time)

    @staticmethod
    def override_response_body_with_unavailable_status(response):
        response.body = SERVICE_UNAVAILABLE_RESP_BODY

    # --Test cases per service ---------------------------------------

    @record
    def test_blob_service_stats(self):
        # Arrange
        bs = self._create_storage_service(BlockBlobService, self.settings)

        # Act
        stats = bs.get_blob_service_stats()

        # Assert
        self._assert_stats_default(stats)

    @record
    def test_blob_service_stats_when_unavailable(self):
        # Arrange
        bs = self._create_storage_service(BlockBlobService, self.settings)
        bs.response_callback = self.override_response_body_with_unavailable_status
        bs.retry = LinearRetry(backoff=1).retry

        # Act
        stats = bs.get_blob_service_stats()

        # Assert
        self._assert_stats_unavailable(stats)

    @record
    def test_queue_service_stats(self):
        # Arrange
        qs = self._create_storage_service(QueueService, self.settings)

        # Act
        stats = qs.get_queue_service_stats()

        # Assert
        self._assert_stats_default(stats)

    @record
    def test_queue_service_stats_when_unavailable(self):
        # Arrange
        qs = self._create_storage_service(QueueService, self.settings)
        qs.response_callback = self.override_response_body_with_unavailable_status
        qs.retry = LinearRetry(backoff=1).retry

        # Act
        stats = qs.get_queue_service_stats()

        # Assert
        self._assert_stats_unavailable(stats)


# ------------------------------------------------------------------------------
if __name__ == '__main__':
    unittest.main()