File: azure_lib.py

package info (click to toggle)
aptly 1.6.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 49,928 kB
  • sloc: python: 10,398; sh: 252; makefile: 184
file content (115 lines) | stat: -rw-r--r-- 3,478 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from lib import BaseTest
import uuid
import os

try:
    from azure.storage.blob import BlobServiceClient

    azure_storage_account = os.environ.get('AZURE_STORAGE_ACCOUNT')
    azure_storage_access_key = os.environ.get('AZURE_STORAGE_ACCESS_KEY')
    azure_storage_endpoint = os.environ.get(
        'AZURE_STORAGE_ENDPOINT',
        f'https://{azure_storage_account}.blob.core.windows.net',
    )
    if azure_storage_account is not None and azure_storage_access_key is not None:
        blob_client = BlobServiceClient(
            account_url=azure_storage_endpoint,
            credential=azure_storage_access_key,
        )
    else:
        print('Azure tests disabled: Azure creds not found in the environment')
        blob_client = None
except ImportError as e:
    print("Azure tests disabled: can't import azure.storage.blob", e)
    blob_client = None


class AzureTest(BaseTest):
    """
    BaseTest + support for Azure Blob Storage
    """

    use_azure_pool = False

    def __init__(self) -> None:
        super(AzureTest, self).__init__()
        self.container_name = None
        self.container = None
        self.container_contents = None

    def fixture_available(self):
        return super(AzureTest, self).fixture_available() and blob_client is not None

    def prepare(self):
        self.container_name = 'aptly-sys-test-' + str(uuid.uuid1())
        self.container = blob_client.create_container(
            self.container_name, public_access='blob'
        )

        self.azure_endpoint = {
            'accountName': azure_storage_account,
            'accountKey': azure_storage_access_key,
            'container': self.container_name,
            'endpoint': azure_storage_endpoint,
        }

        self.configOverride = {
            'AzurePublishEndpoints': {
                'test1': self.azure_endpoint,
            },
        }
        if self.use_azure_pool:
            self.configOverride['packagePoolStorage'] = {
                'type': 'azure',
                **self.azure_endpoint,
            }

        super(AzureTest, self).prepare()

    def shutdown(self):
        if self.container_name is not None:
            blob_client.delete_container(self.container_name)

        super(AzureTest, self).shutdown()

    def check_path(self, path):
        if self.container_contents is None:
            self.container_contents = [
                p.name for p in self.container.list_blobs() if p.name is not None
            ]

        if path.startswith('public/'):
            path = path.removeprefix('public/')

        if path in self.container_contents:
            return True

        if not path.endswith('/'):
            path = path + '/'

        for item in self.container_contents:
            if item.startswith(path):
                return True

        return False

    def check_exists(self, path):
        if not self.check_path(path):
            raise Exception("path %s doesn't exist" % (path,))

    def check_exists_azure_only(self, path):
        self.check_exists(path)
        BaseTest.check_not_exists(self, path)

    def check_not_exists(self, path):
        if self.check_path(path):
            raise Exception('path %s exists' % (path,))

    def read_file(self, path, mode=''):
        assert not mode

        if path.startswith('public/'):
            path = path.removeprefix('public/')

        blob = self.container.download_blob(path)
        return blob.readall().decode('utf-8')