File: test_buffer.py

package info (click to toggle)
siridb-server 2.0.53-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,612 kB
  • sloc: ansic: 47,501; python: 6,263; sh: 254; makefile: 149
file content (119 lines) | stat: -rw-r--r-- 3,952 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import asyncio
import functools
import random
import time
from testing import Client
from testing import default_test_setup
from testing import gen_data
from testing import gen_points
from testing import gen_series
from testing import InsertError
from testing import PoolError
from testing import QueryError
from testing import run_test
from testing import Series
from testing import Server
from testing import ServerError
from testing import SiriDB
from testing import TestBase
from testing import UserAuthError
from testing import parse_args


class TestBuffer(TestBase):
    title = 'Test buffer object'

    async def _add_points(self):
        for series_name in ['iris', 'db', 'ligo', 'sasha']:
            if series_name not in self.total:
                self.total[series_name] = []
            batches = sum([ord(c) for c in series_name]) % 100
            for i in range(batches):
                npoints = []
                n = int(i**0.5 * 10000 % 5) + 1
                for p in range(n):
                    self.ts += (n + 5000) if i % 2 else (n - 5000)
                    npoints.append([self.ts, i*1000+p])
                self.total[series_name].extend(npoints)
                self.total[series_name].sort()
                await self.client0.insert({series_name: npoints})

    async def _test_equal(self):
        for series_name, points in self.total.items():
            res = await self.client0.query(f'select * from "{series_name}"')
            res = res[series_name]
            self.assertEqual(len(points), len(res))
            self.assertEqual(points, res)

    async def _change_buf_size(self, buffer_size):
        self.client0.close()
        result = await self.server0.stop()
        self.assertTrue(result)
        self.server0.set_buffer_size(self.db, buffer_size)
        await self.server0.start(sleep=20)
        await self.client0.connect()
        res = await self.client0.query('show buffer_size')
        self.assertEqual(res['data'][0]['value'], buffer_size)
        await self._test_equal()

    async def _change_buf_path(self, buffer_path):
        self.client0.close()
        result = await self.server0.stop()
        self.assertTrue(result)
        self.server0.set_buffer_path(self.db, buffer_path)
        await self.server0.start(sleep=20)
        await self.client0.connect()
        res = await self.client0.query('show buffer_path')
        self.assertEqual(res['data'][0]['value'], buffer_path)
        res = await self.client0.query('show open_files')
        self.assertEqual(res['data'][0]['value'], 3)
        res = await self.client0.query(
            f'alter server {self.uuid} set backup_mode true')
        await asyncio.sleep(5)
        res = await self.client0.query('show open_files')
        self.assertEqual(res['data'][0]['value'], 0)
        res = await self.client0.query(
            f'alter server {self.uuid} set backup_mode false')
        await self._test_equal()

    @default_test_setup(1, time_precision='s', compression=False)
    async def run(self):
        await self.client0.connect()

        res = await self.client0.query('show uuid')
        self.uuid = res['data'][0]['value']

        self.ts = 1500000000
        self.total = {}

        await self._add_points()
        await self._test_equal()

        await self._change_buf_path(os.path.join(
            self.server0.dbpath,
            self.db.dbname,
            '../buf/'))

        await self._change_buf_size(8192)

        await self._add_points()
        await self._test_equal()

        await self._change_buf_size(8192)
        await self._change_buf_size(512)

        await self._add_points()
        await self._test_equal()

        await self._change_buf_size(1024)

        await self._change_buf_path(os.path.join(
            self.server0.dbpath,
            self.db.dbname,
            'buf/'))


if __name__ == '__main__':
    parse_args()
    run_test(TestBuffer())