File: client.py

package info (click to toggle)
siridb-server 2.0.53-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,612 kB
  • sloc: ansic: 47,501; python: 6,263; sh: 254; makefile: 149
file content (75 lines) | stat: -rw-r--r-- 2,206 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import sys
import asyncio
import functools
import logging
import random
import time
from siridb.connector import SiriDBClient
from siridb.connector.lib.exceptions import AuthenticationError
from siridb.connector.lib.exceptions import InsertError
from siridb.connector.lib.exceptions import PoolError
from siridb.connector.lib.exceptions import QueryError
from siridb.connector.lib.exceptions import ServerError
from siridb.connector.lib.exceptions import UserAuthError
from .helpers import gen_points
from .server import Server


class Client:
    def __init__(self, db, servers, username='iris', password='siri'):
        self.db = db

        if isinstance(servers, Server):
            servers = [servers]

        self.cluster = SiriDBClient(
            username=username,
            password=password,
            dbname=db.dbname,
            hostlist=[
                (server.server_address, server.listen_client_port)
                for server in servers
            ])

        self.query = self.cluster.query
        self.insert = self.cluster.insert

    async def connect(self):
        logging.info('Create client connection to database {}'.format(
            self.db.dbname))
        await self.cluster.connect()

    def close(self):
        logging.info('Closing connection to database {}'.format(
            self.db.dbname))
        self.cluster.close()

    async def insert_some_series(self,
                                 series,
                                 n=0.01,
                                 timeout=None,
                                 points=functools.partial(gen_points, n=1)):
        random.shuffle(series)

        n = int(len(series) * n)

        assert (n <= len(series) and n > 0)

        data = {s.name: s.add_points(points()) for s in series[:n]}

        while True:
            try:
                await self.insert(data)
            except PoolError as e:
                if not timeout:
                    raise e
                timeout -= 1
                await asyncio.sleep(1.0)
            else:
                break

        if timeout is not None:
            time.sleep(0.1)

        for s in series[:n]:
            s.commit_points()