File: test_volume.py

package info (click to toggle)
python-openstacksdk 4.4.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,352 kB
  • sloc: python: 122,960; sh: 153; makefile: 23
file content (166 lines) | stat: -rw-r--r-- 6,840 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""
test_volume
----------------------------------

Functional tests for block storage methods.
"""

from fixtures import TimeoutException
from testtools import content

from openstack import exceptions
from openstack.tests.functional import base
from openstack import utils


class TestVolume(base.BaseFunctionalTest):
    # Creating and deleting volumes is slow
    TIMEOUT_SCALING_FACTOR = 1.5

    def setUp(self):
        super().setUp()
        self.skipTest('Volume functional tests temporarily disabled')
        if not self.user_cloud.has_service('volume'):
            self.skipTest('volume service not supported by cloud')

    def test_volumes(self):
        '''Test volume and snapshot functionality'''
        volume_name = self.getUniqueString()
        snapshot_name = self.getUniqueString()
        self.addDetail('volume', content.text_content(volume_name))
        self.addCleanup(self.cleanup, volume_name, snapshot_name=snapshot_name)
        volume = self.user_cloud.create_volume(
            display_name=volume_name, size=1
        )
        snapshot = self.user_cloud.create_volume_snapshot(
            volume['id'], display_name=snapshot_name
        )

        ret_volume = self.user_cloud.get_volume_by_id(volume['id'])
        self.assertEqual(volume['id'], ret_volume['id'])

        volume_ids = [v['id'] for v in self.user_cloud.list_volumes()]
        self.assertIn(volume['id'], volume_ids)

        snapshot_list = self.user_cloud.list_volume_snapshots()
        snapshot_ids = [s['id'] for s in snapshot_list]
        self.assertIn(snapshot['id'], snapshot_ids)

        ret_snapshot = self.user_cloud.get_volume_snapshot_by_id(
            snapshot['id']
        )
        self.assertEqual(snapshot['id'], ret_snapshot['id'])

        self.user_cloud.delete_volume_snapshot(snapshot_name, wait=True)
        self.user_cloud.delete_volume(volume_name, wait=True)

    def test_volume_to_image(self):
        '''Test volume export to image functionality'''
        volume_name = self.getUniqueString()
        image_name = self.getUniqueString()
        self.addDetail('volume', content.text_content(volume_name))
        self.addCleanup(self.cleanup, volume_name, image_name=image_name)
        volume = self.user_cloud.create_volume(
            display_name=volume_name, size=1
        )
        image = self.user_cloud.create_image(
            image_name, volume=volume, wait=True
        )

        volume_ids = [v['id'] for v in self.user_cloud.list_volumes()]
        self.assertIn(volume['id'], volume_ids)

        image_list = self.user_cloud.list_images()
        image_ids = [s['id'] for s in image_list]
        self.assertIn(image['id'], image_ids)

        self.user_cloud.delete_image(image_name, wait=True)
        self.user_cloud.delete_volume(volume_name, wait=True)

    def cleanup(self, volume, snapshot_name=None, image_name=None):
        # Need to delete snapshots before volumes
        if snapshot_name:
            snapshot = self.user_cloud.get_volume_snapshot(snapshot_name)
            if snapshot:
                self.user_cloud.delete_volume_snapshot(
                    snapshot_name, wait=True
                )
        if image_name:
            image = self.user_cloud.get_image(image_name)
            if image:
                self.user_cloud.delete_image(image_name, wait=True)
        if not isinstance(volume, list):
            self.user_cloud.delete_volume(volume, wait=True)
        else:
            # We have more than one volume to clean up - submit all of the
            # deletes without wait, then poll until none of them are found
            # in the volume list anymore
            for v in volume:
                self.user_cloud.delete_volume(v, wait=False)
            try:
                for count in utils.iterate_timeout(
                    180, "Timeout waiting for volume cleanup"
                ):
                    found = False
                    for existing in self.user_cloud.list_volumes():
                        for v in volume:
                            if v['id'] == existing['id']:
                                found = True
                                break
                        if found:
                            break
                    if not found:
                        break
            except (exceptions.ResourceTimeout, TimeoutException):
                # NOTE(slaweq): ups, some volumes are still not removed
                # so we should try to force delete it once again and move
                # forward
                for existing in self.user_cloud.list_volumes():
                    for v in volume:
                        if v['id'] == existing['id']:
                            self.operator_cloud.delete_volume(
                                v, wait=False, force=True
                            )

    def test_list_volumes_pagination(self):
        '''Test pagination for list volumes functionality'''

        volumes = []
        # the number of created volumes needs to be higher than
        # CONF.osapi_max_limit but not higher than volume quotas for
        # the test user in the tenant(default quotas is set to 10)
        num_volumes = 8
        for i in range(num_volumes):
            name = self.getUniqueString()
            v = self.user_cloud.create_volume(display_name=name, size=1)
            volumes.append(v)
        self.addCleanup(self.cleanup, volumes)
        result = []
        for v in self.user_cloud.list_volumes():
            if v['name'] and v['name'].startswith(self.id()):
                result.append(v['id'])
        self.assertEqual(sorted([v['id'] for v in volumes]), sorted(result))

    def test_update_volume(self):
        name, desc = self.getUniqueString('name'), self.getUniqueString('desc')
        self.addCleanup(self.cleanup, name)
        volume = self.user_cloud.create_volume(1, name=name, description=desc)
        self.assertEqual(volume.name, name)
        self.assertEqual(volume.description, desc)
        new_name = self.getUniqueString('name')
        volume = self.user_cloud.update_volume(volume.id, name=new_name)
        self.assertNotEqual(volume.name, name)
        self.assertEqual(volume.name, new_name)
        self.assertEqual(volume.description, desc)