1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
import datetime
import logging
from abc import ABCMeta, abstractmethod
from decimal import Decimal
from celery.result import EagerResult, allow_join_result
from celery.backends.base import DisabledBackend
logger = logging.getLogger(__name__)
PROGRESS_STATE = 'PROGRESS'
class AbstractProgressRecorder(object):
__metaclass__ = ABCMeta
@abstractmethod
def set_progress(self, current, total, description=""):
pass
class ConsoleProgressRecorder(AbstractProgressRecorder):
def set_progress(self, current, total, description=""):
print('processed {} items of {}. {}'.format(current, total, description))
class ProgressRecorder(AbstractProgressRecorder):
def __init__(self, task):
self.task = task
def set_progress(self, current, total, description=""):
percent = 0
if total > 0:
percent = (Decimal(current) / Decimal(total)) * Decimal(100)
percent = float(round(percent, 2))
state = PROGRESS_STATE
meta = {
'pending': False,
'current': current,
'total': total,
'percent': percent,
'description': description
}
self.task.update_state(
state=state,
meta=meta
)
return state, meta
class Progress(object):
def __init__(self, result):
"""
result:
an AsyncResult or an object that mimics it to a degree
"""
self.result = result
def get_info(self):
response = {'state': self.result.state}
if self.result.state in ['SUCCESS', 'FAILURE']:
success = self.result.successful()
with allow_join_result():
response.update({
'complete': True,
'success': success,
'progress': _get_completed_progress(),
'result': self.result.get(self.result.id) if success else str(self.result.info),
})
elif self.result.state in ['RETRY', 'REVOKED']:
if self.result.state == 'RETRY':
retry = self.result.info
when = str(retry.when) if isinstance(retry.when, datetime.datetime) else str(
datetime.datetime.now() + datetime.timedelta(seconds=retry.when))
result = {'when': when, 'message': retry.message or str(retry.exc)}
else:
result = 'Task ' + str(self.result.info)
response.update({
'complete': True,
'success': False,
'progress': _get_completed_progress(),
'result': result,
})
elif self.result.state == 'IGNORED':
response.update({
'complete': True,
'success': None,
'progress': _get_completed_progress(),
'result': str(self.result.info)
})
elif self.result.state == PROGRESS_STATE:
response.update({
'complete': False,
'success': None,
'progress': self.result.info,
})
elif self.result.state in ['PENDING', 'STARTED']:
response.update({
'complete': False,
'success': None,
'progress': _get_unknown_progress(self.result.state),
})
else:
logger.error('Task %s has unknown state %s with metadata %s', self.result.id, self.result.state, self.result.info)
response.update({
'complete': True,
'success': False,
'progress': _get_unknown_progress(self.result.state),
'result': 'Unknown state {}'.format(self.result.state),
})
return response
class KnownResult(EagerResult):
"""Like EagerResult but supports non-ready states."""
def __init__(self, id, ret_value, state, traceback=None):
"""
ret_value:
result, exception, or progress metadata
"""
# set backend to get state groups (like READY_STATES in ready())
self.backend = DisabledBackend
super().__init__(id, ret_value, state, traceback)
def ready(self):
return super(EagerResult, self).ready()
def __del__(self):
# throws an exception if not overridden
pass
def _get_completed_progress():
return {
'pending': False,
'current': 100,
'total': 100,
'percent': 100,
}
def _get_unknown_progress(state):
return {
'pending': state == 'PENDING',
'current': 0,
'total': 100,
'percent': 0,
}
|