File: testcase.py

package info (click to toggle)
python-clickhouse-driver 0.2.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,516 kB
  • sloc: python: 10,950; pascal: 42; makefile: 31; sh: 3
file content (130 lines) | stat: -rw-r--r-- 3,815 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import configparser
from contextlib import contextmanager
import subprocess
from unittest import TestCase

from clickhouse_driver.client import Client
from tests import log
from tests.util import skip_by_server_version


file_config = configparser.ConfigParser()
file_config.read(['setup.cfg'])


log.configure(file_config.get('log', 'level'))


class BaseTestCase(TestCase):
    required_server_version = None
    server_version = None

    clickhouse_client_binary = file_config.get('db', 'client')
    host = file_config.get('db', 'host')
    port = file_config.getint('db', 'port')
    database = file_config.get('db', 'database')
    user = file_config.get('db', 'user')
    password = file_config.get('db', 'password')

    client = None
    client_kwargs = None
    cli_client_kwargs = None

    @classmethod
    def emit_cli(cls, statement, database=None, encoding='utf-8', **kwargs):
        if database is None:
            database = cls.database

        args = [
            cls.clickhouse_client_binary,
            '--database', database,
            '--host', cls.host,
            '--port', str(cls.port),
            '--query', str(statement)
        ]

        for key, value in kwargs.items():
            args.extend(['--' + key, str(value)])

        process = subprocess.Popen(
            args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        output = process.communicate()
        out, err = output

        if err:
            raise RuntimeError(
                'Error during communication. {}'.format(err)
            )

        return out.decode(encoding)

    def _create_client(self, **kwargs):
        client_kwargs = {
            'port': self.port,
            'database': self.database,
            'user': self.user,
            'password': self.password
        }
        client_kwargs.update(kwargs)
        return Client(self.host, **client_kwargs)

    def created_client(self, **kwargs):
        return self._create_client(**kwargs)

    @classmethod
    def setUpClass(cls):
        cls.emit_cli(
            'DROP DATABASE IF EXISTS {}'.format(cls.database), 'default'
        )
        cls.emit_cli('CREATE DATABASE {}'.format(cls.database), 'default')

        version_str = cls.emit_cli('SELECT version()').strip()
        cls.server_version = tuple(int(x) for x in version_str.split('.'))

        super(BaseTestCase, cls).setUpClass()

    def setUp(self):
        super(BaseTestCase, self).setUp()

        required = self.required_server_version

        if required and required > self.server_version:
            skip_by_server_version(self, self.required_server_version)

        if callable(self.client_kwargs):
            client_kwargs = self.client_kwargs(self.server_version)
        else:
            client_kwargs = self.client_kwargs
        client_kwargs = client_kwargs or {}
        self.client = self._create_client(**client_kwargs)

    def tearDown(self):
        self.client.disconnect()
        super(BaseTestCase, self).tearDown()

    @classmethod
    def tearDownClass(cls):
        cls.emit_cli('DROP DATABASE {}'.format(cls.database))
        super(BaseTestCase, cls).tearDownClass()

    @contextmanager
    def create_table(self, columns, **kwargs):
        if self.cli_client_kwargs:
            if callable(self.cli_client_kwargs):
                cli_client_kwargs = self.cli_client_kwargs()
                if cli_client_kwargs:
                    kwargs.update(cli_client_kwargs)
            else:
                kwargs.update(self.cli_client_kwargs)

        self.emit_cli(
            'CREATE TABLE test ({}) ''ENGINE = Memory'.format(columns),
            **kwargs
        )
        try:
            yield
        except Exception:
            raise
        finally:
            self.emit_cli('DROP TABLE test')