1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
"""test_action_delete_snapshots"""
from unittest import TestCase
from unittest.mock import Mock
from curator.actions import DeleteSnapshots
from curator.exceptions import FailedExecution
from curator import SnapshotList
# Get test variables and constants from a single source
from . import testvars as testvars
class TestActionDeleteSnapshots(TestCase):
def test_init_raise(self):
self.assertRaises(TypeError, DeleteSnapshots, 'invalid')
def test_init(self):
client = Mock()
client.snapshot.get.return_value = testvars.snapshots
client.snapshot.get_repository.return_value = testvars.test_repo
slo = SnapshotList(client, repository=testvars.repo_name)
do = DeleteSnapshots(slo)
self.assertEqual(slo, do.snapshot_list)
self.assertEqual(client, do.client)
def test_do_dry_run(self):
client = Mock()
client.snapshot.get.return_value = testvars.snapshots
client.snapshot.get_repository.return_value = testvars.test_repo
client.tasks.get.return_value = testvars.no_snap_tasks
client.snapshot.delete.return_value = None
slo = SnapshotList(client, repository=testvars.repo_name)
do = DeleteSnapshots(slo)
self.assertIsNone(do.do_dry_run())
def test_do_action(self):
client = Mock()
client.snapshot.get.return_value = testvars.snapshots
client.snapshot.get_repository.return_value = testvars.test_repo
client.tasks.list.return_value = testvars.no_snap_tasks
client.snapshot.delete.return_value = None
slo = SnapshotList(client, repository=testvars.repo_name)
do = DeleteSnapshots(slo)
self.assertIsNone(do.do_action())
def test_do_action_raises_exception(self):
client = Mock()
client.snapshot.get.return_value = testvars.snapshots
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.delete.return_value = None
client.tasks.list.return_value = testvars.no_snap_tasks
client.snapshot.delete.side_effect = testvars.fake_fail
slo = SnapshotList(client, repository=testvars.repo_name)
do = DeleteSnapshots(slo)
self.assertRaises(FailedExecution, do.do_action)
### This check is not necessary after ES 7.16 as it is possible to have
### up to 1000 concurrent snapshots
###
### https://www.elastic.co/guide/en/elasticsearch/reference/8.6/snapshot-settings.html
### snapshot.max_concurrent_operations
### (Dynamic, integer) Maximum number of concurrent snapshot operations. Defaults to 1000.
###
### This limit applies in total to all ongoing snapshot creation, cloning, and deletion
### operations. Elasticsearch will reject any operations that would exceed this limit.
# def test_not_safe_to_snap_raises_exception(self):
# client = Mock()
# client.snapshot.get.return_value = testvars.inprogress
# client.snapshot.get_repository.return_value = testvars.test_repo
# client.tasks.list.return_value = testvars.no_snap_tasks
# slo = SnapshotList(client, repository=testvars.repo_name)
# do = DeleteSnapshots(slo, retry_interval=0, retry_count=1)
# self.assertRaises(curator.FailedExecution, do.do_action)
|