1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
"""
A very basic, ORM-based backend for simple search during tests.
"""
from functools import reduce
from warnings import warn
from django.db.models import Q
from haystack import connections
from haystack.backends import (
BaseEngine,
BaseSearchBackend,
BaseSearchQuery,
SearchNode,
log_query,
)
from haystack.inputs import PythonData
from haystack.models import SearchResult
from haystack.utils import get_model_ct_tuple
class SimpleSearchBackend(BaseSearchBackend):
def update(self, indexer, iterable, commit=True):
warn("update is not implemented in this backend")
def remove(self, obj, commit=True):
warn("remove is not implemented in this backend")
def clear(self, models=None, commit=True):
warn("clear is not implemented in this backend")
@log_query
def search(self, query_string, **kwargs):
hits = 0
results = []
result_class = SearchResult
models = (
connections[self.connection_alias].get_unified_index().get_indexed_models()
)
if kwargs.get("result_class"):
result_class = kwargs["result_class"]
if kwargs.get("models"):
models = kwargs["models"]
if query_string:
for model in models:
if query_string == "*":
qs = model.objects.all()
else:
for term in query_string.split():
queries = []
for field in model._meta.fields:
if hasattr(field, "related"):
continue
if field.get_internal_type() not in (
"TextField",
"CharField",
"SlugField",
):
continue
queries.append(Q(**{"%s__icontains" % field.name: term}))
if queries:
qs = model.objects.filter(
reduce(lambda x, y: x | y, queries)
)
else:
qs = []
hits += len(qs)
for match in qs:
match.__dict__.pop("score", None)
app_label, model_name = get_model_ct_tuple(match)
result = result_class(
app_label, model_name, match.pk, 0, **match.__dict__
)
# For efficiency.
result._model = match.__class__
result._object = match
results.append(result)
return {"results": results, "hits": hits}
def prep_value(self, db_field, value):
return value
def more_like_this(
self,
model_instance,
additional_query_string=None,
start_offset=0,
end_offset=None,
limit_to_registered_models=None,
result_class=None,
**kwargs
):
return {"results": [], "hits": 0}
class SimpleSearchQuery(BaseSearchQuery):
def build_query(self):
if not self.query_filter:
return "*"
return self._build_sub_query(self.query_filter)
def _build_sub_query(self, search_node):
term_list = []
for child in search_node.children:
if isinstance(child, SearchNode):
term_list.append(self._build_sub_query(child))
else:
value = child[1]
if not hasattr(value, "input_type_name"):
value = PythonData(value)
term_list.append(value.prepare(self))
return (" ").join(map(str, term_list))
class SimpleEngine(BaseEngine):
backend = SimpleSearchBackend
query = SimpleSearchQuery
|