File: query.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (113 lines) | stat: -rw-r--r-- 3,492 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import datetime as datetime

from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS

with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client:

    write_api = client.write_api(write_options=SYNCHRONOUS)

    """
    Prepare data
    """
    _point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
    _point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
    write_api.write(bucket="my-bucket", record=[_point1, _point2])

    query_api = client.query_api()

    """
    Query: using Table structure
    """
    tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

    for table in tables:
        print(table)
        for record in table.records:
            print(record.values)

    print()
    print()

    """
    Query: using Bind parameters
    """

    p = {"_start": datetime.timedelta(hours=-1),
         "_location": "Prague",
         "_desc": True,
         "_floatParam": 25.1,
         "_every": datetime.timedelta(minutes=5)
         }

    tables = query_api.query('''
        from(bucket:"my-bucket") |> range(start: _start)
            |> filter(fn: (r) => r["_measurement"] == "my_measurement")
            |> filter(fn: (r) => r["_field"] == "temperature")
            |> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam)
            |> aggregateWindow(every: _every, fn: mean, createEmpty: true)        
            |> sort(columns: ["_time"], desc: _desc) 
    ''', params=p)

    for table in tables:
        print(table)
        for record in table.records:
            print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"]))

    print()
    print()

    """
    Query: using Stream
    """
    records = query_api.query_stream('''
    from(bucket:"my-bucket") 
        |> range(start: -10m)  
        |> filter(fn: (r) => r["_measurement"] == "my_measurement")
    ''')

    for record in records:
        print(f'Temperature in {record["location"]} is {record["_value"]}')

    """
    Interrupt a stream after retrieve a required data
    """
    large_stream = query_api.query_stream('''
    from(bucket:"my-bucket") 
        |> range(start: -100d) 
        |> filter(fn: (r) => r["_measurement"] == "my_measurement")
    ''')

    for record in large_stream:
        if record["location"] == "New York":
            print(f'New York temperature: {record["_value"]}')
            break

    large_stream.close()

    print()
    print()

    """
    Query: using csv library
    """
    csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)',
                                     dialect=Dialect(header=False, delimiter=",", comment_prefix="#", annotations=[],
                                                     date_time_format="RFC3339"))
    for csv_line in csv_result:
        print(f'Temperature in {csv_line[9]} is {csv_line[6]}')

    print()
    print()

    """
    Query: using Pandas DataFrame
    """
    data_frame = query_api.query_data_frame('''
    from(bucket:"my-bucket") 
        |> range(start: -10m) 
        |> filter(fn: (r) => r["_measurement"] == "my_measurement")
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
        |> keep(columns: ["_time","location", "temperature"])
    ''')
    print(data_frame.to_string())