File: connection_check.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (61 lines) | stat: -rw-r--r-- 1,971 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""
How to check that connection credentials are suitable for queries and writes from/into specified bucket.
"""

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException

"""
Define credentials
"""
url = "http://localhost:8086"
token = "my-token"
org = "my-org"
bucket = "my-bucket"


def check_connection():
    """Check that the InfluxDB is running."""
    print("> Checking connection ...", end=" ")
    client.api_client.call_api('/ping', 'GET')
    print("ok")


def check_query():
    """Check that the credentials has permission to query from the Bucket"""
    print("> Checking credentials for query ...", end=" ")
    try:
        client.query_api().query(f"from(bucket:\"{bucket}\") |> range(start: -1m) |> limit(n:1)", org)
    except ApiException as e:
        # missing credentials
        if e.status == 404:
            raise Exception(f"The specified token doesn't have sufficient credentials to read from '{bucket}' "
                            f"or specified bucket doesn't exists.") from e
        raise
    print("ok")


def check_write():
    """Check that the credentials has permission to write into the Bucket"""
    print("> Checking credentials for write ...", end=" ")
    try:
        client.write_api(write_options=SYNCHRONOUS).write(bucket, org, b"")
    except ApiException as e:
        # bucket does not exist
        if e.status == 404:
            raise Exception(f"The specified bucket does not exist.") from e
        # insufficient permissions
        if e.status == 403:
            raise Exception(f"The specified token does not have sufficient credentials to write to '{bucket}'.") from e
        # 400 (BadRequest) caused by empty LineProtocol
        if e.status != 400:
            raise
    print("ok")


with InfluxDBClient(url=url, token=token, org=org) as client:
    check_connection()
    check_query()
    check_write()
    pass