File: write_structured_data.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (66 lines) | stat: -rw-r--r-- 1,696 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS


class Sensor(namedtuple('Sensor', ['name', 'location', 'version', 'pressure', 'temperature', 'timestamp'])):
    """
    Named structure - Sensor
    """
    pass


@dataclass
class Car:
    """
    DataClass structure - Car
    """
    engine: str
    type: str
    speed: float


"""
Initialize client
"""
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
    write_api = client.write_api(write_options=SYNCHRONOUS)

    """
    Sensor "current" state
    """
    sensor = Sensor(name="sensor_pt859",
                    location="warehouse_125",
                    version="2021.06.05.5874",
                    pressure=125,
                    temperature=10,
                    timestamp=datetime.utcnow())
    print(sensor)

    """
    Synchronous write
    """
    write_api.write(bucket="my-bucket",
                    record=sensor,
                    record_measurement_key="name",
                    record_time_key="timestamp",
                    record_tag_keys=["location", "version"],
                    record_field_keys=["pressure", "temperature"])

    """
    Car "current" speed
    """
    car = Car('12V-BT', 'sport-cars', 125.25)
    print(car)

    """
    Synchronous write
    """
    write_api.write(bucket="my-bucket",
                    record=car,
                    record_measurement_name="performance",
                    record_tag_keys=["engine", "type"],
                    record_field_keys=["speed"])