File: test_logging.py

package info (click to toggle)
python-azure-storage 20181109%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 76,472 kB
  • sloc: python: 28,724; makefile: 204; sh: 1
file content (132 lines) | stat: -rw-r--r-- 5,096 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import sys
from datetime import datetime, timedelta

from azure.storage.blob import (
    BlockBlobService,
    ContainerPermissions,
    BlobPermissions,
)
from azure.storage.common._constants import (
    _AUTHORIZATION_HEADER_NAME,
)
from azure.storage.common.sharedaccesssignature import (
    _QueryStringConstants,
)
from tests.testcase import (
    StorageTestCase,
    TestMode,
    record,
    LogCaptured,
)

if sys.version_info >= (3,):
    from urllib.parse import parse_qs
else:
    from urlparse import parse_qs


class StorageLoggingTest(StorageTestCase):

    def setUp(self):
        super(StorageLoggingTest, self).setUp()

        self.bs = self._create_storage_service(BlockBlobService, self.settings)
        self.container_name = self.get_resource_name('utcontainer')

        # create source blob to be copied from
        self.source_blob_name = self.get_resource_name('srcblob')
        self.source_blob_data = self.get_random_bytes(4 * 1024)

        if not self.is_playback():
            self.bs.create_container(self.container_name)
            self.bs.create_blob_from_bytes(self.container_name, self.source_blob_name, self.source_blob_data)

        # generate a SAS so that it is accessible with a URL
        sas_token = self.bs.generate_blob_shared_access_signature(
            self.container_name,
            self.source_blob_name,
            permission=BlobPermissions.READ,
            expiry=datetime.utcnow() + timedelta(hours=1),
        )
        self.source_blob_url = self.bs.make_blob_url(self.container_name, self.source_blob_name, sas_token=sas_token)

    def tearDown(self):
        if not self.is_playback():
            self.bs.delete_container(self.container_name)

        return super(StorageLoggingTest, self).tearDown()

    @record
    def test_authorization_is_scrubbed_off(self):
        # Act
        with LogCaptured(self) as log_captured:
            self.bs.exists(self.container_name)
            log_as_str = log_captured.getvalue()

            # Assert
            # make sure authorization header is logged, but its value is not
            # the keyword SharedKey is present in the authorization header's value
            self.assertTrue(_AUTHORIZATION_HEADER_NAME in log_as_str)
            self.assertFalse('SharedKey' in log_as_str)

    @record
    def test_sas_signature_is_scrubbed_off(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        token = self.bs.generate_container_shared_access_signature(
            self.container_name,
            permission=ContainerPermissions.READ,
            expiry=datetime.utcnow() + timedelta(hours=1),
        )
        # parse out the signed signature
        token_components = parse_qs(token)
        signed_signature = token_components[_QueryStringConstants.SIGNED_SIGNATURE][0]

        bs_with_sas = BlockBlobService(account_name=self.settings.STORAGE_ACCOUNT_NAME, sas_token=token,
                                       protocol=self.settings.PROTOCOL)

        # Act
        with LogCaptured(self) as log_captured:
            bs_with_sas.get_blob_account_information(self.container_name)
            log_as_str = log_captured.getvalue()

            # Assert
            # make sure the query parameter 'sig' is logged, but its value is not
            self.assertTrue(_QueryStringConstants.SIGNED_SIGNATURE in log_as_str)
            self.assertFalse(signed_signature in log_as_str)

    @record
    def test_copy_source_sas_is_scrubbed_off(self):
        # SAS URL is calculated from storage key, so this test runs live only
        if TestMode.need_recording_file(self.test_mode):
            return

        # Arrange
        dest_blob_name = self.get_resource_name('destblob')

        # parse out the signed signature
        token_components = parse_qs(self.source_blob_url)
        signed_signature = token_components[_QueryStringConstants.SIGNED_SIGNATURE][0]

        # Act
        with LogCaptured(self) as log_captured:
            self.bs.copy_blob(self.container_name, dest_blob_name, self.source_blob_url, requires_sync=True)
            log_as_str = log_captured.getvalue()

            # Assert
            # make sure the query parameter 'sig' is logged, but its value is not
            self.assertTrue(_QueryStringConstants.SIGNED_SIGNATURE in log_as_str)
            self.assertFalse(signed_signature in log_as_str)

            # make sure authorization header is logged, but its value is not
            # the keyword SharedKey is present in the authorization header's value
            self.assertTrue(_AUTHORIZATION_HEADER_NAME in log_as_str)
            self.assertFalse('SharedKey' in log_as_str)