File: asynchronous.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (85 lines) | stat: -rw-r--r-- 3,381 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
How to use Asyncio with InfluxDB client.
"""
import asyncio
from datetime import datetime

from influxdb_client import Point
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


async def main():
    async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
        """
        Check the version of the InfluxDB
        """
        version = await client.version()
        print(f"\n------- Version -------\n")
        print(f"InfluxDB: {version}")

        """
        Prepare data
        """
        print(f"\n------- Write data by async API: -------\n")
        write_api = client.write_api()
        _point1 = Point("async_m").tag("location", "Prague").field("temperature", 25.3)
        _point2 = Point("async_m").tag("location", "New York").field("temperature", 24.3)
        successfully = await write_api.write(bucket="my-bucket", record=[_point1, _point2])
        print(f" > successfully: {successfully}")

        """
        Query: List of FluxTables
        """
        query_api = client.query_api()
        print(f"\n------- Query: List of FluxTables -------\n")
        tables = await query_api.query('from(bucket:"my-bucket") '
                                       '|> range(start: -10m) '
                                       '|> filter(fn: (r) => r["_measurement"] == "async_m")')

        for table in tables:
            for record in table.records:
                print(f'Temperature in {record["location"]} is {record["_value"]}')

        """
        Query: Stream of FluxRecords
        """
        print(f"\n------- Query: Stream of FluxRecords -------\n")
        query_api = client.query_api()
        records = await query_api.query_stream('from(bucket:"my-bucket") '
                                               '|> range(start: -10m) '
                                               '|> filter(fn: (r) => r["_measurement"] == "async_m")')
        async for record in records:
            print(record)

        """
        Query: Pandas DataFrame
        """
        print(f"\n------- Query: Pandas DataFrame -------\n")
        query_api = client.query_api()
        dataframe = await query_api.query_data_frame('from(bucket:"my-bucket") '
                                                     '|> range(start: -10m) '
                                                     '|> filter(fn: (r) => r["_measurement"] == "async_m")'
                                                     ' |> group()')
        print(dataframe)

        """
        Query: String output
        """
        print(f"\n------- Query: String output -------\n")
        query_api = client.query_api()
        raw = await query_api.query_raw('from(bucket:"my-bucket") '
                                        '|> range(start: -10m) '
                                        '|> filter(fn: (r) => r["_measurement"] == "async_m")')
        print(raw)

        """
        Delete data
        """
        print(f"\n------- Delete data with location = 'Prague' -------\n")
        successfully = await client.delete_api().delete(start=datetime.utcfromtimestamp(0), stop=datetime.now(),
                                                        predicate="location = \"Prague\"", bucket="my-bucket")
        print(f" > successfully: {successfully}")


if __name__ == "__main__":
    asyncio.run(main())