File: test_redis.py

package info (click to toggle)
python-dynaconf 3.2.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,900 kB
  • sloc: python: 21,464; sh: 9; makefile: 4
file content (124 lines) | stat: -rw-r--r-- 4,284 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from __future__ import annotations

import os

import pytest

from dynaconf import LazySettings
from dynaconf.loaders.redis_loader import delete
from dynaconf.loaders.redis_loader import load
from dynaconf.loaders.redis_loader import write
from dynaconf.utils.inspect import get_history


def custom_checker(ip_address, port):
    # This function should be check if the redis server is online and ready
    # write(settings, {"SECRET": "redis_works"})
    # return load(settings, key="SECRET")
    return True


DYNACONF_TEST_REDIS_URL = os.environ.get("DYNACONF_TEST_REDIS_URL", None)
if DYNACONF_TEST_REDIS_URL:

    @pytest.fixture(scope="module")
    def docker_redis():
        return DYNACONF_TEST_REDIS_URL

else:

    @pytest.fixture(scope="module")
    def docker_redis(docker_services):
        docker_services.start("redis")
        public_port = docker_services.wait_for_service(
            "redis", 6379, check_server=custom_checker
        )
        url = f"http://{docker_services.docker_ip}:{public_port}"
        return url


@pytest.mark.integration
def test_redis_not_configured():
    with pytest.raises(RuntimeError) as excinfo:
        settings = LazySettings(environments=True)
        write(settings, {"OTHER_SECRET": "redis_works"})
    assert "export REDIS_ENABLED_FOR_DYNACONF=true" in str(excinfo.value)


@pytest.mark.integration
def test_write_redis_without_data(docker_redis):
    os.environ["REDIS_ENABLED_FOR_DYNACONF"] = "1"
    os.environ["REDIS_HOST_FOR_DYNACONF"] = "localhost"
    os.environ["REDIS_PORT_FOR_DYNACONF"] = "6379"
    settings = LazySettings(environments=True)
    with pytest.raises(AttributeError) as excinfo:
        write(settings)
    assert "Data must be provided" in str(excinfo.value)


@pytest.mark.integration
def test_write_to_redis(docker_redis):
    os.environ["REDIS_ENABLED_FOR_DYNACONF"] = "1"
    os.environ["REDIS_HOST_FOR_DYNACONF"] = "localhost"
    os.environ["REDIS_PORT_FOR_DYNACONF"] = "6379"
    settings = LazySettings(environments=True)

    write(settings, {"SECRET": "redis_works_with_docker"})
    load(settings, key="SECRET")
    assert settings.get("SECRET") == "redis_works_with_docker"


@pytest.mark.integration
def test_load_from_redis_with_key(docker_redis):
    os.environ["REDIS_ENABLED_FOR_DYNACONF"] = "1"
    os.environ["REDIS_HOST_FOR_DYNACONF"] = "localhost"
    os.environ["REDIS_PORT_FOR_DYNACONF"] = "6379"
    settings = LazySettings(environments=True)
    load(settings, key="SECRET")
    assert settings.get("SECRET") == "redis_works_with_docker"


@pytest.mark.integration
def test_write_and_load_from_redis_without_key(docker_redis):
    os.environ["REDIS_ENABLED_FOR_DYNACONF"] = "1"
    os.environ["REDIS_HOST_FOR_DYNACONF"] = "localhost"
    os.environ["REDIS_PORT_FOR_DYNACONF"] = "6379"
    settings = LazySettings(environments=True)
    write(settings, {"SECRET": "redis_works_perfectly"})
    load(settings)
    assert settings.get("SECRET") == "redis_works_perfectly"


@pytest.mark.integration
def test_delete_from_redis(docker_redis):
    os.environ["REDIS_ENABLED_FOR_DYNACONF"] = "1"
    os.environ["REDIS_HOST_FOR_DYNACONF"] = "localhost"
    os.environ["REDIS_PORT_FOR_DYNACONF"] = "6379"
    settings = LazySettings(environments=True)
    write(settings, {"OTHER_SECRET": "redis_works"})
    load(settings)
    assert settings.get("OTHER_SECRET") == "redis_works"
    delete(settings, key="OTHER_SECRET")
    assert load(settings, key="OTHER_SECRET") is None


@pytest.mark.integration
def test_delete_all_from_redis(docker_redis):
    settings = LazySettings(environments=True)
    delete(settings)
    assert load(settings, key="OTHER_SECRET") is None


@pytest.mark.integration
def test_redis_has_proper_source_metadata(docker_redis):
    os.environ["REDIS_ENABLED_FOR_DYNACONF"] = "1"
    os.environ["REDIS_HOST_FOR_DYNACONF"] = "localhost"
    os.environ["REDIS_PORT_FOR_DYNACONF"] = "6379"
    settings = LazySettings(environments=True)
    write(settings, {"SECRET": "redis_works_perfectly"})
    load(settings)
    history = get_history(
        settings, filter_callable=lambda s: s.loader == "redis"
    )
    assert history[0]["env"] == "development"  # default when environments=True
    assert history[0]["value"]["SECRET"] == "redis_works_perfectly"