File: __init__.py

package info (click to toggle)
siridb-server 2.0.53-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,612 kB
  • sloc: ansic: 47,501; python: 6,263; sh: 254; makefile: 149
file content (45 lines) | stat: -rw-r--r-- 1,132 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio
import logging
from .client import Client
from .client import InsertError
from .client import PoolError
from .client import QueryError
from .client import ServerError
from .client import UserAuthError
from .helpers import cleanup
from .helpers import gen_data
from .helpers import gen_points
from .helpers import gen_series
from .server import Server
from .siridb import SiriDB
from .testbase import default_test_setup
from .testbase import TestBase
from .series import Series
from .pipe_client import PipeClient as SiriDBAsyncUnixConnection
from .args import parse_args
from .task import Task


async def _run_test(test, loglevel):
    logger = logging.getLogger()
    logger.setLevel(loglevel)
    task = Task(test.title)

    try:
        await test.run()
    except Exception as e:
        task.stop(success=False)
        raise e
    else:
        task.stop(success=True)

    logger.setLevel('CRITICAL')

    await task.task


def run_test(test, loglevel='CRITICAL'):
    assert isinstance(test, TestBase)
    loop = asyncio.new_event_loop()
    cleanup()
    loop.run_until_complete(_run_test(test, loglevel))