1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
|
import random
import httpretty
import pytest
import reactivex as rx
from pandas import DataFrame
from pandas._libs.tslibs.timestamps import Timestamp
from reactivex import operators as ops
from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions
from influxdb_client.client.warnings import MissingPivotFunction
from influxdb_client.rest import ApiException
from tests.base_test import BaseTest, current_milli_time
class QueryDataFrameApi(BaseTest):
def setUp(self) -> None:
super(QueryDataFrameApi, self).setUp()
httpretty.enable()
httpretty.reset()
def tearDown(self) -> None:
self.client.close()
httpretty.disable()
def test_one_table(self):
query_response = \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
'\n\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrame = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "used")',
"my-org")
self.assertEqual(DataFrame, type(_dataFrame))
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrame.columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrame.index))
self.assertEqual(5, len(_dataFrame))
self.assertEqual("_result", _dataFrame['result'][0])
self.assertEqual("_result", _dataFrame['result'][1])
self.assertEqual("_result", _dataFrame['result'][2])
self.assertEqual("_result", _dataFrame['result'][3])
self.assertEqual("_result", _dataFrame['result'][4])
self.assertEqual(0, _dataFrame['table'][0], None)
self.assertEqual(0, _dataFrame['table'][1], None)
self.assertEqual(0, _dataFrame['table'][2], None)
self.assertEqual(0, _dataFrame['table'][3], None)
self.assertEqual(0, _dataFrame['table'][4], None)
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][0])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][1])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][2])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][3])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][4])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][0])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][1])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][2])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][3])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][4])
self.assertEqual(Timestamp('2019-11-12 08:09:05+0000'), _dataFrame['_time'][0])
self.assertEqual(Timestamp('2019-11-12 08:09:06+0000'), _dataFrame['_time'][1])
self.assertEqual(Timestamp('2019-11-12 08:09:07+0000'), _dataFrame['_time'][2])
self.assertEqual(Timestamp('2019-11-12 08:09:08+0000'), _dataFrame['_time'][3])
self.assertEqual(Timestamp('2019-11-12 08:09:09+0000'), _dataFrame['_time'][4])
self.assertEqual(11125907456, _dataFrame['_value'][0])
self.assertEqual(11127103488, _dataFrame['_value'][1])
self.assertEqual(11127291904, _dataFrame['_value'][2])
self.assertEqual(11126190080, _dataFrame['_value'][3])
self.assertEqual(11127832576, _dataFrame['_value'][4])
self.assertEqual('used', _dataFrame['_field'][0])
self.assertEqual('used', _dataFrame['_field'][1])
self.assertEqual('used', _dataFrame['_field'][2])
self.assertEqual('used', _dataFrame['_field'][3])
self.assertEqual('used', _dataFrame['_field'][4])
self.assertEqual('mem', _dataFrame['_measurement'][0])
self.assertEqual('mem', _dataFrame['_measurement'][1])
self.assertEqual('mem', _dataFrame['_measurement'][2])
self.assertEqual('mem', _dataFrame['_measurement'][3])
self.assertEqual('mem', _dataFrame['_measurement'][4])
self.assertEqual('mac.local', _dataFrame['host'][0])
self.assertEqual('mac.local', _dataFrame['host'][1])
self.assertEqual('mac.local', _dataFrame['host'][2])
self.assertEqual('mac.local', _dataFrame['host'][3])
self.assertEqual('mac.local', _dataFrame['host'][4])
def test_more_table(self):
query_response = \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,6053961728,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrames = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")',
"my-org")
self.assertEqual(list, type(_dataFrames))
self.assertEqual(len(_dataFrames), 3)
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[0].columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[0].index))
self.assertEqual(5, len(_dataFrames[0]))
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[1].columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[1].index))
self.assertEqual(5, len(_dataFrames[1]))
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[2].columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[2].index))
self.assertEqual(5, len(_dataFrames[2]))
def test_empty_data_set(self):
query_response = '\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrame = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "not_exit")',
"my-org")
self.assertEqual(DataFrame, type(_dataFrame))
self.assertListEqual([], list(_dataFrame.columns))
self.assertListEqual([], list(_dataFrame.index))
def test_more_table_custom_index(self):
query_response = \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,6053961728,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrames = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")',
"my-org", data_frame_index=["_time"])
self.assertEqual(list, type(_dataFrames))
self.assertEqual(len(_dataFrames), 3)
print(_dataFrames[0].to_string())
self.assertListEqual(
["result", "table", "_start", "_stop", "_value", "_field", "_measurement", "host"],
list(_dataFrames[0].columns))
self.assertListEqual([Timestamp('2019-11-12 08:09:05+0000'), Timestamp('2019-11-12 08:09:06+0000'),
Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[0].index))
self.assertEqual(5, len(_dataFrames[0]))
self.assertListEqual(
["result", "table", "_start", "_stop", "_value", "_field", "_measurement", "host"],
list(_dataFrames[1].columns))
self.assertListEqual([Timestamp('2019-11-12 08:09:05+0000'), Timestamp('2019-11-12 08:09:06+0000'),
Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[1].index))
self.assertEqual(5, len(_dataFrames[1]))
self.assertListEqual(
["result", "table", "_start", "_stop", "_value", "_field", "_measurement", "host"],
list(_dataFrames[2].columns))
self.assertListEqual([Timestamp('2019-11-12 08:09:05+0000'), Timestamp('2019-11-12 08:09:06+0000'),
Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[2].index))
self.assertEqual(5, len(_dataFrames[2]))
def test_query_with_warning(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body='\n')
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
with pytest.warns(MissingPivotFunction) as warnings:
self.client.query_api().query_data_frame(
'from(bucket: "my-bucket")'
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
"my-org")
self.assertEqual(1, len(warnings))
def test_query_without_warning(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body='\n')
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
with pytest.warns(None) as warnings:
self.client.query_api().query_data_frame(
'import "influxdata/influxdb/schema"'
''
'from(bucket: "my-bucket")'
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> schema.fieldsAsCols() '
"my-org")
self.assertEqual(0, len(warnings))
with pytest.warns(None) as warnings:
self.client.query_api().query_data_frame(
'from(bucket: "my-bucket")'
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")'
"my-org")
self.assertEqual(0, len(warnings))
class QueryDataFrameIntegrationApi(BaseTest):
def test_large_amount_of_data(self):
_measurement_name = "data_frame_" + str(current_milli_time())
def _create_point(index) -> Point:
return Point(_measurement_name) \
.tag("deviceType", str(random.choice(['A', 'B']))) \
.tag("name", random.choice(['A', 'B'])) \
.field("uuid", random.randint(0, 10_000)) \
.field("co2", random.randint(0, 10_000)) \
.field("humid", random.randint(0, 10_000)) \
.field("lux", random.randint(0, 10_000)) \
.field("water", random.randint(0, 10_000)) \
.field("shine", random.randint(0, 10_000)) \
.field("temp", random.randint(0, 10_000)) \
.field("voc", random.randint(0, 10_000)) \
.time(time=(1583828781 + index), write_precision=WritePrecision.S)
data = rx.range(0, 2_000).pipe(ops.map(lambda index: _create_point(index)))
write_api = self.client.write_api(write_options=WriteOptions(batch_size=500))
write_api.write(org="my-org", bucket="my-bucket", record=data, write_precision=WritePrecision.S)
write_api.close()
query = 'from(bucket: "my-bucket")' \
'|> range(start: 2020-02-19T23:30:00Z, stop: now())' \
f'|> filter(fn: (r) => r._measurement == "{_measurement_name}")'
result = self.client.query_api().query_data_frame(org="my-org", query=query)
self.assertGreater(len(result), 1)
def test_query_without_credentials(self):
_client = InfluxDBClient(url="http://localhost:8086", token="my-token-wrong-credentials", org="my-org")
with self.assertRaises(ApiException) as ae:
query = 'from(bucket: "my-bucket")' \
'|> range(start: 2020-02-19T23:30:00Z, stop: now())' \
f'|> filter(fn: (r) => r._measurement == "my-measurement")'
_client.query_api().query_data_frame(query=query)
exception = ae.exception
self.assertEqual(401, exception.status)
self.assertEqual("Unauthorized", exception.reason)
_client.close()
|