File: conftest.py

package info (click to toggle)
cachelib 0.13.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 428 kB
  • sloc: python: 1,555; makefile: 32; sh: 15
file content (93 lines) | stat: -rw-r--r-- 2,563 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os
import subprocess
import warnings
from pathlib import Path

import pytest
from xprocess import ProcessStarter


def under_uwsgi():
    try:
        import uwsgi  # noqa: F401
    except ImportError:
        return False
    else:
        return True


@pytest.hookimpl()
def pytest_sessionfinish(session, exitstatus):
    if under_uwsgi():
        try:
            script_path = Path(os.environ["TMPDIR"], "return_pytest_exit_code.py")
        except KeyError:
            warnings.warn(
                "Pytest could not find tox 'TMPDIR' in the environment,"
                " make sure the variable is set in the project tox.ini"
                " file if you are running under tox.",
                stacklevel=2,
            )
        else:
            with open(script_path, mode="w") as f:
                f.write(f"import sys; sys.exit({exitstatus})")


@pytest.fixture(scope="class")
def redis_server(xprocess):
    package_name = "redis"
    pytest.importorskip(
        modname=package_name, reason=f"could not find python package {package_name}"
    )

    class Starter(ProcessStarter):
        pattern = "[Rr]eady to accept connections"
        args = ["redis-server", "--port 6360"]

        def startup_check(self):
            out = subprocess.run(
                ["redis-cli", "-p", "6360", "ping"], stdout=subprocess.PIPE
            )
            return out.stdout == b"PONG\n"

    xprocess.ensure(package_name, Starter)
    yield
    xprocess.getinfo(package_name).terminate()


@pytest.fixture(scope="class")
def memcached_server(xprocess):
    package_name = "pylibmc"
    pytest.importorskip(
        modname=package_name, reason=f"could not find python package {package_name}"
    )

    class Starter(ProcessStarter):
        pattern = "server listening"
        args = ["memcached", "-vv"]

        def startup_check(self):
            out = subprocess.run(["memcached"], stderr=subprocess.PIPE)
            return b"Address already" in out.stderr

    xprocess.ensure(package_name, Starter)
    yield
    xprocess.getinfo(package_name).terminate()


class TestData:
    """This class centralizes all data samples used in tests"""

    sample_numbers = [0, 10, 1024000, 9, 5000000000000, 99, 738, 2000000]
    sample_pairs = {
        "128": False,
        "beef": True,
        "crevettes": {},
        "1024": "spam",
        "bacon": "eggs",
        "sausage": 2048,
        "3072": [],
        "brandy": [{}, "fried eggs"],
        "lobster": ["baked beans", [512]],
        "4096": {"sauce": [], 256: "truffle"},
    }