File: test_insert.py

package info (click to toggle)
siridb-server 2.0.53-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,612 kB
  • sloc: ansic: 47,501; python: 6,263; sh: 254; makefile: 149
file content (191 lines) | stat: -rw-r--r-- 6,072 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import asyncio
import functools
import random
import time
from testing import Client
from testing import default_test_setup
from testing import gen_data
from testing import gen_points
from testing import gen_series
from testing import InsertError
from testing import PoolError
from testing import QueryError
from testing import run_test
from testing import Series
from testing import Server
from testing import ServerError
from testing import SiriDB
from testing import TestBase
from testing import UserAuthError
from testing import parse_args


TIME_PRECISION = 'ns'


class TestInsert(TestBase):
    title = 'Test inserts and response'

    GEN_POINTS = functools.partial(
        gen_points, n=1, time_precision=TIME_PRECISION)

    async def _test_series(self, client):

        result = await client.query('select * from "series float"')
        self.assertEqual(result['series float'], self.series_float)

        result = await client.query('select * from "series int"')
        self.assertEqual(result['series int'], self.series_int)

        result = await client.query(
            'list series name, length, type, start, end')
        result['series'].sort()
        self.assertEqual(
            result,
            {
                'columns': ['name', 'length', 'type', 'start', 'end'],
                'series': [
                    [
                        'series float',
                        10000, 'float',
                        self.series_float[0][0],
                        self.series_float[-1][0]],
                    [
                        'series int', 10000,
                        'integer',
                        self.series_int[0][0],
                        self.series_int[-1][0]],
                ]
            })

    async def insert(self, client, series, n, timeout=1):
        for _ in range(n):
            await client.insert_some_series(
                series, timeout=timeout, points=self.GEN_POINTS)
            await asyncio.sleep(1.0)

    @default_test_setup(2, time_precision=TIME_PRECISION, compression=False)
    async def run(self):
        await self.client0.connect()

        self.assertEqual(
            await self.client0.insert({}),
            {'success_msg': 'Successfully inserted 0 point(s).'})

        self.assertEqual(
            await self.client0.insert([]),
            {'success_msg': 'Successfully inserted 0 point(s).'})

        self.series_float = gen_points(
            tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
        random.shuffle(self.series_float)
        self.series_int = gen_points(
            tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
        random.shuffle(self.series_int)

        self.assertEqual(
            await self.client0.insert({
                'series float': self.series_float,
                'series int': self.series_int
            }), {'success_msg': 'Successfully inserted 20000 point(s).'})

        self.series_float.sort()
        self.series_int.sort()

        await self._test_series(self.client0)

        with self.assertRaises(InsertError):
            await self.client0.insert('[]')

        with self.assertRaises(InsertError):
            await self.client0.insert('[]')

        with self.assertRaises(InsertError):
            await self.client0.insert([{}])

        with self.assertRaises(InsertError):
            await self.client0.insert({'None': [[1, None]]})

        with self.assertRaises(InsertError):
            await self.client0.insert({'no points': []})

        with self.assertRaises(InsertError):
            await self.client0.insert({'no points': [[]]})

        self.assertEqual(
            await self.client0.insert({
                'ts_zero': [[0, 1]]
                }), {'success_msg': 'Successfully inserted 1 point(s).'})

        await self.client0.query('drop series "ts_zero"')

        with self.assertRaises(InsertError):
            await self.client0.insert([{'name': 'no points', 'points': []}])

        # timestamps should be interger values
        with self.assertRaises(InsertError):
            await self.client0.insert({'invalid ts': [[0.5, 6]]})

        # timestamps should be interger values
        with self.assertRaises(InsertError):
            await self.client0.insert(
                {'invalid ts': [[-1, 6]]})

        # empty series name is not allowed
        with self.assertRaises(InsertError):
            await self.client0.insert({'': [[1, 0]]})

        # empty series name is not allowed
        with self.assertRaises(InsertError):
            await self.client0.insert([{'name': '', 'points': [[1, 0]]}])

        await self.db.add_replica(self.server1, 0, sleep=30)
        # await self.db.add_pool(self.server1, sleep=30)

        await self.assertIsRunning(self.db, self.client0, timeout=30)

        await self.client1.connect()

        await self._test_series(self.client1)

        # Create some random series and start 25 insert task parallel
        series = gen_series(n=10000)
        tasks = [
            asyncio.ensure_future(
                self.client0.insert_some_series(
                    series,
                    timeout=0,
                    points=self.GEN_POINTS))
            for i in range(25)]
        await asyncio.gather(*tasks)

        await asyncio.sleep(2)

        # Check the result
        await self.assertSeries(self.client0, series)
        await self.assertSeries(self.client1, series)

        tasks = [
            asyncio.ensure_future(self.client0.query(
                    'drop series /.*/ set ignore_threshold true'))
            for i in range(5)]

        await asyncio.gather(*tasks)

        tasks = [
            asyncio.ensure_future(self.client0.query(
                    'drop shards set ignore_threshold true'))
            for i in range(5)]

        await asyncio.gather(*tasks)

        await asyncio.sleep(2)

        self.client0.close()
        self.client1.close()


if __name__ == '__main__':
    random.seed(1)
    parse_args()
    run_test(TestInsert())