File: asynchronous_retry.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (47 lines) | stat: -rw-r--r-- 1,805 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""
How to use `aiohttp-retry` with async client.

This example depends on `aiohttp_retry <https://github.com/inyutin/aiohttp_retry>`_.
Install ``aiohttp_retry`` by: pip install aiohttp-retry.

"""
import asyncio

from aiohttp_retry import ExponentialRetry, RetryClient

from influxdb_client import Point
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


async def main():
    """
    Configure Retries - for more info see https://github.com/inyutin/aiohttp_retry
    """
    retry_options = ExponentialRetry(attempts=3)
    async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org",
                                   client_session_type=RetryClient,
                                   client_session_kwargs={"retry_options": retry_options}) as client:
        """
        Write data:
        """
        print(f"\n------- Written data: -------\n")
        write_api = client.write_api()
        _point1 = Point("async_m").tag("location", "Prague").field("temperature", 25.3)
        _point2 = Point("async_m").tag("location", "New York").field("temperature", 24.3)
        successfully = await write_api.write(bucket="my-bucket", record=[_point1, _point2])
        print(f" > successfully: {successfully}")

        """
        Query: Stream of FluxRecords
        """
        print(f"\n------- Query: Stream of FluxRecords -------\n")
        query_api = client.query_api()
        records = await query_api.query_stream('from(bucket:"my-bucket") '
                                               '|> range(start: -10m) '
                                               '|> filter(fn: (r) => r["_measurement"] == "async_m")')
        async for record in records:
            print(record)


if __name__ == "__main__":
    asyncio.run(main())