File: async.rst

package info (click to toggle)
python-elasticsearch 8.17.2-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 20,124 kB
  • sloc: python: 69,424; makefile: 150; javascript: 75
file content (237 lines) | stat: -rw-r--r-- 7,199 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
Using asyncio with Elasticsearch
================================

 .. py:module:: elasticsearch
    :no-index:

The ``elasticsearch`` package supports async/await with
`asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://docs.aiohttp.org>`_.
You can either install ``aiohttp`` directly or use the ``[async]`` extra:

 .. code-block:: bash

    $ python -m pip install elasticsearch aiohttp

    # - OR -

    $ python -m pip install elasticsearch[async]

Getting Started with Async
--------------------------

After installation all async API endpoints are available via :class:`~elasticsearch.AsyncElasticsearch`
and are used in the same way as other APIs, just with an extra ``await``:

 .. code-block:: python

    import asyncio
    from elasticsearch import AsyncElasticsearch

    client = AsyncElasticsearch()

    async def main():
        resp = await client.search(
            index="documents",
            body={"query": {"match_all": {}}},
            size=20,
        )
        print(resp)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

All APIs that are available under the sync client are also available under the async client.

ASGI Applications and Elastic APM
---------------------------------

`ASGI <https://asgi.readthedocs.io>`_ (Asynchronous Server Gateway Interface) is a new way to
serve Python web applications making use of async I/O to achieve better performance.
Some examples of ASGI frameworks include FastAPI, Django 3.0+, and Starlette.
If you're using one of these frameworks along with Elasticsearch then you
should be using :py:class:`~elasticsearch.AsyncElasticsearch` to avoid blocking
the event loop with synchronous network calls for optimal performance.

`Elastic APM <https://www.elastic.co/guide/en/apm/agent/python/current/index.html>`_
also supports tracing of async Elasticsearch queries just the same as
synchronous queries. For an example on how to configure ``AsyncElasticsearch`` with
a popular ASGI framework `FastAPI <https://fastapi.tiangolo.com/>`_ and APM tracing
there is a `pre-built example <https://github.com/elastic/elasticsearch-py/tree/master/examples/fastapi-apm>`_
in the ``examples/fastapi-apm`` directory.

Frequently Asked Questions
--------------------------

ValueError when initializing ``AsyncElasticsearch``?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If when trying to use ``AsyncElasticsearch`` you receive ``ValueError: You must
have 'aiohttp' installed to use AiohttpHttpNode`` you should ensure that you
have ``aiohttp`` installed in your environment (check with ``$ python -m pip
freeze | grep aiohttp``). Otherwise, async support won't be available.

What about the ``elasticsearch-async`` package?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Previously asyncio was supported separately via the `elasticsearch-async <https://github.com/elastic/elasticsearch-py-async>`_
package. The ``elasticsearch-async`` package has been deprecated in favor of
``AsyncElasticsearch`` provided by the ``elasticsearch`` package
in v7.8 and onwards.

Receiving 'Unclosed client session / connector' warning?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This warning is created by ``aiohttp`` when an open HTTP connection is
garbage collected. You'll typically run into this when closing your application.
To resolve the issue ensure that :meth:`~elasticsearch.AsyncElasticsearch.close`
is called before the :py:class:`~elasticsearch.AsyncElasticsearch` instance is garbage collected.

For example if using FastAPI that might look like this:

 .. code-block:: python

    import os
    from contextlib import asynccontextmanager

    from fastapi import FastAPI
    from elasticsearch import AsyncElasticsearch

    ELASTICSEARCH_URL = os.environ["ELASTICSEARCH_URL"]
    client = None

    @asynccontextmanager
    async def lifespan(app: FastAPI):
        global client
        client = AsyncElasticsearch(ELASTICSEARCH_URL)
        yield
        await client.close()

    app = FastAPI(lifespan=lifespan)

    @app.get("/")
    async def main():
        return await client.info()

You can run this example by saving it to ``main.py`` and executing
``ELASTICSEARCH_URL=http://localhost:9200 uvicorn main:app``.


Async Helpers
-------------

Async variants of all helpers are available in ``elasticsearch.helpers``
and are all prefixed with ``async_*``. You'll notice that these APIs
are identical to the ones in the sync :ref:`helpers` documentation.

All async helpers that accept an iterator or generator also accept async iterators
and async generators.

 .. py:module:: elasticsearch.helpers
    :no-index:

Bulk and Streaming Bulk
~~~~~~~~~~~~~~~~~~~~~~~

 .. autofunction:: async_bulk

 .. code-block:: python

    import asyncio
    from elasticsearch import AsyncElasticsearch
    from elasticsearch.helpers import async_bulk

    client = AsyncElasticsearch()

    async def gendata():
        mywords = ['foo', 'bar', 'baz']
        for word in mywords:
            yield {
                "_index": "mywords",
                "doc": {"word": word},
            }

    async def main():
        await async_bulk(client, gendata())

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

 .. autofunction:: async_streaming_bulk

 .. code-block:: python

    import asyncio
    from elasticsearch import AsyncElasticsearch
    from elasticsearch.helpers import async_streaming_bulk

    client = AsyncElasticsearch()

    async def gendata():
        mywords = ['foo', 'bar', 'baz']
        for word in mywords:
            yield {
                "_index": "mywords",
                "word": word,
            }

    async def main():
        async for ok, result in async_streaming_bulk(client, gendata()):
            action, result = result.popitem()
            if not ok:
                print("failed to %s document %s" % ())

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Scan
~~~~

 .. autofunction:: async_scan

 .. code-block:: python

    import asyncio
    from elasticsearch import AsyncElasticsearch
    from elasticsearch.helpers import async_scan

    client = AsyncElasticsearch()

    async def main():
        async for doc in async_scan(
            client=client,
            query={"query": {"match": {"title": "python"}}},
            index="orders-*"
        ):
            print(doc)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Reindex
~~~~~~~

 .. autofunction:: async_reindex


API Reference
-------------

 .. py:module:: elasticsearch
    :no-index:

The API of :class:`~elasticsearch.AsyncElasticsearch` is nearly identical
to the API of :class:`~elasticsearch.Elasticsearch` with the exception that
every API call like :py:func:`~elasticsearch.AsyncElasticsearch.search` is
an ``async`` function and requires an ``await`` to properly return the response
body.

AsyncElasticsearch
~~~~~~~~~~~~~~~~~~

 .. note::

    To reference Elasticsearch APIs that are namespaced like ``.indices.create()``
    refer to the sync API reference. These APIs are identical between sync and async.

 .. autoclass:: AsyncElasticsearch
   :members: