File: test_MultiprocessingWriter.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (72 lines) | stat: -rw-r--r-- 3,167 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import os
import unittest
from datetime import datetime

from influxdb_client import WritePrecision, InfluxDBClient
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
from influxdb_client.client.write_api import SYNCHRONOUS


# noinspection PyMethodMayBeStatic
class MultiprocessingWriterTest(unittest.TestCase):

    def setUp(self) -> None:
        self.url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")
        self.token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")
        self.org = os.getenv('INFLUXDB_V2_ORG', "my-org")
        self.writer = None

    def tearDown(self) -> None:
        if self.writer:
            self.writer.__del__()

    def test_write_without_start(self):
        self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
                                            write_options=SYNCHRONOUS)

        with self.assertRaises(AssertionError) as ve:
            self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")

        self.assertEqual('Cannot write data: the writer is not started.', f'{ve.exception}')

    def test_write_after_terminate(self):
        self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
                                            write_options=SYNCHRONOUS)
        self.writer.start()
        self.writer.__del__()

        with self.assertRaises(AssertionError) as ve:
            self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")

        self.assertEqual('Cannot write data: the writer is closed.', f'{ve.exception}')

    def test_terminate_twice(self):
        with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
            writer.__del__()
            writer.terminate()
            writer.terminate()
            writer.__del__()

    def test_use_context_manager(self):
        with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
            self.assertIsNotNone(writer)

    def test_pass_parameters(self):
        unique = get_date_helper().to_nanoseconds(datetime.utcnow() - datetime.utcfromtimestamp(0))

        # write data
        with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
            writer.write(bucket="my-bucket", record=f"mem_{unique},tag=a value=5i 10", write_precision=WritePrecision.S)

        # query data
        with InfluxDBClient(url=self.url, token=self.token, org=self.org) as client:
            query_api = client.query_api()
            tables = query_api.query(
                f'from(bucket: "my-bucket") |> range(start: 0) |> filter(fn: (r) => r._measurement == "mem_{unique}")',
                self.org)
            record = tables[0].records[0]
            self.assertIsNotNone(record)
            self.assertEqual("a", record["tag"])
            self.assertEqual(5, record["_value"])
            self.assertEqual(get_date_helper().to_utc(datetime.utcfromtimestamp(10)), record["_time"])