1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
import asyncio
import functools
import random
import time
from testing import Client
from testing import default_test_setup
from testing import gen_data
from testing import gen_points
from testing import gen_series
from testing import InsertError
from testing import PoolError
from testing import QueryError
from testing import run_test
from testing import Series
from testing import Server
from testing import ServerError
from testing import SiriDB
from testing import TestBase
from testing import UserAuthError
from testing import parse_args
TIME_PRECISION = 'ns'
class TestInsert(TestBase):
title = 'Test inserts and response'
GEN_POINTS = functools.partial(
gen_points, n=1, time_precision=TIME_PRECISION)
async def _test_series(self, client):
result = await client.query('select * from "series float"')
self.assertEqual(result['series float'], self.series_float)
result = await client.query('select * from "series int"')
self.assertEqual(result['series int'], self.series_int)
result = await client.query(
'list series name, length, type, start, end')
result['series'].sort()
self.assertEqual(
result,
{
'columns': ['name', 'length', 'type', 'start', 'end'],
'series': [
[
'series float',
10000, 'float',
self.series_float[0][0],
self.series_float[-1][0]],
[
'series int', 10000,
'integer',
self.series_int[0][0],
self.series_int[-1][0]],
]
})
async def insert(self, client, series, n, timeout=1):
for _ in range(n):
await client.insert_some_series(
series, timeout=timeout, points=self.GEN_POINTS)
await asyncio.sleep(1.0)
@default_test_setup(2, time_precision=TIME_PRECISION, compression=False)
async def run(self):
await self.client0.connect()
self.assertEqual(
await self.client0.insert({}),
{'success_msg': 'Successfully inserted 0 point(s).'})
self.assertEqual(
await self.client0.insert([]),
{'success_msg': 'Successfully inserted 0 point(s).'})
self.series_float = gen_points(
tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
random.shuffle(self.series_float)
self.series_int = gen_points(
tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
random.shuffle(self.series_int)
self.assertEqual(
await self.client0.insert({
'series float': self.series_float,
'series int': self.series_int
}), {'success_msg': 'Successfully inserted 20000 point(s).'})
self.series_float.sort()
self.series_int.sort()
await self._test_series(self.client0)
with self.assertRaises(InsertError):
await self.client0.insert('[]')
with self.assertRaises(InsertError):
await self.client0.insert('[]')
with self.assertRaises(InsertError):
await self.client0.insert([{}])
with self.assertRaises(InsertError):
await self.client0.insert({'None': [[1, None]]})
with self.assertRaises(InsertError):
await self.client0.insert({'no points': []})
with self.assertRaises(InsertError):
await self.client0.insert({'no points': [[]]})
self.assertEqual(
await self.client0.insert({
'ts_zero': [[0, 1]]
}), {'success_msg': 'Successfully inserted 1 point(s).'})
await self.client0.query('drop series "ts_zero"')
with self.assertRaises(InsertError):
await self.client0.insert([{'name': 'no points', 'points': []}])
# timestamps should be interger values
with self.assertRaises(InsertError):
await self.client0.insert({'invalid ts': [[0.5, 6]]})
# timestamps should be interger values
with self.assertRaises(InsertError):
await self.client0.insert(
{'invalid ts': [[-1, 6]]})
# empty series name is not allowed
with self.assertRaises(InsertError):
await self.client0.insert({'': [[1, 0]]})
# empty series name is not allowed
with self.assertRaises(InsertError):
await self.client0.insert([{'name': '', 'points': [[1, 0]]}])
await self.db.add_replica(self.server1, 0, sleep=30)
# await self.db.add_pool(self.server1, sleep=30)
await self.assertIsRunning(self.db, self.client0, timeout=30)
await self.client1.connect()
await self._test_series(self.client1)
# Create some random series and start 25 insert task parallel
series = gen_series(n=10000)
tasks = [
asyncio.ensure_future(
self.client0.insert_some_series(
series,
timeout=0,
points=self.GEN_POINTS))
for i in range(25)]
await asyncio.gather(*tasks)
await asyncio.sleep(2)
# Check the result
await self.assertSeries(self.client0, series)
await self.assertSeries(self.client1, series)
tasks = [
asyncio.ensure_future(self.client0.query(
'drop series /.*/ set ignore_threshold true'))
for i in range(5)]
await asyncio.gather(*tasks)
tasks = [
asyncio.ensure_future(self.client0.query(
'drop shards set ignore_threshold true'))
for i in range(5)]
await asyncio.gather(*tasks)
await asyncio.sleep(2)
self.client0.close()
self.client1.close()
if __name__ == '__main__':
random.seed(1)
parse_args()
run_test(TestInsert())
|