1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
from django.core.exceptions import ImproperlyConfigured
from django.core.management import call_command
from django.apps import apps
get_model = apps.get_model
from .conf import settings
from haystack import connections, connection_router
from haystack.exceptions import NotHandled as IndexNotFoundException
from celery.task import Task # noqa
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
class CeleryHaystackSignalHandler(Task):
using = settings.CELERY_HAYSTACK_DEFAULT_ALIAS
max_retries = settings.CELERY_HAYSTACK_MAX_RETRIES
default_retry_delay = settings.CELERY_HAYSTACK_RETRY_DELAY
def split_identifier(self, identifier, **kwargs):
"""
Break down the identifier representing the instance.
Converts 'notes.note.23' into ('notes.note', 23).
"""
bits = identifier.split('.')
if len(bits) < 2:
logger.error("Unable to parse object "
"identifer '%s'. Moving on..." % identifier)
return (None, None)
pk = bits[-1]
# In case Django ever handles full paths...
object_path = '.'.join(bits[:-1])
return (object_path, pk)
def get_model_class(self, object_path, **kwargs):
"""
Fetch the model's class in a standarized way.
"""
bits = object_path.split('.')
app_name = '.'.join(bits[:-1])
classname = bits[-1]
model_class = get_model(app_name, classname)
if model_class is None:
raise ImproperlyConfigured("Could not load model '%s'." %
object_path)
return model_class
def get_instance(self, model_class, pk, **kwargs):
"""
Fetch the instance in a standarized way.
"""
instance = None
try:
instance = model_class._default_manager.get(pk=pk)
except model_class.DoesNotExist:
logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" %
(model_class._meta.app_label.lower(),
model_class._meta.object_name.lower(), pk))
except model_class.MultipleObjectsReturned:
logger.error("More than one object with pk %s. Oops?" % pk)
return instance
def get_indexes(self, model_class, **kwargs):
"""
Fetch the model's registered ``SearchIndex`` in a standarized way.
"""
try:
using_backends = connection_router.for_write(**{'models': [model_class]})
for using in using_backends:
index_holder = connections[using].get_unified_index()
yield index_holder.get_index(model_class), using
except IndexNotFoundException:
raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." %
model_class)
def run(self, action, identifier, **kwargs):
"""
Trigger the actual index handler depending on the
given action ('update' or 'delete').
"""
# First get the object path and pk (e.g. ('notes.note', 23))
object_path, pk = self.split_identifier(identifier, **kwargs)
if object_path is None or pk is None:
msg = "Couldn't handle object with identifier %s" % identifier
logger.error(msg)
raise ValueError(msg)
# Then get the model class for the object path
model_class = self.get_model_class(object_path, **kwargs)
for current_index, using in self.get_indexes(model_class, **kwargs):
current_index_name = ".".join([current_index.__class__.__module__,
current_index.__class__.__name__])
if action == 'delete':
# If the object is gone, we'll use just the identifier
# against the index.
try:
current_index.remove_object(identifier, using=using)
except Exception as exc:
logger.exception(exc)
self.retry(exc=exc)
else:
msg = ("Deleted '%s' (with %s)" %
(identifier, current_index_name))
logger.debug(msg)
elif action == 'update':
# and the instance of the model class with the pk
instance = self.get_instance(model_class, pk, **kwargs)
if instance is None:
logger.debug("Failed updating '%s' (with %s)" %
(identifier, current_index_name))
raise ValueError("Couldn't load object '%s'" % identifier)
# Call the appropriate handler of the current index and
# handle exception if neccessary
try:
current_index.update_object(instance, using=using)
except Exception as exc:
logger.exception(exc)
self.retry(exc=exc)
else:
msg = ("Updated '%s' (with %s)" %
(identifier, current_index_name))
logger.debug(msg)
else:
logger.error("Unrecognized action '%s'. Moving on..." % action)
raise ValueError("Unrecognized action %s" % action)
class CeleryHaystackUpdateIndex(Task):
"""
A celery task class to be used to call the update_index management
command from Celery.
"""
def run(self, apps=None, **kwargs):
defaults = {
'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE,
'age': settings.CELERY_HAYSTACK_COMMAND_AGE,
'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE,
'using': [settings.CELERY_HAYSTACK_DEFAULT_ALIAS],
'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS,
'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY,
}
defaults.update(kwargs)
if apps is None:
apps = settings.CELERY_HAYSTACK_COMMAND_APPS
# Run the update_index management command
logger.info("Starting update index")
call_command('update_index', *apps, **defaults)
logger.info("Finishing update index")
|