1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
|
"""The function that waits
...and its helpers
"""
import logging
import warnings
from time import localtime, sleep, strftime
from datetime import datetime
from elasticsearch9.exceptions import GeneralAvailabilityWarning
from curator.exceptions import (
ActionTimeout,
ConfigurationError,
CuratorException,
FailedReindex,
MissingArgument,
)
from curator.helpers.utils import chunk_index_list
def health_check(client, **kwargs):
"""
This function calls `client.cluster.`
:py:meth:`~.elasticsearch.client.ClusterClient.health` and, based on the params
provided, will return ``True`` or ``False`` depending on whether that
particular keyword appears in the output, and has the expected value.
If multiple keys are provided, all must match for a ``True`` response.
:param client: A client connection object
:type client: :py:class:`~.elasticsearch.Elasticsearch`
:rtype: bool
"""
logger = logging.getLogger(__name__)
logger.debug('KWARGS= "%s"', kwargs)
klist = list(kwargs.keys())
if not klist:
raise MissingArgument('Must provide at least one keyword argument')
hc_data = client.cluster.health()
response = True
for k in klist:
# First, verify that all kwargs are in the list
if k not in list(hc_data.keys()):
raise ConfigurationError('Key "{0}" not in cluster health output')
if not hc_data[k] == kwargs[k]:
msg = (
f'NO MATCH: Value for key "{kwargs[k]}", '
f'health check data: {hc_data[k]}'
)
logger.debug(msg)
response = False
else:
msg = f'MATCH: Value for key "{kwargs[k]}", health check data: {hc_data[k]}'
logger.debug(msg)
if response:
logger.info('Health Check for all provided keys passed.')
return response
def relocate_check(client, index):
"""
This function calls `client.cluster.`
:py:meth:`~.elasticsearch.client.ClusterClient.state`
with a given index to check if all of the shards for that index are in the
``STARTED`` state. It will return ``True`` if all primary and replica shards
are in the ``STARTED`` state, and it will return ``False`` if any shard is
in a different state.
:param client: A client connection object
:param index: The index name
:type client: :py:class:`~.elasticsearch.Elasticsearch`
:type index: str
:rtype: bool
"""
logger = logging.getLogger(__name__)
shard_state_data = client.cluster.state(index=index)['routing_table']['indices'][
index
]['shards']
finished_state = all(
all(shard['state'] == "STARTED" for shard in shards)
for shards in shard_state_data.values()
)
if finished_state:
logger.info('Relocate Check for index: "%s" has passed.', index)
return finished_state
def restore_check(client, index_list):
"""
This function calls `client.indices.`
:py:meth:`~.elasticsearch.client.IndicesClient.recovery`
with the list of indices to check for complete recovery. It will return ``True``
if recovery of those indices is complete, and ``False`` otherwise. It is
designed to fail fast: if a single shard is encountered that is still recovering
(not in ``DONE`` stage), it will immediately return ``False``, rather than
complete iterating over the rest of the response.
:param client: A client connection object
:param index_list: The list of indices to verify having been restored.
:param kwargs: Any additional keyword arguments to pass to the function
:type client: :py:class:`~.elasticsearch.Elasticsearch`
:type index_list: list
:rtype: bool
"""
logger = logging.getLogger(__name__)
response = {}
for chunk in chunk_index_list(index_list):
try:
chunk_response = client.indices.recovery(index=chunk, human=True)
except Exception as err:
msg = (
f'Unable to obtain recovery information for specified indices. '
f'Error: {err}'
)
raise CuratorException(msg) from err
if chunk_response == {}:
logger.info('_recovery returned an empty response. Trying again.')
return False
response.update(chunk_response)
logger.info('Provided indices: %s', index_list)
logger.info('Found indices: %s', list(response.keys()))
# pylint: disable=consider-using-dict-items
for index in response:
for shard in range(0, len(response[index]['shards'])):
stage = response[index]['shards'][shard]['stage']
if stage != 'DONE':
logger.info('Index "%s" is still in stage "%s"', index, stage)
return False
# If we've gotten here, all of the indices have recovered
return True
def snapshot_check(client, snapshot=None, repository=None):
"""
This function calls `client.snapshot.`
:py:meth:`~.elasticsearch.client.SnapshotClient.get` and tests to see whether
the snapshot is complete, and if so, with what status. It will log errors
according to the result. If the snapshot is still ``IN_PROGRESS``, it will
return ``False``. ``SUCCESS`` will be an ``INFO`` level message, ``PARTIAL``
nets a ``WARNING`` message, ``FAILED`` is an ``ERROR``, message, and all
others will be a ``WARNING`` level message.
:param client: A client connection object
:param snapshot: The snapshot name
:param repository: The repository name
:type client: :py:class:`~.elasticsearch.Elasticsearch`
:type snapshot: str
:type repository: str
:rtype: bool
"""
logger = logging.getLogger(__name__)
logger.debug('SNAPSHOT: %s', snapshot)
logger.debug('REPOSITORY: %s', repository)
try:
result = client.snapshot.get(repository=repository, snapshot=snapshot)
logger.debug('RESULT: %s', result)
except Exception as err:
raise CuratorException(
f'Unable to obtain information for snapshot "{snapshot}" in repository '
f'"{repository}". Error: {err}'
) from err
state = result['snapshots'][0]['state']
logger.debug('Snapshot state = %s', state)
retval = True
if state == 'IN_PROGRESS':
logger.info('Snapshot %s still in progress.', snapshot)
retval = False
elif state == 'SUCCESS':
logger.info('Snapshot %s successfully completed.', snapshot)
elif state == 'PARTIAL':
logger.warning('Snapshot %s completed with state PARTIAL.', snapshot)
elif state == 'FAILED':
logger.error('Snapshot %s completed with state FAILED.', snapshot)
else:
logger.warning('Snapshot %s completed with state: %s', snapshot, state)
return retval
def task_check(client, task_id=None):
"""
This function calls `client.tasks.`
:py:meth:`~.elasticsearch.client.TasksClient.get` with the provided
``task_id``. If the task data contains ``'completed': True``, then it will
return ``True``. If the task is not completed, it will log some information
about the task and return ``False``
:param client: A client connection object
:param task_id: The task id
:type client: :py:class:`~.elasticsearch.Elasticsearch`
:type task_id: str
:rtype: bool
"""
logger = logging.getLogger(__name__)
try:
warnings.filterwarnings("ignore", category=GeneralAvailabilityWarning)
task_data = client.tasks.get(task_id=task_id)
except Exception as err:
msg = (
f'Unable to obtain task information for task_id "{task_id}". '
f'Exception {err}'
)
raise CuratorException(msg) from err
task = task_data['task']
completed = task_data['completed']
if task['action'] == 'indices:data/write/reindex':
logger.debug('It\'s a REINDEX TASK')
logger.debug('TASK_DATA: %s', task_data)
logger.debug('TASK_DATA keys: %s', list(task_data.keys()))
if 'response' in task_data:
response = task_data['response']
if response['failures']:
msg = f'Failures found in reindex response: {response["failures"]}'
raise FailedReindex(msg)
running_time = 0.000000001 * task['running_time_in_nanos']
logger.debug('Running time: %s seconds', running_time)
descr = task['description']
if completed:
completion_time = (running_time * 1000) + task['start_time_in_millis']
time_string = strftime('%Y-%m-%dT%H:%M:%SZ', localtime(completion_time / 1000))
logger.info('Task "%s" completed at %s.', descr, time_string)
retval = True
else:
# Log the task status here.
logger.debug('Full Task Data: %s', task_data)
msg = (
f'Task "{descr}" with task_id "{task_id}" has been running for '
f'{running_time} seconds'
)
logger.info(msg)
retval = False
return retval
# pylint: disable=too-many-locals, too-many-arguments
def wait_for_it(
client,
action,
task_id=None,
snapshot=None,
repository=None,
index=None,
index_list=None,
wait_interval=9,
max_wait=-1,
**kwargs,
):
"""
This function becomes one place to do all ``wait_for_completion`` type behaviors
:param client: A client connection object
:param action: The action name that will identify how to wait
:param task_id: If the action provided a task_id, this is where it must be declared.
:param snapshot: The name of the snapshot.
:param repository: The Elasticsearch snapshot repository to use
:param wait_interval: Seconds to wait between completion checks.
:param max_wait: Maximum number of seconds to ``wait_for_completion``
:param kwargs: Any additional keyword arguments to pass to the function
:type client: :py:class:`~.elasticsearch.Elasticsearch`
:type action: str
:type task_id: str
:type snapshot: str
:type repository: str
:type wait_interval: int
:type max_wait: int
:type kwargs: dict
:rtype: None
"""
logger = logging.getLogger(__name__)
action_map = {
'allocation': {'function': health_check, 'args': {'relocating_shards': 0}},
'replicas': {'function': health_check, 'args': {'status': 'green'}},
'cluster_routing': {'function': health_check, 'args': {'relocating_shards': 0}},
'snapshot': {
'function': snapshot_check,
'args': {'snapshot': snapshot, 'repository': repository},
},
'restore': {
'function': restore_check,
'args': {'index_list': index_list},
},
'reindex': {'function': task_check, 'args': {'task_id': task_id}},
'shrink': {'function': health_check, 'args': {'status': 'green'}},
'relocate': {'function': relocate_check, 'args': {'index': index}},
}
wait_actions = list(action_map.keys())
if action not in wait_actions:
raise ConfigurationError(f'"action" must be one of {wait_actions}')
if action == 'reindex' and task_id is None:
raise MissingArgument(f'A task_id must accompany "action" {action}')
if action == 'snapshot' and ((snapshot is None) or (repository is None)):
raise MissingArgument(
f'A snapshot and repository must accompany "action" {action}. snapshot: '
f'{snapshot}, repository: {repository}'
)
if action == 'restore' and index_list is None:
raise MissingArgument(f'An index_list must accompany "action" {action}')
if action == 'reindex':
try:
warnings.filterwarnings("ignore", category=GeneralAvailabilityWarning)
_ = client.tasks.get(task_id=task_id)
except Exception as err:
# This exception should only exist in API usage. It should never
# occur in regular Curator usage.
raise CuratorException(
f'Unable to find task_id {task_id}. Exception: {err}'
) from err
# Now with this mapped, we can perform the wait as indicated.
start_time = datetime.now()
result = False
while True:
elapsed = int((datetime.now() - start_time).total_seconds())
logger.debug('Elapsed time: %s seconds', elapsed)
if kwargs:
response = action_map[action]['function'](
client, **action_map[action]['args'], **kwargs
)
else:
response = action_map[action]['function'](
client, **action_map[action]['args']
)
logger.debug('Response: %s', response)
# Success
if response:
logger.debug(
'Action "%s" finished executing (may or may not have been successful)',
action,
)
result = True
break
# Not success, and reached maximum wait (if defined)
if (max_wait != -1) and (elapsed >= max_wait):
msg = (
f'Unable to complete action "{action}" within max_wait '
f'({max_wait}) seconds.'
)
logger.error(msg)
break
# Not success, so we wait.
msg = (
f'Action "{action}" not yet complete, {elapsed} total seconds elapsed. '
f'Waiting {wait_interval} seconds before checking again.'
)
logger.debug(msg)
sleep(wait_interval)
logger.debug('Result: %s', result)
if not result:
raise ActionTimeout(
(
f'Action "{action}" failed to complete in the max_wait period of '
f'{max_wait} seconds'
)
)
|