1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
import datetime
import os
import re
import time
import unittest
import influxdb_client
from influxdb_client import BucketRetentionRules, Organization, InfluxDBClient
current_milli_time = lambda: int(round(time.time() * 1000))
def generate_bucket_name():
return generate_name(key="bucket")
def generate_name(key: str):
return f"test_{key}_" + str(datetime.datetime.now().timestamp()) + "_IT"
class BaseTest(unittest.TestCase):
def setUp(self) -> None:
self.conf = influxdb_client.configuration.Configuration()
self.host = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")
self.debug = False
self.auth_token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")
self.org = os.getenv('INFLUXDB_V2_ORG', "my-org")
self.client = InfluxDBClient(url=self.host, token=self.auth_token, debug=self.debug, org=self.org)
self.api_client = self.client.api_client
self.query_api = self.client.query_api()
self.buckets_api = self.client.buckets_api()
self.users_api = self.client.users_api()
self.organizations_api = self.client.organizations_api()
self.authorizations_api = self.client.authorizations_api()
self.labels_api = self.client.labels_api()
self.my_organization = self.find_my_org()
def tearDown(self) -> None:
self.client.close()
def create_test_bucket(self):
bucket_name = generate_bucket_name()
bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org=self.my_organization,
description=bucket_name + "description")
return bucket
def delete_test_bucket(self, bucket):
return self.buckets_api.delete_bucket(bucket)
def find_my_org(self) -> Organization:
return self.client.organizations_api().find_organizations(org=self.org)[0]
@staticmethod
def log(args):
print(">>>", args)
@staticmethod
def generate_name(prefix):
assert prefix != "" or prefix is not None
return prefix + str(datetime.datetime.now().timestamp()) + "-IT"
@classmethod
def retention_rule(cls) -> BucketRetentionRules:
return BucketRetentionRules(type='expire', every_seconds=3600)
def assertEqualIgnoringWhitespace(self, first, second, msg=None) -> None:
whitespace_pattern = re.compile(r"\s+")
self.assertEqual(whitespace_pattern.sub("", first), whitespace_pattern.sub("", second), msg=msg)
|