File: redis_loader.py

package info (click to toggle)
python-dynaconf 3.1.7-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 1,116 kB
  • sloc: python: 12,959; makefile: 4
file content (106 lines) | stat: -rw-r--r-- 3,493 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from dynaconf.utils import build_env_list
from dynaconf.utils import upperfy
from dynaconf.utils.parse_conf import parse_conf_data
from dynaconf.utils.parse_conf import unparse_conf_data

try:
    from redis import StrictRedis
except ImportError:
    StrictRedis = None

IDENTIFIER = "redis"


def load(obj, env=None, silent=True, key=None):
    """Reads and loads in to "settings" a single key or all keys from redis

    :param obj: the settings instance
    :param env: settings env default='DYNACONF'
    :param silent: if errors should raise
    :param key: if defined load a single key, else load all in env
    :return: None
    """
    if StrictRedis is None:
        raise ImportError(
            "redis package is not installed in your environment. "
            "`pip install dynaconf[redis]` or disable the redis loader with "
            "export REDIS_ENABLED_FOR_DYNACONF=false"
        )

    redis = StrictRedis(**obj.get("REDIS_FOR_DYNACONF"))
    prefix = obj.get("ENVVAR_PREFIX_FOR_DYNACONF")
    # prefix is added to env_list to keep backwards compatibility
    env_list = [prefix] + build_env_list(obj, env or obj.current_env)
    for env_name in env_list:
        holder = f"{prefix.upper()}_{env_name.upper()}"
        try:
            if key:
                value = redis.hget(holder.upper(), key)
                if value:
                    parsed_value = parse_conf_data(
                        value, tomlfy=True, box_settings=obj
                    )
                    if parsed_value:
                        obj.set(key, parsed_value)
            else:
                data = {
                    key: parse_conf_data(value, tomlfy=True, box_settings=obj)
                    for key, value in redis.hgetall(holder.upper()).items()
                }
                if data:
                    obj.update(data, loader_identifier=IDENTIFIER)
        except Exception:
            if silent:
                return False
            raise


def write(obj, data=None, **kwargs):
    """Write a value in to loader source

    :param obj: settings object
    :param data: vars to be stored
    :param kwargs: vars to be stored
    :return:
    """
    if obj.REDIS_ENABLED_FOR_DYNACONF is False:
        raise RuntimeError(
            "Redis is not configured \n"
            "export REDIS_ENABLED_FOR_DYNACONF=true\n"
            "and configure the REDIS_FOR_DYNACONF_* variables"
        )
    client = StrictRedis(**obj.REDIS_FOR_DYNACONF)
    holder = obj.get("ENVVAR_PREFIX_FOR_DYNACONF").upper()
    # add env to holder
    holder = f"{holder}_{obj.current_env.upper()}"

    data = data or {}
    data.update(kwargs)
    if not data:
        raise AttributeError("Data must be provided")
    redis_data = {
        upperfy(key): unparse_conf_data(value) for key, value in data.items()
    }
    client.hmset(holder.upper(), redis_data)
    load(obj)


def delete(obj, key=None):
    """
    Delete a single key if specified, or all env if key is none
    :param obj: settings object
    :param key: key to delete from store location
    :return: None
    """
    client = StrictRedis(**obj.REDIS_FOR_DYNACONF)
    holder = obj.get("ENVVAR_PREFIX_FOR_DYNACONF").upper()
    # add env to holder
    holder = f"{holder}_{obj.current_env.upper()}"

    if key:
        client.hdel(holder.upper(), upperfy(key))
        obj.unset(key)
    else:
        keys = client.hkeys(holder.upper())
        client.delete(holder.upper())
        obj.unset_all(keys)