1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
from dynaconf.utils import build_env_list
from dynaconf.utils import upperfy
from dynaconf.utils.parse_conf import parse_conf_data
from dynaconf.utils.parse_conf import unparse_conf_data
try:
from redis import StrictRedis
except ImportError:
StrictRedis = None
IDENTIFIER = "redis"
def load(obj, env=None, silent=True, key=None):
"""Reads and loads in to "settings" a single key or all keys from redis
:param obj: the settings instance
:param env: settings env default='DYNACONF'
:param silent: if errors should raise
:param key: if defined load a single key, else load all in env
:return: None
"""
if StrictRedis is None:
raise ImportError(
"redis package is not installed in your environment. "
"`pip install dynaconf[redis]` or disable the redis loader with "
"export REDIS_ENABLED_FOR_DYNACONF=false"
)
redis = StrictRedis(**obj.get("REDIS_FOR_DYNACONF"))
prefix = obj.get("ENVVAR_PREFIX_FOR_DYNACONF")
# prefix is added to env_list to keep backwards compatibility
env_list = [prefix] + build_env_list(obj, env or obj.current_env)
for env_name in env_list:
holder = f"{prefix.upper()}_{env_name.upper()}"
try:
if key:
value = redis.hget(holder.upper(), key)
if value:
parsed_value = parse_conf_data(
value, tomlfy=True, box_settings=obj
)
if parsed_value:
obj.set(key, parsed_value)
else:
data = {
key: parse_conf_data(value, tomlfy=True, box_settings=obj)
for key, value in redis.hgetall(holder.upper()).items()
}
if data:
obj.update(data, loader_identifier=IDENTIFIER)
except Exception:
if silent:
return False
raise
def write(obj, data=None, **kwargs):
"""Write a value in to loader source
:param obj: settings object
:param data: vars to be stored
:param kwargs: vars to be stored
:return:
"""
if obj.REDIS_ENABLED_FOR_DYNACONF is False:
raise RuntimeError(
"Redis is not configured \n"
"export REDIS_ENABLED_FOR_DYNACONF=true\n"
"and configure the REDIS_FOR_DYNACONF_* variables"
)
client = StrictRedis(**obj.REDIS_FOR_DYNACONF)
holder = obj.get("ENVVAR_PREFIX_FOR_DYNACONF").upper()
# add env to holder
holder = f"{holder}_{obj.current_env.upper()}"
data = data or {}
data.update(kwargs)
if not data:
raise AttributeError("Data must be provided")
redis_data = {
upperfy(key): unparse_conf_data(value) for key, value in data.items()
}
client.hmset(holder.upper(), redis_data)
load(obj)
def delete(obj, key=None):
"""
Delete a single key if specified, or all env if key is none
:param obj: settings object
:param key: key to delete from store location
:return: None
"""
client = StrictRedis(**obj.REDIS_FOR_DYNACONF)
holder = obj.get("ENVVAR_PREFIX_FOR_DYNACONF").upper()
# add env to holder
holder = f"{holder}_{obj.current_env.upper()}"
if key:
client.hdel(holder.upper(), upperfy(key))
obj.unset(key)
else:
keys = client.hkeys(holder.upper())
client.delete(holder.upper())
obj.unset_all(keys)
|