1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
class Sensor(namedtuple('Sensor', ['name', 'location', 'version', 'pressure', 'temperature', 'timestamp'])):
"""
Named structure - Sensor
"""
pass
@dataclass
class Car:
"""
DataClass structure - Car
"""
engine: str
type: str
speed: float
"""
Initialize client
"""
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
"""
Sensor "current" state
"""
sensor = Sensor(name="sensor_pt859",
location="warehouse_125",
version="2021.06.05.5874",
pressure=125,
temperature=10,
timestamp=datetime.utcnow())
print(sensor)
"""
Synchronous write
"""
write_api.write(bucket="my-bucket",
record=sensor,
record_measurement_key="name",
record_time_key="timestamp",
record_tag_keys=["location", "version"],
record_field_keys=["pressure", "temperature"])
"""
Car "current" speed
"""
car = Car('12V-BT', 'sport-cars', 125.25)
print(car)
"""
Synchronous write
"""
write_api.write(bucket="my-bucket",
record=car,
record_measurement_name="performance",
record_tag_keys=["engine", "type"],
record_field_keys=["speed"])
|