File: waiters.py

package info (click to toggle)
elasticsearch-curator 9.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,740 kB
  • sloc: python: 18,836; makefile: 159; sh: 156
file content (370 lines) | stat: -rw-r--r-- 13,753 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
"""The function that waits

...and its helpers
"""

import logging
import warnings
from time import localtime, sleep, strftime
from datetime import datetime
from elasticsearch9.exceptions import GeneralAvailabilityWarning
from curator.exceptions import (
    ActionTimeout,
    ConfigurationError,
    CuratorException,
    FailedReindex,
    MissingArgument,
)
from curator.helpers.utils import chunk_index_list


def health_check(client, **kwargs):
    """
    This function calls `client.cluster.`
    :py:meth:`~.elasticsearch.client.ClusterClient.health` and, based on the params
    provided, will return ``True`` or ``False`` depending on whether that
    particular keyword appears in the output, and has the expected value.

    If multiple keys are provided, all must match for a ``True`` response.

    :param client: A client connection object

    :type client: :py:class:`~.elasticsearch.Elasticsearch`

    :rtype: bool
    """
    logger = logging.getLogger(__name__)
    logger.debug('KWARGS= "%s"', kwargs)
    klist = list(kwargs.keys())
    if not klist:
        raise MissingArgument('Must provide at least one keyword argument')
    hc_data = client.cluster.health()
    response = True

    for k in klist:
        # First, verify that all kwargs are in the list
        if k not in list(hc_data.keys()):
            raise ConfigurationError('Key "{0}" not in cluster health output')
        if not hc_data[k] == kwargs[k]:
            msg = (
                f'NO MATCH: Value for key "{kwargs[k]}", '
                f'health check data: {hc_data[k]}'
            )
            logger.debug(msg)
            response = False
        else:
            msg = f'MATCH: Value for key "{kwargs[k]}", health check data: {hc_data[k]}'
            logger.debug(msg)
    if response:
        logger.info('Health Check for all provided keys passed.')
    return response


def relocate_check(client, index):
    """
    This function calls `client.cluster.`
    :py:meth:`~.elasticsearch.client.ClusterClient.state`
    with a given index to check if all of the shards for that index are in the
    ``STARTED`` state. It will return ``True`` if all primary and replica shards
    are in the ``STARTED`` state, and it will return ``False`` if any shard is
    in a different state.

    :param client: A client connection object
    :param index: The index name

    :type client: :py:class:`~.elasticsearch.Elasticsearch`
    :type index: str

    :rtype: bool
    """
    logger = logging.getLogger(__name__)
    shard_state_data = client.cluster.state(index=index)['routing_table']['indices'][
        index
    ]['shards']
    finished_state = all(
        all(shard['state'] == "STARTED" for shard in shards)
        for shards in shard_state_data.values()
    )
    if finished_state:
        logger.info('Relocate Check for index: "%s" has passed.', index)
    return finished_state


def restore_check(client, index_list):
    """
    This function calls `client.indices.`
    :py:meth:`~.elasticsearch.client.IndicesClient.recovery`
    with the list of indices to check for complete recovery.  It will return ``True``
    if recovery of those indices is complete, and ``False`` otherwise.  It is
    designed to fail fast: if a single shard is encountered that is still recovering
    (not in ``DONE`` stage), it will immediately return ``False``, rather than
    complete iterating over the rest of the response.

    :param client: A client connection object
    :param index_list: The list of indices to verify having been restored.
    :param kwargs: Any additional keyword arguments to pass to the function

    :type client: :py:class:`~.elasticsearch.Elasticsearch`
    :type index_list: list

    :rtype: bool
    """
    logger = logging.getLogger(__name__)
    response = {}

    for chunk in chunk_index_list(index_list):
        try:
            chunk_response = client.indices.recovery(index=chunk, human=True)
        except Exception as err:
            msg = (
                f'Unable to obtain recovery information for specified indices. '
                f'Error: {err}'
            )
            raise CuratorException(msg) from err
        if chunk_response == {}:
            logger.info('_recovery returned an empty response. Trying again.')
            return False
        response.update(chunk_response)
    logger.info('Provided indices: %s', index_list)
    logger.info('Found indices: %s', list(response.keys()))
    # pylint: disable=consider-using-dict-items
    for index in response:
        for shard in range(0, len(response[index]['shards'])):
            stage = response[index]['shards'][shard]['stage']
            if stage != 'DONE':
                logger.info('Index "%s" is still in stage "%s"', index, stage)
                return False

    # If we've gotten here, all of the indices have recovered
    return True


def snapshot_check(client, snapshot=None, repository=None):
    """
    This function calls `client.snapshot.`
    :py:meth:`~.elasticsearch.client.SnapshotClient.get` and tests to see whether
    the snapshot is complete, and if so, with what status.  It will log errors
    according to the result. If the snapshot is still ``IN_PROGRESS``, it will
    return ``False``. ``SUCCESS`` will be an ``INFO`` level message, ``PARTIAL``
    nets a ``WARNING`` message, ``FAILED`` is an ``ERROR``, message, and all
    others will be a ``WARNING`` level message.

    :param client: A client connection object
    :param snapshot: The snapshot name
    :param repository: The repository name

    :type client: :py:class:`~.elasticsearch.Elasticsearch`
    :type snapshot: str
    :type repository: str

    :rtype: bool
    """
    logger = logging.getLogger(__name__)
    logger.debug('SNAPSHOT: %s', snapshot)
    logger.debug('REPOSITORY: %s', repository)
    try:
        result = client.snapshot.get(repository=repository, snapshot=snapshot)
        logger.debug('RESULT: %s', result)
    except Exception as err:
        raise CuratorException(
            f'Unable to obtain information for snapshot "{snapshot}" in repository '
            f'"{repository}". Error: {err}'
        ) from err
    state = result['snapshots'][0]['state']
    logger.debug('Snapshot state = %s', state)
    retval = True
    if state == 'IN_PROGRESS':
        logger.info('Snapshot %s still in progress.', snapshot)
        retval = False
    elif state == 'SUCCESS':
        logger.info('Snapshot %s successfully completed.', snapshot)
    elif state == 'PARTIAL':
        logger.warning('Snapshot %s completed with state PARTIAL.', snapshot)
    elif state == 'FAILED':
        logger.error('Snapshot %s completed with state FAILED.', snapshot)
    else:
        logger.warning('Snapshot %s completed with state: %s', snapshot, state)
    return retval


def task_check(client, task_id=None):
    """
    This function calls `client.tasks.`
    :py:meth:`~.elasticsearch.client.TasksClient.get` with the provided
    ``task_id``.  If the task data contains ``'completed': True``, then it will
    return ``True``. If the task is not completed, it will log some information
    about the task and return ``False``

    :param client: A client connection object
    :param task_id: The task id

    :type client: :py:class:`~.elasticsearch.Elasticsearch`
    :type task_id: str

    :rtype: bool
    """
    logger = logging.getLogger(__name__)
    try:
        warnings.filterwarnings("ignore", category=GeneralAvailabilityWarning)
        task_data = client.tasks.get(task_id=task_id)
    except Exception as err:
        msg = (
            f'Unable to obtain task information for task_id "{task_id}". '
            f'Exception {err}'
        )
        raise CuratorException(msg) from err
    task = task_data['task']
    completed = task_data['completed']
    if task['action'] == 'indices:data/write/reindex':
        logger.debug('It\'s a REINDEX TASK')
        logger.debug('TASK_DATA: %s', task_data)
        logger.debug('TASK_DATA keys: %s', list(task_data.keys()))
        if 'response' in task_data:
            response = task_data['response']
            if response['failures']:
                msg = f'Failures found in reindex response: {response["failures"]}'
                raise FailedReindex(msg)
    running_time = 0.000000001 * task['running_time_in_nanos']
    logger.debug('Running time: %s seconds', running_time)
    descr = task['description']

    if completed:
        completion_time = (running_time * 1000) + task['start_time_in_millis']
        time_string = strftime('%Y-%m-%dT%H:%M:%SZ', localtime(completion_time / 1000))
        logger.info('Task "%s" completed at %s.', descr, time_string)
        retval = True
    else:
        # Log the task status here.
        logger.debug('Full Task Data: %s', task_data)
        msg = (
            f'Task "{descr}" with task_id "{task_id}" has been running for '
            f'{running_time} seconds'
        )
        logger.info(msg)
        retval = False
    return retval


# pylint: disable=too-many-locals, too-many-arguments
def wait_for_it(
    client,
    action,
    task_id=None,
    snapshot=None,
    repository=None,
    index=None,
    index_list=None,
    wait_interval=9,
    max_wait=-1,
    **kwargs,
):
    """
    This function becomes one place to do all ``wait_for_completion`` type behaviors

    :param client: A client connection object
    :param action: The action name that will identify how to wait
    :param task_id: If the action provided a task_id, this is where it must be declared.
    :param snapshot: The name of the snapshot.
    :param repository: The Elasticsearch snapshot repository to use
    :param wait_interval: Seconds to wait between completion checks.
    :param max_wait: Maximum number of seconds to ``wait_for_completion``
    :param kwargs: Any additional keyword arguments to pass to the function

    :type client: :py:class:`~.elasticsearch.Elasticsearch`
    :type action: str
    :type task_id: str
    :type snapshot: str
    :type repository: str
    :type wait_interval: int
    :type max_wait: int
    :type kwargs: dict
    :rtype: None
    """
    logger = logging.getLogger(__name__)
    action_map = {
        'allocation': {'function': health_check, 'args': {'relocating_shards': 0}},
        'replicas': {'function': health_check, 'args': {'status': 'green'}},
        'cluster_routing': {'function': health_check, 'args': {'relocating_shards': 0}},
        'snapshot': {
            'function': snapshot_check,
            'args': {'snapshot': snapshot, 'repository': repository},
        },
        'restore': {
            'function': restore_check,
            'args': {'index_list': index_list},
        },
        'reindex': {'function': task_check, 'args': {'task_id': task_id}},
        'shrink': {'function': health_check, 'args': {'status': 'green'}},
        'relocate': {'function': relocate_check, 'args': {'index': index}},
    }
    wait_actions = list(action_map.keys())

    if action not in wait_actions:
        raise ConfigurationError(f'"action" must be one of {wait_actions}')
    if action == 'reindex' and task_id is None:
        raise MissingArgument(f'A task_id must accompany "action" {action}')
    if action == 'snapshot' and ((snapshot is None) or (repository is None)):
        raise MissingArgument(
            f'A snapshot and repository must accompany "action" {action}. snapshot: '
            f'{snapshot}, repository: {repository}'
        )
    if action == 'restore' and index_list is None:
        raise MissingArgument(f'An index_list must accompany "action" {action}')
    if action == 'reindex':
        try:
            warnings.filterwarnings("ignore", category=GeneralAvailabilityWarning)
            _ = client.tasks.get(task_id=task_id)
        except Exception as err:
            # This exception should only exist in API usage. It should never
            # occur in regular Curator usage.
            raise CuratorException(
                f'Unable to find task_id {task_id}. Exception: {err}'
            ) from err

    # Now with this mapped, we can perform the wait as indicated.
    start_time = datetime.now()
    result = False
    while True:
        elapsed = int((datetime.now() - start_time).total_seconds())
        logger.debug('Elapsed time: %s seconds', elapsed)
        if kwargs:
            response = action_map[action]['function'](
                client, **action_map[action]['args'], **kwargs
            )
        else:
            response = action_map[action]['function'](
                client, **action_map[action]['args']
            )
        logger.debug('Response: %s', response)
        # Success
        if response:
            logger.debug(
                'Action "%s" finished executing (may or may not have been successful)',
                action,
            )
            result = True
            break
        # Not success, and reached maximum wait (if defined)
        if (max_wait != -1) and (elapsed >= max_wait):
            msg = (
                f'Unable to complete action "{action}" within max_wait '
                f'({max_wait}) seconds.'
            )
            logger.error(msg)
            break
        # Not success, so we wait.
        msg = (
            f'Action "{action}" not yet complete, {elapsed} total seconds elapsed. '
            f'Waiting {wait_interval} seconds before checking again.'
        )
        logger.debug(msg)
        sleep(wait_interval)

    logger.debug('Result: %s', result)
    if not result:
        raise ActionTimeout(
            (
                f'Action "{action}" failed to complete in the max_wait period of '
                f'{max_wait} seconds'
            )
        )