1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
import configparser
from contextlib import contextmanager
import subprocess
from unittest import TestCase
from clickhouse_driver.client import Client
from tests import log
from tests.util import skip_by_server_version
file_config = configparser.ConfigParser()
file_config.read(['setup.cfg'])
log.configure(file_config.get('log', 'level'))
class BaseTestCase(TestCase):
required_server_version = None
server_version = None
clickhouse_client_binary = file_config.get('db', 'client')
host = file_config.get('db', 'host')
port = file_config.getint('db', 'port')
database = file_config.get('db', 'database')
user = file_config.get('db', 'user')
password = file_config.get('db', 'password')
client = None
client_kwargs = None
cli_client_kwargs = None
@classmethod
def emit_cli(cls, statement, database=None, encoding='utf-8', **kwargs):
if database is None:
database = cls.database
args = [
cls.clickhouse_client_binary,
'--database', database,
'--host', cls.host,
'--port', str(cls.port),
'--query', str(statement)
]
for key, value in kwargs.items():
args.extend(['--' + key, str(value)])
process = subprocess.Popen(
args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
output = process.communicate()
out, err = output
if err:
raise RuntimeError(
'Error during communication. {}'.format(err)
)
return out.decode(encoding)
def _create_client(self, **kwargs):
client_kwargs = {
'port': self.port,
'database': self.database,
'user': self.user,
'password': self.password
}
client_kwargs.update(kwargs)
return Client(self.host, **client_kwargs)
def created_client(self, **kwargs):
return self._create_client(**kwargs)
@classmethod
def setUpClass(cls):
cls.emit_cli(
'DROP DATABASE IF EXISTS {}'.format(cls.database), 'default'
)
cls.emit_cli('CREATE DATABASE {}'.format(cls.database), 'default')
version_str = cls.emit_cli('SELECT version()').strip()
cls.server_version = tuple(int(x) for x in version_str.split('.'))
super(BaseTestCase, cls).setUpClass()
def setUp(self):
super(BaseTestCase, self).setUp()
required = self.required_server_version
if required and required > self.server_version:
skip_by_server_version(self, self.required_server_version)
if callable(self.client_kwargs):
client_kwargs = self.client_kwargs(self.server_version)
else:
client_kwargs = self.client_kwargs
client_kwargs = client_kwargs or {}
self.client = self._create_client(**client_kwargs)
def tearDown(self):
self.client.disconnect()
super(BaseTestCase, self).tearDown()
@classmethod
def tearDownClass(cls):
cls.emit_cli('DROP DATABASE {}'.format(cls.database))
super(BaseTestCase, cls).tearDownClass()
@contextmanager
def create_table(self, columns, **kwargs):
if self.cli_client_kwargs:
if callable(self.cli_client_kwargs):
cli_client_kwargs = self.cli_client_kwargs()
if cli_client_kwargs:
kwargs.update(cli_client_kwargs)
else:
kwargs.update(self.cli_client_kwargs)
self.emit_cli(
'CREATE TABLE test ({}) ''ENGINE = Memory'.format(columns),
**kwargs
)
try:
yield
except Exception:
raise
finally:
self.emit_cli('DROP TABLE test')
|