File: simple_backend.py

package info (click to toggle)
django-haystack 3.3.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,504 kB
  • sloc: python: 23,475; xml: 1,708; sh: 74; makefile: 71
file content (133 lines) | stat: -rw-r--r-- 3,928 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
A very basic, ORM-based backend for simple search during tests.
"""

from functools import reduce
from warnings import warn

from django.db.models import Q

from haystack import connections
from haystack.backends import (
    BaseEngine,
    BaseSearchBackend,
    BaseSearchQuery,
    SearchNode,
    log_query,
)
from haystack.inputs import PythonData
from haystack.models import SearchResult
from haystack.utils import get_model_ct_tuple


class SimpleSearchBackend(BaseSearchBackend):
    def update(self, indexer, iterable, commit=True):
        warn("update is not implemented in this backend")

    def remove(self, obj, commit=True):
        warn("remove is not implemented in this backend")

    def clear(self, models=None, commit=True):
        warn("clear is not implemented in this backend")

    @log_query
    def search(self, query_string, **kwargs):
        hits = 0
        results = []
        result_class = SearchResult
        models = (
            connections[self.connection_alias].get_unified_index().get_indexed_models()
        )

        if kwargs.get("result_class"):
            result_class = kwargs["result_class"]

        if kwargs.get("models"):
            models = kwargs["models"]

        if query_string:
            for model in models:
                if query_string == "*":
                    qs = model.objects.all()
                else:
                    for term in query_string.split():
                        queries = []

                        for field in model._meta.fields:
                            if hasattr(field, "related"):
                                continue

                            if field.get_internal_type() not in (
                                "TextField",
                                "CharField",
                                "SlugField",
                            ):
                                continue

                            queries.append(Q(**{"%s__icontains" % field.name: term}))

                        if queries:
                            qs = model.objects.filter(
                                reduce(lambda x, y: x | y, queries)
                            )
                        else:
                            qs = []

                hits += len(qs)

                for match in qs:
                    match.__dict__.pop("score", None)
                    app_label, model_name = get_model_ct_tuple(match)
                    result = result_class(
                        app_label, model_name, match.pk, 0, **match.__dict__
                    )
                    # For efficiency.
                    result._model = match.__class__
                    result._object = match
                    results.append(result)

        return {"results": results, "hits": hits}

    def prep_value(self, db_field, value):
        return value

    def more_like_this(
        self,
        model_instance,
        additional_query_string=None,
        start_offset=0,
        end_offset=None,
        limit_to_registered_models=None,
        result_class=None,
        **kwargs
    ):
        return {"results": [], "hits": 0}


class SimpleSearchQuery(BaseSearchQuery):
    def build_query(self):
        if not self.query_filter:
            return "*"

        return self._build_sub_query(self.query_filter)

    def _build_sub_query(self, search_node):
        term_list = []

        for child in search_node.children:
            if isinstance(child, SearchNode):
                term_list.append(self._build_sub_query(child))
            else:
                value = child[1]

                if not hasattr(value, "input_type_name"):
                    value = PythonData(value)

                term_list.append(value.prepare(self))

        return (" ").join(map(str, term_list))


class SimpleEngine(BaseEngine):
    backend = SimpleSearchBackend
    query = SimpleSearchQuery