File: test_action_allocation.py

package info (click to toggle)
elasticsearch-curator 8.0.21-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 2,716 kB
  • sloc: python: 17,838; makefile: 159; sh: 156
file content (67 lines) | stat: -rw-r--r-- 3,015 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""Alias unit tests"""
# pylint: disable=missing-function-docstring, missing-class-docstring, invalid-name, line-too-long, attribute-defined-outside-init
from unittest import TestCase
from unittest.mock import Mock
from curator import IndexList
from curator.exceptions import MissingArgument
from curator.actions.allocation import Allocation
from . import testvars

class TestActionAllocation(TestCase):
    VERSION = {'version': {'number': '5.0.0'} }
    def builder(self):
        self.client = Mock()
        self.client.info.return_value = self.VERSION
        self.client.cat.indices.return_value = testvars.state_one
        self.client.indices.get_settings.return_value = testvars.settings_one
        self.client.indices.stats.return_value = testvars.stats_one
        self.client.indices.exists_alias.return_value = False
        self.client.indices.put_settings.return_value = None
        self.ilo = IndexList(self.client)
    def test_init_raise(self):
        self.assertRaises(TypeError, Allocation, 'invalid')
    def test_init(self):
        self.builder()
        ao = Allocation(self.ilo, key='key', value='value')
        self.assertEqual(self.ilo, ao.index_list)
        self.assertEqual(self.client, ao.client)
    def test_create_body_no_key(self):
        self.builder()
        self.assertRaises(MissingArgument, Allocation, self.ilo)
    def test_create_body_invalid_allocation_type(self):
        self.builder()
        self.assertRaises(
            ValueError,
            Allocation, self.ilo,
            key='key', value='value', allocation_type='invalid'
        )
    def test_create_body_valid(self):
        self.builder()
        ao = Allocation(self.ilo, key='key', value='value')
        self.assertEqual({'index.routing.allocation.require.key': 'value'}, ao.settings)
    def test_do_action_raise_on_put_settings(self):
        self.builder()
        self.client.indices.put_settings.side_effect = testvars.fake_fail
        ao = Allocation(self.ilo, key='key', value='value')
        self.assertRaises(Exception, ao.do_action)
    def test_do_dry_run(self):
        self.builder()
        alo = Allocation(self.ilo, key='key', value='value')
        self.assertIsNone(alo.do_dry_run())
    def test_do_action(self):
        self.builder()
        alo = Allocation(self.ilo, key='key', value='value')
        self.assertIsNone(alo.do_action())
    def test_do_action_wait_v50(self):
        self.builder()
        self.client.cluster.health.return_value = {'relocating_shards':0}
        alo = Allocation(
            self.ilo, key='key', value='value', wait_for_completion=True)
        self.assertIsNone(alo.do_action())
    def test_do_action_wait_v51(self):
        self.builder()
        self.client.info.return_value = {'version': {'number': '5.1.1'} }
        self.client.cluster.health.return_value = {'relocating_shards':0}
        alo = Allocation(
            self.ilo, key='key', value='value', wait_for_completion=True)
        self.assertIsNone(alo.do_action())