1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
|
import redis
from ...asyncio.client import Pipeline as AsyncioPipeline
from .commands import (
AGGREGATE_CMD,
CONFIG_CMD,
INFO_CMD,
PROFILE_CMD,
SEARCH_CMD,
SPELLCHECK_CMD,
SYNDUMP_CMD,
AsyncSearchCommands,
SearchCommands,
)
class Search(SearchCommands):
"""
Create a client for talking to search.
It abstracts the API of the module and lets you just use the engine.
"""
class BatchIndexer:
"""
A batch indexer allows you to automatically batch
document indexing in pipelines, flushing it every N documents.
"""
def __init__(self, client, chunk_size=1000):
self.client = client
self.execute_command = client.execute_command
self._pipeline = client.pipeline(transaction=False, shard_hint=None)
self.total = 0
self.chunk_size = chunk_size
self.current_chunk = 0
def __del__(self):
if self.current_chunk:
self.commit()
def add_document(
self,
doc_id,
nosave=False,
score=1.0,
payload=None,
replace=False,
partial=False,
no_create=False,
**fields,
):
"""
Add a document to the batch query
"""
self.client._add_document(
doc_id,
conn=self._pipeline,
nosave=nosave,
score=score,
payload=payload,
replace=replace,
partial=partial,
no_create=no_create,
**fields,
)
self.current_chunk += 1
self.total += 1
if self.current_chunk >= self.chunk_size:
self.commit()
def add_document_hash(self, doc_id, score=1.0, replace=False):
"""
Add a hash to the batch query
"""
self.client._add_document_hash(
doc_id, conn=self._pipeline, score=score, replace=replace
)
self.current_chunk += 1
self.total += 1
if self.current_chunk >= self.chunk_size:
self.commit()
def commit(self):
"""
Manually commit and flush the batch indexing query
"""
self._pipeline.execute()
self.current_chunk = 0
def __init__(self, client, index_name="idx"):
"""
Create a new Client for the given index_name.
The default name is `idx`
If conn is not None, we employ an already existing redis connection
"""
self._MODULE_CALLBACKS = {}
self.client = client
self.index_name = index_name
self.execute_command = client.execute_command
self._pipeline = client.pipeline
self._RESP2_MODULE_CALLBACKS = {
INFO_CMD: self._parse_info,
SEARCH_CMD: self._parse_search,
AGGREGATE_CMD: self._parse_aggregate,
PROFILE_CMD: self._parse_profile,
SPELLCHECK_CMD: self._parse_spellcheck,
CONFIG_CMD: self._parse_config_get,
SYNDUMP_CMD: self._parse_syndump,
}
def pipeline(self, transaction=True, shard_hint=None):
"""Creates a pipeline for the SEARCH module, that can be used for executing
SEARCH commands, as well as classic core commands.
"""
p = Pipeline(
connection_pool=self.client.connection_pool,
response_callbacks=self._MODULE_CALLBACKS,
transaction=transaction,
shard_hint=shard_hint,
)
p.index_name = self.index_name
return p
class AsyncSearch(Search, AsyncSearchCommands):
class BatchIndexer(Search.BatchIndexer):
"""
A batch indexer allows you to automatically batch
document indexing in pipelines, flushing it every N documents.
"""
async def add_document(
self,
doc_id,
nosave=False,
score=1.0,
payload=None,
replace=False,
partial=False,
no_create=False,
**fields,
):
"""
Add a document to the batch query
"""
self.client._add_document(
doc_id,
conn=self._pipeline,
nosave=nosave,
score=score,
payload=payload,
replace=replace,
partial=partial,
no_create=no_create,
**fields,
)
self.current_chunk += 1
self.total += 1
if self.current_chunk >= self.chunk_size:
await self.commit()
async def commit(self):
"""
Manually commit and flush the batch indexing query
"""
await self._pipeline.execute()
self.current_chunk = 0
def pipeline(self, transaction=True, shard_hint=None):
"""Creates a pipeline for the SEARCH module, that can be used for executing
SEARCH commands, as well as classic core commands.
"""
p = AsyncPipeline(
connection_pool=self.client.connection_pool,
response_callbacks=self._MODULE_CALLBACKS,
transaction=transaction,
shard_hint=shard_hint,
)
p.index_name = self.index_name
return p
class Pipeline(SearchCommands, redis.client.Pipeline):
"""Pipeline for the module."""
class AsyncPipeline(AsyncSearchCommands, AsyncioPipeline, Pipeline):
"""AsyncPipeline for the module."""
|