File: utils.py

package info (click to toggle)
mycli 1.66.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,276 kB
  • sloc: python: 12,440; makefile: 10
file content (105 lines) | stat: -rw-r--r-- 2,890 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# type: ignore

import multiprocessing
import os
import platform
import signal
import time

import pymysql
import pytest

from mycli.constants import (
    DEFAULT_CHARSET,
    DEFAULT_HOST,
    DEFAULT_PORT,
    DEFAULT_USER,
    TEST_DATABASE,
)
from mycli.main import special

DATABASE = TEST_DATABASE
PASSWORD = os.getenv("PYTEST_PASSWORD")
USER = os.getenv("PYTEST_USER", DEFAULT_USER)
HOST = os.getenv("PYTEST_HOST", DEFAULT_HOST)
PORT = int(os.getenv("PYTEST_PORT", DEFAULT_PORT))
CHARACTER_SET = os.getenv("PYTEST_CHARSET", DEFAULT_CHARSET)
SSH_USER = os.getenv("PYTEST_SSH_USER", None)
SSH_HOST = os.getenv("PYTEST_SSH_HOST", None)
SSH_PORT = int(os.getenv("PYTEST_SSH_PORT", "22"))
TEMPFILE_PREFIX = 'mycli_test_suite_'


def db_connection(dbname=None):
    conn = pymysql.connect(user=USER, host=HOST, port=PORT, database=dbname, password=PASSWORD, charset=CHARACTER_SET, local_infile=False)
    conn.autocommit = True
    return conn


try:
    db_connection()
    CAN_CONNECT_TO_DB = True
except Exception:
    CAN_CONNECT_TO_DB = False

dbtest = pytest.mark.skipif(not CAN_CONNECT_TO_DB, reason=f"Need a mysql instance at {DEFAULT_HOST} accessible by user '{DEFAULT_USER}'")


def create_db(dbname):
    with db_connection().cursor() as cur:
        try:
            cur.execute(f"DROP DATABASE IF EXISTS {TEST_DATABASE}")
            cur.execute(f"CREATE DATABASE {TEST_DATABASE}")
        except Exception:
            pass


def run(executor, sql, rows_as_list=True):
    """Return string output for the sql to be run."""
    results = []

    for result in executor.run(sql):
        rows = list(result.rows) if (rows_as_list and result.rows) else result.rows
        results.append({
            "preamble": result.preamble,
            "header": result.header,
            "rows": rows,
            "postamble": result.postamble,
            "status": result.status,
            "status_plain": result.status_plain,
        })

    return results


def set_expanded_output(is_expanded):
    """Pass-through for the tests."""
    return special.set_expanded_output(is_expanded)


def is_expanded_output():
    """Pass-through for the tests."""
    return special.is_expanded_output()


def send_ctrl_c_to_pid(pid, wait_seconds):
    """Sends a Ctrl-C like signal to the given `pid` after `wait_seconds`
    seconds."""
    time.sleep(wait_seconds)
    system_name = platform.system()
    if system_name == "Windows":
        os.kill(pid, signal.CTRL_C_EVENT)
    else:
        os.kill(pid, signal.SIGINT)


def send_ctrl_c(wait_seconds):
    """Create a process that sends a Ctrl-C like signal to the current process
    after `wait_seconds` seconds.

    Returns the `multiprocessing.Process` created.

    """
    ctrl_c_process = multiprocessing.Process(target=send_ctrl_c_to_pid, args=(os.getpid(), wait_seconds))
    ctrl_c_process.start()
    return ctrl_c_process