1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
"""Core functions."""
# remove when we don't support py38 anymore
from __future__ import annotations
import atexit
import logging
import os
from subprocess import Popen
from typing import NoReturn
logger = logging.getLogger(__name__)
def read_dotenv(filename: str) -> dict[str, str]:
"""Read dotenv file.
Parameters
----------
filename
path to the filename
Returns
-------
dict
"""
try:
with open(filename) as fh:
data = fh.read()
except FileNotFoundError:
logger.warning(
f"{filename} does not exist, continuing without "
"setting environment variables."
)
data = ""
res = {}
for line in data.splitlines():
logger.debug(line)
line = line.strip()
# ignore comments
if line.startswith("#"):
continue
# ignore empty lines or lines w/o '='
if "=" not in line:
continue
key, value = line.split("=", 1)
# allow export
if key.startswith("export "):
key = key.split(" ", 1)[-1]
key = key.strip()
value = value.strip()
# remove quotes (not sure if this is standard behaviour)
if len(value) >= 2 and value[0] == value[-1] == '"':
value = value[1:-1]
# un-escape escape characters
control_chars = {
r'\a': '\a',
r'\b': '\b',
r'\f': '\f',
r'\n': '\n',
r'\r': '\r',
r'\t': '\t',
r'\v': '\v',
r'\\': '\\',
}
for char, repl in control_chars.items():
value = value.replace(char, repl)
elif len(value) >= 2 and value[0] == value[-1] == "'":
value = value[1:-1]
res[key] = value
logger.debug(res)
return res
def run_dotenv(
filenames: list[str], command: list[str], replace: bool = False
) -> NoReturn | int:
"""Run dotenv.
This function executes the commands with the environment variables
parsed from filename.
Parameters
----------
filenames
paths to the .env files
command
command to execute
replace_env
Replace the current environment instead of updating it.
Returns
-------
NoReturn | int
The exit status code in Windows. In POSIX-compatible systems, the
function does not return normally.
"""
# read dotenv files
dotenv = {}
for filename in filenames:
dotenv.update(read_dotenv(filename))
if replace:
# replace env
env = dotenv
else:
# update env
env = os.environ.copy()
env.update(dotenv)
# in POSIX, we replace the current process with the command, execvpe does
# not return
if os.name == "posix":
os.execvpe(command[0], command, env)
# in Windows, we spawn a new process
# execute
proc = Popen(
command,
# stdin=PIPE,
# stdout=PIPE,
# stderr=STDOUT,
universal_newlines=True,
bufsize=0,
shell=False,
env=env,
)
def terminate_proc() -> None:
"""Kill child process.
All signals should be forwarded to the child processes
automatically, however child processes are also free to ignore
some of them. With this we make sure the child processes get
killed once dotenv exits.
"""
proc.kill()
# register
atexit.register(terminate_proc)
_, _ = proc.communicate()
# unregister
atexit.unregister(terminate_proc)
return proc.returncode
|