File: db_helpers.py

package info (click to toggle)
pytest-django 4.11.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 604 kB
  • sloc: python: 4,006; makefile: 39; sh: 17
file content (186 lines) | stat: -rw-r--r-- 5,781 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
from __future__ import annotations

import os
import sqlite3
import subprocess
from typing import Mapping

import pytest
from django.conf import settings
from django.utils.encoding import force_str


# Construct names for the "inner" database used in runpytest tests
_settings = settings.DATABASES["default"]

DB_NAME: str = _settings["NAME"]
TEST_DB_NAME: str = _settings["TEST"]["NAME"]

if _settings["ENGINE"] == "django.db.backends.sqlite3" and TEST_DB_NAME is None:
    TEST_DB_NAME = ":memory:"
    SECOND_DB_NAME = ":memory:"
    SECOND_TEST_DB_NAME = ":memory:"
else:
    DB_NAME += "_inner"

    if TEST_DB_NAME is None:
        # No explicit test db name was given, construct a default one
        TEST_DB_NAME = f"test_{DB_NAME}_inner"
    else:
        # An explicit test db name was given, is that as the base name
        TEST_DB_NAME = f"{TEST_DB_NAME}_inner"

    SECOND_DB_NAME = DB_NAME + "_second" if DB_NAME is not None else None
    SECOND_TEST_DB_NAME = TEST_DB_NAME + "_second" if DB_NAME is not None else None


def get_db_engine() -> str:
    db_engine: str = _settings["ENGINE"].split(".")[-1]
    return db_engine


class CmdResult:
    def __init__(self, status_code: int, std_out: bytes, std_err: bytes) -> None:
        self.status_code = status_code
        self.std_out = std_out
        self.std_err = std_err


def run_cmd(*args: str, env: Mapping[str, str] | None = None) -> CmdResult:
    r = subprocess.Popen(
        args,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        env={**os.environ, **(env or {})},
    )
    stdoutdata, stderrdata = r.communicate()
    ret = r.wait()
    return CmdResult(ret, stdoutdata, stderrdata)


def run_psql(*args: str) -> CmdResult:
    env = {}
    user = _settings.get("USER")
    if user:  # pragma: no branch
        args = ("-U", user, *args)
    password = _settings.get("PASSWORD")
    if password:  # pragma: no branch
        env["PGPASSWORD"] = password
    host = _settings.get("HOST")
    if host:  # pragma: no branch
        args = ("-h", host, *args)
    return run_cmd("psql", *args, env=env)


def run_mysql(*args: str) -> CmdResult:
    user = _settings.get("USER")
    if user:  # pragma: no branch
        args = ("-u", user, *args)
    password = _settings.get("PASSWORD")
    if password:  # pragma: no branch
        # Note: "-ppassword" must be a single argument.
        args = ("-p" + password, *args)
    host = _settings.get("HOST")
    if host:  # pragma: no branch
        args = ("-h", host, *args)
    return run_cmd("mysql", *args)


def skip_if_sqlite_in_memory() -> None:
    if _settings["ENGINE"] == "django.db.backends.sqlite3" and _settings["TEST"]["NAME"] is None:
        pytest.skip("Do not test db reuse since database does not support it")


def _get_db_name(db_suffix: str | None = None) -> str:
    name = TEST_DB_NAME
    if db_suffix:
        name = f"{name}_{db_suffix}"
    return name


def drop_database(db_suffix: str | None = None) -> None:
    name = _get_db_name(db_suffix)
    db_engine = get_db_engine()

    if db_engine == "postgresql":
        r = run_psql("postgres", "-c", f"DROP DATABASE {name}")
        assert "DROP DATABASE" in force_str(r.std_out) or "does not exist" in force_str(r.std_err)
        return

    if db_engine == "mysql":
        r = run_mysql("-e", f"DROP DATABASE {name}")
        assert "database doesn't exist" in force_str(r.std_err) or r.status_code == 0
        return

    assert db_engine == "sqlite3", f"{db_engine} cannot be tested properly!"
    assert name != ":memory:", "sqlite in-memory database cannot be dropped!"
    if os.path.exists(name):  # pragma: no branch
        os.unlink(name)


def db_exists(db_suffix: str | None = None) -> bool:
    name = _get_db_name(db_suffix)
    db_engine = get_db_engine()

    if db_engine == "postgresql":
        r = run_psql(name, "-c", "SELECT 1")
        return r.status_code == 0

    if db_engine == "mysql":
        r = run_mysql(name, "-e", "SELECT 1")
        return r.status_code == 0

    assert db_engine == "sqlite3", f"{db_engine} cannot be tested properly!"
    assert TEST_DB_NAME != ":memory:", "sqlite in-memory database cannot be checked for existence!"
    return os.path.exists(name)


def mark_database() -> None:
    db_engine = get_db_engine()

    if db_engine == "postgresql":
        r = run_psql(TEST_DB_NAME, "-c", "CREATE TABLE mark_table();")
        assert r.status_code == 0
        return

    if db_engine == "mysql":
        r = run_mysql(TEST_DB_NAME, "-e", "CREATE TABLE mark_table(kaka int);")
        assert r.status_code == 0
        return

    assert db_engine == "sqlite3", f"{db_engine} cannot be tested properly!"
    assert TEST_DB_NAME != ":memory:", "sqlite in-memory database cannot be marked!"

    conn = sqlite3.connect(TEST_DB_NAME)
    try:
        with conn:
            conn.execute("CREATE TABLE mark_table(kaka int);")
    finally:  # Close the DB even if an error is raised
        conn.close()


def mark_exists() -> bool:
    db_engine = get_db_engine()

    if db_engine == "postgresql":
        r = run_psql(TEST_DB_NAME, "-c", "SELECT 1 FROM mark_table")

        return r.status_code == 0

    if db_engine == "mysql":
        r = run_mysql(TEST_DB_NAME, "-e", "SELECT 1 FROM mark_table")

        return r.status_code == 0

    assert db_engine == "sqlite3", f"{db_engine} cannot be tested properly!"
    assert TEST_DB_NAME != ":memory:", "sqlite in-memory database cannot be checked for mark!"

    conn = sqlite3.connect(TEST_DB_NAME)
    try:
        with conn:
            conn.execute("SELECT 1 FROM mark_table")
            return True
    except sqlite3.OperationalError:
        return False
    finally:  # Close the DB even if an error is raised
        conn.close()