File: tasks.py

package info (click to toggle)
celery-haystack 0.10-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 184 kB
  • sloc: python: 322; makefile: 8
file content (156 lines) | stat: -rw-r--r-- 6,324 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from django.core.exceptions import ImproperlyConfigured
from django.core.management import call_command
from django.apps import apps
get_model = apps.get_model

from .conf import settings

from haystack import connections, connection_router
from haystack.exceptions import NotHandled as IndexNotFoundException

from celery.task import Task  # noqa
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


class CeleryHaystackSignalHandler(Task):
    using = settings.CELERY_HAYSTACK_DEFAULT_ALIAS
    max_retries = settings.CELERY_HAYSTACK_MAX_RETRIES
    default_retry_delay = settings.CELERY_HAYSTACK_RETRY_DELAY

    def split_identifier(self, identifier, **kwargs):
        """
        Break down the identifier representing the instance.

        Converts 'notes.note.23' into ('notes.note', 23).
        """
        bits = identifier.split('.')

        if len(bits) < 2:
            logger.error("Unable to parse object "
                         "identifer '%s'. Moving on..." % identifier)
            return (None, None)

        pk = bits[-1]
        # In case Django ever handles full paths...
        object_path = '.'.join(bits[:-1])
        return (object_path, pk)

    def get_model_class(self, object_path, **kwargs):
        """
        Fetch the model's class in a standarized way.
        """
        bits = object_path.split('.')
        app_name = '.'.join(bits[:-1])
        classname = bits[-1]
        model_class = get_model(app_name, classname)

        if model_class is None:
            raise ImproperlyConfigured("Could not load model '%s'." %
                                       object_path)
        return model_class

    def get_instance(self, model_class, pk, **kwargs):
        """
        Fetch the instance in a standarized way.
        """
        instance = None
        try:
            instance = model_class._default_manager.get(pk=pk)
        except model_class.DoesNotExist:
            logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" %
                         (model_class._meta.app_label.lower(),
                          model_class._meta.object_name.lower(), pk))
        except model_class.MultipleObjectsReturned:
            logger.error("More than one object with pk %s. Oops?" % pk)
        return instance

    def get_indexes(self, model_class, **kwargs):
        """
        Fetch the model's registered ``SearchIndex`` in a standarized way.
        """
        try:
            using_backends = connection_router.for_write(**{'models': [model_class]})
            for using in using_backends:
                index_holder = connections[using].get_unified_index()
                yield index_holder.get_index(model_class), using
        except IndexNotFoundException:
            raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." %
                                       model_class)

    def run(self, action, identifier, **kwargs):
        """
        Trigger the actual index handler depending on the
        given action ('update' or 'delete').
        """
        # First get the object path and pk (e.g. ('notes.note', 23))
        object_path, pk = self.split_identifier(identifier, **kwargs)
        if object_path is None or pk is None:
            msg = "Couldn't handle object with identifier %s" % identifier
            logger.error(msg)
            raise ValueError(msg)

        # Then get the model class for the object path
        model_class = self.get_model_class(object_path, **kwargs)
        for current_index, using in self.get_indexes(model_class, **kwargs):
            current_index_name = ".".join([current_index.__class__.__module__,
                                           current_index.__class__.__name__])

            if action == 'delete':
                # If the object is gone, we'll use just the identifier
                # against the index.
                try:
                    current_index.remove_object(identifier, using=using)
                except Exception as exc:
                    logger.exception(exc)
                    self.retry(exc=exc)
                else:
                    msg = ("Deleted '%s' (with %s)" %
                           (identifier, current_index_name))
                    logger.debug(msg)
            elif action == 'update':
                # and the instance of the model class with the pk
                instance = self.get_instance(model_class, pk, **kwargs)
                if instance is None:
                    logger.debug("Failed updating '%s' (with %s)" %
                                 (identifier, current_index_name))
                    raise ValueError("Couldn't load object '%s'" % identifier)

                # Call the appropriate handler of the current index and
                # handle exception if neccessary
                try:
                    current_index.update_object(instance, using=using)
                except Exception as exc:
                    logger.exception(exc)
                    self.retry(exc=exc)
                else:
                    msg = ("Updated '%s' (with %s)" %
                           (identifier, current_index_name))
                    logger.debug(msg)
            else:
                logger.error("Unrecognized action '%s'. Moving on..." % action)
                raise ValueError("Unrecognized action %s" % action)


class CeleryHaystackUpdateIndex(Task):
    """
    A celery task class to be used to call the update_index management
    command from Celery.
    """
    def run(self, apps=None, **kwargs):
        defaults = {
            'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE,
            'age': settings.CELERY_HAYSTACK_COMMAND_AGE,
            'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE,
            'using': [settings.CELERY_HAYSTACK_DEFAULT_ALIAS],
            'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS,
            'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY,
        }
        defaults.update(kwargs)
        if apps is None:
            apps = settings.CELERY_HAYSTACK_COMMAND_APPS
        # Run the update_index management command
        logger.info("Starting update index")
        call_command('update_index', *apps, **defaults)
        logger.info("Finishing update index")