File: test_multi_hardware_clean_steps.py

package info (click to toggle)
ironic-python-agent 11.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 4,508 kB
  • sloc: python: 36,248; sh: 60; makefile: 29
file content (116 lines) | stat: -rw-r--r-- 4,665 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Copyright 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest import mock

from stevedore import extension

from ironic_python_agent.extensions import clean
from ironic_python_agent import hardware
from ironic_python_agent.tests.unit import base


def _build_clean_step(name, priority, reboot=False, abort=False):
    return {'step': name, 'priority': priority,
            'reboot_requested': reboot, 'abortable': abort}


class AFakeMainlineHardwareManager(hardware.HardwareManager):
    def evaluate_hardware_support(self):
        return hardware.HardwareSupport.MAINLINE

    def get_clean_steps(self, node, ports):
        return [_build_clean_step('duped_ml', 20)]


class AFakeGenericHardwareManager(hardware.HardwareManager):
    def evaluate_hardware_support(self):
        return hardware.HardwareSupport.GENERIC

    def get_clean_steps(self, node, ports):
        return [_build_clean_step('duped_ml', 20),
                _build_clean_step('duped_gn', 30),
                _build_clean_step('ZHigherPrio', 1)]


class ZFakeGenericHardwareManager(hardware.HardwareManager):
    def evaluate_hardware_support(self):
        return hardware.HardwareSupport.GENERIC

    def get_clean_steps(self, node, ports):
        return [_build_clean_step('duped_ml', 20),
                _build_clean_step('duped_gn', 30),
                _build_clean_step('ZHigherPrio', 100)]


@mock.patch.object(hardware.HardwareManager, 'wait_for_disks',
                   lambda _self: None)
class TestMultipleHardwareManagerCleanSteps(base.IronicAgentTest):
    def setUp(self):
        super(TestMultipleHardwareManagerCleanSteps, self).setUp()

        self.agent_extension = clean.CleanExtension()

        fake_ep = mock.Mock()
        fake_ep.module_name = 'fake'
        fake_ep.attrs = ['fake attrs']
        self.ag_hwm = extension.Extension(
            'fake_ageneric', fake_ep, None, AFakeGenericHardwareManager())
        self.zg_hwm = extension.Extension(
            'fake_zgeneric', fake_ep, None, ZFakeGenericHardwareManager())
        self.ml_hwm = extension.Extension(
            'fake_amainline', fake_ep, None, AFakeMainlineHardwareManager())
        self.fake_ext_mgr = extension.ExtensionManager.make_test_instance(
            [self.ag_hwm, self.zg_hwm, self.ml_hwm])

        self.extension_mgr_patcher = mock.patch('stevedore.ExtensionManager',
                                                autospec=True)
        self.addCleanup(self.extension_mgr_patcher.stop)
        self.mocked_extension_mgr = self.extension_mgr_patcher.start()
        self.mocked_extension_mgr.return_value = self.fake_ext_mgr
        hardware._global_managers = None

    def test_clean_step_ordering(self):
        as_results = self.agent_extension.get_clean_steps(node={'uuid': '1'},
                                                          ports=[])
        results = as_results.join().command_result
        expected_steps = {
            'clean_steps': {
                'AFakeGenericHardwareManager': [
                    {'step': 'duped_gn',
                     'reboot_requested': False,
                     'abortable': False,
                     'priority': 30}],
                'ZFakeGenericHardwareManager': [
                    {'step': 'ZHigherPrio',
                     'reboot_requested': False,
                     'abortable': False,
                     'priority': 100}],
                'AFakeMainlineHardwareManager': [
                    {'step': 'duped_ml',
                     'reboot_requested': False,
                     'abortable': False,
                     'priority': 20}]},
            'hardware_manager_version': {
                'AFakeGenericHardwareManager': '1.0',
                'AFakeMainlineHardwareManager': '1.0',
                'ZFakeGenericHardwareManager': '1.0'}}

        for manager, steps in results['clean_steps'].items():
            steps.sort(key=lambda x: (x['priority'], x['step']))

        for manager, steps in expected_steps['clean_steps'].items():
            steps.sort(key=lambda x: (x['priority'], x['step']))

        self.assertEqual(expected_steps, results)