1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
"""
How to check that connection credentials are suitable for queries and writes from/into specified bucket.
"""
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException
"""
Define credentials
"""
url = "http://localhost:8086"
token = "my-token"
org = "my-org"
bucket = "my-bucket"
def check_connection():
"""Check that the InfluxDB is running."""
print("> Checking connection ...", end=" ")
client.api_client.call_api('/ping', 'GET')
print("ok")
def check_query():
"""Check that the credentials has permission to query from the Bucket"""
print("> Checking credentials for query ...", end=" ")
try:
client.query_api().query(f"from(bucket:\"{bucket}\") |> range(start: -1m) |> limit(n:1)", org)
except ApiException as e:
# missing credentials
if e.status == 404:
raise Exception(f"The specified token doesn't have sufficient credentials to read from '{bucket}' "
f"or specified bucket doesn't exists.") from e
raise
print("ok")
def check_write():
"""Check that the credentials has permission to write into the Bucket"""
print("> Checking credentials for write ...", end=" ")
try:
client.write_api(write_options=SYNCHRONOUS).write(bucket, org, b"")
except ApiException as e:
# bucket does not exist
if e.status == 404:
raise Exception(f"The specified bucket does not exist.") from e
# insufficient permissions
if e.status == 403:
raise Exception(f"The specified token does not have sufficient credentials to write to '{bucket}'.") from e
# 400 (BadRequest) caused by empty LineProtocol
if e.status != 400:
raise
print("ok")
with InfluxDBClient(url=url, token=token, org=org) as client:
check_connection()
check_query()
check_write()
pass
|