File: helpers.py

package info (click to toggle)
siridb-server 2.0.53-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,612 kB
  • sloc: ansic: 47,501; python: 6,263; sh: 254; makefile: 149
file content (79 lines) | stat: -rw-r--r-- 2,115 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import os
import logging
import shutil
import time
import random
import functools
import string
from .constants import TEST_DIR
from .series import Series

_MAP_TS = {
    's': 10**0,
    'ms': 10**3,
    'us': 10**6,
    'ns': 10**9
}


def cleanup():
    logging.info('Remove test dir')
    try:
        shutil.rmtree(os.path.join(TEST_DIR))
    except FileNotFoundError:
        pass
    os.mkdir(TEST_DIR)

    logging.info('Force kill all open siridb-server processes')
    os.system('pkill -9 siridb-server')

    logging.info('Force kill all open memcheck-amd64- processes')
    os.system('pkill -9 memcheck-amd64-')


def random_value(tp=float, mi=-100, ma=100):
    i = random.randrange(mi, ma)
    if tp == float:
        return i * random.random()
    elif tp == int:
        return i


def random_series_name(size=12, chars=string.ascii_letters + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


def gen_points(
        n=100,
        time_precision='s',
        tp=float,
        mi=-100,
        ma=100,
        ts_gap=1,
        shuffle=True):
    if isinstance(ts_gap, str):
        if ts_gap.endswith('s'):
            ts_gap = int(ts_gap[:-1]) * _MAP_TS[time_precision]
        elif ts_gap.endswith('m'):
            ts_gap = int(ts_gap[:-1]) * _MAP_TS[time_precision] * 60
        elif ts_gap.endswith('h'):
            ts_gap = int(ts_gap[:-1]) * _MAP_TS[time_precision] * 3600
        elif ts_gap.endswith('d'):
            ts_gap = int(ts_gap[:-1]) * _MAP_TS[time_precision] * 3600 * 24
        elif ts_gap.endswith('w'):
            ts_gap = int(ts_gap[:-1]) * _MAP_TS[time_precision] * 3600 * 24 * 7

    end = int(time.time() * _MAP_TS[time_precision])
    start = end - (n * ts_gap)
    timestamps = list(range(start, end, ts_gap))
    if shuffle:
        random.shuffle(timestamps)
    return [[ts, random_value(tp, mi, ma)] for ts in timestamps]


def gen_data(points=functools.partial(gen_points), n=100):
    return {random_series_name(): points() for _ in range(n)}


def gen_series(n=10000):
    return [Series(random_series_name()) for _ in range(n)]