File: write_api_callbacks.py

package info (click to toggle)
python-influxdb-client 1.40.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (49 lines) | stat: -rw-r--r-- 1,543 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
"""
How to use WriteApi's callbacks to notify about state of background batches.
"""

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError

"""
Configuration
"""
url = 'http://localhost:8086'
token = 'my-token'
org = 'my-org'
bucket = 'my-bucket'

"""
Data
"""
points = [Point("my-temperature").tag("location", "Prague").field("temperature", 25.3),
          Point("my-temperature").tag("location", "New York").field("temperature", 18.4)]


class BatchingCallback(object):

    def success(self, conf: (str, str, str), data: str):
        """Successfully writen batch."""
        print(f"Written batch: {conf}, data: {data}")

    def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
        """Unsuccessfully writen batch."""
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
        """Retryable error."""
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


callback = BatchingCallback()
with InfluxDBClient(url=url, token=token, org=org) as client:
    """
    Use batching API
    """
    with client.write_api(success_callback=callback.success,
                          error_callback=callback.error,
                          retry_callback=callback.retry) as write_api:
        write_api.write(bucket=bucket, record=points)
        print()
        print("Wait to finishing ingesting...")
        print()