File: test_QueryApiDataFrame.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (342 lines) | stat: -rw-r--r-- 20,633 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
import random

import httpretty
import pytest
import reactivex as rx
from pandas import DataFrame
from pandas._libs.tslibs.timestamps import Timestamp
from reactivex import operators as ops

from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions
from influxdb_client.client.warnings import MissingPivotFunction
from influxdb_client.rest import ApiException
from tests.base_test import BaseTest, current_milli_time


class QueryDataFrameApi(BaseTest):

    def setUp(self) -> None:
        super(QueryDataFrameApi, self).setUp()

        httpretty.enable()
        httpretty.reset()

    def tearDown(self) -> None:
        self.client.close()
        httpretty.disable()

    def test_one_table(self):
        query_response = \
            '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
            '#group,false,false,true,true,false,false,true,true,true\n' \
            '#default,_result,,,,,,,,\n' \
            ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
            '\n\n'

        httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)

        self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)

        _dataFrame = self.client.query_api().query_data_frame(
            'from(bucket: "my-bucket") '
            '|> range(start: -5s, stop: now()) '
            '|> filter(fn: (r) => r._measurement == "mem") '
            '|> filter(fn: (r) => r._field == "used")',
            "my-org")

        self.assertEqual(DataFrame, type(_dataFrame))
        self.assertListEqual(
            ["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
            list(_dataFrame.columns))
        self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrame.index))
        self.assertEqual(5, len(_dataFrame))
        self.assertEqual("_result", _dataFrame['result'][0])
        self.assertEqual("_result", _dataFrame['result'][1])
        self.assertEqual("_result", _dataFrame['result'][2])
        self.assertEqual("_result", _dataFrame['result'][3])
        self.assertEqual("_result", _dataFrame['result'][4])
        self.assertEqual(0, _dataFrame['table'][0], None)
        self.assertEqual(0, _dataFrame['table'][1], None)
        self.assertEqual(0, _dataFrame['table'][2], None)
        self.assertEqual(0, _dataFrame['table'][3], None)
        self.assertEqual(0, _dataFrame['table'][4], None)
        self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][0])
        self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][1])
        self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][2])
        self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][3])
        self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][4])
        self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][0])
        self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][1])
        self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][2])
        self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][3])
        self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][4])
        self.assertEqual(Timestamp('2019-11-12 08:09:05+0000'), _dataFrame['_time'][0])
        self.assertEqual(Timestamp('2019-11-12 08:09:06+0000'), _dataFrame['_time'][1])
        self.assertEqual(Timestamp('2019-11-12 08:09:07+0000'), _dataFrame['_time'][2])
        self.assertEqual(Timestamp('2019-11-12 08:09:08+0000'), _dataFrame['_time'][3])
        self.assertEqual(Timestamp('2019-11-12 08:09:09+0000'), _dataFrame['_time'][4])
        self.assertEqual(11125907456, _dataFrame['_value'][0])
        self.assertEqual(11127103488, _dataFrame['_value'][1])
        self.assertEqual(11127291904, _dataFrame['_value'][2])
        self.assertEqual(11126190080, _dataFrame['_value'][3])
        self.assertEqual(11127832576, _dataFrame['_value'][4])
        self.assertEqual('used', _dataFrame['_field'][0])
        self.assertEqual('used', _dataFrame['_field'][1])
        self.assertEqual('used', _dataFrame['_field'][2])
        self.assertEqual('used', _dataFrame['_field'][3])
        self.assertEqual('used', _dataFrame['_field'][4])
        self.assertEqual('mem', _dataFrame['_measurement'][0])
        self.assertEqual('mem', _dataFrame['_measurement'][1])
        self.assertEqual('mem', _dataFrame['_measurement'][2])
        self.assertEqual('mem', _dataFrame['_measurement'][3])
        self.assertEqual('mem', _dataFrame['_measurement'][4])
        self.assertEqual('mac.local', _dataFrame['host'][0])
        self.assertEqual('mac.local', _dataFrame['host'][1])
        self.assertEqual('mac.local', _dataFrame['host'][2])
        self.assertEqual('mac.local', _dataFrame['host'][3])
        self.assertEqual('mac.local', _dataFrame['host'][4])

    def test_more_table(self):
        query_response = \
            '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
            '#group,false,false,true,true,false,false,true,true,true\n' \
            '#default,_result,,,,,,,,\n' \
            ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
            '\n\n' \
            '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
            '#group,false,false,true,true,false,false,true,true,true\n' \
            '#default,_result,,,,,,,,\n' \
            ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,6053961728,available,mem,mac.local\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \
            '\n\n' \
            '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
            '#group,false,false,true,true,false,false,true,true,true\n' \
            '#default,_result,,,,,,,,\n' \
            ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n'

        httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)

        self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)

        _dataFrames = self.client.query_api().query_data_frame(
            'from(bucket: "my-bucket") '
            '|> range(start: -5s, stop: now()) '
            '|> filter(fn: (r) => r._measurement == "mem") '
            '|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")',
            "my-org")

        self.assertEqual(list, type(_dataFrames))
        self.assertEqual(len(_dataFrames), 3)

        self.assertListEqual(
            ["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
            list(_dataFrames[0].columns))
        self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[0].index))
        self.assertEqual(5, len(_dataFrames[0]))

        self.assertListEqual(
            ["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
            list(_dataFrames[1].columns))
        self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[1].index))
        self.assertEqual(5, len(_dataFrames[1]))

        self.assertListEqual(
            ["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
            list(_dataFrames[2].columns))
        self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[2].index))
        self.assertEqual(5, len(_dataFrames[2]))

    def test_empty_data_set(self):
        query_response = '\n'

        httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)

        self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)

        _dataFrame = self.client.query_api().query_data_frame(
            'from(bucket: "my-bucket") '
            '|> range(start: -5s, stop: now()) '
            '|> filter(fn: (r) => r._measurement == "mem") '
            '|> filter(fn: (r) => r._field == "not_exit")',
            "my-org")

        self.assertEqual(DataFrame, type(_dataFrame))
        self.assertListEqual([], list(_dataFrame.columns))
        self.assertListEqual([], list(_dataFrame.index))

    def test_more_table_custom_index(self):
        query_response = \
            '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
            '#group,false,false,true,true,false,false,true,true,true\n' \
            '#default,_result,,,,,,,,\n' \
            ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
            ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
            '\n\n' \
            '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
            '#group,false,false,true,true,false,false,true,true,true\n' \
            '#default,_result,,,,,,,,\n' \
            ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,6053961728,available,mem,mac.local\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \
            ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \
            '\n\n' \
            '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
            '#group,false,false,true,true,false,false,true,true,true\n' \
            '#default,_result,,,,,,,,\n' \
            ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \
            ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n'

        httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)

        self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)

        _dataFrames = self.client.query_api().query_data_frame(
            'from(bucket: "my-bucket") '
            '|> range(start: -5s, stop: now()) '
            '|> filter(fn: (r) => r._measurement == "mem") '
            '|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")',
            "my-org", data_frame_index=["_time"])

        self.assertEqual(list, type(_dataFrames))
        self.assertEqual(len(_dataFrames), 3)

        print(_dataFrames[0].to_string())
        self.assertListEqual(
            ["result", "table", "_start", "_stop", "_value", "_field", "_measurement", "host"],
            list(_dataFrames[0].columns))
        self.assertListEqual([Timestamp('2019-11-12 08:09:05+0000'), Timestamp('2019-11-12 08:09:06+0000'),
                              Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
                              Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[0].index))
        self.assertEqual(5, len(_dataFrames[0]))

        self.assertListEqual(
            ["result", "table", "_start", "_stop", "_value", "_field", "_measurement", "host"],
            list(_dataFrames[1].columns))
        self.assertListEqual([Timestamp('2019-11-12 08:09:05+0000'), Timestamp('2019-11-12 08:09:06+0000'),
                              Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
                              Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[1].index))
        self.assertEqual(5, len(_dataFrames[1]))

        self.assertListEqual(
            ["result", "table", "_start", "_stop", "_value", "_field", "_measurement", "host"],
            list(_dataFrames[2].columns))
        self.assertListEqual([Timestamp('2019-11-12 08:09:05+0000'), Timestamp('2019-11-12 08:09:06+0000'),
                              Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
                              Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[2].index))
        self.assertEqual(5, len(_dataFrames[2]))

    def test_query_with_warning(self):
        httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body='\n')

        self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)

        with pytest.warns(MissingPivotFunction) as warnings:
            self.client.query_api().query_data_frame(
                'from(bucket: "my-bucket")'
                '|> range(start: -5s, stop: now()) '
                '|> filter(fn: (r) => r._measurement == "mem") '
                "my-org")
        self.assertEqual(1, len(warnings))

    def test_query_without_warning(self):
        httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body='\n')

        self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)

        with pytest.warns(None) as warnings:
            self.client.query_api().query_data_frame(
                'import "influxdata/influxdb/schema"'
                ''
                'from(bucket: "my-bucket")'
                '|> range(start: -5s, stop: now()) '
                '|> filter(fn: (r) => r._measurement == "mem") '
                '|> schema.fieldsAsCols() '
                "my-org")
        self.assertEqual(0, len(warnings))

        with pytest.warns(None) as warnings:
            self.client.query_api().query_data_frame(
                'from(bucket: "my-bucket")'
                '|> range(start: -5s, stop: now()) '
                '|> filter(fn: (r) => r._measurement == "mem") '
                '|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")'
                "my-org")
        self.assertEqual(0, len(warnings))


class QueryDataFrameIntegrationApi(BaseTest):

    def test_large_amount_of_data(self):
        _measurement_name = "data_frame_" + str(current_milli_time())

        def _create_point(index) -> Point:
            return Point(_measurement_name) \
                .tag("deviceType", str(random.choice(['A', 'B']))) \
                .tag("name", random.choice(['A', 'B'])) \
                .field("uuid", random.randint(0, 10_000)) \
                .field("co2", random.randint(0, 10_000)) \
                .field("humid", random.randint(0, 10_000)) \
                .field("lux", random.randint(0, 10_000)) \
                .field("water", random.randint(0, 10_000)) \
                .field("shine", random.randint(0, 10_000)) \
                .field("temp", random.randint(0, 10_000)) \
                .field("voc", random.randint(0, 10_000)) \
                .time(time=(1583828781 + index), write_precision=WritePrecision.S)

        data = rx.range(0, 2_000).pipe(ops.map(lambda index: _create_point(index)))

        write_api = self.client.write_api(write_options=WriteOptions(batch_size=500))
        write_api.write(org="my-org", bucket="my-bucket", record=data, write_precision=WritePrecision.S)
        write_api.close()

        query = 'from(bucket: "my-bucket")' \
                '|> range(start: 2020-02-19T23:30:00Z, stop: now())' \
                f'|> filter(fn: (r) => r._measurement == "{_measurement_name}")'

        result = self.client.query_api().query_data_frame(org="my-org", query=query)

        self.assertGreater(len(result), 1)

    def test_query_without_credentials(self):
        _client = InfluxDBClient(url="http://localhost:8086", token="my-token-wrong-credentials", org="my-org")

        with self.assertRaises(ApiException) as ae:
            query = 'from(bucket: "my-bucket")' \
                    '|> range(start: 2020-02-19T23:30:00Z, stop: now())' \
                    f'|> filter(fn: (r) => r._measurement == "my-measurement")'
            _client.query_api().query_data_frame(query=query)

        exception = ae.exception
        self.assertEqual(401, exception.status)
        self.assertEqual("Unauthorized", exception.reason)

        _client.close()